diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 9a19ff7084a..185be02e9aa 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -15,7 +15,6 @@ from uuid import UUID import sqlalchemy from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text, update from sqlalchemy.engine import CursorResult, Engine -from sqlalchemy.engine.interfaces import ReflectedForeignKeyConstraint from sqlalchemy.exc import ( DatabaseError, IntegrityError, @@ -645,7 +644,7 @@ def _update_states_table_with_foreign_key_options( def _drop_foreign_key_constraints( session_maker: Callable[[], Session], engine: Engine, table: str, column: str -) -> list[tuple[str, str, ReflectedForeignKeyConstraint]]: +) -> None: """Drop foreign key constraints for a table on specific columns. This is not supported for SQLite because it does not support @@ -658,11 +657,6 @@ def _drop_foreign_key_constraints( ) inspector = sqlalchemy.inspect(engine) - dropped_constraints = [ - (table, column, foreign_key) - for foreign_key in inspector.get_foreign_keys(table) - if foreign_key["name"] and foreign_key["constrained_columns"] == [column] - ] ## Find matching named constraints and bind the ForeignKeyConstraints to the table tmp_table = Table(table, MetaData()) @@ -685,8 +679,6 @@ def _drop_foreign_key_constraints( ) raise - return dropped_constraints - def _restore_foreign_key_constraints( session_maker: Callable[[], Session], diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 4bc317bdaa7..b56dfe3e189 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -7,7 +7,9 @@ import sys from unittest.mock import ANY, Mock, PropertyMock, call, patch import pytest -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine, inspect, text +from sqlalchemy.engine import Engine +from sqlalchemy.engine.interfaces import ReflectedForeignKeyConstraint from sqlalchemy.exc import ( DatabaseError, InternalError, @@ -973,31 +975,40 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: ], } + def find_constraints( + engine: Engine, table: str, column: str + ) -> list[tuple[str, str, ReflectedForeignKeyConstraint]]: + inspector = inspect(engine) + return [ + (table, column, foreign_key) + for foreign_key in inspector.get_foreign_keys(table) + if foreign_key["name"] and foreign_key["constrained_columns"] == [column] + ] + engine = create_engine(recorder_db_url) db_schema.Base.metadata.create_all(engine) + matching_constraints_1 = [ + dropped_constraint + for table, column, _, _ in constraints_to_recreate + for dropped_constraint in find_constraints(engine, table, column) + ] + assert matching_constraints_1 == expected_dropped_constraints[db_engine] + with Session(engine) as session: session_maker = Mock(return_value=session) - dropped_constraints_1 = [ - dropped_constraint - for table, column, _, _ in constraints_to_recreate - for dropped_constraint in migration._drop_foreign_key_constraints( + for table, column, _, _ in constraints_to_recreate: + migration._drop_foreign_key_constraints( session_maker, engine, table, column ) - ] - assert dropped_constraints_1 == expected_dropped_constraints[db_engine] # Check we don't find the constrained columns again (they are removed) - with Session(engine) as session: - session_maker = Mock(return_value=session) - dropped_constraints_2 = [ - dropped_constraint - for table, column, _, _ in constraints_to_recreate - for dropped_constraint in migration._drop_foreign_key_constraints( - session_maker, engine, table, column - ) - ] - assert dropped_constraints_2 == [] + matching_constraints_2 = [ + dropped_constraint + for table, column, _, _ in constraints_to_recreate + for dropped_constraint in find_constraints(engine, table, column) + ] + assert matching_constraints_2 == [] # Restore the constraints with Session(engine) as session: @@ -1007,16 +1018,12 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: ) # Check we do find the constrained columns again (they are restored) - with Session(engine) as session: - session_maker = Mock(return_value=session) - dropped_constraints_3 = [ - dropped_constraint - for table, column, _, _ in constraints_to_recreate - for dropped_constraint in migration._drop_foreign_key_constraints( - session_maker, engine, table, column - ) - ] - assert dropped_constraints_3 == expected_dropped_constraints[db_engine] + matching_constraints_3 = [ + dropped_constraint + for table, column, _, _ in constraints_to_recreate + for dropped_constraint in find_constraints(engine, table, column) + ] + assert matching_constraints_3 == expected_dropped_constraints[db_engine] engine.dispose()