Ensure legacy event foreign key is removed from the states table when a previous rebuild failed (#123388)

* Ensure legacy event foreign key is removed from the states table

If the system ran out of disk space removing the FK, it would
fail. #121938 fixed that to try again, however that PR was made
ineffective by #122069 since it will never reach the check.

To solve this, the migration version is incremented to 2, and
the migration is no longer marked as done unless the rebuild
/fk removal is successful.

* fix logic for mysql

* fix test

* asserts

* coverage

* coverage

* narrow test

* fixes

* split tests

* should have skipped

* fixture must be used
This commit is contained in:
J. Nick Koston 2024-08-08 23:19:12 -05:00 committed by GitHub
parent 03ba8f6173
commit 00c1a3fd4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 368 additions and 16 deletions

View File

@ -632,7 +632,7 @@ def _update_states_table_with_foreign_key_options(
def _drop_foreign_key_constraints(
session_maker: Callable[[], Session], engine: Engine, table: str, column: str
) -> list[tuple[str, str, ReflectedForeignKeyConstraint]]:
) -> tuple[bool, list[tuple[str, str, ReflectedForeignKeyConstraint]]]:
"""Drop foreign key constraints for a table on specific columns."""
inspector = sqlalchemy.inspect(engine)
dropped_constraints = [
@ -649,6 +649,7 @@ def _drop_foreign_key_constraints(
if foreign_key["name"] and foreign_key["constrained_columns"] == [column]
]
fk_remove_ok = True
for drop in drops:
with session_scope(session=session_maker()) as session:
try:
@ -660,8 +661,9 @@ def _drop_foreign_key_constraints(
TABLE_STATES,
column,
)
fk_remove_ok = False
return dropped_constraints
return fk_remove_ok, dropped_constraints
def _restore_foreign_key_constraints(
@ -1481,7 +1483,7 @@ class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44):
for column in columns
for dropped_constraint in _drop_foreign_key_constraints(
self.session_maker, self.engine, table, column
)
)[1]
]
_LOGGER.debug("Dropped foreign key constraints: %s", dropped_constraints)
@ -1956,14 +1958,15 @@ def cleanup_legacy_states_event_ids(instance: Recorder) -> bool:
if instance.dialect_name == SupportedDialect.SQLITE:
# SQLite does not support dropping foreign key constraints
# so we have to rebuild the table
rebuild_sqlite_table(session_maker, instance.engine, States)
fk_remove_ok = rebuild_sqlite_table(session_maker, instance.engine, States)
else:
_drop_foreign_key_constraints(
fk_remove_ok, _ = _drop_foreign_key_constraints(
session_maker, instance.engine, TABLE_STATES, "event_id"
)
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
instance.use_legacy_events_index = False
_mark_migration_done(session, EventIDPostMigration)
if fk_remove_ok:
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
instance.use_legacy_events_index = False
_mark_migration_done(session, EventIDPostMigration)
return True
@ -2419,6 +2422,7 @@ class EventIDPostMigration(BaseRunTimeMigration):
migration_id = "event_id_post_migration"
task = MigrationTask
migration_version = 2
@staticmethod
def migrate_data(instance: Recorder) -> bool:
@ -2469,7 +2473,7 @@ def _mark_migration_done(
def rebuild_sqlite_table(
session_maker: Callable[[], Session], engine: Engine, table: type[Base]
) -> None:
) -> bool:
"""Rebuild an SQLite table.
This must only be called after all migrations are complete
@ -2524,8 +2528,10 @@ def rebuild_sqlite_table(
# Swallow the exception since we do not want to ever raise
# an integrity error as it would cause the database
# to be discarded and recreated from scratch
return False
else:
_LOGGER.warning("Rebuilding SQLite table %s finished", orig_name)
return True
finally:
with session_scope(session=session_maker()) as session:
# Step 12 - Re-enable foreign keys

View File

@ -748,7 +748,7 @@ def test_rebuild_sqlite_states_table(recorder_db_url: str) -> None:
session.add(States(state="on"))
session.commit()
migration.rebuild_sqlite_table(session_maker, engine, States)
assert migration.rebuild_sqlite_table(session_maker, engine, States) is True
with session_scope(session=session_maker()) as session:
assert session.query(States).count() == 1
@ -776,13 +776,13 @@ def test_rebuild_sqlite_states_table_missing_fails(
session.connection().execute(text("DROP TABLE states"))
session.commit()
migration.rebuild_sqlite_table(session_maker, engine, States)
assert migration.rebuild_sqlite_table(session_maker, engine, States) is False
assert "Error recreating SQLite table states" in caplog.text
caplog.clear()
# Now rebuild the events table to make sure the database did not
# get corrupted
migration.rebuild_sqlite_table(session_maker, engine, Events)
assert migration.rebuild_sqlite_table(session_maker, engine, Events) is True
with session_scope(session=session_maker()) as session:
assert session.query(Events).count() == 1
@ -812,7 +812,7 @@ def test_rebuild_sqlite_states_table_extra_columns(
text("ALTER TABLE states ADD COLUMN extra_column TEXT")
)
migration.rebuild_sqlite_table(session_maker, engine, States)
assert migration.rebuild_sqlite_table(session_maker, engine, States) is True
assert "Error recreating SQLite table states" not in caplog.text
with session_scope(session=session_maker()) as session:
@ -905,7 +905,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
for table, column in constraints_to_recreate
for dropped_constraint in migration._drop_foreign_key_constraints(
session_maker, engine, table, column
)
)[1]
]
assert dropped_constraints_1 == expected_dropped_constraints[db_engine]
@ -917,7 +917,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
for table, column in constraints_to_recreate
for dropped_constraint in migration._drop_foreign_key_constraints(
session_maker, engine, table, column
)
)[1]
]
assert dropped_constraints_2 == []
@ -936,7 +936,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
for table, column in constraints_to_recreate
for dropped_constraint in migration._drop_foreign_key_constraints(
session_maker, engine, table, column
)
)[1]
]
assert dropped_constraints_3 == expected_dropped_constraints[db_engine]

View File

@ -7,6 +7,7 @@ from unittest.mock import patch
import pytest
from sqlalchemy import create_engine, inspect
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm import Session
from homeassistant.components import recorder
@ -444,3 +445,348 @@ async def test_migrate_can_resume_ix_states_event_id_removed(
assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None
await hass.async_stop()
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.parametrize("enable_migrate_event_ids", [True])
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_out_of_disk_space_while_rebuild_states_table(
async_test_recorder: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
recorder_db_url: str,
) -> None:
"""Test that we can recover from out of disk space while rebuilding the states table.
This case tests the migration still happens if
ix_states_event_id is removed from the states table.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
now = dt_util.utcnow()
one_second_past = now - timedelta(seconds=1)
mock_state = State(
"sensor.test",
"old",
{"last_reset": now.isoformat()},
last_changed=one_second_past,
last_updated=now,
)
state_changed_event = Event(
EVENT_STATE_CHANGED,
{
"entity_id": "sensor.test",
"old_state": None,
"new_state": mock_state,
},
EventOrigin.local,
time_fired_timestamp=now.timestamp(),
)
custom_event = Event(
"custom_event",
{"entity_id": "sensor.custom"},
EventOrigin.local,
time_fired_timestamp=now.timestamp(),
)
number_of_migrations = 5
def _get_event_id_foreign_keys():
assert instance.engine is not None
return next(
(
fk # type: ignore[misc]
for fk in inspect(instance.engine).get_foreign_keys("states")
if fk["constrained_columns"] == ["event_id"]
),
None,
)
def _get_states_index_names():
with session_scope(hass=hass) as session:
return inspect(session.connection()).get_indexes("states")
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
patch(
"homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids"
),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
def _add_data():
with session_scope(hass=hass) as session:
session.add(old_db_schema.Events.from_event(custom_event))
session.add(old_db_schema.States.from_event(state_changed_event))
await instance.async_add_executor_job(_add_data)
await hass.async_block_till_done()
await instance.async_block_till_done()
await instance.async_add_executor_job(
migration._drop_index,
instance.get_session,
"states",
"ix_states_event_id",
)
states_indexes = await instance.async_add_executor_job(
_get_states_index_names
)
states_index_names = {index["name"] for index in states_indexes}
assert instance.use_legacy_events_index is True
assert (
await instance.async_add_executor_job(_get_event_id_foreign_keys)
is not None
)
await hass.async_stop()
await hass.async_block_till_done()
assert "ix_states_entity_id_last_updated_ts" in states_index_names
# Simulate out of disk space while rebuilding the states table by
# - patching CreateTable to raise SQLAlchemyError for SQLite
# - patching DropConstraint to raise InternalError for MySQL and PostgreSQL
with (
patch(
"homeassistant.components.recorder.migration.CreateTable",
side_effect=SQLAlchemyError,
),
patch(
"homeassistant.components.recorder.migration.DropConstraint",
side_effect=OperationalError(
None, None, OSError("No space left on device")
),
),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_block_till_done()
# We need to wait for all the migration tasks to complete
# before we can check the database.
for _ in range(number_of_migrations):
await instance.async_block_till_done()
await async_wait_recording_done(hass)
states_indexes = await instance.async_add_executor_job(
_get_states_index_names
)
states_index_names = {index["name"] for index in states_indexes}
assert instance.use_legacy_events_index is True
assert "Error recreating SQLite table states" in caplog.text
assert await instance.async_add_executor_job(_get_event_id_foreign_keys)
await hass.async_stop()
# Now run it again to verify the table rebuild tries again
caplog.clear()
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_block_till_done()
# We need to wait for all the migration tasks to complete
# before we can check the database.
for _ in range(number_of_migrations):
await instance.async_block_till_done()
await async_wait_recording_done(hass)
states_indexes = await instance.async_add_executor_job(_get_states_index_names)
states_index_names = {index["name"] for index in states_indexes}
assert instance.use_legacy_events_index is False
assert "ix_states_entity_id_last_updated_ts" not in states_index_names
assert "ix_states_event_id" not in states_index_names
assert "Rebuilding SQLite table states finished" in caplog.text
assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None
await hass.async_stop()
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.skip_on_db_engine(["sqlite"])
@pytest.mark.parametrize("enable_migrate_event_ids", [True])
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_out_of_disk_space_while_removing_foreign_key(
async_test_recorder: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
recorder_db_url: str,
) -> None:
"""Test that we can recover from out of disk space while removing the foreign key.
This case tests the migration still happens if
ix_states_event_id is removed from the states table.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
now = dt_util.utcnow()
one_second_past = now - timedelta(seconds=1)
mock_state = State(
"sensor.test",
"old",
{"last_reset": now.isoformat()},
last_changed=one_second_past,
last_updated=now,
)
state_changed_event = Event(
EVENT_STATE_CHANGED,
{
"entity_id": "sensor.test",
"old_state": None,
"new_state": mock_state,
},
EventOrigin.local,
time_fired_timestamp=now.timestamp(),
)
custom_event = Event(
"custom_event",
{"entity_id": "sensor.custom"},
EventOrigin.local,
time_fired_timestamp=now.timestamp(),
)
number_of_migrations = 5
def _get_event_id_foreign_keys():
assert instance.engine is not None
return next(
(
fk # type: ignore[misc]
for fk in inspect(instance.engine).get_foreign_keys("states")
if fk["constrained_columns"] == ["event_id"]
),
None,
)
def _get_states_index_names():
with session_scope(hass=hass) as session:
return inspect(session.connection()).get_indexes("states")
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData),
patch.object(core, "States", old_db_schema.States),
patch.object(core, "Events", old_db_schema.Events),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
patch(
"homeassistant.components.recorder.migration.cleanup_legacy_states_event_ids"
),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
def _add_data():
with session_scope(hass=hass) as session:
session.add(old_db_schema.Events.from_event(custom_event))
session.add(old_db_schema.States.from_event(state_changed_event))
await instance.async_add_executor_job(_add_data)
await hass.async_block_till_done()
await instance.async_block_till_done()
await instance.async_add_executor_job(
migration._drop_index,
instance.get_session,
"states",
"ix_states_event_id",
)
states_indexes = await instance.async_add_executor_job(
_get_states_index_names
)
states_index_names = {index["name"] for index in states_indexes}
assert instance.use_legacy_events_index is True
assert (
await instance.async_add_executor_job(_get_event_id_foreign_keys)
is not None
)
await hass.async_stop()
await hass.async_block_till_done()
assert "ix_states_entity_id_last_updated_ts" in states_index_names
# Simulate out of disk space while removing the foreign key from the states table by
# - patching DropConstraint to raise InternalError for MySQL and PostgreSQL
with (
patch(
"homeassistant.components.recorder.migration.DropConstraint",
side_effect=OperationalError(
None, None, OSError("No space left on device")
),
),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_block_till_done()
# We need to wait for all the migration tasks to complete
# before we can check the database.
for _ in range(number_of_migrations):
await instance.async_block_till_done()
await async_wait_recording_done(hass)
states_indexes = await instance.async_add_executor_job(
_get_states_index_names
)
states_index_names = {index["name"] for index in states_indexes}
assert instance.use_legacy_events_index is True
assert await instance.async_add_executor_job(_get_event_id_foreign_keys)
await hass.async_stop()
# Now run it again to verify the table rebuild tries again
caplog.clear()
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_block_till_done()
# We need to wait for all the migration tasks to complete
# before we can check the database.
for _ in range(number_of_migrations):
await instance.async_block_till_done()
await async_wait_recording_done(hass)
states_indexes = await instance.async_add_executor_job(_get_states_index_names)
states_index_names = {index["name"] for index in states_indexes}
assert instance.use_legacy_events_index is False
assert "ix_states_entity_id_last_updated_ts" not in states_index_names
assert "ix_states_event_id" not in states_index_names
assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None
await hass.async_stop()