diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 17edf1a59fc..e93a3677e74 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -9,7 +9,7 @@ from dataclasses import dataclass, replace as dataclass_replace from datetime import timedelta import logging from time import time -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast, final from uuid import UUID import sqlalchemy @@ -656,7 +656,7 @@ def _restore_foreign_key_constraints( @database_job_retry_wrapper("Apply migration update", 10) -def _apply_update( # noqa: C901 +def _apply_update( instance: Recorder, hass: HomeAssistant, engine: Engine, @@ -665,66 +665,148 @@ def _apply_update( # noqa: C901 old_version: int, ) -> None: """Perform operations to bring schema up to date.""" - assert engine.dialect.name is not None, "Dialect name must be set" - dialect = try_parse_enum(SupportedDialect, engine.dialect.name) - _column_types = _COLUMN_TYPES_FOR_DIALECT.get(dialect, _SQLITE_COLUMN_TYPES) - if new_version == 1: + migrator_cls = _SchemaVersionMigrator.get_migrator(new_version) + migrator_cls(instance, hass, engine, session_maker, old_version).apply_update() + + +class _SchemaVersionMigrator(ABC): + """Perform operations to bring schema up to date.""" + + __migrators: dict[int, type[_SchemaVersionMigrator]] = {} + + def __init_subclass__(cls, target_version: int, **kwargs: Any) -> None: + """Post initialisation processing.""" + super().__init_subclass__(**kwargs) + if target_version in _SchemaVersionMigrator.__migrators: + raise ValueError("Duplicated version") + _SchemaVersionMigrator.__migrators[target_version] = cls + + def __init__( + self, + instance: Recorder, + hass: HomeAssistant, + engine: Engine, + session_maker: Callable[[], Session], + old_version: int, + ) -> None: + """Initialize.""" + self.instance = instance + self.hass = hass + self.engine = engine + self.session_maker = session_maker + self.old_version = old_version + assert engine.dialect.name is not None, "Dialect name must be set" + dialect = try_parse_enum(SupportedDialect, engine.dialect.name) + self.column_types = _COLUMN_TYPES_FOR_DIALECT.get(dialect, _SQLITE_COLUMN_TYPES) + + @classmethod + def get_migrator(cls, target_version: int) -> type[_SchemaVersionMigrator]: + """Return a migrator for a specific schema version.""" + try: + return cls.__migrators[target_version] + except KeyError as err: + raise ValueError( + f"No migrator for schema version {target_version}" + ) from err + + @final + def apply_update(self) -> None: + """Perform operations to bring schema up to date.""" + self._apply_update() + + @abstractmethod + def _apply_update(self) -> None: + """Version specific update method.""" + + +class _SchemaVersion1Migrator(_SchemaVersionMigrator, target_version=1): + def _apply_update(self) -> None: + """Version specific update method.""" # This used to create ix_events_time_fired, but it was removed in version 32 - pass - elif new_version == 2: + + +class _SchemaVersion2Migrator(_SchemaVersionMigrator, target_version=2): + def _apply_update(self) -> None: + """Version specific update method.""" # Create compound start/end index for recorder_runs - _create_index(session_maker, "recorder_runs", "ix_recorder_runs_start_end") + _create_index(self.session_maker, "recorder_runs", "ix_recorder_runs_start_end") # This used to create ix_states_last_updated bit it was removed in version 32 - elif new_version == 3: + + +class _SchemaVersion3Migrator(_SchemaVersionMigrator, target_version=3): + def _apply_update(self) -> None: + """Version specific update method.""" # There used to be a new index here, but it was removed in version 4. - pass - elif new_version == 4: + + +class _SchemaVersion4Migrator(_SchemaVersionMigrator, target_version=4): + def _apply_update(self) -> None: + """Version specific update method.""" # Queries were rewritten in this schema release. Most indexes from # earlier versions of the schema are no longer needed. - if old_version == 3: + if self.old_version == 3: # Remove index that was added in version 3 - _drop_index(session_maker, "states", "ix_states_created_domain") - if old_version == 2: + _drop_index(self.session_maker, "states", "ix_states_created_domain") + if self.old_version == 2: # Remove index that was added in version 2 - _drop_index(session_maker, "states", "ix_states_entity_id_created") + _drop_index(self.session_maker, "states", "ix_states_entity_id_created") # Remove indexes that were added in version 0 - _drop_index(session_maker, "states", "states__state_changes") - _drop_index(session_maker, "states", "states__significant_changes") - _drop_index(session_maker, "states", "ix_states_entity_id_created") + _drop_index(self.session_maker, "states", "states__state_changes") + _drop_index(self.session_maker, "states", "states__significant_changes") + _drop_index(self.session_maker, "states", "ix_states_entity_id_created") # This used to create ix_states_entity_id_last_updated, # but it was removed in version 32 - elif new_version == 5: + + +class _SchemaVersion5Migrator(_SchemaVersionMigrator, target_version=5): + def _apply_update(self) -> None: + """Version specific update method.""" # Create supporting index for States.event_id foreign key - _create_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) - elif new_version == 6: + _create_index(self.session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) + + +class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6): + def _apply_update(self) -> None: + """Version specific update method.""" _add_columns( - session_maker, + self.session_maker, "events", ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"], ) - _create_index(session_maker, "events", "ix_events_context_id") + _create_index(self.session_maker, "events", "ix_events_context_id") # This used to create ix_events_context_user_id, # but it was removed in version 28 _add_columns( - session_maker, + self.session_maker, "states", ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"], ) - _create_index(session_maker, "states", "ix_states_context_id") + _create_index(self.session_maker, "states", "ix_states_context_id") # This used to create ix_states_context_user_id, # but it was removed in version 28 - elif new_version == 7: + + +class _SchemaVersion7Migrator(_SchemaVersionMigrator, target_version=7): + def _apply_update(self) -> None: + """Version specific update method.""" # There used to be a ix_states_entity_id index here, # but it was removed in later schema - pass - elif new_version == 8: - _add_columns(session_maker, "events", ["context_parent_id CHARACTER(36)"]) - _add_columns(session_maker, "states", ["old_state_id INTEGER"]) + + +class _SchemaVersion8Migrator(_SchemaVersionMigrator, target_version=8): + def _apply_update(self) -> None: + """Version specific update method.""" + _add_columns(self.session_maker, "events", ["context_parent_id CHARACTER(36)"]) + _add_columns(self.session_maker, "states", ["old_state_id INTEGER"]) # This used to create ix_events_context_parent_id, # but it was removed in version 28 - elif new_version == 9: + + +class _SchemaVersion9Migrator(_SchemaVersionMigrator, target_version=9): + def _apply_update(self) -> None: + """Version specific update method.""" # We now get the context from events with a join # since its always there on state_changed events # @@ -734,37 +816,56 @@ def _apply_update( # noqa: C901 # sqlalchemy alembic to make that work # # no longer dropping ix_states_context_id since its recreated in 28 - _drop_index(session_maker, "states", "ix_states_context_user_id") + _drop_index(self.session_maker, "states", "ix_states_context_user_id") # This index won't be there if they were not running # nightly but we don't treat that as a critical issue - _drop_index(session_maker, "states", "ix_states_context_parent_id") + _drop_index(self.session_maker, "states", "ix_states_context_parent_id") # Redundant keys on composite index: # We already have ix_states_entity_id_last_updated - _drop_index(session_maker, "states", "ix_states_entity_id") + _drop_index(self.session_maker, "states", "ix_states_entity_id") # This used to create ix_events_event_type_time_fired, # but it was removed in version 32 - _drop_index(session_maker, "events", "ix_events_event_type") - elif new_version == 10: + _drop_index(self.session_maker, "events", "ix_events_event_type") + + +class _SchemaVersion10Migrator(_SchemaVersionMigrator, target_version=10): + def _apply_update(self) -> None: + """Version specific update method.""" # Now done in step 11 - pass - elif new_version == 11: - _create_index(session_maker, "states", "ix_states_old_state_id") - _update_states_table_with_foreign_key_options(session_maker, engine) - elif new_version == 12: - if engine.dialect.name == SupportedDialect.MYSQL: - _modify_columns(session_maker, engine, "events", ["event_data LONGTEXT"]) - _modify_columns(session_maker, engine, "states", ["attributes LONGTEXT"]) - elif new_version == 13: - if engine.dialect.name == SupportedDialect.MYSQL: + + +class _SchemaVersion11Migrator(_SchemaVersionMigrator, target_version=11): + def _apply_update(self) -> None: + """Version specific update method.""" + _create_index(self.session_maker, "states", "ix_states_old_state_id") + _update_states_table_with_foreign_key_options(self.session_maker, self.engine) + + +class _SchemaVersion12Migrator(_SchemaVersionMigrator, target_version=12): + def _apply_update(self) -> None: + """Version specific update method.""" + if self.engine.dialect.name == SupportedDialect.MYSQL: _modify_columns( - session_maker, - engine, + self.session_maker, self.engine, "events", ["event_data LONGTEXT"] + ) + _modify_columns( + self.session_maker, self.engine, "states", ["attributes LONGTEXT"] + ) + + +class _SchemaVersion13Migrator(_SchemaVersionMigrator, target_version=13): + def _apply_update(self) -> None: + """Version specific update method.""" + if self.engine.dialect.name == SupportedDialect.MYSQL: + _modify_columns( + self.session_maker, + self.engine, "events", ["time_fired DATETIME(6)", "created DATETIME(6)"], ) _modify_columns( - session_maker, - engine, + self.session_maker, + self.engine, "states", [ "last_changed DATETIME(6)", @@ -772,19 +873,39 @@ def _apply_update( # noqa: C901 "created DATETIME(6)", ], ) - elif new_version == 14: - _modify_columns(session_maker, engine, "events", ["event_type VARCHAR(64)"]) - elif new_version == 15: - # This dropped the statistics table, done again in version 18. - pass - elif new_version == 16: - _drop_foreign_key_constraints( - session_maker, engine, TABLE_STATES, "old_state_id" + + +class _SchemaVersion14Migrator(_SchemaVersionMigrator, target_version=14): + def _apply_update(self) -> None: + """Version specific update method.""" + _modify_columns( + self.session_maker, self.engine, "events", ["event_type VARCHAR(64)"] ) - elif new_version == 17: + + +class _SchemaVersion15Migrator(_SchemaVersionMigrator, target_version=15): + def _apply_update(self) -> None: + """Version specific update method.""" # This dropped the statistics table, done again in version 18. - pass - elif new_version == 18: + + +class _SchemaVersion16Migrator(_SchemaVersionMigrator, target_version=16): + def _apply_update(self) -> None: + """Version specific update method.""" + _drop_foreign_key_constraints( + self.session_maker, self.engine, TABLE_STATES, "old_state_id" + ) + + +class _SchemaVersion17Migrator(_SchemaVersionMigrator, target_version=17): + def _apply_update(self) -> None: + """Version specific update method.""" + # This dropped the statistics table, done again in version 18. + + +class _SchemaVersion18Migrator(_SchemaVersionMigrator, target_version=18): + def _apply_update(self) -> None: + """Version specific update method.""" # Recreate the statistics and statistics meta tables. # # Order matters! Statistics and StatisticsShortTerm have a relation with @@ -794,7 +915,7 @@ def _apply_update( # noqa: C901 # We need to cast __table__ to Table, explanation in # https://github.com/sqlalchemy/sqlalchemy/issues/9130 Base.metadata.drop_all( - bind=engine, + bind=self.engine, tables=[ cast(Table, StatisticsShortTerm.__table__), cast(Table, Statistics.__table__), @@ -802,42 +923,61 @@ def _apply_update( # noqa: C901 ], ) - cast(Table, StatisticsMeta.__table__).create(engine) - cast(Table, StatisticsShortTerm.__table__).create(engine) - cast(Table, Statistics.__table__).create(engine) - elif new_version == 19: + cast(Table, StatisticsMeta.__table__).create(self.engine) + cast(Table, StatisticsShortTerm.__table__).create(self.engine) + cast(Table, Statistics.__table__).create(self.engine) + + +class _SchemaVersion19Migrator(_SchemaVersionMigrator, target_version=19): + def _apply_update(self) -> None: + """Version specific update method.""" # This adds the statistic runs table, insert a fake run to prevent duplicating # statistics. - with session_scope(session=session_maker()) as session: + with session_scope(session=self.session_maker()) as session: session.add(StatisticsRuns(start=get_start_time())) - elif new_version == 20: + + +class _SchemaVersion20Migrator(_SchemaVersionMigrator, target_version=20): + def _apply_update(self) -> None: + """Version specific update method.""" # This changed the precision of statistics from float to double - if engine.dialect.name in [SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL]: + if self.engine.dialect.name in [ + SupportedDialect.MYSQL, + SupportedDialect.POSTGRESQL, + ]: _modify_columns( - session_maker, - engine, + self.session_maker, + self.engine, "statistics", [ f"{column} {DOUBLE_PRECISION_TYPE_SQL}" for column in ("max", "mean", "min", "state", "sum") ], ) - elif new_version == 21: + + +class _SchemaVersion21Migrator(_SchemaVersionMigrator, target_version=21): + def _apply_update(self) -> None: + """Version specific update method.""" # Try to change the character set of the statistic_meta table - if engine.dialect.name == SupportedDialect.MYSQL: + if self.engine.dialect.name == SupportedDialect.MYSQL: for table in ("events", "states", "statistics_meta"): - _correct_table_character_set_and_collation(table, session_maker) - elif new_version == 22: + _correct_table_character_set_and_collation(table, self.session_maker) + + +class _SchemaVersion22Migrator(_SchemaVersionMigrator, target_version=22): + def _apply_update(self) -> None: + """Version specific update method.""" # Recreate the all statistics tables for Oracle DB with Identity columns # # Order matters! Statistics has a relation with StatisticsMeta, # so statistics need to be deleted before meta (or in pair depending # on the SQL backend); and meta needs to be created before statistics. - if engine.dialect.name == "oracle": + if self.engine.dialect.name == "oracle": # We need to cast __table__ to Table, explanation in # https://github.com/sqlalchemy/sqlalchemy/issues/9130 Base.metadata.drop_all( - bind=engine, + bind=self.engine, tables=[ cast(Table, StatisticsShortTerm.__table__), cast(Table, Statistics.__table__), @@ -846,15 +986,15 @@ def _apply_update( # noqa: C901 ], ) - cast(Table, StatisticsRuns.__table__).create(engine) - cast(Table, StatisticsMeta.__table__).create(engine) - cast(Table, StatisticsShortTerm.__table__).create(engine) - cast(Table, Statistics.__table__).create(engine) + cast(Table, StatisticsRuns.__table__).create(self.engine) + cast(Table, StatisticsMeta.__table__).create(self.engine) + cast(Table, StatisticsShortTerm.__table__).create(self.engine) + cast(Table, Statistics.__table__).create(self.engine) # Block 5-minute statistics for one hour from the last run, or it will overlap # with existing hourly statistics. Don't block on a database with no existing # statistics. - with session_scope(session=session_maker()) as session: + with session_scope(session=self.session_maker()) as session: if session.query(Statistics.id).count() and ( last_run_string := session.query( func.max(StatisticsRuns.start) @@ -870,7 +1010,7 @@ def _apply_update( # noqa: C901 # When querying the database, be careful to only explicitly query for columns # which were present in schema version 22. If querying the table, SQLAlchemy # will refer to future columns. - with session_scope(session=session_maker()) as session: + with session_scope(session=self.session_maker()) as session: for sum_statistic in session.query(StatisticsMeta.id).filter_by( has_sum=true() ): @@ -895,31 +1035,58 @@ def _apply_update( # noqa: C901 sum=last_statistic.sum, ) ) - elif new_version == 23: + + +class _SchemaVersion23Migrator(_SchemaVersionMigrator, target_version=23): + def _apply_update(self) -> None: + """Version specific update method.""" # Add name column to StatisticsMeta - _add_columns(session_maker, "statistics_meta", ["name VARCHAR(255)"]) - elif new_version == 24: + _add_columns(self.session_maker, "statistics_meta", ["name VARCHAR(255)"]) + + +class _SchemaVersion24Migrator(_SchemaVersionMigrator, target_version=24): + def _apply_update(self) -> None: + """Version specific update method.""" # This used to create the unique indices for start and statistic_id # but we changed the format in schema 34 which will now take care # of removing any duplicate if they still exist. - pass - elif new_version == 25: + + +class _SchemaVersion25Migrator(_SchemaVersionMigrator, target_version=25): + def _apply_update(self) -> None: + """Version specific update method.""" _add_columns( - session_maker, "states", [f"attributes_id {_column_types.big_int_type}"] + self.session_maker, + "states", + [f"attributes_id {self.column_types.big_int_type}"], ) - _create_index(session_maker, "states", "ix_states_attributes_id") - elif new_version == 26: - _create_index(session_maker, "statistics_runs", "ix_statistics_runs_start") - elif new_version == 27: - _add_columns(session_maker, "events", [f"data_id {_column_types.big_int_type}"]) - _create_index(session_maker, "events", "ix_events_data_id") - elif new_version == 28: - _add_columns(session_maker, "events", ["origin_idx INTEGER"]) - # We never use the user_id or parent_id index - _drop_index(session_maker, "events", "ix_events_context_user_id") - _drop_index(session_maker, "events", "ix_events_context_parent_id") + _create_index(self.session_maker, "states", "ix_states_attributes_id") + + +class _SchemaVersion26Migrator(_SchemaVersionMigrator, target_version=26): + def _apply_update(self) -> None: + """Version specific update method.""" + _create_index(self.session_maker, "statistics_runs", "ix_statistics_runs_start") + + +class _SchemaVersion27Migrator(_SchemaVersionMigrator, target_version=27): + def _apply_update(self) -> None: + """Version specific update method.""" _add_columns( - session_maker, + self.session_maker, "events", [f"data_id {self.column_types.big_int_type}"] + ) + _create_index(self.session_maker, "events", "ix_events_data_id") + + +class _SchemaVersion28Migrator(_SchemaVersionMigrator, target_version=28): + def _apply_update(self) -> None: + """Version specific update method.""" + _add_columns(self.session_maker, "events", ["origin_idx INTEGER"]) + # We never use the user_id or parent_id index + _drop_index(self.session_maker, "events", "ix_events_context_user_id") + _drop_index(self.session_maker, "events", "ix_events_context_parent_id") + _add_columns( + self.session_maker, "states", [ "origin_idx INTEGER", @@ -928,18 +1095,24 @@ def _apply_update( # noqa: C901 "context_parent_id VARCHAR(36)", ], ) - _create_index(session_maker, "states", "ix_states_context_id") + _create_index(self.session_maker, "states", "ix_states_context_id") # Once there are no longer any state_changed events # in the events table we can drop the index on states.event_id - elif new_version == 29: + + +class _SchemaVersion29Migrator(_SchemaVersionMigrator, target_version=29): + def _apply_update(self) -> None: + """Version specific update method.""" # Recreate statistics_meta index to block duplicated statistic_id - _drop_index(session_maker, "statistics_meta", "ix_statistics_meta_statistic_id") - if engine.dialect.name == SupportedDialect.MYSQL: + _drop_index( + self.session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" + ) + if self.engine.dialect.name == SupportedDialect.MYSQL: # Ensure the row format is dynamic or the index # unique will be too large with ( contextlib.suppress(SQLAlchemyError), - session_scope(session=session_maker()) as session, + session_scope(session=self.session_maker()) as session, ): connection = session.connection() # This is safe to run multiple times and fast @@ -949,58 +1122,82 @@ def _apply_update( # noqa: C901 ) try: _create_index( - session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" + self.session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" ) except DatabaseError: # There may be duplicated statistics_meta entries, delete duplicates # and try again - with session_scope(session=session_maker()) as session: - delete_statistics_meta_duplicates(instance, session) + with session_scope(session=self.session_maker()) as session: + delete_statistics_meta_duplicates(self.instance, session) _create_index( - session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" + self.session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" ) - elif new_version == 30: + + +class _SchemaVersion30Migrator(_SchemaVersionMigrator, target_version=30): + def _apply_update(self) -> None: + """Version specific update method.""" # This added a column to the statistics_meta table, removed again before # release of HA Core 2022.10.0 # SQLite 3.31.0 does not support dropping columns. # Once we require SQLite >= 3.35.5, we should drop the column: # ALTER TABLE statistics_meta DROP COLUMN state_unit_of_measurement - pass - elif new_version == 31: + + +class _SchemaVersion31Migrator(_SchemaVersionMigrator, target_version=31): + def _apply_update(self) -> None: + """Version specific update method.""" # Once we require SQLite >= 3.35.5, we should drop the column: # ALTER TABLE events DROP COLUMN time_fired # ALTER TABLE states DROP COLUMN last_updated # ALTER TABLE states DROP COLUMN last_changed _add_columns( - session_maker, "events", [f"time_fired_ts {_column_types.timestamp_type}"] + self.session_maker, + "events", + [f"time_fired_ts {self.column_types.timestamp_type}"], ) _add_columns( - session_maker, + self.session_maker, "states", [ - f"last_updated_ts {_column_types.timestamp_type}", - f"last_changed_ts {_column_types.timestamp_type}", + f"last_updated_ts {self.column_types.timestamp_type}", + f"last_changed_ts {self.column_types.timestamp_type}", ], ) - _create_index(session_maker, "events", "ix_events_time_fired_ts") - _create_index(session_maker, "events", "ix_events_event_type_time_fired_ts") - _create_index(session_maker, "states", "ix_states_entity_id_last_updated_ts") - _create_index(session_maker, "states", "ix_states_last_updated_ts") - _migrate_columns_to_timestamp(instance, session_maker, engine) - elif new_version == 32: + _create_index(self.session_maker, "events", "ix_events_time_fired_ts") + _create_index( + self.session_maker, "events", "ix_events_event_type_time_fired_ts" + ) + _create_index( + self.session_maker, "states", "ix_states_entity_id_last_updated_ts" + ) + _create_index(self.session_maker, "states", "ix_states_last_updated_ts") + _migrate_columns_to_timestamp(self.instance, self.session_maker, self.engine) + + +class _SchemaVersion32Migrator(_SchemaVersionMigrator, target_version=32): + def _apply_update(self) -> None: + """Version specific update method.""" # Migration is done in two steps to ensure we can start using # the new columns before we wipe the old ones. - _drop_index(session_maker, "states", "ix_states_entity_id_last_updated") - _drop_index(session_maker, "events", "ix_events_event_type_time_fired") - _drop_index(session_maker, "states", "ix_states_last_updated") - _drop_index(session_maker, "events", "ix_events_time_fired") - elif new_version == 33: + _drop_index(self.session_maker, "states", "ix_states_entity_id_last_updated") + _drop_index(self.session_maker, "events", "ix_events_event_type_time_fired") + _drop_index(self.session_maker, "states", "ix_states_last_updated") + _drop_index(self.session_maker, "events", "ix_events_time_fired") + + +class _SchemaVersion33Migrator(_SchemaVersionMigrator, target_version=33): + def _apply_update(self) -> None: + """Version specific update method.""" # This index is no longer used and can cause MySQL to use the wrong index # when querying the states table. # https://github.com/home-assistant/core/issues/83787 # There was an index cleanup here but its now done in schema 39 - pass - elif new_version == 34: + + +class _SchemaVersion34Migrator(_SchemaVersionMigrator, target_version=34): + def _apply_update(self) -> None: + """Version specific update method.""" # Once we require SQLite >= 3.35.5, we should drop the columns: # ALTER TABLE statistics DROP COLUMN created # ALTER TABLE statistics DROP COLUMN start @@ -1009,150 +1206,225 @@ def _apply_update( # noqa: C901 # ALTER TABLE statistics_short_term DROP COLUMN start # ALTER TABLE statistics_short_term DROP COLUMN last_reset _add_columns( - session_maker, + self.session_maker, "statistics", [ - f"created_ts {_column_types.timestamp_type}", - f"start_ts {_column_types.timestamp_type}", - f"last_reset_ts {_column_types.timestamp_type}", + f"created_ts {self.column_types.timestamp_type}", + f"start_ts {self.column_types.timestamp_type}", + f"last_reset_ts {self.column_types.timestamp_type}", ], ) _add_columns( - session_maker, + self.session_maker, "statistics_short_term", [ - f"created_ts {_column_types.timestamp_type}", - f"start_ts {_column_types.timestamp_type}", - f"last_reset_ts {_column_types.timestamp_type}", + f"created_ts {self.column_types.timestamp_type}", + f"start_ts {self.column_types.timestamp_type}", + f"last_reset_ts {self.column_types.timestamp_type}", ], ) - _create_index(session_maker, "statistics", "ix_statistics_start_ts") + _create_index(self.session_maker, "statistics", "ix_statistics_start_ts") _create_index( - session_maker, "statistics", "ix_statistics_statistic_id_start_ts" + self.session_maker, "statistics", "ix_statistics_statistic_id_start_ts" ) _create_index( - session_maker, "statistics_short_term", "ix_statistics_short_term_start_ts" + self.session_maker, + "statistics_short_term", + "ix_statistics_short_term_start_ts", ) _create_index( - session_maker, + self.session_maker, "statistics_short_term", "ix_statistics_short_term_statistic_id_start_ts", ) _migrate_statistics_columns_to_timestamp_removing_duplicates( - hass, instance, session_maker, engine + self.hass, self.instance, self.session_maker, self.engine ) - elif new_version == 35: + + +class _SchemaVersion35Migrator(_SchemaVersionMigrator, target_version=35): + def _apply_update(self) -> None: + """Version specific update method.""" # Migration is done in two steps to ensure we can start using # the new columns before we wipe the old ones. _drop_index( - session_maker, "statistics", "ix_statistics_statistic_id_start", quiet=True + self.session_maker, + "statistics", + "ix_statistics_statistic_id_start", + quiet=True, ) _drop_index( - session_maker, + self.session_maker, "statistics_short_term", "ix_statistics_short_term_statistic_id_start", quiet=True, ) # ix_statistics_start and ix_statistics_statistic_id_start are still used # for the post migration cleanup and can be removed in a future version. - elif new_version == 36: + + +class _SchemaVersion36Migrator(_SchemaVersionMigrator, target_version=36): + def _apply_update(self) -> None: + """Version specific update method.""" for table in ("states", "events"): _add_columns( - session_maker, + self.session_maker, table, [ - f"context_id_bin {_column_types.context_bin_type}", - f"context_user_id_bin {_column_types.context_bin_type}", - f"context_parent_id_bin {_column_types.context_bin_type}", + f"context_id_bin {self.column_types.context_bin_type}", + f"context_user_id_bin {self.column_types.context_bin_type}", + f"context_parent_id_bin {self.column_types.context_bin_type}", ], ) - _create_index(session_maker, "events", "ix_events_context_id_bin") - _create_index(session_maker, "states", "ix_states_context_id_bin") - elif new_version == 37: + _create_index(self.session_maker, "events", "ix_events_context_id_bin") + _create_index(self.session_maker, "states", "ix_states_context_id_bin") + + +class _SchemaVersion37Migrator(_SchemaVersionMigrator, target_version=37): + def _apply_update(self) -> None: + """Version specific update method.""" _add_columns( - session_maker, "events", [f"event_type_id {_column_types.big_int_type}"] + self.session_maker, + "events", + [f"event_type_id {self.column_types.big_int_type}"], ) - _create_index(session_maker, "events", "ix_events_event_type_id") - _drop_index(session_maker, "events", "ix_events_event_type_time_fired_ts") - _create_index(session_maker, "events", "ix_events_event_type_id_time_fired_ts") - elif new_version == 38: + _create_index(self.session_maker, "events", "ix_events_event_type_id") + _drop_index(self.session_maker, "events", "ix_events_event_type_time_fired_ts") + _create_index( + self.session_maker, "events", "ix_events_event_type_id_time_fired_ts" + ) + + +class _SchemaVersion38Migrator(_SchemaVersionMigrator, target_version=38): + def _apply_update(self) -> None: + """Version specific update method.""" _add_columns( - session_maker, "states", [f"metadata_id {_column_types.big_int_type}"] + self.session_maker, + "states", + [f"metadata_id {self.column_types.big_int_type}"], ) - _create_index(session_maker, "states", "ix_states_metadata_id") - _create_index(session_maker, "states", "ix_states_metadata_id_last_updated_ts") - elif new_version == 39: + _create_index(self.session_maker, "states", "ix_states_metadata_id") + _create_index( + self.session_maker, "states", "ix_states_metadata_id_last_updated_ts" + ) + + +class _SchemaVersion39Migrator(_SchemaVersionMigrator, target_version=39): + def _apply_update(self) -> None: + """Version specific update method.""" # Dropping indexes with PostgreSQL never worked correctly if there was a prefix # so we need to cleanup leftover indexes. _drop_index( - session_maker, "events", "ix_events_event_type_time_fired_ts", quiet=True + self.session_maker, + "events", + "ix_events_event_type_time_fired_ts", + quiet=True, ) - _drop_index(session_maker, "events", "ix_events_event_type", quiet=True) + _drop_index(self.session_maker, "events", "ix_events_event_type", quiet=True) _drop_index( - session_maker, "events", "ix_events_event_type_time_fired", quiet=True + self.session_maker, "events", "ix_events_event_type_time_fired", quiet=True ) - _drop_index(session_maker, "events", "ix_events_time_fired", quiet=True) - _drop_index(session_maker, "events", "ix_events_context_user_id", quiet=True) - _drop_index(session_maker, "events", "ix_events_context_parent_id", quiet=True) + _drop_index(self.session_maker, "events", "ix_events_time_fired", quiet=True) _drop_index( - session_maker, "states", "ix_states_entity_id_last_updated", quiet=True - ) - _drop_index(session_maker, "states", "ix_states_last_updated", quiet=True) - _drop_index(session_maker, "states", "ix_states_entity_id", quiet=True) - _drop_index(session_maker, "states", "ix_states_context_user_id", quiet=True) - _drop_index(session_maker, "states", "ix_states_context_parent_id", quiet=True) - _drop_index(session_maker, "states", "ix_states_created_domain", quiet=True) - _drop_index(session_maker, "states", "ix_states_entity_id_created", quiet=True) - _drop_index(session_maker, "states", "states__state_changes", quiet=True) - _drop_index(session_maker, "states", "states__significant_changes", quiet=True) - _drop_index(session_maker, "states", "ix_states_entity_id_created", quiet=True) - _drop_index( - session_maker, "statistics", "ix_statistics_statistic_id_start", quiet=True + self.session_maker, "events", "ix_events_context_user_id", quiet=True ) _drop_index( - session_maker, + self.session_maker, "events", "ix_events_context_parent_id", quiet=True + ) + _drop_index( + self.session_maker, "states", "ix_states_entity_id_last_updated", quiet=True + ) + _drop_index(self.session_maker, "states", "ix_states_last_updated", quiet=True) + _drop_index(self.session_maker, "states", "ix_states_entity_id", quiet=True) + _drop_index( + self.session_maker, "states", "ix_states_context_user_id", quiet=True + ) + _drop_index( + self.session_maker, "states", "ix_states_context_parent_id", quiet=True + ) + _drop_index( + self.session_maker, "states", "ix_states_created_domain", quiet=True + ) + _drop_index( + self.session_maker, "states", "ix_states_entity_id_created", quiet=True + ) + _drop_index(self.session_maker, "states", "states__state_changes", quiet=True) + _drop_index( + self.session_maker, "states", "states__significant_changes", quiet=True + ) + _drop_index( + self.session_maker, "states", "ix_states_entity_id_created", quiet=True + ) + _drop_index( + self.session_maker, + "statistics", + "ix_statistics_statistic_id_start", + quiet=True, + ) + _drop_index( + self.session_maker, "statistics_short_term", "ix_statistics_short_term_statistic_id_start", quiet=True, ) - elif new_version == 40: + + +class _SchemaVersion40Migrator(_SchemaVersionMigrator, target_version=40): + def _apply_update(self) -> None: + """Version specific update method.""" # ix_events_event_type_id is a left-prefix of ix_events_event_type_id_time_fired_ts - _drop_index(session_maker, "events", "ix_events_event_type_id") + _drop_index(self.session_maker, "events", "ix_events_event_type_id") # ix_states_metadata_id is a left-prefix of ix_states_metadata_id_last_updated_ts - _drop_index(session_maker, "states", "ix_states_metadata_id") + _drop_index(self.session_maker, "states", "ix_states_metadata_id") # ix_statistics_metadata_id is a left-prefix of ix_statistics_statistic_id_start_ts - _drop_index(session_maker, "statistics", "ix_statistics_metadata_id") + _drop_index(self.session_maker, "statistics", "ix_statistics_metadata_id") # ix_statistics_short_term_metadata_id is a left-prefix of ix_statistics_short_term_statistic_id_start_ts _drop_index( - session_maker, + self.session_maker, "statistics_short_term", "ix_statistics_short_term_metadata_id", ) - elif new_version == 41: - _create_index(session_maker, "event_types", "ix_event_types_event_type") - _create_index(session_maker, "states_meta", "ix_states_meta_entity_id") - elif new_version == 42: + + +class _SchemaVersion41Migrator(_SchemaVersionMigrator, target_version=41): + def _apply_update(self) -> None: + """Version specific update method.""" + _create_index(self.session_maker, "event_types", "ix_event_types_event_type") + _create_index(self.session_maker, "states_meta", "ix_states_meta_entity_id") + + +class _SchemaVersion42Migrator(_SchemaVersionMigrator, target_version=42): + def _apply_update(self) -> None: + """Version specific update method.""" # If the user had a previously failed migration, or they # downgraded from 2023.3.x to an older version we will have # unmigrated statistics columns so we want to clean this up # one last time since compiling the statistics will be slow # or fail if we have unmigrated statistics. _migrate_statistics_columns_to_timestamp_removing_duplicates( - hass, instance, session_maker, engine + self.hass, self.instance, self.session_maker, self.engine ) - elif new_version == 43: + + +class _SchemaVersion43Migrator(_SchemaVersionMigrator, target_version=43): + def _apply_update(self) -> None: + """Version specific update method.""" _add_columns( - session_maker, + self.session_maker, "states", - [f"last_reported_ts {_column_types.timestamp_type}"], + [f"last_reported_ts {self.column_types.timestamp_type}"], ) - elif new_version == 44: + + +class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44): + def _apply_update(self) -> None: + """Version specific update method.""" # We skip this step for SQLITE, it doesn't have differently sized integers - if engine.dialect.name == SupportedDialect.SQLITE: + if self.engine.dialect.name == SupportedDialect.SQLITE: return identity_sql = ( "NOT NULL AUTO_INCREMENT" - if engine.dialect.name == SupportedDialect.MYSQL + if self.engine.dialect.name == SupportedDialect.MYSQL else "" ) # First drop foreign key constraints @@ -1167,7 +1439,7 @@ def _apply_update( # noqa: C901 for table, columns in foreign_columns for column in columns for dropped_constraint in _drop_foreign_key_constraints( - session_maker, engine, table, column + self.session_maker, self.engine, table, column ) ] _LOGGER.debug("Dropped foreign key constraints: %s", dropped_constraints) @@ -1175,8 +1447,8 @@ def _apply_update( # noqa: C901 # Then modify the constrained columns for table, columns in foreign_columns: _modify_columns( - session_maker, - engine, + self.session_maker, + self.engine, table, [f"{column} {BIG_INTEGER_SQL}" for column in columns], ) @@ -1198,16 +1470,15 @@ def _apply_update( # noqa: C901 ) for table, column in id_columns: _modify_columns( - session_maker, - engine, + self.session_maker, + self.engine, table, [f"{column} {BIG_INTEGER_SQL} {identity_sql}"], ) # Finally restore dropped constraints - _restore_foreign_key_constraints(session_maker, engine, dropped_constraints) - - else: - raise ValueError(f"No schema migration defined for version {new_version}") + _restore_foreign_key_constraints( + self.session_maker, self.engine, dropped_constraints + ) def _migrate_statistics_columns_to_timestamp_removing_duplicates(