mirror of
https://github.com/home-assistant/core.git
synced 2025-07-15 01:07:10 +00:00
Prevent duplicating constraints during schema migration (#124616)
This commit is contained in:
parent
156948c496
commit
b9aaba0432
@ -695,6 +695,18 @@ def _restore_foreign_key_constraints(
|
||||
_LOGGER.info("Did not find a matching constraint for %s.%s", table, column)
|
||||
continue
|
||||
|
||||
inspector = sqlalchemy.inspect(engine)
|
||||
if any(
|
||||
foreign_key["name"] and foreign_key["constrained_columns"] == [column]
|
||||
for foreign_key in inspector.get_foreign_keys(table)
|
||||
):
|
||||
_LOGGER.info(
|
||||
"The database already has a matching constraint for %s.%s",
|
||||
table,
|
||||
column,
|
||||
)
|
||||
continue
|
||||
|
||||
if TYPE_CHECKING:
|
||||
assert foreign_table is not None
|
||||
assert foreign_column is not None
|
||||
|
@ -1028,6 +1028,287 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
|
||||
engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.skip_on_db_engine(["sqlite"])
|
||||
@pytest.mark.usefixtures("skip_by_db_engine")
|
||||
def test_restore_foreign_key_constraints_twice(recorder_db_url: str) -> None:
|
||||
"""Test we can drop and then restore foreign keys.
|
||||
|
||||
This is not supported on SQLite
|
||||
"""
|
||||
|
||||
constraints_to_recreate = (
|
||||
("events", "data_id", "event_data", "data_id"),
|
||||
("states", "event_id", None, None), # This won't be found
|
||||
("states", "old_state_id", "states", "state_id"),
|
||||
)
|
||||
|
||||
db_engine = recorder_db_url.partition("://")[0]
|
||||
|
||||
expected_dropped_constraints = {
|
||||
"mysql": [
|
||||
(
|
||||
"events",
|
||||
"data_id",
|
||||
{
|
||||
"constrained_columns": ["data_id"],
|
||||
"name": ANY,
|
||||
"options": {},
|
||||
"referred_columns": ["data_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "event_data",
|
||||
},
|
||||
),
|
||||
(
|
||||
"states",
|
||||
"old_state_id",
|
||||
{
|
||||
"constrained_columns": ["old_state_id"],
|
||||
"name": ANY,
|
||||
"options": {},
|
||||
"referred_columns": ["state_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "states",
|
||||
},
|
||||
),
|
||||
],
|
||||
"postgresql": [
|
||||
(
|
||||
"events",
|
||||
"data_id",
|
||||
{
|
||||
"comment": None,
|
||||
"constrained_columns": ["data_id"],
|
||||
"name": "events_data_id_fkey",
|
||||
"options": {},
|
||||
"referred_columns": ["data_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "event_data",
|
||||
},
|
||||
),
|
||||
(
|
||||
"states",
|
||||
"old_state_id",
|
||||
{
|
||||
"comment": None,
|
||||
"constrained_columns": ["old_state_id"],
|
||||
"name": "states_old_state_id_fkey",
|
||||
"options": {},
|
||||
"referred_columns": ["state_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "states",
|
||||
},
|
||||
),
|
||||
],
|
||||
}
|
||||
|
||||
def find_constraints(
|
||||
engine: Engine, table: str, column: str
|
||||
) -> list[tuple[str, str, ReflectedForeignKeyConstraint]]:
|
||||
inspector = inspect(engine)
|
||||
return [
|
||||
(table, column, foreign_key)
|
||||
for foreign_key in inspector.get_foreign_keys(table)
|
||||
if foreign_key["name"] and foreign_key["constrained_columns"] == [column]
|
||||
]
|
||||
|
||||
engine = create_engine(recorder_db_url)
|
||||
db_schema.Base.metadata.create_all(engine)
|
||||
|
||||
matching_constraints_1 = [
|
||||
dropped_constraint
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in find_constraints(engine, table, column)
|
||||
]
|
||||
assert matching_constraints_1 == expected_dropped_constraints[db_engine]
|
||||
|
||||
with Session(engine) as session:
|
||||
session_maker = Mock(return_value=session)
|
||||
for table, column, _, _ in constraints_to_recreate:
|
||||
migration._drop_foreign_key_constraints(
|
||||
session_maker, engine, table, column
|
||||
)
|
||||
|
||||
# Check we don't find the constrained columns again (they are removed)
|
||||
matching_constraints_2 = [
|
||||
dropped_constraint
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in find_constraints(engine, table, column)
|
||||
]
|
||||
assert matching_constraints_2 == []
|
||||
|
||||
# Restore the constraints
|
||||
with Session(engine) as session:
|
||||
session_maker = Mock(return_value=session)
|
||||
migration._restore_foreign_key_constraints(
|
||||
session_maker, engine, constraints_to_recreate
|
||||
)
|
||||
|
||||
# Restore the constraints again
|
||||
with Session(engine) as session:
|
||||
session_maker = Mock(return_value=session)
|
||||
migration._restore_foreign_key_constraints(
|
||||
session_maker, engine, constraints_to_recreate
|
||||
)
|
||||
|
||||
# Check we do find a single the constrained columns again (they are restored
|
||||
# only once, even though we called _restore_foreign_key_constraints twice)
|
||||
matching_constraints_3 = [
|
||||
dropped_constraint
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in find_constraints(engine, table, column)
|
||||
]
|
||||
assert matching_constraints_3 == expected_dropped_constraints[db_engine]
|
||||
|
||||
engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.skip_on_db_engine(["sqlite"])
|
||||
@pytest.mark.usefixtures("skip_by_db_engine")
|
||||
def test_drop_duplicated_foreign_key_constraints(recorder_db_url: str) -> None:
|
||||
"""Test we can drop and then restore foreign keys.
|
||||
|
||||
This is not supported on SQLite
|
||||
"""
|
||||
|
||||
constraints_to_recreate = (
|
||||
("events", "data_id", "event_data", "data_id"),
|
||||
("states", "event_id", None, None), # This won't be found
|
||||
("states", "old_state_id", "states", "state_id"),
|
||||
)
|
||||
|
||||
db_engine = recorder_db_url.partition("://")[0]
|
||||
|
||||
expected_dropped_constraints = {
|
||||
"mysql": [
|
||||
(
|
||||
"events",
|
||||
"data_id",
|
||||
{
|
||||
"constrained_columns": ["data_id"],
|
||||
"name": ANY,
|
||||
"options": {},
|
||||
"referred_columns": ["data_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "event_data",
|
||||
},
|
||||
),
|
||||
(
|
||||
"states",
|
||||
"old_state_id",
|
||||
{
|
||||
"constrained_columns": ["old_state_id"],
|
||||
"name": ANY,
|
||||
"options": {},
|
||||
"referred_columns": ["state_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "states",
|
||||
},
|
||||
),
|
||||
],
|
||||
"postgresql": [
|
||||
(
|
||||
"events",
|
||||
"data_id",
|
||||
{
|
||||
"comment": None,
|
||||
"constrained_columns": ["data_id"],
|
||||
"name": ANY,
|
||||
"options": {},
|
||||
"referred_columns": ["data_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "event_data",
|
||||
},
|
||||
),
|
||||
(
|
||||
"states",
|
||||
"old_state_id",
|
||||
{
|
||||
"comment": None,
|
||||
"constrained_columns": ["old_state_id"],
|
||||
"name": ANY,
|
||||
"options": {},
|
||||
"referred_columns": ["state_id"],
|
||||
"referred_schema": None,
|
||||
"referred_table": "states",
|
||||
},
|
||||
),
|
||||
],
|
||||
}
|
||||
|
||||
def find_constraints(
|
||||
engine: Engine, table: str, column: str
|
||||
) -> list[tuple[str, str, ReflectedForeignKeyConstraint]]:
|
||||
inspector = inspect(engine)
|
||||
return [
|
||||
(table, column, foreign_key)
|
||||
for foreign_key in inspector.get_foreign_keys(table)
|
||||
if foreign_key["name"] and foreign_key["constrained_columns"] == [column]
|
||||
]
|
||||
|
||||
engine = create_engine(recorder_db_url)
|
||||
db_schema.Base.metadata.create_all(engine)
|
||||
|
||||
# Create a duplicate of the constraints
|
||||
inspector = Mock(name="inspector")
|
||||
inspector.get_foreign_keys = Mock(name="get_foreign_keys", return_value=[])
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.sqlalchemy.inspect",
|
||||
return_value=inspector,
|
||||
),
|
||||
Session(engine) as session,
|
||||
):
|
||||
session_maker = Mock(return_value=session)
|
||||
migration._restore_foreign_key_constraints(
|
||||
session_maker, engine, constraints_to_recreate
|
||||
)
|
||||
|
||||
matching_constraints_1 = [
|
||||
dropped_constraint
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in find_constraints(engine, table, column)
|
||||
]
|
||||
_expected_dropped_constraints = [
|
||||
_dropped_constraint
|
||||
for dropped_constraint in expected_dropped_constraints[db_engine]
|
||||
for _dropped_constraint in (dropped_constraint, dropped_constraint)
|
||||
]
|
||||
assert matching_constraints_1 == _expected_dropped_constraints
|
||||
|
||||
with Session(engine) as session:
|
||||
session_maker = Mock(return_value=session)
|
||||
for table, column, _, _ in constraints_to_recreate:
|
||||
migration._drop_foreign_key_constraints(
|
||||
session_maker, engine, table, column
|
||||
)
|
||||
|
||||
# Check we don't find the constrained columns again (they are removed)
|
||||
matching_constraints_2 = [
|
||||
dropped_constraint
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in find_constraints(engine, table, column)
|
||||
]
|
||||
assert matching_constraints_2 == []
|
||||
|
||||
# Restore the constraints
|
||||
with Session(engine) as session:
|
||||
session_maker = Mock(return_value=session)
|
||||
migration._restore_foreign_key_constraints(
|
||||
session_maker, engine, constraints_to_recreate
|
||||
)
|
||||
|
||||
# Check we do find a single the constrained columns again (they are restored
|
||||
# only once, even though we called _restore_foreign_key_constraints twice)
|
||||
matching_constraints_3 = [
|
||||
dropped_constraint
|
||||
for table, column, _, _ in constraints_to_recreate
|
||||
for dropped_constraint in find_constraints(engine, table, column)
|
||||
]
|
||||
assert matching_constraints_3 == expected_dropped_constraints[db_engine]
|
||||
|
||||
engine.dispose()
|
||||
|
||||
|
||||
def test_restore_foreign_key_constraints_with_error(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
@ -1045,6 +1326,9 @@ def test_restore_foreign_key_constraints_with_error(
|
||||
instance = Mock()
|
||||
instance.get_session = Mock(return_value=session)
|
||||
engine = Mock()
|
||||
inspector = Mock(name="inspector")
|
||||
inspector.get_foreign_keys = Mock(name="get_foreign_keys", return_value=[])
|
||||
engine._sa_instance_state = inspector
|
||||
|
||||
session_maker = Mock(return_value=session)
|
||||
with pytest.raises(InternalError):
|
||||
|
Loading…
x
Reference in New Issue
Block a user