diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index fffecff149c..750b4adc563 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -2586,15 +2586,11 @@ class EventTypeIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration): return has_event_type_to_migrate() -class EntityIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration): +class EntityIDMigration(BaseMigrationWithQuery, BaseOffLineMigration): """Migration to migrate entity_ids to states_meta.""" required_schema_version = STATES_META_SCHEMA_VERSION migration_id = "entity_id_migration" - task = CommitBeforeMigrationTask - # We have to commit before to make sure there are - # no new pending states_meta about to be added to - # the db since this happens live def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: """Migrate entity_ids to states_meta, return True if completed. @@ -2664,18 +2660,6 @@ class EntityIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration): _LOGGER.debug("Migrating entity_ids done=%s", is_done) return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done) - def migration_done(self, instance: Recorder, session: Session) -> None: - """Will be called after migrate returns True.""" - # The migration has finished, now we start the post migration - # to remove the old entity_id data from the states table - # at this point we can also start using the StatesMeta table - # so we set active to True - _LOGGER.debug("Activating states_meta manager as all data is migrated") - instance.states_meta_manager.active = True - with contextlib.suppress(SQLAlchemyError): - migrate = EntityIDPostMigration(self.schema_version, self.migration_changes) - migrate.queue_migration(instance, session) - def needs_migrate_query(self) -> StatementLambdaElement: """Check if the data is migrated.""" return has_entity_ids_to_migrate() @@ -2786,12 +2770,13 @@ class EntityIDPostMigration(BaseMigrationWithQuery, BaseRunTimeMigration): NON_LIVE_DATA_MIGRATORS = ( StatesContextIDMigration, # Introduced in HA Core 2023.4 EventsContextIDMigration, # Introduced in HA Core 2023.4 + EntityIDMigration, # Introduced in HA Core 2023.4 by PR #89557 ) LIVE_DATA_MIGRATORS = ( EventTypeIDMigration, # Introduced in HA Core 2023.4 by PR #89465 - EntityIDMigration, # Introduced in HA Core 2023.4 by PR #89557 EventIDPostMigration, # Introduced in HA Core 2023.4 by PR #89901 + EntityIDPostMigration, # Introduced in HA Core 2023.4 by PR #89557 ) diff --git a/homeassistant/components/recorder/table_managers/states_meta.py b/homeassistant/components/recorder/table_managers/states_meta.py index 80d20dbec94..75afb6589a1 100644 --- a/homeassistant/components/recorder/table_managers/states_meta.py +++ b/homeassistant/components/recorder/table_managers/states_meta.py @@ -24,7 +24,7 @@ CACHE_SIZE = 8192 class StatesMetaManager(BaseLRUTableManager[StatesMeta]): """Manage the StatesMeta table.""" - active = False + active = True def __init__(self, recorder: Recorder) -> None: """Initialize the states meta manager.""" diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index 60168f5e6ef..fbb0991c960 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -428,14 +428,6 @@ def get_schema_module_path(schema_version_postfix: str) -> str: return f"tests.components.recorder.db_schema_{schema_version_postfix}" -@dataclass(slots=True) -class MockMigrationTask(migration.MigrationTask): - """Mock migration task which does nothing.""" - - def run(self, instance: Recorder) -> None: - """Run migration task.""" - - @contextmanager def old_db_schema(schema_version_postfix: str) -> Iterator[None]: """Fixture to initialize the db with the old schema.""" @@ -453,7 +445,6 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]: patch.object(core, "States", old_db_schema.States), patch.object(core, "Events", old_db_schema.Events), patch.object(core, "StateAttributes", old_db_schema.StateAttributes), - patch.object(migration.EntityIDMigration, "task", MockMigrationTask), patch( CREATE_ENGINE_TARGET, new=partial( diff --git a/tests/components/recorder/test_history_db_schema_32.py b/tests/components/recorder/test_history_db_schema_32.py index 3ee6edd8e1e..666626ff688 100644 --- a/tests/components/recorder/test_history_db_schema_32.py +++ b/tests/components/recorder/test_history_db_schema_32.py @@ -38,6 +38,17 @@ async def mock_recorder_before_hass( """Set up recorder.""" +@pytest.fixture +def disable_states_meta_manager(): + """Disable the states meta manager.""" + with patch.object( + recorder.table_managers.states_meta.StatesMetaManager, + "active", + False, + ): + yield + + @pytest.fixture(autouse=True) def db_schema_32(): """Fixture to initialize the db with the old schema 32.""" @@ -46,7 +57,9 @@ def db_schema_32(): @pytest.fixture(autouse=True) -def setup_recorder(db_schema_32, recorder_mock: Recorder) -> recorder.Recorder: +def setup_recorder( + db_schema_32, disable_states_meta_manager, recorder_mock: Recorder +) -> recorder.Recorder: """Set up recorder.""" diff --git a/tests/components/recorder/test_migration_from_schema_32.py b/tests/components/recorder/test_migration_from_schema_32.py index 6ef97f3bbd1..e77fae7ffad 100644 --- a/tests/components/recorder/test_migration_from_schema_32.py +++ b/tests/components/recorder/test_migration_from_schema_32.py @@ -44,7 +44,6 @@ import homeassistant.util.dt as dt_util from homeassistant.util.ulid import bytes_to_ulid, ulid_at_time, ulid_to_bytes from .common import ( - MockMigrationTask, async_attach_db_engine, async_recorder_block_till_done, async_wait_recording_done, @@ -114,7 +113,6 @@ def db_schema_32(): patch.object(core, "States", old_db_schema.States), patch.object(core, "Events", old_db_schema.Events), patch.object(core, "StateAttributes", old_db_schema.StateAttributes), - patch.object(migration.EntityIDMigration, "task", MockMigrationTask), patch(CREATE_ENGINE_TARGET, new=_create_engine_test), ): yield @@ -919,11 +917,13 @@ async def test_migrate_event_type_ids( ) +@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("enable_migrate_entity_ids", [True]) -@pytest.mark.usefixtures("db_schema_32") -async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder) -> None: +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage +async def test_migrate_entity_ids( + async_test_recorder: RecorderInstanceGenerator, +) -> None: """Test we can migrate entity_ids to the StatesMeta table.""" - await async_wait_recording_done(hass) importlib.import_module(SCHEMA_MODULE_32) old_db_schema = sys.modules[SCHEMA_MODULE_32] @@ -949,14 +949,24 @@ async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder) ) ) - await recorder_mock.async_add_executor_job(_insert_states) + # Create database with old schema + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration.EntityIDMigration, "migrate_data"), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await instance.async_add_executor_job(_insert_states) - await _async_wait_migration_done(hass) - # This is a threadsafe way to add a task to the recorder - migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {}) - recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator)) - await _async_wait_migration_done(hass) - await _async_wait_migration_done(hass) + await async_wait_recording_done(hass) + await _async_wait_migration_done(hass) + + await hass.async_stop() + await hass.async_block_till_done() def _fetch_migrated_states(): with session_scope(hass=hass, read_only=True) as session: @@ -982,28 +992,43 @@ async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder) ) return result - states_by_entity_id = await recorder_mock.async_add_executor_job( - _fetch_migrated_states - ) + # Run again with new schema, let migration run + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + instance.recorder_and_worker_thread_ids.add(threading.get_ident()) + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + states_by_entity_id = await instance.async_add_executor_job( + _fetch_migrated_states + ) + migration_changes = await instance.async_add_executor_job( + _get_migration_id, hass + ) + + await hass.async_stop() + await hass.async_block_till_done() + assert len(states_by_entity_id["sensor.two"]) == 2 assert len(states_by_entity_id["sensor.one"]) == 1 - migration_changes = await recorder_mock.async_add_executor_job( - _get_migration_id, hass - ) assert ( migration_changes[migration.EntityIDMigration.migration_id] == migration.EntityIDMigration.migration_version ) +@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("enable_migrate_entity_ids", [True]) -@pytest.mark.usefixtures("db_schema_32") +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage async def test_post_migrate_entity_ids( - hass: HomeAssistant, recorder_mock: Recorder + async_test_recorder: RecorderInstanceGenerator, ) -> None: """Test we can migrate entity_ids to the StatesMeta table.""" - await async_wait_recording_done(hass) importlib.import_module(SCHEMA_MODULE_32) old_db_schema = sys.modules[SCHEMA_MODULE_32] @@ -1029,14 +1054,25 @@ async def test_post_migrate_entity_ids( ) ) - await recorder_mock.async_add_executor_job(_insert_events) + # Create database with old schema + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration.EntityIDMigration, "migrate_data"), + patch.object(migration.EntityIDPostMigration, "migrate_data"), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await instance.async_add_executor_job(_insert_events) - await _async_wait_migration_done(hass) - # This is a threadsafe way to add a task to the recorder - migrator = migration.EntityIDPostMigration(None, None) - recorder_mock.queue_task(migrator.task(migrator)) - await _async_wait_migration_done(hass) - await _async_wait_migration_done(hass) + await async_wait_recording_done(hass) + await _async_wait_migration_done(hass) + + await hass.async_stop() + await hass.async_block_till_done() def _fetch_migrated_states(): with session_scope(hass=hass, read_only=True) as session: @@ -1047,19 +1083,34 @@ async def test_post_migrate_entity_ids( assert len(states) == 3 return {state.state: state.entity_id for state in states} - states_by_state = await recorder_mock.async_add_executor_job(_fetch_migrated_states) + # Run again with new schema, let migration run + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + instance.recorder_and_worker_thread_ids.add(threading.get_ident()) + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + states_by_state = await instance.async_add_executor_job(_fetch_migrated_states) + + await hass.async_stop() + await hass.async_block_till_done() + assert states_by_state["one_1"] is None assert states_by_state["two_2"] is None assert states_by_state["two_1"] is None +@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("enable_migrate_entity_ids", [True]) -@pytest.mark.usefixtures("db_schema_32") +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage async def test_migrate_null_entity_ids( - hass: HomeAssistant, recorder_mock: Recorder + async_test_recorder: RecorderInstanceGenerator, ) -> None: """Test we can migrate entity_ids to the StatesMeta table.""" - await async_wait_recording_done(hass) importlib.import_module(SCHEMA_MODULE_32) old_db_schema = sys.modules[SCHEMA_MODULE_32] @@ -1088,14 +1139,24 @@ async def test_migrate_null_entity_ids( ), ) - await recorder_mock.async_add_executor_job(_insert_states) + # Create database with old schema + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration.EntityIDMigration, "migrate_data"), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await instance.async_add_executor_job(_insert_states) - await _async_wait_migration_done(hass) - # This is a threadsafe way to add a task to the recorder - migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {}) - recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator)) - await _async_wait_migration_done(hass) - await _async_wait_migration_done(hass) + await async_wait_recording_done(hass) + await _async_wait_migration_done(hass) + + await hass.async_stop() + await hass.async_block_till_done() def _fetch_migrated_states(): with session_scope(hass=hass, read_only=True) as session: @@ -1121,17 +1182,32 @@ async def test_migrate_null_entity_ids( ) return result - states_by_entity_id = await recorder_mock.async_add_executor_job( - _fetch_migrated_states - ) - assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000 - assert len(states_by_entity_id["sensor.one"]) == 2 - def _get_migration_id(): with session_scope(hass=hass, read_only=True) as session: return dict(execute_stmt_lambda_element(session, get_migration_changes())) - migration_changes = await recorder_mock.async_add_executor_job(_get_migration_id) + # Run again with new schema, let migration run + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + instance.recorder_and_worker_thread_ids.add(threading.get_ident()) + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + states_by_entity_id = await instance.async_add_executor_job( + _fetch_migrated_states + ) + migration_changes = await instance.async_add_executor_job(_get_migration_id) + + await hass.async_stop() + await hass.async_block_till_done() + + assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000 + assert len(states_by_entity_id["sensor.one"]) == 2 + assert ( migration_changes[migration.EntityIDMigration.migration_id] == migration.EntityIDMigration.migration_version diff --git a/tests/components/recorder/test_migration_run_time_migrations_remember.py b/tests/components/recorder/test_migration_run_time_migrations_remember.py index 93fa16b8364..7a333b0a2f5 100644 --- a/tests/components/recorder/test_migration_run_time_migrations_remember.py +++ b/tests/components/recorder/test_migration_run_time_migrations_remember.py @@ -19,11 +19,7 @@ from homeassistant.components.recorder.util import ( from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.core import HomeAssistant -from .common import ( - MockMigrationTask, - async_recorder_block_till_done, - async_wait_recording_done, -) +from .common import async_recorder_block_till_done, async_wait_recording_done from tests.common import async_test_home_assistant from tests.typing import RecorderInstanceGenerator @@ -102,7 +98,6 @@ async def test_migration_changes_prevent_trying_to_migrate_again( patch.object(core, "States", old_db_schema.States), patch.object(core, "Events", old_db_schema.Events), patch.object(core, "StateAttributes", old_db_schema.StateAttributes), - patch.object(migration.EntityIDMigration, "task", MockMigrationTask), patch(CREATE_ENGINE_TARGET, new=_create_engine_test), ): async with (