diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index db9f4239480..5d7d81f38d2 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -16,7 +16,14 @@ import time from typing import TYPE_CHECKING, Any, cast import psutil_home_assistant as ha_psutil -from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select, update +from sqlalchemy import ( + create_engine, + event as sqlalchemy_event, + exc, + inspect, + select, + update, +) from sqlalchemy.engine import Engine from sqlalchemy.engine.interfaces import DBAPIConnection from sqlalchemy.exc import SQLAlchemyError @@ -820,7 +827,7 @@ class Recorder(threading.Thread): # If ix_states_entity_id_last_updated_ts still exists # on the states table it means the entity id migration # finished by the EntityIDPostMigrationTask did not - # because they restarted in the middle of it. We need + # complete because they restarted in the middle of it. We need # to pick back up where we left off. if get_index_by_name( session, @@ -832,9 +839,13 @@ class Recorder(threading.Thread): if self.schema_version > LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION: with contextlib.suppress(SQLAlchemyError): # If the index of event_ids on the states table is still present - # we need to queue a task to remove it. - if get_index_by_name( - session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX + # or the event_id foreign key still exists we need to queue a + # task to remove it. + if ( + get_index_by_name( + session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX + ) + or self._legacy_event_id_foreign_key_exists() ): self.queue_task(EventIdMigrationTask()) self.use_legacy_events_index = True @@ -1285,6 +1296,21 @@ class Recorder(threading.Thread): """Run post schema migration tasks.""" migration.post_schema_migration(self, old_version, new_version) + def _legacy_event_id_foreign_key_exists(self) -> bool: + """Check if the legacy event_id foreign key exists.""" + engine = self.engine + assert engine is not None + return bool( + next( + ( + fk + for fk in inspect(engine).get_foreign_keys(TABLE_STATES) + if fk["constrained_columns"] == ["event_id"] + ), + None, + ) + ) + def _migrate_states_context_ids(self) -> bool: """Migrate states context ids if needed.""" return migration.migrate_states_context_ids(self) diff --git a/tests/components/recorder/test_v32_migration.py b/tests/components/recorder/test_v32_migration.py index 4e809d02446..666629d4bcf 100644 --- a/tests/components/recorder/test_v32_migration.py +++ b/tests/components/recorder/test_v32_migration.py @@ -3,14 +3,14 @@ from datetime import timedelta import importlib import sys -from unittest.mock import patch +from unittest.mock import DEFAULT, patch import pytest from sqlalchemy import create_engine, inspect from sqlalchemy.orm import Session from homeassistant.components import recorder -from homeassistant.components.recorder import core, statistics +from homeassistant.components.recorder import core, migration, statistics from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.util import session_scope from homeassistant.core import EVENT_STATE_CHANGED, Event, EventOrigin, State @@ -104,21 +104,14 @@ async def test_migrate_times( patch.object(core, "States", old_db_schema.States), patch.object(core, "Events", old_db_schema.Events), patch(CREATE_ENGINE_TARGET, new=_create_engine_test), - patch( - "homeassistant.components.recorder.Recorder._migrate_events_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_states_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_event_type_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_entity_ids", - ), - patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"), - patch( - "homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids" + patch.multiple( + "homeassistant.components.recorder.Recorder", + _migrate_events_context_ids=DEFAULT, + _migrate_states_context_ids=DEFAULT, + _migrate_event_type_ids=DEFAULT, + _migrate_entity_ids=DEFAULT, + _post_migrate_entity_ids=DEFAULT, + _cleanup_legacy_states_event_ids=DEFAULT, ), ): async with ( @@ -267,21 +260,14 @@ async def test_migrate_can_resume_entity_id_post_migration( patch.object(core, "States", old_db_schema.States), patch.object(core, "Events", old_db_schema.Events), patch(CREATE_ENGINE_TARGET, new=_create_engine_test), - patch( - "homeassistant.components.recorder.Recorder._migrate_events_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_states_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_event_type_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_entity_ids", - ), - patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"), - patch( - "homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids" + patch.multiple( + "homeassistant.components.recorder.Recorder", + _migrate_events_context_ids=DEFAULT, + _migrate_states_context_ids=DEFAULT, + _migrate_event_type_ids=DEFAULT, + _migrate_entity_ids=DEFAULT, + _post_migrate_entity_ids=DEFAULT, + _cleanup_legacy_states_event_ids=DEFAULT, ), ): async with ( @@ -328,5 +314,143 @@ async def test_migrate_can_resume_entity_id_post_migration( states_indexes = await instance.async_add_executor_job(_get_states_index_names) states_index_names = {index["name"] for index in states_indexes} assert "ix_states_entity_id_last_updated_ts" not in states_index_names + assert "ix_states_event_id" not in states_index_names + + await hass.async_stop() + + +@pytest.mark.parametrize("persistent_database", [True]) +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage +async def test_migrate_can_resume_ix_states_event_id_removed( + async_test_recorder: RecorderInstanceGenerator, + caplog: pytest.LogCaptureFixture, + recorder_db_url: str, +) -> None: + """Test we resume the entity id post migration after a restart. + + This case tests the migration still happens if + ix_states_event_id is removed from the states table. + """ + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_second_past = now - timedelta(seconds=1) + mock_state = State( + "sensor.test", + "old", + {"last_reset": now.isoformat()}, + last_changed=one_second_past, + last_updated=now, + ) + state_changed_event = Event( + EVENT_STATE_CHANGED, + { + "entity_id": "sensor.test", + "old_state": None, + "new_state": mock_state, + }, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + custom_event = Event( + "custom_event", + {"entity_id": "sensor.custom"}, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + number_of_migrations = 5 + + def _get_event_id_foreign_keys(): + assert instance.engine is not None + return next( + ( + fk # type: ignore[misc] + for fk in inspect(instance.engine).get_foreign_keys("states") + if fk["constrained_columns"] == ["event_id"] + ), + None, + ) + + def _get_states_index_names(): + with session_scope(hass=hass) as session: + return inspect(session.connection()).get_indexes("states") + + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object( + recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION + ), + patch.object(core, "StatesMeta", old_db_schema.StatesMeta), + patch.object(core, "EventTypes", old_db_schema.EventTypes), + patch.object(core, "EventData", old_db_schema.EventData), + patch.object(core, "States", old_db_schema.States), + patch.object(core, "Events", old_db_schema.Events), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + patch.multiple( + "homeassistant.components.recorder.Recorder", + _migrate_events_context_ids=DEFAULT, + _migrate_states_context_ids=DEFAULT, + _migrate_event_type_ids=DEFAULT, + _migrate_entity_ids=DEFAULT, + _post_migrate_entity_ids=DEFAULT, + _cleanup_legacy_states_event_ids=DEFAULT, + ), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + def _add_data(): + with session_scope(hass=hass) as session: + session.add(old_db_schema.Events.from_event(custom_event)) + session.add(old_db_schema.States.from_event(state_changed_event)) + + await instance.async_add_executor_job(_add_data) + await hass.async_block_till_done() + await instance.async_block_till_done() + + await instance.async_add_executor_job( + migration._drop_index, + instance.get_session, + "states", + "ix_states_event_id", + ) + + states_indexes = await instance.async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is True + assert ( + await instance.async_add_executor_job(_get_event_id_foreign_keys) + is not None + ) + + await hass.async_stop() + await hass.async_block_till_done() + + assert "ix_states_entity_id_last_updated_ts" in states_index_names + + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + + # We need to wait for all the migration tasks to complete + # before we can check the database. + for _ in range(number_of_migrations): + await instance.async_block_till_done() + await async_wait_recording_done(hass) + + states_indexes = await instance.async_add_executor_job(_get_states_index_names) + states_index_names = {index["name"] for index in states_indexes} + assert "ix_states_entity_id_last_updated_ts" not in states_index_names + assert "ix_states_event_id" not in states_index_names + assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None await hass.async_stop() diff --git a/tests/conftest.py b/tests/conftest.py index b96bd783331..85f4671f6c0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1300,6 +1300,16 @@ def enable_migrate_entity_ids() -> bool: return False +@pytest.fixture +def enable_migrate_event_ids() -> bool: + """Fixture to control enabling of recorder's event id migration. + + To enable context id migration, tests can be marked with: + @pytest.mark.parametrize("enable_migrate_event_ids", [True]) + """ + return False + + @pytest.fixture def recorder_config() -> dict[str, Any] | None: """Fixture to override recorder config. @@ -1416,6 +1426,7 @@ async def async_test_recorder( enable_migrate_context_ids: bool, enable_migrate_event_type_ids: bool, enable_migrate_entity_ids: bool, + enable_migrate_event_ids: bool, ) -> AsyncGenerator[RecorderInstanceGenerator]: """Yield context manager to setup recorder instance.""" # pylint: disable-next=import-outside-toplevel @@ -1457,6 +1468,11 @@ async def async_test_recorder( migrate_entity_ids = ( recorder.Recorder._migrate_entity_ids if enable_migrate_entity_ids else None ) + legacy_event_id_foreign_key_exists = ( + recorder.Recorder._legacy_event_id_foreign_key_exists + if enable_migrate_event_ids + else None + ) with ( patch( "homeassistant.components.recorder.Recorder.async_nightly_tasks", @@ -1493,6 +1509,11 @@ async def async_test_recorder( side_effect=migrate_entity_ids, autospec=True, ), + patch( + "homeassistant.components.recorder.Recorder._legacy_event_id_foreign_key_exists", + side_effect=legacy_event_id_foreign_key_exists, + autospec=True, + ), patch( "homeassistant.components.recorder.Recorder._schedule_compile_missing_statistics", side_effect=compile_missing,