From 00c1a3fd4e473d415d89447cb2b89326dd8eeac3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 8 Aug 2024 23:19:12 -0500 Subject: [PATCH] Ensure legacy event foreign key is removed from the states table when a previous rebuild failed (#123388) * Ensure legacy event foreign key is removed from the states table If the system ran out of disk space removing the FK, it would fail. #121938 fixed that to try again, however that PR was made ineffective by #122069 since it will never reach the check. To solve this, the migration version is incremented to 2, and the migration is no longer marked as done unless the rebuild /fk removal is successful. * fix logic for mysql * fix test * asserts * coverage * coverage * narrow test * fixes * split tests * should have skipped * fixture must be used --- .../components/recorder/migration.py | 24 +- tests/components/recorder/test_migrate.py | 14 +- .../components/recorder/test_v32_migration.py | 346 ++++++++++++++++++ 3 files changed, 368 insertions(+), 16 deletions(-) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 2932ea484c9..a41de07e243 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -632,7 +632,7 @@ def _update_states_table_with_foreign_key_options( def _drop_foreign_key_constraints( session_maker: Callable[[], Session], engine: Engine, table: str, column: str -) -> list[tuple[str, str, ReflectedForeignKeyConstraint]]: +) -> tuple[bool, list[tuple[str, str, ReflectedForeignKeyConstraint]]]: """Drop foreign key constraints for a table on specific columns.""" inspector = sqlalchemy.inspect(engine) dropped_constraints = [ @@ -649,6 +649,7 @@ def _drop_foreign_key_constraints( if foreign_key["name"] and foreign_key["constrained_columns"] == [column] ] + fk_remove_ok = True for drop in drops: with session_scope(session=session_maker()) as session: try: @@ -660,8 +661,9 @@ def _drop_foreign_key_constraints( TABLE_STATES, column, ) + fk_remove_ok = False - return dropped_constraints + return fk_remove_ok, dropped_constraints def _restore_foreign_key_constraints( @@ -1481,7 +1483,7 @@ class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44): for column in columns for dropped_constraint in _drop_foreign_key_constraints( self.session_maker, self.engine, table, column - ) + )[1] ] _LOGGER.debug("Dropped foreign key constraints: %s", dropped_constraints) @@ -1956,14 +1958,15 @@ def cleanup_legacy_states_event_ids(instance: Recorder) -> bool: if instance.dialect_name == SupportedDialect.SQLITE: # SQLite does not support dropping foreign key constraints # so we have to rebuild the table - rebuild_sqlite_table(session_maker, instance.engine, States) + fk_remove_ok = rebuild_sqlite_table(session_maker, instance.engine, States) else: - _drop_foreign_key_constraints( + fk_remove_ok, _ = _drop_foreign_key_constraints( session_maker, instance.engine, TABLE_STATES, "event_id" ) - _drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) - instance.use_legacy_events_index = False - _mark_migration_done(session, EventIDPostMigration) + if fk_remove_ok: + _drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) + instance.use_legacy_events_index = False + _mark_migration_done(session, EventIDPostMigration) return True @@ -2419,6 +2422,7 @@ class EventIDPostMigration(BaseRunTimeMigration): migration_id = "event_id_post_migration" task = MigrationTask + migration_version = 2 @staticmethod def migrate_data(instance: Recorder) -> bool: @@ -2469,7 +2473,7 @@ def _mark_migration_done( def rebuild_sqlite_table( session_maker: Callable[[], Session], engine: Engine, table: type[Base] -) -> None: +) -> bool: """Rebuild an SQLite table. This must only be called after all migrations are complete @@ -2524,8 +2528,10 @@ def rebuild_sqlite_table( # Swallow the exception since we do not want to ever raise # an integrity error as it would cause the database # to be discarded and recreated from scratch + return False else: _LOGGER.warning("Rebuilding SQLite table %s finished", orig_name) + return True finally: with session_scope(session=session_maker()) as session: # Step 12 - Re-enable foreign keys diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index dc99ddefa3b..e55793caad7 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -748,7 +748,7 @@ def test_rebuild_sqlite_states_table(recorder_db_url: str) -> None: session.add(States(state="on")) session.commit() - migration.rebuild_sqlite_table(session_maker, engine, States) + assert migration.rebuild_sqlite_table(session_maker, engine, States) is True with session_scope(session=session_maker()) as session: assert session.query(States).count() == 1 @@ -776,13 +776,13 @@ def test_rebuild_sqlite_states_table_missing_fails( session.connection().execute(text("DROP TABLE states")) session.commit() - migration.rebuild_sqlite_table(session_maker, engine, States) + assert migration.rebuild_sqlite_table(session_maker, engine, States) is False assert "Error recreating SQLite table states" in caplog.text caplog.clear() # Now rebuild the events table to make sure the database did not # get corrupted - migration.rebuild_sqlite_table(session_maker, engine, Events) + assert migration.rebuild_sqlite_table(session_maker, engine, Events) is True with session_scope(session=session_maker()) as session: assert session.query(Events).count() == 1 @@ -812,7 +812,7 @@ def test_rebuild_sqlite_states_table_extra_columns( text("ALTER TABLE states ADD COLUMN extra_column TEXT") ) - migration.rebuild_sqlite_table(session_maker, engine, States) + assert migration.rebuild_sqlite_table(session_maker, engine, States) is True assert "Error recreating SQLite table states" not in caplog.text with session_scope(session=session_maker()) as session: @@ -905,7 +905,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: for table, column in constraints_to_recreate for dropped_constraint in migration._drop_foreign_key_constraints( session_maker, engine, table, column - ) + )[1] ] assert dropped_constraints_1 == expected_dropped_constraints[db_engine] @@ -917,7 +917,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: for table, column in constraints_to_recreate for dropped_constraint in migration._drop_foreign_key_constraints( session_maker, engine, table, column - ) + )[1] ] assert dropped_constraints_2 == [] @@ -936,7 +936,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: for table, column in constraints_to_recreate for dropped_constraint in migration._drop_foreign_key_constraints( session_maker, engine, table, column - ) + )[1] ] assert dropped_constraints_3 == expected_dropped_constraints[db_engine] diff --git a/tests/components/recorder/test_v32_migration.py b/tests/components/recorder/test_v32_migration.py index 9956fec8a09..5266e55851c 100644 --- a/tests/components/recorder/test_v32_migration.py +++ b/tests/components/recorder/test_v32_migration.py @@ -7,6 +7,7 @@ from unittest.mock import patch import pytest from sqlalchemy import create_engine, inspect +from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.orm import Session from homeassistant.components import recorder @@ -444,3 +445,348 @@ async def test_migrate_can_resume_ix_states_event_id_removed( assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None await hass.async_stop() + + +@pytest.mark.usefixtures("skip_by_db_engine") +@pytest.mark.skip_on_db_engine(["mysql", "postgresql"]) +@pytest.mark.parametrize("enable_migrate_event_ids", [True]) +@pytest.mark.parametrize("persistent_database", [True]) +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage +async def test_out_of_disk_space_while_rebuild_states_table( + async_test_recorder: RecorderInstanceGenerator, + caplog: pytest.LogCaptureFixture, + recorder_db_url: str, +) -> None: + """Test that we can recover from out of disk space while rebuilding the states table. + + This case tests the migration still happens if + ix_states_event_id is removed from the states table. + """ + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_second_past = now - timedelta(seconds=1) + mock_state = State( + "sensor.test", + "old", + {"last_reset": now.isoformat()}, + last_changed=one_second_past, + last_updated=now, + ) + state_changed_event = Event( + EVENT_STATE_CHANGED, + { + "entity_id": "sensor.test", + "old_state": None, + "new_state": mock_state, + }, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + custom_event = Event( + "custom_event", + {"entity_id": "sensor.custom"}, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + number_of_migrations = 5 + + def _get_event_id_foreign_keys(): + assert instance.engine is not None + return next( + ( + fk # type: ignore[misc] + for fk in inspect(instance.engine).get_foreign_keys("states") + if fk["constrained_columns"] == ["event_id"] + ), + None, + ) + + def _get_states_index_names(): + with session_scope(hass=hass) as session: + return inspect(session.connection()).get_indexes("states") + + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object( + recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION + ), + patch.object(core, "StatesMeta", old_db_schema.StatesMeta), + patch.object(core, "EventTypes", old_db_schema.EventTypes), + patch.object(core, "EventData", old_db_schema.EventData), + patch.object(core, "States", old_db_schema.States), + patch.object(core, "Events", old_db_schema.Events), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"), + patch( + "homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids" + ), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + def _add_data(): + with session_scope(hass=hass) as session: + session.add(old_db_schema.Events.from_event(custom_event)) + session.add(old_db_schema.States.from_event(state_changed_event)) + + await instance.async_add_executor_job(_add_data) + await hass.async_block_till_done() + await instance.async_block_till_done() + + await instance.async_add_executor_job( + migration._drop_index, + instance.get_session, + "states", + "ix_states_event_id", + ) + + states_indexes = await instance.async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is True + assert ( + await instance.async_add_executor_job(_get_event_id_foreign_keys) + is not None + ) + + await hass.async_stop() + await hass.async_block_till_done() + + assert "ix_states_entity_id_last_updated_ts" in states_index_names + + # Simulate out of disk space while rebuilding the states table by + # - patching CreateTable to raise SQLAlchemyError for SQLite + # - patching DropConstraint to raise InternalError for MySQL and PostgreSQL + with ( + patch( + "homeassistant.components.recorder.migration.CreateTable", + side_effect=SQLAlchemyError, + ), + patch( + "homeassistant.components.recorder.migration.DropConstraint", + side_effect=OperationalError( + None, None, OSError("No space left on device") + ), + ), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + + # We need to wait for all the migration tasks to complete + # before we can check the database. + for _ in range(number_of_migrations): + await instance.async_block_till_done() + await async_wait_recording_done(hass) + + states_indexes = await instance.async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is True + assert "Error recreating SQLite table states" in caplog.text + assert await instance.async_add_executor_job(_get_event_id_foreign_keys) + + await hass.async_stop() + + # Now run it again to verify the table rebuild tries again + caplog.clear() + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + + # We need to wait for all the migration tasks to complete + # before we can check the database. + for _ in range(number_of_migrations): + await instance.async_block_till_done() + await async_wait_recording_done(hass) + + states_indexes = await instance.async_add_executor_job(_get_states_index_names) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is False + assert "ix_states_entity_id_last_updated_ts" not in states_index_names + assert "ix_states_event_id" not in states_index_names + assert "Rebuilding SQLite table states finished" in caplog.text + assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None + + await hass.async_stop() + + +@pytest.mark.usefixtures("skip_by_db_engine") +@pytest.mark.skip_on_db_engine(["sqlite"]) +@pytest.mark.parametrize("enable_migrate_event_ids", [True]) +@pytest.mark.parametrize("persistent_database", [True]) +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage +async def test_out_of_disk_space_while_removing_foreign_key( + async_test_recorder: RecorderInstanceGenerator, + caplog: pytest.LogCaptureFixture, + recorder_db_url: str, +) -> None: + """Test that we can recover from out of disk space while removing the foreign key. + + This case tests the migration still happens if + ix_states_event_id is removed from the states table. + """ + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_second_past = now - timedelta(seconds=1) + mock_state = State( + "sensor.test", + "old", + {"last_reset": now.isoformat()}, + last_changed=one_second_past, + last_updated=now, + ) + state_changed_event = Event( + EVENT_STATE_CHANGED, + { + "entity_id": "sensor.test", + "old_state": None, + "new_state": mock_state, + }, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + custom_event = Event( + "custom_event", + {"entity_id": "sensor.custom"}, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + number_of_migrations = 5 + + def _get_event_id_foreign_keys(): + assert instance.engine is not None + return next( + ( + fk # type: ignore[misc] + for fk in inspect(instance.engine).get_foreign_keys("states") + if fk["constrained_columns"] == ["event_id"] + ), + None, + ) + + def _get_states_index_names(): + with session_scope(hass=hass) as session: + return inspect(session.connection()).get_indexes("states") + + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object( + recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION + ), + patch.object(core, "StatesMeta", old_db_schema.StatesMeta), + patch.object(core, "EventTypes", old_db_schema.EventTypes), + patch.object(core, "EventData", old_db_schema.EventData), + patch.object(core, "States", old_db_schema.States), + patch.object(core, "Events", old_db_schema.Events), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"), + patch( + "homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids" + ), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + def _add_data(): + with session_scope(hass=hass) as session: + session.add(old_db_schema.Events.from_event(custom_event)) + session.add(old_db_schema.States.from_event(state_changed_event)) + + await instance.async_add_executor_job(_add_data) + await hass.async_block_till_done() + await instance.async_block_till_done() + + await instance.async_add_executor_job( + migration._drop_index, + instance.get_session, + "states", + "ix_states_event_id", + ) + + states_indexes = await instance.async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is True + assert ( + await instance.async_add_executor_job(_get_event_id_foreign_keys) + is not None + ) + + await hass.async_stop() + await hass.async_block_till_done() + + assert "ix_states_entity_id_last_updated_ts" in states_index_names + + # Simulate out of disk space while removing the foreign key from the states table by + # - patching DropConstraint to raise InternalError for MySQL and PostgreSQL + with ( + patch( + "homeassistant.components.recorder.migration.DropConstraint", + side_effect=OperationalError( + None, None, OSError("No space left on device") + ), + ), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + + # We need to wait for all the migration tasks to complete + # before we can check the database. + for _ in range(number_of_migrations): + await instance.async_block_till_done() + await async_wait_recording_done(hass) + + states_indexes = await instance.async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is True + assert await instance.async_add_executor_job(_get_event_id_foreign_keys) + + await hass.async_stop() + + # Now run it again to verify the table rebuild tries again + caplog.clear() + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + + # We need to wait for all the migration tasks to complete + # before we can check the database. + for _ in range(number_of_migrations): + await instance.async_block_till_done() + await async_wait_recording_done(hass) + + states_indexes = await instance.async_add_executor_job(_get_states_index_names) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is False + assert "ix_states_entity_id_last_updated_ts" not in states_index_names + assert "ix_states_event_id" not in states_index_names + assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None + + await hass.async_stop()