Remove support for live recorder data migration of entity IDs (#131952)

This commit is contained in:
Erik Montnemery 2024-12-03 21:43:33 +01:00 committed by GitHub
parent 09d7fed6cd
commit 1a714276cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 141 additions and 81 deletions

View File

@ -2586,15 +2586,11 @@ class EventTypeIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
return has_event_type_to_migrate()
class EntityIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
class EntityIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
"""Migration to migrate entity_ids to states_meta."""
required_schema_version = STATES_META_SCHEMA_VERSION
migration_id = "entity_id_migration"
task = CommitBeforeMigrationTask
# We have to commit before to make sure there are
# no new pending states_meta about to be added to
# the db since this happens live
def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
"""Migrate entity_ids to states_meta, return True if completed.
@ -2664,18 +2660,6 @@ class EntityIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
_LOGGER.debug("Migrating entity_ids done=%s", is_done)
return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
def migration_done(self, instance: Recorder, session: Session) -> None:
"""Will be called after migrate returns True."""
# The migration has finished, now we start the post migration
# to remove the old entity_id data from the states table
# at this point we can also start using the StatesMeta table
# so we set active to True
_LOGGER.debug("Activating states_meta manager as all data is migrated")
instance.states_meta_manager.active = True
with contextlib.suppress(SQLAlchemyError):
migrate = EntityIDPostMigration(self.schema_version, self.migration_changes)
migrate.queue_migration(instance, session)
def needs_migrate_query(self) -> StatementLambdaElement:
"""Check if the data is migrated."""
return has_entity_ids_to_migrate()
@ -2786,12 +2770,13 @@ class EntityIDPostMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
NON_LIVE_DATA_MIGRATORS = (
StatesContextIDMigration, # Introduced in HA Core 2023.4
EventsContextIDMigration, # Introduced in HA Core 2023.4
EntityIDMigration, # Introduced in HA Core 2023.4 by PR #89557
)
LIVE_DATA_MIGRATORS = (
EventTypeIDMigration, # Introduced in HA Core 2023.4 by PR #89465
EntityIDMigration, # Introduced in HA Core 2023.4 by PR #89557
EventIDPostMigration, # Introduced in HA Core 2023.4 by PR #89901
EntityIDPostMigration, # Introduced in HA Core 2023.4 by PR #89557
)

View File

@ -24,7 +24,7 @@ CACHE_SIZE = 8192
class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
"""Manage the StatesMeta table."""
active = False
active = True
def __init__(self, recorder: Recorder) -> None:
"""Initialize the states meta manager."""

View File

@ -428,14 +428,6 @@ def get_schema_module_path(schema_version_postfix: str) -> str:
return f"tests.components.recorder.db_schema_{schema_version_postfix}"
@dataclass(slots=True)
class MockMigrationTask(migration.MigrationTask):
"""Mock migration task which does nothing."""
def run(self, instance: Recorder) -> None:
"""Run migration task."""
@contextmanager
def old_db_schema(schema_version_postfix: str) -> Iterator[None]:
"""Fixture to initialize the db with the old schema."""
@ -453,7 +445,6 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]:
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch.object(core, "StateAttributes", old_db_schema.StateAttributes),
patch.object(migration.EntityIDMigration, "task", MockMigrationTask),
patch(
CREATE_ENGINE_TARGET,
new=partial(

View File

@ -38,6 +38,17 @@ async def mock_recorder_before_hass(
"""Set up recorder."""
@pytest.fixture
def disable_states_meta_manager():
"""Disable the states meta manager."""
with patch.object(
recorder.table_managers.states_meta.StatesMetaManager,
"active",
False,
):
yield
@pytest.fixture(autouse=True)
def db_schema_32():
"""Fixture to initialize the db with the old schema 32."""
@ -46,7 +57,9 @@ def db_schema_32():
@pytest.fixture(autouse=True)
def setup_recorder(db_schema_32, recorder_mock: Recorder) -> recorder.Recorder:
def setup_recorder(
db_schema_32, disable_states_meta_manager, recorder_mock: Recorder
) -> recorder.Recorder:
"""Set up recorder."""

View File

@ -44,7 +44,6 @@ import homeassistant.util.dt as dt_util
from homeassistant.util.ulid import bytes_to_ulid, ulid_at_time, ulid_to_bytes
from .common import (
MockMigrationTask,
async_attach_db_engine,
async_recorder_block_till_done,
async_wait_recording_done,
@ -114,7 +113,6 @@ def db_schema_32():
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch.object(core, "StateAttributes", old_db_schema.StateAttributes),
patch.object(migration.EntityIDMigration, "task", MockMigrationTask),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
):
yield
@ -919,11 +917,13 @@ async def test_migrate_event_type_ids(
)
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.usefixtures("db_schema_32")
async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder) -> None:
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_entity_ids(
async_test_recorder: RecorderInstanceGenerator,
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE_32]
@ -949,14 +949,24 @@ async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder)
)
)
await recorder_mock.async_add_executor_job(_insert_states)
# Create database with old schema
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EntityIDMigration, "migrate_data"),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await instance.async_add_executor_job(_insert_states)
await _async_wait_migration_done(hass)
# This is a threadsafe way to add a task to the recorder
migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {})
recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator))
await _async_wait_migration_done(hass)
await _async_wait_migration_done(hass)
await async_wait_recording_done(hass)
await _async_wait_migration_done(hass)
await hass.async_stop()
await hass.async_block_till_done()
def _fetch_migrated_states():
with session_scope(hass=hass, read_only=True) as session:
@ -982,28 +992,43 @@ async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder)
)
return result
states_by_entity_id = await recorder_mock.async_add_executor_job(
_fetch_migrated_states
)
# Run again with new schema, let migration run
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
states_by_entity_id = await instance.async_add_executor_job(
_fetch_migrated_states
)
migration_changes = await instance.async_add_executor_job(
_get_migration_id, hass
)
await hass.async_stop()
await hass.async_block_till_done()
assert len(states_by_entity_id["sensor.two"]) == 2
assert len(states_by_entity_id["sensor.one"]) == 1
migration_changes = await recorder_mock.async_add_executor_job(
_get_migration_id, hass
)
assert (
migration_changes[migration.EntityIDMigration.migration_id]
== migration.EntityIDMigration.migration_version
)
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.usefixtures("db_schema_32")
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_post_migrate_entity_ids(
hass: HomeAssistant, recorder_mock: Recorder
async_test_recorder: RecorderInstanceGenerator,
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE_32]
@ -1029,14 +1054,25 @@ async def test_post_migrate_entity_ids(
)
)
await recorder_mock.async_add_executor_job(_insert_events)
# Create database with old schema
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EntityIDMigration, "migrate_data"),
patch.object(migration.EntityIDPostMigration, "migrate_data"),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await instance.async_add_executor_job(_insert_events)
await _async_wait_migration_done(hass)
# This is a threadsafe way to add a task to the recorder
migrator = migration.EntityIDPostMigration(None, None)
recorder_mock.queue_task(migrator.task(migrator))
await _async_wait_migration_done(hass)
await _async_wait_migration_done(hass)
await async_wait_recording_done(hass)
await _async_wait_migration_done(hass)
await hass.async_stop()
await hass.async_block_till_done()
def _fetch_migrated_states():
with session_scope(hass=hass, read_only=True) as session:
@ -1047,19 +1083,34 @@ async def test_post_migrate_entity_ids(
assert len(states) == 3
return {state.state: state.entity_id for state in states}
states_by_state = await recorder_mock.async_add_executor_job(_fetch_migrated_states)
# Run again with new schema, let migration run
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
states_by_state = await instance.async_add_executor_job(_fetch_migrated_states)
await hass.async_stop()
await hass.async_block_till_done()
assert states_by_state["one_1"] is None
assert states_by_state["two_2"] is None
assert states_by_state["two_1"] is None
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.usefixtures("db_schema_32")
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_null_entity_ids(
hass: HomeAssistant, recorder_mock: Recorder
async_test_recorder: RecorderInstanceGenerator,
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE_32]
@ -1088,14 +1139,24 @@ async def test_migrate_null_entity_ids(
),
)
await recorder_mock.async_add_executor_job(_insert_states)
# Create database with old schema
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EntityIDMigration, "migrate_data"),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await instance.async_add_executor_job(_insert_states)
await _async_wait_migration_done(hass)
# This is a threadsafe way to add a task to the recorder
migrator = migration.EntityIDMigration(old_db_schema.SCHEMA_VERSION, {})
recorder_mock.queue_task(migration.CommitBeforeMigrationTask(migrator))
await _async_wait_migration_done(hass)
await _async_wait_migration_done(hass)
await async_wait_recording_done(hass)
await _async_wait_migration_done(hass)
await hass.async_stop()
await hass.async_block_till_done()
def _fetch_migrated_states():
with session_scope(hass=hass, read_only=True) as session:
@ -1121,17 +1182,32 @@ async def test_migrate_null_entity_ids(
)
return result
states_by_entity_id = await recorder_mock.async_add_executor_job(
_fetch_migrated_states
)
assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000
assert len(states_by_entity_id["sensor.one"]) == 2
def _get_migration_id():
with session_scope(hass=hass, read_only=True) as session:
return dict(execute_stmt_lambda_element(session, get_migration_changes()))
migration_changes = await recorder_mock.async_add_executor_job(_get_migration_id)
# Run again with new schema, let migration run
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
states_by_entity_id = await instance.async_add_executor_job(
_fetch_migrated_states
)
migration_changes = await instance.async_add_executor_job(_get_migration_id)
await hass.async_stop()
await hass.async_block_till_done()
assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000
assert len(states_by_entity_id["sensor.one"]) == 2
assert (
migration_changes[migration.EntityIDMigration.migration_id]
== migration.EntityIDMigration.migration_version

View File

@ -19,11 +19,7 @@ from homeassistant.components.recorder.util import (
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from .common import (
MockMigrationTask,
async_recorder_block_till_done,
async_wait_recording_done,
)
from .common import async_recorder_block_till_done, async_wait_recording_done
from tests.common import async_test_home_assistant
from tests.typing import RecorderInstanceGenerator
@ -102,7 +98,6 @@ async def test_migration_changes_prevent_trying_to_migrate_again(
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch.object(core, "StateAttributes", old_db_schema.StateAttributes),
patch.object(migration.EntityIDMigration, "task", MockMigrationTask),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
):
async with (