From ff8bc763c3f667039b3034d972c871e7439a6833 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Wed, 18 Dec 2024 21:29:52 +0100 Subject: [PATCH] Ensure indices needed by data migrators exist (#133367) * Ensure indices needed by data migrators exist * Update test * Improve test * Ignore index error on char(0) columns * Adjust tests * Address review comments * Add comment motivating magic number --- .../components/recorder/db_schema.py | 38 ++- .../components/recorder/migration.py | 223 ++++++++++++++---- tests/components/recorder/test_migrate.py | 8 +- ..._migration_run_time_migrations_remember.py | 203 +++++++++------- .../recorder/test_purge_v32_schema.py | 4 +- 5 files changed, 340 insertions(+), 136 deletions(-) diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index fb57a1c73e2..fa4162f4183 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -6,7 +6,7 @@ from collections.abc import Callable from datetime import datetime, timedelta import logging import time -from typing import Any, Self, cast +from typing import Any, Final, Self, cast import ciso8601 from fnv_hash_fast import fnv1a_32 @@ -130,7 +130,8 @@ METADATA_ID_LAST_UPDATED_INDEX_TS = "ix_states_metadata_id_last_updated_ts" EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id" -LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated_ts" +LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX = "ix_states_entity_id_last_updated_ts" +LEGACY_MAX_LENGTH_EVENT_CONTEXT_ID: Final = 36 CONTEXT_ID_BIN_MAX_LENGTH = 16 MYSQL_COLLATE = "utf8mb4_unicode_ci" @@ -350,6 +351,17 @@ class Events(Base): return None +class LegacyEvents(LegacyBase): + """Event history data with event_id, used for schema migration.""" + + __table_args__ = (_DEFAULT_TABLE_ARGS,) + __tablename__ = TABLE_EVENTS + event_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True) + context_id: Mapped[str | None] = mapped_column( + String(LEGACY_MAX_LENGTH_EVENT_CONTEXT_ID), index=True + ) + + class EventData(Base): """Event data history.""" @@ -575,6 +587,28 @@ class States(Base): ) +class LegacyStates(LegacyBase): + """State change history with entity_id, used for schema migration.""" + + __table_args__ = ( + Index( + LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX, + "entity_id", + "last_updated_ts", + ), + _DEFAULT_TABLE_ARGS, + ) + __tablename__ = TABLE_STATES + state_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True) + entity_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) + last_updated_ts: Mapped[float | None] = mapped_column( + TIMESTAMP_TYPE, default=time.time, index=True + ) + context_id: Mapped[str | None] = mapped_column( + String(LEGACY_MAX_LENGTH_EVENT_CONTEXT_ID), index=True + ) + + class StateAttributes(Base): """State attribute change history.""" diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 74e3b08f51c..33790ec65b2 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -23,6 +23,7 @@ from sqlalchemy.exc import ( ProgrammingError, SQLAlchemyError, ) +from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm.session import Session from sqlalchemy.schema import AddConstraint, CreateTable, DropConstraint from sqlalchemy.sql.expression import true @@ -59,7 +60,7 @@ from .db_schema import ( BIG_INTEGER_SQL, CONTEXT_ID_BIN_MAX_LENGTH, DOUBLE_PRECISION_TYPE_SQL, - LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX, + LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX, LEGACY_STATES_EVENT_ID_INDEX, MYSQL_COLLATE, MYSQL_DEFAULT_CHARSET, @@ -169,6 +170,24 @@ _COLUMN_TYPES_FOR_DIALECT: dict[SupportedDialect | None, _ColumnTypesForDialect] } +def _unindexable_legacy_column( + instance: Recorder, base: type[DeclarativeBase], err: Exception +) -> bool: + """Ignore index errors on char(0) columns.""" + # The error code is hard coded because the PyMySQL library may not be + # installed when using other database engines than MySQL or MariaDB. + # 1167: The used storage engine can't index column '%s' + return bool( + base == LegacyBase + and isinstance(err, OperationalError) + and instance.engine + and instance.engine.dialect.name == SupportedDialect.MYSQL + and isinstance(err.orig, BaseException) + and err.orig.args + and err.orig.args[0] == 1167 + ) + + def raise_if_exception_missing_str(ex: Exception, match_substrs: Iterable[str]) -> None: """Raise if the exception and cause do not contain the match substrs.""" lower_ex_strs = [str(ex).lower(), str(ex.__cause__).lower()] @@ -471,14 +490,19 @@ def migrate_data_live( def _create_index( - session_maker: Callable[[], Session], table_name: str, index_name: str + instance: Recorder, + session_maker: Callable[[], Session], + table_name: str, + index_name: str, + *, + base: type[DeclarativeBase] = Base, ) -> None: """Create an index for the specified table. The index name should match the name given for the index within the table definition described in the models """ - table = Table(table_name, Base.metadata) + table = Table(table_name, base.metadata) _LOGGER.debug("Looking up index %s for table %s", index_name, table_name) # Look up the index object by name from the table is the models index_list = [idx for idx in table.indexes if idx.name == index_name] @@ -498,10 +522,18 @@ def _create_index( connection = session.connection() index.create(connection) except (InternalError, OperationalError, ProgrammingError) as err: + if _unindexable_legacy_column(instance, base, err): + _LOGGER.debug( + "Can't add legacy index %s to column %s, continuing", + index_name, + table_name, + ) + return raise_if_exception_missing_str(err, ["already exists", "duplicate"]) _LOGGER.warning( "Index %s already exists on %s, continuing", index_name, table_name ) + return _LOGGER.warning("Finished adding index `%s` to table `%s`", index_name, table_name) @@ -1040,7 +1072,12 @@ class _SchemaVersion2Migrator(_SchemaVersionMigrator, target_version=2): def _apply_update(self) -> None: """Version specific update method.""" # Create compound start/end index for recorder_runs - _create_index(self.session_maker, "recorder_runs", "ix_recorder_runs_start_end") + _create_index( + self.instance, + self.session_maker, + "recorder_runs", + "ix_recorder_runs_start_end", + ) # This used to create ix_states_last_updated bit it was removed in version 32 @@ -1075,7 +1112,9 @@ class _SchemaVersion5Migrator(_SchemaVersionMigrator, target_version=5): def _apply_update(self) -> None: """Version specific update method.""" # Create supporting index for States.event_id foreign key - _create_index(self.session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) + _create_index( + self.instance, self.session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX + ) class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6): @@ -1086,7 +1125,9 @@ class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6): "events", ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"], ) - _create_index(self.session_maker, "events", "ix_events_context_id") + _create_index( + self.instance, self.session_maker, "events", "ix_events_context_id" + ) # This used to create ix_events_context_user_id, # but it was removed in version 28 _add_columns( @@ -1094,7 +1135,9 @@ class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6): "states", ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"], ) - _create_index(self.session_maker, "states", "ix_states_context_id") + _create_index( + self.instance, self.session_maker, "states", "ix_states_context_id" + ) # This used to create ix_states_context_user_id, # but it was removed in version 28 @@ -1148,7 +1191,9 @@ class _SchemaVersion10Migrator(_SchemaVersionMigrator, target_version=10): class _SchemaVersion11Migrator(_SchemaVersionMigrator, target_version=11): def _apply_update(self) -> None: """Version specific update method.""" - _create_index(self.session_maker, "states", "ix_states_old_state_id") + _create_index( + self.instance, self.session_maker, "states", "ix_states_old_state_id" + ) # _update_states_table_with_foreign_key_options first drops foreign # key constraints, and then re-adds them with the correct settings. @@ -1390,13 +1435,20 @@ class _SchemaVersion25Migrator(_SchemaVersionMigrator, target_version=25): "states", [f"attributes_id {self.column_types.big_int_type}"], ) - _create_index(self.session_maker, "states", "ix_states_attributes_id") + _create_index( + self.instance, self.session_maker, "states", "ix_states_attributes_id" + ) class _SchemaVersion26Migrator(_SchemaVersionMigrator, target_version=26): def _apply_update(self) -> None: """Version specific update method.""" - _create_index(self.session_maker, "statistics_runs", "ix_statistics_runs_start") + _create_index( + self.instance, + self.session_maker, + "statistics_runs", + "ix_statistics_runs_start", + ) class _SchemaVersion27Migrator(_SchemaVersionMigrator, target_version=27): @@ -1405,7 +1457,7 @@ class _SchemaVersion27Migrator(_SchemaVersionMigrator, target_version=27): _add_columns( self.session_maker, "events", [f"data_id {self.column_types.big_int_type}"] ) - _create_index(self.session_maker, "events", "ix_events_data_id") + _create_index(self.instance, self.session_maker, "events", "ix_events_data_id") class _SchemaVersion28Migrator(_SchemaVersionMigrator, target_version=28): @@ -1425,7 +1477,9 @@ class _SchemaVersion28Migrator(_SchemaVersionMigrator, target_version=28): "context_parent_id VARCHAR(36)", ], ) - _create_index(self.session_maker, "states", "ix_states_context_id") + _create_index( + self.instance, self.session_maker, "states", "ix_states_context_id" + ) # Once there are no longer any state_changed events # in the events table we can drop the index on states.event_id @@ -1452,7 +1506,10 @@ class _SchemaVersion29Migrator(_SchemaVersionMigrator, target_version=29): ) try: _create_index( - self.session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" + self.instance, + self.session_maker, + "statistics_meta", + "ix_statistics_meta_statistic_id", ) except DatabaseError: # There may be duplicated statistics_meta entries, delete duplicates @@ -1460,7 +1517,10 @@ class _SchemaVersion29Migrator(_SchemaVersionMigrator, target_version=29): with session_scope(session=self.session_maker()) as session: delete_statistics_meta_duplicates(self.instance, session) _create_index( - self.session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" + self.instance, + self.session_maker, + "statistics_meta", + "ix_statistics_meta_statistic_id", ) @@ -1494,14 +1554,24 @@ class _SchemaVersion31Migrator(_SchemaVersionMigrator, target_version=31): f"last_changed_ts {self.column_types.timestamp_type}", ], ) - _create_index(self.session_maker, "events", "ix_events_time_fired_ts") _create_index( - self.session_maker, "events", "ix_events_event_type_time_fired_ts" + self.instance, self.session_maker, "events", "ix_events_time_fired_ts" ) _create_index( - self.session_maker, "states", "ix_states_entity_id_last_updated_ts" + self.instance, + self.session_maker, + "events", + "ix_events_event_type_time_fired_ts", + ) + _create_index( + self.instance, + self.session_maker, + "states", + "ix_states_entity_id_last_updated_ts", + ) + _create_index( + self.instance, self.session_maker, "states", "ix_states_last_updated_ts" ) - _create_index(self.session_maker, "states", "ix_states_last_updated_ts") _migrate_columns_to_timestamp(self.instance, self.session_maker, self.engine) @@ -1559,16 +1629,23 @@ class _SchemaVersion34Migrator(_SchemaVersionMigrator, target_version=34): f"last_reset_ts {self.column_types.timestamp_type}", ], ) - _create_index(self.session_maker, "statistics", "ix_statistics_start_ts") _create_index( - self.session_maker, "statistics", "ix_statistics_statistic_id_start_ts" + self.instance, self.session_maker, "statistics", "ix_statistics_start_ts" ) _create_index( + self.instance, + self.session_maker, + "statistics", + "ix_statistics_statistic_id_start_ts", + ) + _create_index( + self.instance, self.session_maker, "statistics_short_term", "ix_statistics_short_term_start_ts", ) _create_index( + self.instance, self.session_maker, "statistics_short_term", "ix_statistics_short_term_statistic_id_start_ts", @@ -1618,8 +1695,12 @@ class _SchemaVersion36Migrator(_SchemaVersionMigrator, target_version=36): f"context_parent_id_bin {self.column_types.context_bin_type}", ], ) - _create_index(self.session_maker, "events", "ix_events_context_id_bin") - _create_index(self.session_maker, "states", "ix_states_context_id_bin") + _create_index( + self.instance, self.session_maker, "events", "ix_events_context_id_bin" + ) + _create_index( + self.instance, self.session_maker, "states", "ix_states_context_id_bin" + ) class _SchemaVersion37Migrator(_SchemaVersionMigrator, target_version=37): @@ -1630,10 +1711,15 @@ class _SchemaVersion37Migrator(_SchemaVersionMigrator, target_version=37): "events", [f"event_type_id {self.column_types.big_int_type}"], ) - _create_index(self.session_maker, "events", "ix_events_event_type_id") + _create_index( + self.instance, self.session_maker, "events", "ix_events_event_type_id" + ) _drop_index(self.session_maker, "events", "ix_events_event_type_time_fired_ts") _create_index( - self.session_maker, "events", "ix_events_event_type_id_time_fired_ts" + self.instance, + self.session_maker, + "events", + "ix_events_event_type_id_time_fired_ts", ) @@ -1645,9 +1731,14 @@ class _SchemaVersion38Migrator(_SchemaVersionMigrator, target_version=38): "states", [f"metadata_id {self.column_types.big_int_type}"], ) - _create_index(self.session_maker, "states", "ix_states_metadata_id") _create_index( - self.session_maker, "states", "ix_states_metadata_id_last_updated_ts" + self.instance, self.session_maker, "states", "ix_states_metadata_id" + ) + _create_index( + self.instance, + self.session_maker, + "states", + "ix_states_metadata_id_last_updated_ts", ) @@ -1731,8 +1822,15 @@ class _SchemaVersion40Migrator(_SchemaVersionMigrator, target_version=40): class _SchemaVersion41Migrator(_SchemaVersionMigrator, target_version=41): def _apply_update(self) -> None: """Version specific update method.""" - _create_index(self.session_maker, "event_types", "ix_event_types_event_type") - _create_index(self.session_maker, "states_meta", "ix_states_meta_entity_id") + _create_index( + self.instance, + self.session_maker, + "event_types", + "ix_event_types_event_type", + ) + _create_index( + self.instance, self.session_maker, "states_meta", "ix_states_meta_entity_id" + ) class _SchemaVersion42Migrator(_SchemaVersionMigrator, target_version=42): @@ -2319,7 +2417,7 @@ class DataMigrationStatus: class BaseMigration(ABC): """Base class for migrations.""" - index_to_drop: tuple[str, str] | None = None + index_to_drop: tuple[str, str, type[DeclarativeBase]] | None = None required_schema_version = 0 # Schema version required to run migration queries max_initial_schema_version: int # Skip migration if db created after this version migration_version = 1 @@ -2349,12 +2447,12 @@ class BaseMigration(ABC): """Migrate some data, returns True if migration is completed.""" status = self.migrate_data_impl(instance) if status.migration_done: - if self.index_to_drop is not None: - table, index = self.index_to_drop - _drop_index(instance.get_session, table, index) with session_scope(session=instance.get_session()) as session: self.migration_done(instance, session) _mark_migration_done(session, self.__class__) + if self.index_to_drop is not None: + table, index, _ = self.index_to_drop + _drop_index(instance.get_session, table, index) return not status.needs_migrate @abstractmethod @@ -2393,25 +2491,31 @@ class BaseMigration(ABC): "Data migration '%s' needed, schema too old", self.migration_id ) return True + has_needed_index = self._has_needed_index(session) + if has_needed_index is True: + # The index to be removed by the migration still exists + _LOGGER.info( + "Data migration '%s' needed, index to drop still exists", + self.migration_id, + ) + return True if self.migration_changes.get(self.migration_id, -1) >= self.migration_version: # The migration changes table indicates that the migration has been done _LOGGER.debug( "Data migration '%s' not needed, already completed", self.migration_id ) return False - # We do not know if the migration is done from the - # migration changes table so we must check the index and data - # This is the slow path - if ( - self.index_to_drop is not None - and get_index_by_name(session, self.index_to_drop[0], self.index_to_drop[1]) - is not None - ): + if has_needed_index is False: + # The index to be removed by the migration does not exist, but the migration + # changes table indicates that the migration has not been done _LOGGER.info( - "Data migration '%s' needed, index to drop still exists", + "Data migration '%s' needed, index to drop does not exist", self.migration_id, ) return True + # We do not know if the migration is done from the + # migration changes table or the index so we must check the data + # This is the slow path needs_migrate = self.needs_migrate_impl(instance, session) if needs_migrate.migration_done: _mark_migration_done(session, self.__class__) @@ -2422,6 +2526,13 @@ class BaseMigration(ABC): ) return needs_migrate.needs_migrate + def _has_needed_index(self, session: Session) -> bool | None: + """Check if the index needed by the migration exists.""" + if self.index_to_drop is None: + return None + table_name, index_name, _ = self.index_to_drop + return get_index_by_name(session, table_name, index_name) is not None + class BaseOffLineMigration(BaseMigration): """Base class for off line migrations.""" @@ -2435,6 +2546,7 @@ class BaseOffLineMigration(BaseMigration): _LOGGER.debug("Migration not needed for '%s'", self.migration_id) self.migration_done(instance, session) return + self._ensure_index_exists(instance) _LOGGER.warning( "The database is about to do data migration step '%s', %s", self.migration_id, @@ -2449,6 +2561,25 @@ class BaseOffLineMigration(BaseMigration): """Migrate some data, returns True if migration is completed.""" return self._migrate_data(instance) + def _ensure_index_exists(self, instance: Recorder) -> None: + """Ensure the index needed by the migration exists.""" + if not self.index_to_drop: + return + table_name, index_name, base = self.index_to_drop + with session_scope(session=instance.get_session()) as session: + if get_index_by_name(session, table_name, index_name) is not None: + return + _LOGGER.warning( + ( + "Data migration step '%s' needs index `%s` on table `%s`, but " + "it does not exist and will be added now" + ), + self.migration_id, + index_name, + table_name, + ) + _create_index(instance, instance.get_session, table_name, index_name, base=base) + class BaseRunTimeMigration(BaseMigration): """Base class for run time migrations.""" @@ -2492,7 +2623,7 @@ class StatesContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration): max_initial_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION - 1 migration_id = "state_context_id_as_binary" migration_version = 2 - index_to_drop = ("states", "ix_states_context_id") + index_to_drop = ("states", "ix_states_context_id", LegacyBase) def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: """Migrate states context_ids to use binary format, return True if completed.""" @@ -2536,7 +2667,7 @@ class EventsContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration): max_initial_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION - 1 migration_id = "event_context_id_as_binary" migration_version = 2 - index_to_drop = ("events", "ix_events_context_id") + index_to_drop = ("events", "ix_events_context_id", LegacyBase) def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: """Migrate events context_ids to use binary format, return True if completed.""" @@ -2814,7 +2945,11 @@ class EntityIDPostMigration(BaseMigrationWithQuery, BaseOffLineMigration): migration_id = "entity_id_post_migration" max_initial_schema_version = STATES_META_SCHEMA_VERSION - 1 - index_to_drop = (TABLE_STATES, LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX) + index_to_drop = ( + TABLE_STATES, + LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX, + LegacyBase, + ) def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: """Migrate some data, returns True if migration is completed.""" diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 462db70496a..052e9202715 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -600,7 +600,7 @@ async def test_schema_migrate( start=self.recorder_runs_manager.recording_start, created=dt_util.utcnow() ) - def _sometimes_failing_create_index(*args): + def _sometimes_failing_create_index(*args, **kwargs): """Make the first index create raise a retryable error to ensure we retry.""" if recorder_db_url.startswith("mysql://"): nonlocal create_calls @@ -609,7 +609,7 @@ async def test_schema_migrate( mysql_exception = OperationalError("statement", {}, []) mysql_exception.orig = Exception(1205, "retryable") raise mysql_exception - real_create_index(*args) + real_create_index(*args, **kwargs) with ( patch( @@ -712,7 +712,7 @@ def test_forgiving_add_index(recorder_db_url: str) -> None: instance = Mock() instance.get_session = Mock(return_value=session) migration._create_index( - instance.get_session, "states", "ix_states_context_id_bin" + instance, instance.get_session, "states", "ix_states_context_id_bin" ) engine.dispose() @@ -788,7 +788,7 @@ def test_forgiving_add_index_with_other_db_types( with patch( "homeassistant.components.recorder.migration.Table", return_value=mocked_table ): - migration._create_index(Mock(), "states", "ix_states_context_id") + migration._create_index(Mock(), Mock(), "states", "ix_states_context_id") assert "already exists on states" in caplog.text assert "continuing" in caplog.text diff --git a/tests/components/recorder/test_migration_run_time_migrations_remember.py b/tests/components/recorder/test_migration_run_time_migrations_remember.py index fa14570bc6b..677abd6083c 100644 --- a/tests/components/recorder/test_migration_run_time_migrations_remember.py +++ b/tests/components/recorder/test_migration_run_time_migrations_remember.py @@ -1,6 +1,6 @@ """Test run time migrations are remembered in the migration_changes table.""" -from collections.abc import Callable +from collections.abc import Callable, Generator import importlib import sys from unittest.mock import Mock, patch @@ -8,6 +8,7 @@ from unittest.mock import Mock, patch import pytest from sqlalchemy import create_engine from sqlalchemy.orm import Session +from sqlalchemy.schema import Index from homeassistant.components import recorder from homeassistant.components.recorder import core, migration, statistics @@ -87,138 +88,165 @@ def _create_engine_test( @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.parametrize( - ("initial_version", "expected_migrator_calls"), + ("initial_version", "expected_migrator_calls", "expected_created_indices"), + # expected_migrator_calls is a dict of + # migrator_id: (needs_migrate_calls, migrate_data_calls) [ ( 27, { - "state_context_id_as_binary": 1, - "event_context_id_as_binary": 1, - "event_type_id_migration": 1, - "entity_id_migration": 1, - "event_id_post_migration": 1, - "entity_id_post_migration": 1, + "state_context_id_as_binary": (0, 1), + "event_context_id_as_binary": (0, 1), + "event_type_id_migration": (2, 1), + "entity_id_migration": (2, 1), + "event_id_post_migration": (1, 1), + "entity_id_post_migration": (0, 1), }, + [ + "ix_states_context_id", + "ix_events_context_id", + "ix_states_entity_id_last_updated_ts", + ], ), ( 28, { - "state_context_id_as_binary": 1, - "event_context_id_as_binary": 1, - "event_type_id_migration": 1, - "entity_id_migration": 1, - "event_id_post_migration": 0, - "entity_id_post_migration": 1, + "state_context_id_as_binary": (0, 1), + "event_context_id_as_binary": (0, 1), + "event_type_id_migration": (2, 1), + "entity_id_migration": (2, 1), + "event_id_post_migration": (0, 0), + "entity_id_post_migration": (0, 1), }, + [ + "ix_states_context_id", + "ix_events_context_id", + "ix_states_entity_id_last_updated_ts", + ], ), ( 36, { - "state_context_id_as_binary": 0, - "event_context_id_as_binary": 0, - "event_type_id_migration": 1, - "entity_id_migration": 1, - "event_id_post_migration": 0, - "entity_id_post_migration": 1, + "state_context_id_as_binary": (0, 0), + "event_context_id_as_binary": (0, 0), + "event_type_id_migration": (2, 1), + "entity_id_migration": (2, 1), + "event_id_post_migration": (0, 0), + "entity_id_post_migration": (0, 1), }, + ["ix_states_entity_id_last_updated_ts"], ), ( 37, { - "state_context_id_as_binary": 0, - "event_context_id_as_binary": 0, - "event_type_id_migration": 0, - "entity_id_migration": 1, - "event_id_post_migration": 0, - "entity_id_post_migration": 1, + "state_context_id_as_binary": (0, 0), + "event_context_id_as_binary": (0, 0), + "event_type_id_migration": (0, 0), + "entity_id_migration": (2, 1), + "event_id_post_migration": (0, 0), + "entity_id_post_migration": (0, 1), }, + ["ix_states_entity_id_last_updated_ts"], ), ( 38, { - "state_context_id_as_binary": 0, - "event_context_id_as_binary": 0, - "event_type_id_migration": 0, - "entity_id_migration": 0, - "event_id_post_migration": 0, - "entity_id_post_migration": 0, + "state_context_id_as_binary": (0, 0), + "event_context_id_as_binary": (0, 0), + "event_type_id_migration": (0, 0), + "entity_id_migration": (0, 0), + "event_id_post_migration": (0, 0), + "entity_id_post_migration": (0, 0), }, + [], ), ( SCHEMA_VERSION, { - "state_context_id_as_binary": 0, - "event_context_id_as_binary": 0, - "event_type_id_migration": 0, - "entity_id_migration": 0, - "event_id_post_migration": 0, - "entity_id_post_migration": 0, + "state_context_id_as_binary": (0, 0), + "event_context_id_as_binary": (0, 0), + "event_type_id_migration": (0, 0), + "entity_id_migration": (0, 0), + "event_id_post_migration": (0, 0), + "entity_id_post_migration": (0, 0), }, + [], ), ], ) -async def test_data_migrator_new_database( +async def test_data_migrator_logic( async_test_recorder: RecorderInstanceGenerator, initial_version: int, - expected_migrator_calls: dict[str, int], + expected_migrator_calls: dict[str, tuple[int, int]], + expected_created_indices: list[str], ) -> None: - """Test that the data migrators are not executed on a new database.""" + """Test the data migrator logic. + + - The data migrators should not be executed on a new database. + - Indices needed by the migrators should be created if missing. + """ config = {recorder.CONF_COMMIT_INTERVAL: 1} - def needs_migrate_mock() -> Mock: - return Mock( - spec_set=[], - return_value=migration.DataMigrationStatus( - needs_migrate=False, migration_done=True + def migrator_mock() -> dict[str, Mock]: + return { + "needs_migrate": Mock( + spec_set=[], + return_value=migration.DataMigrationStatus( + needs_migrate=True, migration_done=False + ), ), - ) + "migrate_data": Mock(spec_set=[], return_value=True), + } migrator_mocks = { - "state_context_id_as_binary": needs_migrate_mock(), - "event_context_id_as_binary": needs_migrate_mock(), - "event_type_id_migration": needs_migrate_mock(), - "entity_id_migration": needs_migrate_mock(), - "event_id_post_migration": needs_migrate_mock(), - "entity_id_post_migration": needs_migrate_mock(), + "state_context_id_as_binary": migrator_mock(), + "event_context_id_as_binary": migrator_mock(), + "event_type_id_migration": migrator_mock(), + "entity_id_migration": migrator_mock(), + "event_id_post_migration": migrator_mock(), + "entity_id_post_migration": migrator_mock(), } + def patch_check( + migrator_id: str, migrator_class: type[migration.BaseMigration] + ) -> Generator[None]: + return patch.object( + migrator_class, + "needs_migrate_impl", + side_effect=migrator_mocks[migrator_id]["needs_migrate"], + ) + + def patch_migrate( + migrator_id: str, migrator_class: type[migration.BaseMigration] + ) -> Generator[None]: + return patch.object( + migrator_class, + "migrate_data", + side_effect=migrator_mocks[migrator_id]["migrate_data"], + ) + with ( - patch.object( - migration.StatesContextIDMigration, - "needs_migrate_impl", - side_effect=migrator_mocks["state_context_id_as_binary"], - ), - patch.object( - migration.EventsContextIDMigration, - "needs_migrate_impl", - side_effect=migrator_mocks["event_context_id_as_binary"], - ), - patch.object( - migration.EventTypeIDMigration, - "needs_migrate_impl", - side_effect=migrator_mocks["event_type_id_migration"], - ), - patch.object( - migration.EntityIDMigration, - "needs_migrate_impl", - side_effect=migrator_mocks["entity_id_migration"], - ), - patch.object( - migration.EventIDPostMigration, - "needs_migrate_impl", - side_effect=migrator_mocks["event_id_post_migration"], - ), - patch.object( - migration.EntityIDPostMigration, - "needs_migrate_impl", - side_effect=migrator_mocks["entity_id_post_migration"], - ), + patch_check("state_context_id_as_binary", migration.StatesContextIDMigration), + patch_check("event_context_id_as_binary", migration.EventsContextIDMigration), + patch_check("event_type_id_migration", migration.EventTypeIDMigration), + patch_check("entity_id_migration", migration.EntityIDMigration), + patch_check("event_id_post_migration", migration.EventIDPostMigration), + patch_check("entity_id_post_migration", migration.EntityIDPostMigration), + patch_migrate("state_context_id_as_binary", migration.StatesContextIDMigration), + patch_migrate("event_context_id_as_binary", migration.EventsContextIDMigration), + patch_migrate("event_type_id_migration", migration.EventTypeIDMigration), + patch_migrate("entity_id_migration", migration.EntityIDMigration), + patch_migrate("event_id_post_migration", migration.EventIDPostMigration), + patch_migrate("entity_id_post_migration", migration.EntityIDPostMigration), patch( CREATE_ENGINE_TARGET, new=_create_engine_test( SCHEMA_MODULE_CURRENT, initial_version=initial_version ), ), + patch( + "sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create + ) as wrapped_idx_create, ): async with ( async_test_home_assistant() as hass, @@ -231,8 +259,15 @@ async def test_data_migrator_new_database( await hass.async_block_till_done() await hass.async_stop() + index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls] + assert index_names == expected_created_indices + + # Check each data migrator's needs_migrate_impl and migrate_data methods were called + # the expected number of times. for migrator, mock in migrator_mocks.items(): - assert len(mock.mock_calls) == expected_migrator_calls[migrator] + needs_migrate_calls, migrate_data_calls = expected_migrator_calls[migrator] + assert len(mock["needs_migrate"].mock_calls) == needs_migrate_calls + assert len(mock["migrate_data"].mock_calls) == migrate_data_calls @pytest.mark.parametrize("enable_migrate_state_context_ids", [True]) diff --git a/tests/components/recorder/test_purge_v32_schema.py b/tests/components/recorder/test_purge_v32_schema.py index 2bd1e7fd7f7..d68d1550268 100644 --- a/tests/components/recorder/test_purge_v32_schema.py +++ b/tests/components/recorder/test_purge_v32_schema.py @@ -1027,7 +1027,7 @@ async def test_purge_can_mix_legacy_and_new_format( def _recreate_legacy_events_index(): """Recreate the legacy events index since its no longer created on new instances.""" migration._create_index( - recorder_mock.get_session, "states", "ix_states_event_id" + recorder_mock, recorder_mock.get_session, "states", "ix_states_event_id" ) recorder_mock.use_legacy_events_index = True @@ -1178,7 +1178,7 @@ async def test_purge_can_mix_legacy_and_new_format_with_detached_state( def _recreate_legacy_events_index(): """Recreate the legacy events index since its no longer created on new instances.""" migration._create_index( - recorder_mock.get_session, "states", "ix_states_event_id" + recorder_mock, recorder_mock.get_session, "states", "ix_states_event_id" ) recorder_mock.use_legacy_events_index = True