diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 089baea6759..17edf1a59fc 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -576,7 +576,13 @@ def _update_states_table_with_foreign_key_options( connection.execute(DropConstraint(alter["old_fk"])) # type: ignore[no-untyped-call] for fkc in states_key_constraints: if fkc.column_keys == alter["columns"]: - connection.execute(AddConstraint(fkc)) # type: ignore[no-untyped-call] + # AddConstraint mutates the constraint passed to it, we need to + # undo that to avoid changing the behavior of the table schema. + # https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/sql/ddl.py#L746-L748 + create_rule = fkc._create_rule # noqa: SLF001 + add_constraint = AddConstraint(fkc) # type: ignore[no-untyped-call] + fkc._create_rule = create_rule # noqa: SLF001 + connection.execute(add_constraint) except (InternalError, OperationalError): _LOGGER.exception( "Could not update foreign options in %s table", TABLE_STATES @@ -634,10 +640,17 @@ def _restore_foreign_key_constraints( ) continue + # AddConstraint mutates the constraint passed to it, we need to + # undo that to avoid changing the behavior of the table schema. + # https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/sql/ddl.py#L746-L748 + create_rule = constraint._create_rule # noqa: SLF001 + add_constraint = AddConstraint(constraint) # type: ignore[no-untyped-call] + constraint._create_rule = create_rule # noqa: SLF001 + with session_scope(session=session_maker()) as session: try: connection = session.connection() - connection.execute(AddConstraint(constraint)) # type: ignore[no-untyped-call] + connection.execute(add_constraint) except (InternalError, OperationalError): _LOGGER.exception("Could not update foreign options in %s table", table) diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 795c8d020dc..682c0a55767 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -4,7 +4,7 @@ import datetime import importlib import sqlite3 import sys -from unittest.mock import Mock, PropertyMock, call, patch +from unittest.mock import ANY, Mock, PropertyMock, call, patch import pytest from sqlalchemy import create_engine, text @@ -688,6 +688,127 @@ def test_rebuild_sqlite_states_table_extra_columns( engine.dispose() +@pytest.mark.skip_on_db_engine(["sqlite"]) +@pytest.mark.usefixtures("skip_by_db_engine") +def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: + """Test we can drop and then restore foreign keys. + + This is not supported on SQLite + """ + + constraints_to_recreate = ( + ("events", "data_id"), + ("states", "event_id"), # This won't be found + ("states", "old_state_id"), + ) + + db_engine = recorder_db_url.partition("://")[0] + + expected_dropped_constraints = { + "mysql": [ + ( + "events", + "data_id", + { + "constrained_columns": ["data_id"], + "name": ANY, + "options": {}, + "referred_columns": ["data_id"], + "referred_schema": None, + "referred_table": "event_data", + }, + ), + ( + "states", + "old_state_id", + { + "constrained_columns": ["old_state_id"], + "name": ANY, + "options": {}, + "referred_columns": ["state_id"], + "referred_schema": None, + "referred_table": "states", + }, + ), + ], + "postgresql": [ + ( + "events", + "data_id", + { + "comment": None, + "constrained_columns": ["data_id"], + "name": "events_data_id_fkey", + "options": {}, + "referred_columns": ["data_id"], + "referred_schema": None, + "referred_table": "event_data", + }, + ), + ( + "states", + "old_state_id", + { + "comment": None, + "constrained_columns": ["old_state_id"], + "name": "states_old_state_id_fkey", + "options": {}, + "referred_columns": ["state_id"], + "referred_schema": None, + "referred_table": "states", + }, + ), + ], + } + + engine = create_engine(recorder_db_url) + db_schema.Base.metadata.create_all(engine) + + with Session(engine) as session: + session_maker = Mock(return_value=session) + dropped_constraints_1 = [ + dropped_constraint + for table, column in constraints_to_recreate + for dropped_constraint in migration._drop_foreign_key_constraints( + session_maker, engine, table, column + ) + ] + assert dropped_constraints_1 == expected_dropped_constraints[db_engine] + + # Check we don't find the constrained columns again (they are removed) + with Session(engine) as session: + session_maker = Mock(return_value=session) + dropped_constraints_2 = [ + dropped_constraint + for table, column in constraints_to_recreate + for dropped_constraint in migration._drop_foreign_key_constraints( + session_maker, engine, table, column + ) + ] + assert dropped_constraints_2 == [] + + # Restore the constraints + with Session(engine) as session: + session_maker = Mock(return_value=session) + migration._restore_foreign_key_constraints( + session_maker, engine, dropped_constraints_1 + ) + + # Check we do find the constrained columns again (they are restored) + with Session(engine) as session: + session_maker = Mock(return_value=session) + dropped_constraints_3 = [ + dropped_constraint + for table, column in constraints_to_recreate + for dropped_constraint in migration._drop_foreign_key_constraints( + session_maker, engine, table, column + ) + ] + assert dropped_constraints_3 == expected_dropped_constraints[db_engine] + + engine.dispose() + + def test_restore_foreign_key_constraints_with_error( caplog: pytest.LogCaptureFixture, ) -> None: