diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 574129ca019..d0beb4f9895 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -1863,217 +1863,6 @@ def _generate_ulid_bytes_at_time(timestamp: float | None) -> bytes: return ulid_to_bytes(ulid_at_time(timestamp or time())) -@retryable_database_job("migrate states context_ids to binary format") -def migrate_states_context_ids(instance: Recorder) -> bool: - """Migrate states context_ids to use binary format.""" - _to_bytes = _context_id_to_bytes - session_maker = instance.get_session - _LOGGER.debug("Migrating states context_ids to binary format") - with session_scope(session=session_maker()) as session: - if states := session.execute( - find_states_context_ids_to_migrate(instance.max_bind_vars) - ).all(): - session.execute( - update(States), - [ - { - "state_id": state_id, - "context_id": None, - "context_id_bin": _to_bytes(context_id) - or _generate_ulid_bytes_at_time(last_updated_ts), - "context_user_id": None, - "context_user_id_bin": _to_bytes(context_user_id), - "context_parent_id": None, - "context_parent_id_bin": _to_bytes(context_parent_id), - } - for state_id, last_updated_ts, context_id, context_user_id, context_parent_id in states - ], - ) - # If there is more work to do return False - # so that we can be called again - if is_done := not states: - _mark_migration_done(session, StatesContextIDMigration) - - if is_done: - _drop_index(session_maker, "states", "ix_states_context_id") - - _LOGGER.debug("Migrating states context_ids to binary format: done=%s", is_done) - return is_done - - -@retryable_database_job("migrate events context_ids to binary format") -def migrate_events_context_ids(instance: Recorder) -> bool: - """Migrate events context_ids to use binary format.""" - _to_bytes = _context_id_to_bytes - session_maker = instance.get_session - _LOGGER.debug("Migrating context_ids to binary format") - with session_scope(session=session_maker()) as session: - if events := session.execute( - find_events_context_ids_to_migrate(instance.max_bind_vars) - ).all(): - session.execute( - update(Events), - [ - { - "event_id": event_id, - "context_id": None, - "context_id_bin": _to_bytes(context_id) - or _generate_ulid_bytes_at_time(time_fired_ts), - "context_user_id": None, - "context_user_id_bin": _to_bytes(context_user_id), - "context_parent_id": None, - "context_parent_id_bin": _to_bytes(context_parent_id), - } - for event_id, time_fired_ts, context_id, context_user_id, context_parent_id in events - ], - ) - # If there is more work to do return False - # so that we can be called again - if is_done := not events: - _mark_migration_done(session, EventsContextIDMigration) - - if is_done: - _drop_index(session_maker, "events", "ix_events_context_id") - - _LOGGER.debug("Migrating events context_ids to binary format: done=%s", is_done) - return is_done - - -@retryable_database_job("migrate events event_types to event_type_ids") -def migrate_event_type_ids(instance: Recorder) -> bool: - """Migrate event_type to event_type_ids.""" - session_maker = instance.get_session - _LOGGER.debug("Migrating event_types") - event_type_manager = instance.event_type_manager - with session_scope(session=session_maker()) as session: - if events := session.execute( - find_event_type_to_migrate(instance.max_bind_vars) - ).all(): - event_types = {event_type for _, event_type in events} - if None in event_types: - # event_type should never be None but we need to be defensive - # so we don't fail the migration because of a bad state - event_types.remove(None) - event_types.add(_EMPTY_EVENT_TYPE) - - event_type_to_id = event_type_manager.get_many(event_types, session) - if missing_event_types := { - event_type - for event_type, event_id in event_type_to_id.items() - if event_id is None - }: - missing_db_event_types = [ - EventTypes(event_type=event_type) - for event_type in missing_event_types - ] - session.add_all(missing_db_event_types) - session.flush() # Assign ids - for db_event_type in missing_db_event_types: - # We cannot add the assigned ids to the event_type_manager - # because the commit could get rolled back - assert ( - db_event_type.event_type is not None - ), "event_type should never be None" - event_type_to_id[db_event_type.event_type] = ( - db_event_type.event_type_id - ) - event_type_manager.clear_non_existent(db_event_type.event_type) - - session.execute( - update(Events), - [ - { - "event_id": event_id, - "event_type": None, - "event_type_id": event_type_to_id[ - _EMPTY_EVENT_TYPE if event_type is None else event_type - ], - } - for event_id, event_type in events - ], - ) - - # If there is more work to do return False - # so that we can be called again - if is_done := not events: - _mark_migration_done(session, EventTypeIDMigration) - - _LOGGER.debug("Migrating event_types done=%s", is_done) - return is_done - - -@retryable_database_job("migrate states entity_ids to states_meta") -def migrate_entity_ids(instance: Recorder) -> bool: - """Migrate entity_ids to states_meta. - - We do this in two steps because we need the history queries to work - while we are migrating. - - 1. Link the states to the states_meta table - 2. Remove the entity_id column from the states table (in post_migrate_entity_ids) - """ - _LOGGER.debug("Migrating entity_ids") - states_meta_manager = instance.states_meta_manager - with session_scope(session=instance.get_session()) as session: - if states := session.execute( - find_entity_ids_to_migrate(instance.max_bind_vars) - ).all(): - entity_ids = {entity_id for _, entity_id in states} - if None in entity_ids: - # entity_id should never be None but we need to be defensive - # so we don't fail the migration because of a bad state - entity_ids.remove(None) - entity_ids.add(_EMPTY_ENTITY_ID) - - entity_id_to_metadata_id = states_meta_manager.get_many( - entity_ids, session, True - ) - if missing_entity_ids := { - entity_id - for entity_id, metadata_id in entity_id_to_metadata_id.items() - if metadata_id is None - }: - missing_states_metadata = [ - StatesMeta(entity_id=entity_id) for entity_id in missing_entity_ids - ] - session.add_all(missing_states_metadata) - session.flush() # Assign ids - for db_states_metadata in missing_states_metadata: - # We cannot add the assigned ids to the event_type_manager - # because the commit could get rolled back - assert ( - db_states_metadata.entity_id is not None - ), "entity_id should never be None" - entity_id_to_metadata_id[db_states_metadata.entity_id] = ( - db_states_metadata.metadata_id - ) - - session.execute( - update(States), - [ - { - "state_id": state_id, - # We cannot set "entity_id": None yet since - # the history queries still need to work while the - # migration is in progress and we will do this in - # post_migrate_entity_ids - "metadata_id": entity_id_to_metadata_id[ - _EMPTY_ENTITY_ID if entity_id is None else entity_id - ], - } - for state_id, entity_id in states - ], - ) - - # If there is more work to do return False - # so that we can be called again - if is_done := not states: - _mark_migration_done(session, EntityIDMigration) - - _LOGGER.debug("Migrating entity_ids done=%s", is_done) - return is_done - - @retryable_database_job("post migrate states entity_ids to states_meta") def post_migrate_entity_ids(instance: Recorder) -> bool: """Remove old entity_id strings from states. @@ -2262,9 +2051,42 @@ class StatesContextIDMigration(BaseRunTimeMigration): migration_id = "state_context_id_as_binary" @staticmethod + @retryable_database_job("migrate states context_ids to binary format") def migrate_data(instance: Recorder) -> bool: - """Migrate some data, returns True if migration is completed.""" - return migrate_states_context_ids(instance) + """Migrate states context_ids to use binary format, return True if completed.""" + _to_bytes = _context_id_to_bytes + session_maker = instance.get_session + _LOGGER.debug("Migrating states context_ids to binary format") + with session_scope(session=session_maker()) as session: + if states := session.execute( + find_states_context_ids_to_migrate(instance.max_bind_vars) + ).all(): + session.execute( + update(States), + [ + { + "state_id": state_id, + "context_id": None, + "context_id_bin": _to_bytes(context_id) + or _generate_ulid_bytes_at_time(last_updated_ts), + "context_user_id": None, + "context_user_id_bin": _to_bytes(context_user_id), + "context_parent_id": None, + "context_parent_id_bin": _to_bytes(context_parent_id), + } + for state_id, last_updated_ts, context_id, context_user_id, context_parent_id in states + ], + ) + # If there is more work to do return False + # so that we can be called again + if is_done := not states: + _mark_migration_done(session, StatesContextIDMigration) + + if is_done: + _drop_index(session_maker, "states", "ix_states_context_id") + + _LOGGER.debug("Migrating states context_ids to binary format: done=%s", is_done) + return is_done def needs_migrate_query(self) -> StatementLambdaElement: """Return the query to check if the migration needs to run.""" @@ -2278,9 +2100,42 @@ class EventsContextIDMigration(BaseRunTimeMigration): migration_id = "event_context_id_as_binary" @staticmethod + @retryable_database_job("migrate events context_ids to binary format") def migrate_data(instance: Recorder) -> bool: - """Migrate some data, returns True if migration is completed.""" - return migrate_events_context_ids(instance) + """Migrate events context_ids to use binary format, return True if completed.""" + _to_bytes = _context_id_to_bytes + session_maker = instance.get_session + _LOGGER.debug("Migrating context_ids to binary format") + with session_scope(session=session_maker()) as session: + if events := session.execute( + find_events_context_ids_to_migrate(instance.max_bind_vars) + ).all(): + session.execute( + update(Events), + [ + { + "event_id": event_id, + "context_id": None, + "context_id_bin": _to_bytes(context_id) + or _generate_ulid_bytes_at_time(time_fired_ts), + "context_user_id": None, + "context_user_id_bin": _to_bytes(context_user_id), + "context_parent_id": None, + "context_parent_id_bin": _to_bytes(context_parent_id), + } + for event_id, time_fired_ts, context_id, context_user_id, context_parent_id in events + ], + ) + # If there is more work to do return False + # so that we can be called again + if is_done := not events: + _mark_migration_done(session, EventsContextIDMigration) + + if is_done: + _drop_index(session_maker, "events", "ix_events_context_id") + + _LOGGER.debug("Migrating events context_ids to binary format: done=%s", is_done) + return is_done def needs_migrate_query(self) -> StatementLambdaElement: """Return the query to check if the migration needs to run.""" @@ -2298,9 +2153,67 @@ class EventTypeIDMigration(BaseRunTimeMigration): # the db since this happens live @staticmethod + @retryable_database_job("migrate events event_types to event_type_ids") def migrate_data(instance: Recorder) -> bool: - """Migrate some data, returns True if migration is completed.""" - return migrate_event_type_ids(instance) + """Migrate event_type to event_type_ids, return True if completed.""" + session_maker = instance.get_session + _LOGGER.debug("Migrating event_types") + event_type_manager = instance.event_type_manager + with session_scope(session=session_maker()) as session: + if events := session.execute( + find_event_type_to_migrate(instance.max_bind_vars) + ).all(): + event_types = {event_type for _, event_type in events} + if None in event_types: + # event_type should never be None but we need to be defensive + # so we don't fail the migration because of a bad state + event_types.remove(None) + event_types.add(_EMPTY_EVENT_TYPE) + + event_type_to_id = event_type_manager.get_many(event_types, session) + if missing_event_types := { + event_type + for event_type, event_id in event_type_to_id.items() + if event_id is None + }: + missing_db_event_types = [ + EventTypes(event_type=event_type) + for event_type in missing_event_types + ] + session.add_all(missing_db_event_types) + session.flush() # Assign ids + for db_event_type in missing_db_event_types: + # We cannot add the assigned ids to the event_type_manager + # because the commit could get rolled back + assert ( + db_event_type.event_type is not None + ), "event_type should never be None" + event_type_to_id[db_event_type.event_type] = ( + db_event_type.event_type_id + ) + event_type_manager.clear_non_existent(db_event_type.event_type) + + session.execute( + update(Events), + [ + { + "event_id": event_id, + "event_type": None, + "event_type_id": event_type_to_id[ + _EMPTY_EVENT_TYPE if event_type is None else event_type + ], + } + for event_id, event_type in events + ], + ) + + # If there is more work to do return False + # so that we can be called again + if is_done := not events: + _mark_migration_done(session, EventTypeIDMigration) + + _LOGGER.debug("Migrating event_types done=%s", is_done) + return is_done def migration_done(self, instance: Recorder) -> None: """Will be called after migrate returns True.""" @@ -2323,9 +2236,77 @@ class EntityIDMigration(BaseRunTimeMigration): # the db since this happens live @staticmethod + @retryable_database_job("migrate states entity_ids to states_meta") def migrate_data(instance: Recorder) -> bool: - """Migrate some data, returns True if migration is completed.""" - return migrate_entity_ids(instance) + """Migrate entity_ids to states_meta, return True if completed. + + We do this in two steps because we need the history queries to work + while we are migrating. + + 1. Link the states to the states_meta table + 2. Remove the entity_id column from the states table (in post_migrate_entity_ids) + """ + _LOGGER.debug("Migrating entity_ids") + states_meta_manager = instance.states_meta_manager + with session_scope(session=instance.get_session()) as session: + if states := session.execute( + find_entity_ids_to_migrate(instance.max_bind_vars) + ).all(): + entity_ids = {entity_id for _, entity_id in states} + if None in entity_ids: + # entity_id should never be None but we need to be defensive + # so we don't fail the migration because of a bad state + entity_ids.remove(None) + entity_ids.add(_EMPTY_ENTITY_ID) + + entity_id_to_metadata_id = states_meta_manager.get_many( + entity_ids, session, True + ) + if missing_entity_ids := { + entity_id + for entity_id, metadata_id in entity_id_to_metadata_id.items() + if metadata_id is None + }: + missing_states_metadata = [ + StatesMeta(entity_id=entity_id) + for entity_id in missing_entity_ids + ] + session.add_all(missing_states_metadata) + session.flush() # Assign ids + for db_states_metadata in missing_states_metadata: + # We cannot add the assigned ids to the event_type_manager + # because the commit could get rolled back + assert ( + db_states_metadata.entity_id is not None + ), "entity_id should never be None" + entity_id_to_metadata_id[db_states_metadata.entity_id] = ( + db_states_metadata.metadata_id + ) + + session.execute( + update(States), + [ + { + "state_id": state_id, + # We cannot set "entity_id": None yet since + # the history queries still need to work while the + # migration is in progress and we will do this in + # post_migrate_entity_ids + "metadata_id": entity_id_to_metadata_id[ + _EMPTY_ENTITY_ID if entity_id is None else entity_id + ], + } + for state_id, entity_id in states + ], + ) + + # If there is more work to do return False + # so that we can be called again + if is_done := not states: + _mark_migration_done(session, EntityIDMigration) + + _LOGGER.debug("Migrating entity_ids done=%s", is_done) + return is_done def migration_done(self, instance: Recorder) -> None: """Will be called after migrate returns True.""" diff --git a/tests/conftest.py b/tests/conftest.py index 594d71fa165..1e4396313d2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1451,16 +1451,22 @@ async def async_test_recorder( else None ) migrate_states_context_ids = ( - migration.migrate_states_context_ids if enable_migrate_context_ids else None + migration.StatesContextIDMigration.migrate_data + if enable_migrate_context_ids + else None ) migrate_events_context_ids = ( - migration.migrate_events_context_ids if enable_migrate_context_ids else None + migration.EventsContextIDMigration.migrate_data + if enable_migrate_context_ids + else None ) migrate_event_type_ids = ( - migration.migrate_event_type_ids if enable_migrate_event_type_ids else None + migration.EventTypeIDMigration.migrate_data + if enable_migrate_event_type_ids + else None ) migrate_entity_ids = ( - migration.migrate_entity_ids if enable_migrate_entity_ids else None + migration.EntityIDMigration.migrate_data if enable_migrate_entity_ids else None ) legacy_event_id_foreign_key_exists = ( recorder.Recorder._legacy_event_id_foreign_key_exists @@ -1484,22 +1490,22 @@ async def async_test_recorder( autospec=True, ), patch( - "homeassistant.components.recorder.migration.migrate_events_context_ids", + "homeassistant.components.recorder.migration.EventsContextIDMigration.migrate_data", side_effect=migrate_events_context_ids, autospec=True, ), patch( - "homeassistant.components.recorder.migration.migrate_states_context_ids", + "homeassistant.components.recorder.migration.StatesContextIDMigration.migrate_data", side_effect=migrate_states_context_ids, autospec=True, ), patch( - "homeassistant.components.recorder.migration.migrate_event_type_ids", + "homeassistant.components.recorder.migration.EventTypeIDMigration.migrate_data", side_effect=migrate_event_type_ids, autospec=True, ), patch( - "homeassistant.components.recorder.migration.migrate_entity_ids", + "homeassistant.components.recorder.migration.EntityIDMigration.migrate_data", side_effect=migrate_entity_ids, autospec=True, ),