mirror of
https://github.com/home-assistant/core.git
synced 2025-07-24 21:57:51 +00:00
Add EntityIDPostMigration data migrator class (#125307)
This commit is contained in:
parent
8bdd81ff24
commit
66ab90b518
@ -95,6 +95,7 @@ from .queries import (
|
|||||||
has_event_type_to_migrate,
|
has_event_type_to_migrate,
|
||||||
has_events_context_ids_to_migrate,
|
has_events_context_ids_to_migrate,
|
||||||
has_states_context_ids_to_migrate,
|
has_states_context_ids_to_migrate,
|
||||||
|
has_used_states_entity_ids,
|
||||||
has_used_states_event_ids,
|
has_used_states_event_ids,
|
||||||
migrate_single_short_term_statistics_row_to_timestamp,
|
migrate_single_short_term_statistics_row_to_timestamp,
|
||||||
migrate_single_statistics_row_to_timestamp,
|
migrate_single_statistics_row_to_timestamp,
|
||||||
@ -2107,7 +2108,6 @@ def _generate_ulid_bytes_at_time(timestamp: float | None) -> bytes:
|
|||||||
return ulid_to_bytes(ulid_at_time(timestamp or time()))
|
return ulid_to_bytes(ulid_at_time(timestamp or time()))
|
||||||
|
|
||||||
|
|
||||||
@retryable_database_job("post migrate states entity_ids to states_meta")
|
|
||||||
def post_migrate_entity_ids(instance: Recorder) -> bool:
|
def post_migrate_entity_ids(instance: Recorder) -> bool:
|
||||||
"""Remove old entity_id strings from states.
|
"""Remove old entity_id strings from states.
|
||||||
|
|
||||||
@ -2122,10 +2122,6 @@ def post_migrate_entity_ids(instance: Recorder) -> bool:
|
|||||||
# If there is more work to do return False
|
# If there is more work to do return False
|
||||||
# so that we can be called again
|
# so that we can be called again
|
||||||
|
|
||||||
if is_done:
|
|
||||||
# Drop the old indexes since they are no longer needed
|
|
||||||
_drop_index(session_maker, "states", LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX)
|
|
||||||
|
|
||||||
_LOGGER.debug("Cleanup legacy entity_ids done=%s", is_done)
|
_LOGGER.debug("Cleanup legacy entity_ids done=%s", is_done)
|
||||||
return is_done
|
return is_done
|
||||||
|
|
||||||
@ -2546,17 +2542,8 @@ class EntityIDMigration(BaseRunTimeMigrationWithQuery):
|
|||||||
_LOGGER.debug("Activating states_meta manager as all data is migrated")
|
_LOGGER.debug("Activating states_meta manager as all data is migrated")
|
||||||
instance.states_meta_manager.active = True
|
instance.states_meta_manager.active = True
|
||||||
with contextlib.suppress(SQLAlchemyError):
|
with contextlib.suppress(SQLAlchemyError):
|
||||||
# If ix_states_entity_id_last_updated_ts still exists
|
migrate = EntityIDPostMigration(self.schema_version, self.migration_changes)
|
||||||
# on the states table it means the entity id migration
|
migrate.do_migrate(instance, session)
|
||||||
# finished by the EntityIDPostMigrationTask did not
|
|
||||||
# complete because they restarted in the middle of it. We need
|
|
||||||
# to pick back up where we left off.
|
|
||||||
if get_index_by_name(
|
|
||||||
session,
|
|
||||||
TABLE_STATES,
|
|
||||||
LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX,
|
|
||||||
):
|
|
||||||
instance.queue_task(EntityIDPostMigrationTask())
|
|
||||||
|
|
||||||
def needs_migrate_query(self) -> StatementLambdaElement:
|
def needs_migrate_query(self) -> StatementLambdaElement:
|
||||||
"""Check if the data is migrated."""
|
"""Check if the data is migrated."""
|
||||||
@ -2645,15 +2632,21 @@ class EventIDPostMigration(BaseRunTimeMigration):
|
|||||||
return DataMigrationStatus(needs_migrate=False, migration_done=True)
|
return DataMigrationStatus(needs_migrate=False, migration_done=True)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
class EntityIDPostMigration(BaseRunTimeMigrationWithQuery):
|
||||||
class EntityIDPostMigrationTask(RecorderTask):
|
"""Migration to remove old entity_id strings from states."""
|
||||||
"""An object to insert into the recorder queue to cleanup after entity_ids migration."""
|
|
||||||
|
|
||||||
def run(self, instance: Recorder) -> None:
|
migration_id = "entity_id_post_migration"
|
||||||
"""Run entity_id post migration task."""
|
task = MigrationTask
|
||||||
if not post_migrate_entity_ids(instance):
|
index_to_drop = (TABLE_STATES, LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX)
|
||||||
# Schedule a new migration task if this one didn't finish
|
|
||||||
instance.queue_task(EntityIDPostMigrationTask())
|
def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
|
||||||
|
"""Migrate some data, returns True if migration is completed."""
|
||||||
|
is_done = post_migrate_entity_ids(instance)
|
||||||
|
return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
|
||||||
|
|
||||||
|
def needs_migrate_query(self) -> StatementLambdaElement:
|
||||||
|
"""Check if the data is migrated."""
|
||||||
|
return has_used_states_entity_ids()
|
||||||
|
|
||||||
|
|
||||||
def _mark_migration_done(
|
def _mark_migration_done(
|
||||||
|
@ -763,6 +763,13 @@ def batch_cleanup_entity_ids() -> StatementLambdaElement:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def has_used_states_entity_ids() -> StatementLambdaElement:
|
||||||
|
"""Check if there are used entity_ids in the states table."""
|
||||||
|
return lambda_stmt(
|
||||||
|
lambda: select(States.state_id).filter(States.entity_id.isnot(None)).limit(1)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def has_used_states_event_ids() -> StatementLambdaElement:
|
def has_used_states_event_ids() -> StatementLambdaElement:
|
||||||
"""Check if there are used event_ids in the states table."""
|
"""Check if there are used event_ids in the states table."""
|
||||||
return lambda_stmt(
|
return lambda_stmt(
|
||||||
|
@ -884,7 +884,7 @@ async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder)
|
|||||||
|
|
||||||
await _async_wait_migration_done(hass)
|
await _async_wait_migration_done(hass)
|
||||||
# This is a threadsafe way to add a task to the recorder
|
# This is a threadsafe way to add a task to the recorder
|
||||||
migrator = migration.EntityIDMigration(None, None)
|
migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {})
|
||||||
recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator))
|
recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator))
|
||||||
await _async_wait_migration_done(hass)
|
await _async_wait_migration_done(hass)
|
||||||
|
|
||||||
@ -963,7 +963,8 @@ async def test_post_migrate_entity_ids(
|
|||||||
|
|
||||||
await _async_wait_migration_done(hass)
|
await _async_wait_migration_done(hass)
|
||||||
# This is a threadsafe way to add a task to the recorder
|
# This is a threadsafe way to add a task to the recorder
|
||||||
recorder_mock.queue_task(migration.EntityIDPostMigrationTask())
|
migrator = migration.EntityIDPostMigration(None, None)
|
||||||
|
recorder_mock.queue_task(migrator.task(migrator))
|
||||||
await _async_wait_migration_done(hass)
|
await _async_wait_migration_done(hass)
|
||||||
|
|
||||||
def _fetch_migrated_states():
|
def _fetch_migrated_states():
|
||||||
@ -1020,7 +1021,7 @@ async def test_migrate_null_entity_ids(
|
|||||||
|
|
||||||
await _async_wait_migration_done(hass)
|
await _async_wait_migration_done(hass)
|
||||||
# This is a threadsafe way to add a task to the recorder
|
# This is a threadsafe way to add a task to the recorder
|
||||||
migrator = migration.EntityIDMigration(None, None)
|
migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {})
|
||||||
recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator))
|
recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator))
|
||||||
await _async_wait_migration_done(hass)
|
await _async_wait_migration_done(hass)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user