Refactor recorder EventIDPostMigration data migrator (#125126)

This commit is contained in:
Erik Montnemery 2024-09-03 22:37:50 +02:00 committed by GitHub
parent 4aa86a574f
commit cc3d059783
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 53 additions and 74 deletions

View File

@ -2137,50 +2137,6 @@ def post_migrate_entity_ids(instance: Recorder) -> bool:
return is_done
@retryable_database_job("cleanup_legacy_event_ids")
def cleanup_legacy_states_event_ids(instance: Recorder) -> bool:
"""Remove old event_id index from states.
We used to link states to events using the event_id column but we no
longer store state changed events in the events table.
If all old states have been purged and existing states are in the new
format we can drop the index since it can take up ~10MB per 1M rows.
"""
session_maker = instance.get_session
_LOGGER.debug("Cleanup legacy entity_ids")
with session_scope(session=session_maker()) as session:
result = session.execute(has_used_states_event_ids()).scalar()
# In the future we may migrate existing states to the new format
# but in practice very few of these still exist in production and
# removing the index is the likely all that needs to happen.
all_gone = not result
if all_gone:
# Only drop the index if there are no more event_ids in the states table
# ex all NULL
assert instance.engine is not None, "engine should never be None"
if instance.dialect_name == SupportedDialect.SQLITE:
# SQLite does not support dropping foreign key constraints
# so we have to rebuild the table
fk_remove_ok = rebuild_sqlite_table(session_maker, instance.engine, States)
else:
try:
_drop_foreign_key_constraints(
session_maker, instance.engine, TABLE_STATES, "event_id"
)
except (InternalError, OperationalError):
fk_remove_ok = False
else:
fk_remove_ok = True
if fk_remove_ok:
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
instance.use_legacy_events_index = False
_mark_migration_done(session, EventIDPostMigration)
return True
def _initialize_database(session: Session) -> bool:
"""Initialize a new database.
@ -2635,9 +2591,50 @@ class EventIDPostMigration(BaseRunTimeMigration):
migration_version = 2
@staticmethod
@retryable_database_job("cleanup_legacy_event_ids")
def migrate_data(instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed."""
return cleanup_legacy_states_event_ids(instance)
"""Remove old event_id index from states, returns True if completed.
We used to link states to events using the event_id column but we no
longer store state changed events in the events table.
If all old states have been purged and existing states are in the new
format we can drop the index since it can take up ~10MB per 1M rows.
"""
session_maker = instance.get_session
_LOGGER.debug("Cleanup legacy entity_ids")
with session_scope(session=session_maker()) as session:
result = session.execute(has_used_states_event_ids()).scalar()
# In the future we may migrate existing states to the new format
# but in practice very few of these still exist in production and
# removing the index is the likely all that needs to happen.
all_gone = not result
if all_gone:
# Only drop the index if there are no more event_ids in the states table
# ex all NULL
assert instance.engine is not None, "engine should never be None"
if instance.dialect_name == SupportedDialect.SQLITE:
# SQLite does not support dropping foreign key constraints
# so we have to rebuild the table
fk_remove_ok = rebuild_sqlite_table(
session_maker, instance.engine, States
)
else:
try:
_drop_foreign_key_constraints(
session_maker, instance.engine, TABLE_STATES, "event_id"
)
except (InternalError, OperationalError):
fk_remove_ok = False
else:
fk_remove_ok = True
if fk_remove_ok:
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
instance.use_legacy_events_index = False
_mark_migration_done(session, EventIDPostMigration)
return True
@staticmethod
def _legacy_event_id_foreign_key_exists(instance: Recorder) -> bool:

View File

@ -113,6 +113,7 @@ async def test_migrate_times(
patch.object(migration.StatesContextIDMigration, "migrate_data"),
patch.object(migration.EventTypeIDMigration, "migrate_data"),
patch.object(migration.EntityIDMigration, "migrate_data"),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
@ -120,9 +121,6 @@ async def test_migrate_times(
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_30)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
patch(
"homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids"
),
):
async with (
async_test_home_assistant() as hass,
@ -264,9 +262,8 @@ async def test_migrate_can_resume_entity_id_post_migration(
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
@ -274,9 +271,6 @@ async def test_migrate_can_resume_entity_id_post_migration(
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
patch(
"homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids"
),
):
async with (
async_test_home_assistant() as hass,
@ -386,9 +380,8 @@ async def test_migrate_can_resume_ix_states_event_id_removed(
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
@ -396,9 +389,6 @@ async def test_migrate_can_resume_ix_states_event_id_removed(
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
patch(
"homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids"
),
):
async with (
async_test_home_assistant() as hass,
@ -522,9 +512,8 @@ async def test_out_of_disk_space_while_rebuild_states_table(
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
@ -532,9 +521,6 @@ async def test_out_of_disk_space_while_rebuild_states_table(
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
patch(
"homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids"
),
):
async with (
async_test_home_assistant() as hass,
@ -654,7 +640,7 @@ async def test_out_of_disk_space_while_removing_foreign_key(
Note that the test is somewhat forced; the states.event_id foreign key constraint is
removed when migrating to schema version 46, inspecting the schema in
cleanup_legacy_states_event_ids is not likely to fail.
EventIDPostMigration.migrate_data, is not likely to fail.
"""
importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE_32]
@ -702,9 +688,8 @@ async def test_out_of_disk_space_while_removing_foreign_key(
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
@ -712,9 +697,6 @@ async def test_out_of_disk_space_while_removing_foreign_key(
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
patch(
"homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids"
),
):
async with (
async_test_home_assistant() as hass,