mirror of
https://github.com/home-assistant/core.git
synced 2025-07-15 09:17:10 +00:00
Remove support for live recorder data migration of event type IDs (#131826)
This commit is contained in:
parent
d26d483a2f
commit
552613d949
@ -2508,15 +2508,11 @@ class EventsContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
|
||||
return has_events_context_ids_to_migrate()
|
||||
|
||||
|
||||
class EventTypeIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
|
||||
class EventTypeIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
|
||||
"""Migration to migrate event_type to event_type_ids."""
|
||||
|
||||
required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION
|
||||
migration_id = "event_type_id_migration"
|
||||
task = CommitBeforeMigrationTask
|
||||
# We have to commit before to make sure there are
|
||||
# no new pending event_types about to be added to
|
||||
# the db since this happens live
|
||||
|
||||
def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
|
||||
"""Migrate event_type to event_type_ids, return True if completed."""
|
||||
@ -2576,11 +2572,6 @@ class EventTypeIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
|
||||
_LOGGER.debug("Migrating event_types done=%s", is_done)
|
||||
return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
|
||||
|
||||
def migration_done(self, instance: Recorder, session: Session) -> None:
|
||||
"""Will be called after migrate returns True."""
|
||||
_LOGGER.debug("Activating event_types manager as all data is migrated")
|
||||
instance.event_type_manager.active = True
|
||||
|
||||
def needs_migrate_query(self) -> StatementLambdaElement:
|
||||
"""Check if the data is migrated."""
|
||||
return has_event_type_to_migrate()
|
||||
@ -2770,11 +2761,11 @@ class EntityIDPostMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
|
||||
NON_LIVE_DATA_MIGRATORS = (
|
||||
StatesContextIDMigration, # Introduced in HA Core 2023.4
|
||||
EventsContextIDMigration, # Introduced in HA Core 2023.4
|
||||
EventTypeIDMigration, # Introduced in HA Core 2023.4 by PR #89465
|
||||
EntityIDMigration, # Introduced in HA Core 2023.4 by PR #89557
|
||||
)
|
||||
|
||||
LIVE_DATA_MIGRATORS = (
|
||||
EventTypeIDMigration, # Introduced in HA Core 2023.4 by PR #89465
|
||||
EventIDPostMigration, # Introduced in HA Core 2023.4 by PR #89901
|
||||
EntityIDPostMigration, # Introduced in HA Core 2023.4 by PR #89557
|
||||
)
|
||||
|
@ -116,8 +116,7 @@ def purge_old_data(
|
||||
|
||||
# This purge cycle is finished, clean up old event types and
|
||||
# recorder runs
|
||||
if instance.event_type_manager.active:
|
||||
_purge_old_event_types(instance, session)
|
||||
_purge_old_event_types(instance, session)
|
||||
|
||||
if instance.states_meta_manager.active:
|
||||
_purge_old_entity_ids(instance, session)
|
||||
|
@ -28,8 +28,6 @@ CACHE_SIZE = 2048
|
||||
class EventTypeManager(BaseLRUTableManager[EventTypes]):
|
||||
"""Manage the EventTypes table."""
|
||||
|
||||
active = False
|
||||
|
||||
def __init__(self, recorder: Recorder) -> None:
|
||||
"""Initialize the event type manager."""
|
||||
super().__init__(recorder, CACHE_SIZE)
|
||||
|
@ -824,13 +824,13 @@ async def test_finish_migrate_states_context_ids(
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("persistent_database", [True])
|
||||
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
|
||||
@pytest.mark.usefixtures("db_schema_32")
|
||||
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
|
||||
async def test_migrate_event_type_ids(
|
||||
hass: HomeAssistant, recorder_mock: Recorder
|
||||
async_test_recorder: RecorderInstanceGenerator,
|
||||
) -> None:
|
||||
"""Test we can migrate event_types to the EventTypes table."""
|
||||
await async_wait_recording_done(hass)
|
||||
importlib.import_module(SCHEMA_MODULE_32)
|
||||
old_db_schema = sys.modules[SCHEMA_MODULE_32]
|
||||
|
||||
@ -856,14 +856,24 @@ async def test_migrate_event_type_ids(
|
||||
)
|
||||
)
|
||||
|
||||
await recorder_mock.async_add_executor_job(_insert_events)
|
||||
# Create database with old schema
|
||||
with (
|
||||
patch.object(recorder, "db_schema", old_db_schema),
|
||||
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
|
||||
patch.object(migration.EventTypeIDMigration, "migrate_data"),
|
||||
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
|
||||
):
|
||||
async with (
|
||||
async_test_home_assistant() as hass,
|
||||
async_test_recorder(hass) as instance,
|
||||
):
|
||||
await instance.async_add_executor_job(_insert_events)
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
# This is a threadsafe way to add a task to the recorder
|
||||
migrator = migration.EventTypeIDMigration(None, None)
|
||||
recorder_mock.queue_task(migrator.task(migrator))
|
||||
await _async_wait_migration_done(hass)
|
||||
await _async_wait_migration_done(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
await _async_wait_migration_done(hass)
|
||||
|
||||
await hass.async_stop()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
def _fetch_migrated_events():
|
||||
with session_scope(hass=hass, read_only=True) as session:
|
||||
@ -894,23 +904,38 @@ async def test_migrate_event_type_ids(
|
||||
)
|
||||
return result
|
||||
|
||||
events_by_type = await recorder_mock.async_add_executor_job(_fetch_migrated_events)
|
||||
assert len(events_by_type["event_type_one"]) == 2
|
||||
assert len(events_by_type["event_type_two"]) == 1
|
||||
|
||||
def _get_many():
|
||||
with session_scope(hass=hass, read_only=True) as session:
|
||||
return recorder_mock.event_type_manager.get_many(
|
||||
return instance.event_type_manager.get_many(
|
||||
("event_type_one", "event_type_two"), session
|
||||
)
|
||||
|
||||
mapped = await recorder_mock.async_add_executor_job(_get_many)
|
||||
# Run again with new schema, let migration run
|
||||
async with (
|
||||
async_test_home_assistant() as hass,
|
||||
async_test_recorder(hass) as instance,
|
||||
):
|
||||
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
|
||||
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
|
||||
mapped = await instance.async_add_executor_job(_get_many)
|
||||
migration_changes = await instance.async_add_executor_job(
|
||||
_get_migration_id, hass
|
||||
)
|
||||
|
||||
await hass.async_stop()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(events_by_type["event_type_one"]) == 2
|
||||
assert len(events_by_type["event_type_two"]) == 1
|
||||
|
||||
assert mapped["event_type_one"] is not None
|
||||
assert mapped["event_type_two"] is not None
|
||||
|
||||
migration_changes = await recorder_mock.async_add_executor_job(
|
||||
_get_migration_id, hass
|
||||
)
|
||||
assert (
|
||||
migration_changes[migration.EventTypeIDMigration.migration_id]
|
||||
== migration.EventTypeIDMigration.migration_version
|
||||
@ -1214,13 +1239,13 @@ async def test_migrate_null_entity_ids(
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("persistent_database", [True])
|
||||
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
|
||||
@pytest.mark.usefixtures("db_schema_32")
|
||||
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
|
||||
async def test_migrate_null_event_type_ids(
|
||||
hass: HomeAssistant, recorder_mock: Recorder
|
||||
async_test_recorder: RecorderInstanceGenerator,
|
||||
) -> None:
|
||||
"""Test we can migrate event_types to the EventTypes table when the event_type is NULL."""
|
||||
await async_wait_recording_done(hass)
|
||||
importlib.import_module(SCHEMA_MODULE_32)
|
||||
old_db_schema = sys.modules[SCHEMA_MODULE_32]
|
||||
|
||||
@ -1249,14 +1274,24 @@ async def test_migrate_null_event_type_ids(
|
||||
),
|
||||
)
|
||||
|
||||
await recorder_mock.async_add_executor_job(_insert_events)
|
||||
# Create database with old schema
|
||||
with (
|
||||
patch.object(recorder, "db_schema", old_db_schema),
|
||||
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
|
||||
patch.object(migration.EventTypeIDMigration, "migrate_data"),
|
||||
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
|
||||
):
|
||||
async with (
|
||||
async_test_home_assistant() as hass,
|
||||
async_test_recorder(hass) as instance,
|
||||
):
|
||||
await instance.async_add_executor_job(_insert_events)
|
||||
|
||||
await _async_wait_migration_done(hass)
|
||||
# This is a threadsafe way to add a task to the recorder
|
||||
migrator = migration.EventTypeIDMigration(None, None)
|
||||
recorder_mock.queue_task(migrator.task(migrator))
|
||||
await _async_wait_migration_done(hass)
|
||||
await _async_wait_migration_done(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
await _async_wait_migration_done(hass)
|
||||
|
||||
await hass.async_stop()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
def _fetch_migrated_events():
|
||||
with session_scope(hass=hass, read_only=True) as session:
|
||||
@ -1287,15 +1322,29 @@ async def test_migrate_null_event_type_ids(
|
||||
)
|
||||
return result
|
||||
|
||||
events_by_type = await recorder_mock.async_add_executor_job(_fetch_migrated_events)
|
||||
assert len(events_by_type["event_type_one"]) == 2
|
||||
assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000
|
||||
|
||||
def _get_migration_id():
|
||||
with session_scope(hass=hass, read_only=True) as session:
|
||||
return dict(execute_stmt_lambda_element(session, get_migration_changes()))
|
||||
|
||||
migration_changes = await recorder_mock.async_add_executor_job(_get_migration_id)
|
||||
# Run again with new schema, let migration run
|
||||
async with (
|
||||
async_test_home_assistant() as hass,
|
||||
async_test_recorder(hass) as instance,
|
||||
):
|
||||
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
|
||||
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
|
||||
migration_changes = await instance.async_add_executor_job(_get_migration_id)
|
||||
|
||||
await hass.async_stop()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(events_by_type["event_type_one"]) == 2
|
||||
assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000
|
||||
assert (
|
||||
migration_changes[migration.EventTypeIDMigration.migration_id]
|
||||
== migration.EventTypeIDMigration.migration_version
|
||||
|
@ -1930,8 +1930,6 @@ async def test_purge_old_events_purges_the_event_type_ids(
|
||||
hass: HomeAssistant, recorder_mock: Recorder
|
||||
) -> None:
|
||||
"""Test deleting old events purges event type ids."""
|
||||
assert recorder_mock.event_type_manager.active is True
|
||||
|
||||
utcnow = dt_util.utcnow()
|
||||
five_days_ago = utcnow - timedelta(days=5)
|
||||
eleven_days_ago = utcnow - timedelta(days=11)
|
||||
|
Loading…
x
Reference in New Issue
Block a user