From 66ab90b518aabf8b5f6a5d88d8a1c813b8a221b8 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Fri, 27 Sep 2024 14:58:34 +0200 Subject: [PATCH] Add EntityIDPostMigration data migrator class (#125307) --- .../components/recorder/migration.py | 41 ++++++++----------- homeassistant/components/recorder/queries.py | 7 ++++ .../recorder/test_migration_from_schema_32.py | 7 ++-- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 6bfba613c01..85455d109e5 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -95,6 +95,7 @@ from .queries import ( has_event_type_to_migrate, has_events_context_ids_to_migrate, has_states_context_ids_to_migrate, + has_used_states_entity_ids, has_used_states_event_ids, migrate_single_short_term_statistics_row_to_timestamp, migrate_single_statistics_row_to_timestamp, @@ -2107,7 +2108,6 @@ def _generate_ulid_bytes_at_time(timestamp: float | None) -> bytes: return ulid_to_bytes(ulid_at_time(timestamp or time())) -@retryable_database_job("post migrate states entity_ids to states_meta") def post_migrate_entity_ids(instance: Recorder) -> bool: """Remove old entity_id strings from states. @@ -2122,10 +2122,6 @@ def post_migrate_entity_ids(instance: Recorder) -> bool: # If there is more work to do return False # so that we can be called again - if is_done: - # Drop the old indexes since they are no longer needed - _drop_index(session_maker, "states", LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX) - _LOGGER.debug("Cleanup legacy entity_ids done=%s", is_done) return is_done @@ -2546,17 +2542,8 @@ class EntityIDMigration(BaseRunTimeMigrationWithQuery): _LOGGER.debug("Activating states_meta manager as all data is migrated") instance.states_meta_manager.active = True with contextlib.suppress(SQLAlchemyError): - # If ix_states_entity_id_last_updated_ts still exists - # on the states table it means the entity id migration - # finished by the EntityIDPostMigrationTask did not - # complete because they restarted in the middle of it. We need - # to pick back up where we left off. - if get_index_by_name( - session, - TABLE_STATES, - LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX, - ): - instance.queue_task(EntityIDPostMigrationTask()) + migrate = EntityIDPostMigration(self.schema_version, self.migration_changes) + migrate.do_migrate(instance, session) def needs_migrate_query(self) -> StatementLambdaElement: """Check if the data is migrated.""" @@ -2645,15 +2632,21 @@ class EventIDPostMigration(BaseRunTimeMigration): return DataMigrationStatus(needs_migrate=False, migration_done=True) -@dataclass(slots=True) -class EntityIDPostMigrationTask(RecorderTask): - """An object to insert into the recorder queue to cleanup after entity_ids migration.""" +class EntityIDPostMigration(BaseRunTimeMigrationWithQuery): + """Migration to remove old entity_id strings from states.""" - def run(self, instance: Recorder) -> None: - """Run entity_id post migration task.""" - if not post_migrate_entity_ids(instance): - # Schedule a new migration task if this one didn't finish - instance.queue_task(EntityIDPostMigrationTask()) + migration_id = "entity_id_post_migration" + task = MigrationTask + index_to_drop = (TABLE_STATES, LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX) + + def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: + """Migrate some data, returns True if migration is completed.""" + is_done = post_migrate_entity_ids(instance) + return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done) + + def needs_migrate_query(self) -> StatementLambdaElement: + """Check if the data is migrated.""" + return has_used_states_entity_ids() def _mark_migration_done( diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index a5be5dffe10..4acf43a491e 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -763,6 +763,13 @@ def batch_cleanup_entity_ids() -> StatementLambdaElement: ) +def has_used_states_entity_ids() -> StatementLambdaElement: + """Check if there are used entity_ids in the states table.""" + return lambda_stmt( + lambda: select(States.state_id).filter(States.entity_id.isnot(None)).limit(1) + ) + + def has_used_states_event_ids() -> StatementLambdaElement: """Check if there are used event_ids in the states table.""" return lambda_stmt( diff --git a/tests/components/recorder/test_migration_from_schema_32.py b/tests/components/recorder/test_migration_from_schema_32.py index 17f6e24e228..8a54a752989 100644 --- a/tests/components/recorder/test_migration_from_schema_32.py +++ b/tests/components/recorder/test_migration_from_schema_32.py @@ -884,7 +884,7 @@ async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder) await _async_wait_migration_done(hass) # This is a threadsafe way to add a task to the recorder - migrator = migration.EntityIDMigration(None, None) + migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {}) recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator)) await _async_wait_migration_done(hass) @@ -963,7 +963,8 @@ async def test_post_migrate_entity_ids( await _async_wait_migration_done(hass) # This is a threadsafe way to add a task to the recorder - recorder_mock.queue_task(migration.EntityIDPostMigrationTask()) + migrator = migration.EntityIDPostMigration(None, None) + recorder_mock.queue_task(migrator.task(migrator)) await _async_wait_migration_done(hass) def _fetch_migrated_states(): @@ -1020,7 +1021,7 @@ async def test_migrate_null_entity_ids( await _async_wait_migration_done(hass) # This is a threadsafe way to add a task to the recorder - migrator = migration.EntityIDMigration(None, None) + migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {}) recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator)) await _async_wait_migration_done(hass)