Move recorder EntityIDPostMigrationTask to migration (#125136)

* Move recorder EntityIDPostMigrationTask to migration

* Update test
This commit is contained in:
Erik Montnemery 2024-09-04 08:38:46 +02:00 committed by GitHub
parent 482bed522f
commit 7fc0e36b2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 18 additions and 25 deletions

View File

@ -1283,10 +1283,6 @@ class Recorder(threading.Thread):
self.event_session = self.get_session()
self.event_session.expire_on_commit = False
def _post_migrate_entity_ids(self) -> bool:
"""Post migrate entity_ids if needed."""
return migration.post_migrate_entity_ids(self)
def _send_keep_alive(self) -> None:
"""Send a keep alive to keep the db connection open."""
assert self.event_session is not None

View File

@ -100,7 +100,7 @@ from .queries import (
migrate_single_statistics_row_to_timestamp,
)
from .statistics import cleanup_statistics_timestamp_migration, get_start_time
from .tasks import EntityIDPostMigrationTask, RecorderTask
from .tasks import RecorderTask
from .util import (
database_job_retry_wrapper,
execute_stmt_lambda_element,
@ -2667,6 +2667,17 @@ class EventIDPostMigration(BaseRunTimeMigration):
return NeedsMigrateResult(needs_migrate=False, migration_done=True)
@dataclass(slots=True)
class EntityIDPostMigrationTask(RecorderTask):
"""An object to insert into the recorder queue to cleanup after entity_ids migration."""
def run(self, instance: Recorder) -> None:
"""Run entity_id post migration task."""
if not post_migrate_entity_ids(instance):
# Schedule a new migration task if this one didn't finish
instance.queue_task(EntityIDPostMigrationTask())
def _mark_migration_done(
session: Session, migration: type[BaseRunTimeMigration]
) -> None:

View File

@ -333,19 +333,6 @@ class AdjustLRUSizeTask(RecorderTask):
instance._adjust_lru_size() # noqa: SLF001
@dataclass(slots=True)
class EntityIDPostMigrationTask(RecorderTask):
"""An object to insert into the recorder queue to cleanup after entity_ids migration."""
def run(self, instance: Recorder) -> None:
"""Run entity_id post migration task."""
if (
not instance._post_migrate_entity_ids() # noqa: SLF001
):
# Schedule a new migration task if this one didn't finish
instance.queue_task(EntityIDPostMigrationTask())
@dataclass(slots=True)
class RefreshEventTypesTask(RecorderTask):
"""An object to insert into the recorder queue to refresh event types."""

View File

@ -32,7 +32,6 @@ from homeassistant.components.recorder.queries import (
get_migration_changes,
select_event_type_ids,
)
from homeassistant.components.recorder.tasks import EntityIDPostMigrationTask
from homeassistant.components.recorder.util import (
execute_stmt_lambda_element,
get_index_by_name,
@ -746,7 +745,7 @@ async def test_post_migrate_entity_ids(
await _async_wait_migration_done(hass)
# This is a threadsafe way to add a task to the recorder
recorder_mock.queue_task(EntityIDPostMigrationTask())
recorder_mock.queue_task(migration.EntityIDPostMigrationTask())
await _async_wait_migration_done(hass)
def _fetch_migrated_states():

View File

@ -109,6 +109,7 @@ async def test_migrate_times(
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(migration.EventsContextIDMigration, "migrate_data"),
patch.object(migration.StatesContextIDMigration, "migrate_data"),
patch.object(migration.EventTypeIDMigration, "migrate_data"),
@ -120,7 +121,6 @@ async def test_migrate_times(
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_30)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
):
async with (
async_test_home_assistant() as hass,
@ -264,13 +264,13 @@ async def test_migrate_can_resume_entity_id_post_migration(
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
):
async with (
async_test_home_assistant() as hass,
@ -382,13 +382,13 @@ async def test_migrate_can_resume_ix_states_event_id_removed(
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
):
async with (
async_test_home_assistant() as hass,
@ -514,13 +514,13 @@ async def test_out_of_disk_space_while_rebuild_states_table(
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
):
async with (
async_test_home_assistant() as hass,
@ -690,13 +690,13 @@ async def test_out_of_disk_space_while_removing_foreign_key(
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test(SCHEMA_MODULE_32)),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
):
async with (
async_test_home_assistant() as hass,