mirror of
https://github.com/home-assistant/core.git
synced 2025-07-16 01:37:08 +00:00
Raise on database error in recorder.migration._drop_foreign_key_constraints (#123645)
* Raise on database error in recorder.migration._drop_foreign_key_constraints * Fix test * Fix test * Revert "Fix test" This reverts commit 940b8cb506e912826d43d09d7697c10888bdf685. * Update test * Improve test coverage * Disable test for SQLite
This commit is contained in:
parent
e6b3d35cdf
commit
667414a457
@ -644,7 +644,7 @@ def _update_states_table_with_foreign_key_options(
|
||||
|
||||
def _drop_foreign_key_constraints(
|
||||
session_maker: Callable[[], Session], engine: Engine, table: str, column: str
|
||||
) -> tuple[bool, list[tuple[str, str, ReflectedForeignKeyConstraint]]]:
|
||||
) -> list[tuple[str, str, ReflectedForeignKeyConstraint]]:
|
||||
"""Drop foreign key constraints for a table on specific columns.
|
||||
|
||||
This is not supported for SQLite because it does not support
|
||||
@ -671,7 +671,6 @@ def _drop_foreign_key_constraints(
|
||||
if foreign_key["name"] and foreign_key["constrained_columns"] == [column]
|
||||
]
|
||||
|
||||
fk_remove_ok = True
|
||||
for drop in drops:
|
||||
with session_scope(session=session_maker()) as session:
|
||||
try:
|
||||
@ -683,9 +682,9 @@ def _drop_foreign_key_constraints(
|
||||
TABLE_STATES,
|
||||
column,
|
||||
)
|
||||
fk_remove_ok = False
|
||||
raise
|
||||
|
||||
return fk_remove_ok, dropped_constraints
|
||||
return dropped_constraints
|
||||
|
||||
|
||||
def _restore_foreign_key_constraints(
|
||||
@ -2183,9 +2182,14 @@ def cleanup_legacy_states_event_ids(instance: Recorder) -> bool:
|
||||
# so we have to rebuild the table
|
||||
fk_remove_ok = rebuild_sqlite_table(session_maker, instance.engine, States)
|
||||
else:
|
||||
fk_remove_ok, _ = _drop_foreign_key_constraints(
|
||||
session_maker, instance.engine, TABLE_STATES, "event_id"
|
||||
)
|
||||
try:
|
||||
_drop_foreign_key_constraints(
|
||||
session_maker, instance.engine, TABLE_STATES, "event_id"
|
||||
)
|
||||
except (InternalError, OperationalError):
|
||||
fk_remove_ok = False
|
||||
else:
|
||||
fk_remove_ok = True
|
||||
if fk_remove_ok:
|
||||
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
|
||||
instance.use_legacy_events_index = False
|
||||
|
@ -1,12 +1,15 @@
|
||||
"""Fixtures for the recorder component tests."""
|
||||
|
||||
from collections.abc import AsyncGenerator, Generator
|
||||
from collections.abc import Callable, Generator
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from functools import partial
|
||||
import threading
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
from homeassistant.components import recorder
|
||||
from homeassistant.components.recorder import db_schema
|
||||
@ -57,31 +60,69 @@ def recorder_dialect_name(hass: HomeAssistant, db_engine: str) -> Generator[None
|
||||
class InstrumentedMigration:
|
||||
"""Container to aid controlling migration progress."""
|
||||
|
||||
migration_done: threading.Event
|
||||
live_migration_done: threading.Event
|
||||
live_migration_done_stall: threading.Event
|
||||
migration_stall: threading.Event
|
||||
migration_started: threading.Event
|
||||
migration_version: int | None
|
||||
non_live_migration_done: threading.Event
|
||||
non_live_migration_done_stall: threading.Event
|
||||
apply_update_mock: Mock
|
||||
stall_on_schema_version: int
|
||||
apply_update_stalled: threading.Event
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def instrument_migration(
|
||||
@pytest.fixture(name="instrument_migration")
|
||||
def instrument_migration_fixture(
|
||||
hass: HomeAssistant,
|
||||
) -> AsyncGenerator[InstrumentedMigration]:
|
||||
) -> Generator[InstrumentedMigration]:
|
||||
"""Instrument recorder migration."""
|
||||
with instrument_migration(hass) as instrumented_migration:
|
||||
yield instrumented_migration
|
||||
|
||||
|
||||
@contextmanager
|
||||
def instrument_migration(
|
||||
hass: HomeAssistant,
|
||||
) -> Generator[InstrumentedMigration]:
|
||||
"""Instrument recorder migration."""
|
||||
|
||||
real_migrate_schema_live = recorder.migration.migrate_schema_live
|
||||
real_migrate_schema_non_live = recorder.migration.migrate_schema_non_live
|
||||
real_apply_update = recorder.migration._apply_update
|
||||
|
||||
def _instrument_migrate_schema(real_func, *args):
|
||||
def _instrument_migrate_schema_live(real_func, *args):
|
||||
"""Control migration progress and check results."""
|
||||
return _instrument_migrate_schema(
|
||||
real_func,
|
||||
args,
|
||||
instrumented_migration.live_migration_done,
|
||||
instrumented_migration.live_migration_done_stall,
|
||||
)
|
||||
|
||||
def _instrument_migrate_schema_non_live(real_func, *args):
|
||||
"""Control migration progress and check results."""
|
||||
return _instrument_migrate_schema(
|
||||
real_func,
|
||||
args,
|
||||
instrumented_migration.non_live_migration_done,
|
||||
instrumented_migration.non_live_migration_done_stall,
|
||||
)
|
||||
|
||||
def _instrument_migrate_schema(
|
||||
real_func,
|
||||
args,
|
||||
migration_done: threading.Event,
|
||||
migration_done_stall: threading.Event,
|
||||
):
|
||||
"""Control migration progress and check results."""
|
||||
instrumented_migration.migration_started.set()
|
||||
|
||||
try:
|
||||
migration_result = real_func(*args)
|
||||
except Exception:
|
||||
instrumented_migration.migration_done.set()
|
||||
migration_done.set()
|
||||
migration_done_stall.wait()
|
||||
raise
|
||||
|
||||
# Check and report the outcome of the migration; if migration fails
|
||||
@ -93,22 +134,36 @@ async def instrument_migration(
|
||||
.first()
|
||||
)
|
||||
instrumented_migration.migration_version = res.schema_version
|
||||
instrumented_migration.migration_done.set()
|
||||
migration_done.set()
|
||||
migration_done_stall.wait()
|
||||
return migration_result
|
||||
|
||||
def _instrument_apply_update(*args):
|
||||
def _instrument_apply_update(
|
||||
instance: recorder.Recorder,
|
||||
hass: HomeAssistant,
|
||||
engine: Engine,
|
||||
session_maker: Callable[[], Session],
|
||||
new_version: int,
|
||||
old_version: int,
|
||||
):
|
||||
"""Control migration progress."""
|
||||
instrumented_migration.migration_stall.wait()
|
||||
real_apply_update(*args)
|
||||
if new_version == instrumented_migration.stall_on_schema_version:
|
||||
instrumented_migration.apply_update_stalled.set()
|
||||
instrumented_migration.migration_stall.wait()
|
||||
real_apply_update(
|
||||
instance, hass, engine, session_maker, new_version, old_version
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.migrate_schema_live",
|
||||
wraps=partial(_instrument_migrate_schema, real_migrate_schema_live),
|
||||
wraps=partial(_instrument_migrate_schema_live, real_migrate_schema_live),
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.migrate_schema_non_live",
|
||||
wraps=partial(_instrument_migrate_schema, real_migrate_schema_non_live),
|
||||
wraps=partial(
|
||||
_instrument_migrate_schema_non_live, real_migrate_schema_non_live
|
||||
),
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration._apply_update",
|
||||
@ -116,11 +171,18 @@ async def instrument_migration(
|
||||
) as apply_update_mock,
|
||||
):
|
||||
instrumented_migration = InstrumentedMigration(
|
||||
migration_done=threading.Event(),
|
||||
live_migration_done=threading.Event(),
|
||||
live_migration_done_stall=threading.Event(),
|
||||
migration_stall=threading.Event(),
|
||||
migration_started=threading.Event(),
|
||||
migration_version=None,
|
||||
non_live_migration_done=threading.Event(),
|
||||
non_live_migration_done_stall=threading.Event(),
|
||||
apply_update_mock=apply_update_mock,
|
||||
stall_on_schema_version=1,
|
||||
apply_update_stalled=threading.Event(),
|
||||
)
|
||||
|
||||
instrumented_migration.live_migration_done_stall.set()
|
||||
instrumented_migration.non_live_migration_done_stall.set()
|
||||
yield instrumented_migration
|
||||
|
@ -257,6 +257,76 @@ async def test_database_migration_failed_step_11(
|
||||
assert len(mock_dismiss.mock_calls) == expected_pn_dismiss
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
(
|
||||
"func_to_patch",
|
||||
"expected_setup_result",
|
||||
"expected_pn_create",
|
||||
"expected_pn_dismiss",
|
||||
),
|
||||
[
|
||||
("DropConstraint", False, 2, 1), # This makes migration to step 44 fail
|
||||
],
|
||||
)
|
||||
@pytest.mark.skip_on_db_engine(["sqlite"])
|
||||
@pytest.mark.usefixtures("skip_by_db_engine")
|
||||
async def test_database_migration_failed_step_44(
|
||||
hass: HomeAssistant,
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator,
|
||||
instrument_migration: InstrumentedMigration,
|
||||
func_to_patch: str,
|
||||
expected_setup_result: bool,
|
||||
expected_pn_create: int,
|
||||
expected_pn_dismiss: int,
|
||||
) -> None:
|
||||
"""Test we notify if the migration fails."""
|
||||
assert recorder.util.async_migration_in_progress(hass) is False
|
||||
instrument_migration.stall_on_schema_version = 44
|
||||
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.recorder.core.create_engine",
|
||||
new=create_engine_test,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.persistent_notification.create",
|
||||
side_effect=pn.create,
|
||||
) as mock_create,
|
||||
patch(
|
||||
"homeassistant.components.persistent_notification.dismiss",
|
||||
side_effect=pn.dismiss,
|
||||
) as mock_dismiss,
|
||||
):
|
||||
await async_setup_recorder_instance(
|
||||
hass,
|
||||
wait_recorder=False,
|
||||
wait_recorder_setup=False,
|
||||
expected_setup_result=expected_setup_result,
|
||||
)
|
||||
# Wait for migration to reach schema version 44
|
||||
await hass.async_add_executor_job(
|
||||
instrument_migration.apply_update_stalled.wait
|
||||
)
|
||||
|
||||
# Make it fail
|
||||
with patch(
|
||||
f"homeassistant.components.recorder.migration.{func_to_patch}",
|
||||
side_effect=OperationalError(
|
||||
None, None, OSError("No space left on device")
|
||||
),
|
||||
):
|
||||
instrument_migration.migration_stall.set()
|
||||
hass.states.async_set("my.entity", "on", {})
|
||||
hass.states.async_set("my.entity", "off", {})
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_add_executor_job(recorder.get_instance(hass).join)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert recorder.util.async_migration_in_progress(hass) is False
|
||||
assert len(mock_create.mock_calls) == expected_pn_create
|
||||
assert len(mock_dismiss.mock_calls) == expected_pn_dismiss
|
||||
|
||||
|
||||
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
|
||||
@pytest.mark.usefixtures("skip_by_db_engine")
|
||||
async def test_live_database_migration_encounters_corruption(
|
||||
@ -611,7 +681,7 @@ async def test_schema_migrate(
|
||||
assert recorder.util.async_migration_is_live(hass) == live
|
||||
instrument_migration.migration_stall.set()
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_add_executor_job(instrument_migration.migration_done.wait)
|
||||
await hass.async_add_executor_job(instrument_migration.live_migration_done.wait)
|
||||
await async_wait_recording_done(hass)
|
||||
assert instrument_migration.migration_version == db_schema.SCHEMA_VERSION
|
||||
assert setup_run.called
|
||||
@ -963,7 +1033,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in migration._drop_foreign_key_constraints(
|
||||
session_maker, engine, table, column
|
||||
)[1]
|
||||
)
|
||||
]
|
||||
assert dropped_constraints_1 == expected_dropped_constraints[db_engine]
|
||||
|
||||
@ -975,7 +1045,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in migration._drop_foreign_key_constraints(
|
||||
session_maker, engine, table, column
|
||||
)[1]
|
||||
)
|
||||
]
|
||||
assert dropped_constraints_2 == []
|
||||
|
||||
@ -994,7 +1064,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in migration._drop_foreign_key_constraints(
|
||||
session_maker, engine, table, column
|
||||
)[1]
|
||||
)
|
||||
]
|
||||
assert dropped_constraints_3 == expected_dropped_constraints[db_engine]
|
||||
|
||||
|
@ -19,6 +19,7 @@ from homeassistant.core import Event, EventOrigin, State
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .common import async_wait_recording_done
|
||||
from .conftest import instrument_migration
|
||||
|
||||
from tests.common import async_test_home_assistant
|
||||
from tests.typing import RecorderInstanceGenerator
|
||||
@ -637,6 +638,10 @@ async def test_out_of_disk_space_while_removing_foreign_key(
|
||||
|
||||
This case tests the migration still happens if
|
||||
ix_states_event_id is removed from the states table.
|
||||
|
||||
Note that the test is somewhat forced; the states.event_id foreign key constraint is
|
||||
removed when migrating to schema version 44, inspecting the schema in
|
||||
cleanup_legacy_states_event_ids is not likely to fail.
|
||||
"""
|
||||
importlib.import_module(SCHEMA_MODULE)
|
||||
old_db_schema = sys.modules[SCHEMA_MODULE]
|
||||
@ -737,36 +742,52 @@ async def test_out_of_disk_space_while_removing_foreign_key(
|
||||
|
||||
assert "ix_states_entity_id_last_updated_ts" in states_index_names
|
||||
|
||||
# Simulate out of disk space while removing the foreign key from the states table by
|
||||
# - patching DropConstraint to raise InternalError for MySQL and PostgreSQL
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.DropConstraint",
|
||||
side_effect=OperationalError(
|
||||
None, None, OSError("No space left on device")
|
||||
),
|
||||
),
|
||||
):
|
||||
async with (
|
||||
async_test_home_assistant() as hass,
|
||||
async_test_recorder(hass) as instance,
|
||||
):
|
||||
await hass.async_block_till_done()
|
||||
async with async_test_home_assistant() as hass:
|
||||
with instrument_migration(hass) as instrumented_migration:
|
||||
# Allow migration to start, but stall when live migration is completed
|
||||
instrumented_migration.migration_stall.set()
|
||||
instrumented_migration.live_migration_done_stall.clear()
|
||||
|
||||
# We need to wait for all the migration tasks to complete
|
||||
# before we can check the database.
|
||||
for _ in range(number_of_migrations):
|
||||
await instance.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
async with async_test_recorder(hass, wait_recorder=False) as instance:
|
||||
await hass.async_block_till_done()
|
||||
|
||||
states_indexes = await instance.async_add_executor_job(
|
||||
_get_states_index_names
|
||||
)
|
||||
states_index_names = {index["name"] for index in states_indexes}
|
||||
assert instance.use_legacy_events_index is True
|
||||
assert await instance.async_add_executor_job(_get_event_id_foreign_keys)
|
||||
# Wait for live migration to complete
|
||||
await hass.async_add_executor_job(
|
||||
instrumented_migration.live_migration_done.wait
|
||||
)
|
||||
|
||||
await hass.async_stop()
|
||||
# Simulate out of disk space while removing the foreign key from the states table by
|
||||
# - patching DropConstraint to raise InternalError for MySQL and PostgreSQL
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.sqlalchemy.inspect",
|
||||
side_effect=OperationalError(
|
||||
None, None, OSError("No space left on device")
|
||||
),
|
||||
),
|
||||
):
|
||||
instrumented_migration.live_migration_done_stall.set()
|
||||
# We need to wait for all the migration tasks to complete
|
||||
# before we can check the database.
|
||||
for _ in range(number_of_migrations):
|
||||
await instance.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
states_indexes = await instance.async_add_executor_job(
|
||||
_get_states_index_names
|
||||
)
|
||||
states_index_names = {index["name"] for index in states_indexes}
|
||||
assert instance.use_legacy_events_index is True
|
||||
# The states.event_id foreign key constraint was removed when
|
||||
# migration to schema version 44
|
||||
assert (
|
||||
await instance.async_add_executor_job(
|
||||
_get_event_id_foreign_keys
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
await hass.async_stop()
|
||||
|
||||
# Now run it again to verify the table rebuild tries again
|
||||
caplog.clear()
|
||||
|
Loading…
x
Reference in New Issue
Block a user