Drop violating rows before adding foreign constraints in DB schema 44 migration (#123454)

* Drop violating rows before adding foreign constraints

* Don't delete rows with null-references

* Only delete rows when integrityerror is caught

* Move restore of dropped foreign key constraints to a separate migration step

* Use aliases for tables

* Update homeassistant/components/recorder/migration.py

* Update test

* Don't use alias for table we're deleting from, improve test

* Fix MySQL

* Update instead of deleting in case of self references

* Improve log messages

* Batch updates

* Add workaround for unsupported LIMIT in PostgreSQL

* Simplify

---------

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Erik Montnemery 2024-08-14 09:31:37 +02:00 committed by GitHub
parent 29887c2a17
commit b7bbc938d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 304 additions and 52 deletions

View File

@ -77,7 +77,7 @@ class LegacyBase(DeclarativeBase):
"""Base class for tables, used for schema migration."""
SCHEMA_VERSION = 44
SCHEMA_VERSION = 45
_LOGGER = logging.getLogger(__name__)

View File

@ -669,33 +669,177 @@ def _drop_foreign_key_constraints(
def _restore_foreign_key_constraints(
session_maker: Callable[[], Session],
engine: Engine,
dropped_constraints: list[tuple[str, str, ReflectedForeignKeyConstraint]],
foreign_columns: list[tuple[str, str, str | None, str | None]],
) -> None:
"""Restore foreign key constraints."""
for table, column, dropped_constraint in dropped_constraints:
for table, column, foreign_table, foreign_column in foreign_columns:
constraints = Base.metadata.tables[table].foreign_key_constraints
for constraint in constraints:
if constraint.column_keys == [column]:
break
else:
_LOGGER.info(
"Did not find a matching constraint for %s", dropped_constraint
)
_LOGGER.info("Did not find a matching constraint for %s.%s", table, column)
continue
if TYPE_CHECKING:
assert foreign_table is not None
assert foreign_column is not None
# AddConstraint mutates the constraint passed to it, we need to
# undo that to avoid changing the behavior of the table schema.
# https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/sql/ddl.py#L746-L748
create_rule = constraint._create_rule # noqa: SLF001
add_constraint = AddConstraint(constraint) # type: ignore[no-untyped-call]
constraint._create_rule = create_rule # noqa: SLF001
try:
_add_constraint(session_maker, add_constraint, table, column)
except IntegrityError:
_LOGGER.exception(
(
"Could not update foreign options in %s table, will delete "
"violations and try again"
),
table,
)
_delete_foreign_key_violations(
session_maker, engine, table, column, foreign_table, foreign_column
)
_add_constraint(session_maker, add_constraint, table, column)
with session_scope(session=session_maker()) as session:
try:
connection = session.connection()
connection.execute(add_constraint)
except (InternalError, OperationalError):
_LOGGER.exception("Could not update foreign options in %s table", table)
def _add_constraint(
session_maker: Callable[[], Session],
add_constraint: AddConstraint,
table: str,
column: str,
) -> None:
"""Add a foreign key constraint."""
_LOGGER.warning(
"Adding foreign key constraint to %s.%s. "
"Note: this can take several minutes on large databases and slow "
"machines. Please be patient!",
table,
column,
)
with session_scope(session=session_maker()) as session:
try:
connection = session.connection()
connection.execute(add_constraint)
except (InternalError, OperationalError):
_LOGGER.exception("Could not update foreign options in %s table", table)
def _delete_foreign_key_violations(
session_maker: Callable[[], Session],
engine: Engine,
table: str,
column: str,
foreign_table: str,
foreign_column: str,
) -> None:
"""Remove rows which violate the constraints."""
if engine.dialect.name not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
raise RuntimeError(
f"_delete_foreign_key_violations not supported for {engine.dialect.name}"
)
_LOGGER.warning(
"Rows in table %s where %s references non existing %s.%s will be %s. "
"Note: this can take several minutes on large databases and slow "
"machines. Please be patient!",
table,
column,
foreign_table,
foreign_column,
"set to NULL" if table == foreign_table else "deleted",
)
result: CursorResult | None = None
if table == foreign_table:
# In case of a foreign reference to the same table, we set invalid
# references to NULL instead of deleting as deleting rows may
# cause additional invalid references to be created. This is to handle
# old_state_id referencing a missing state.
if engine.dialect.name == SupportedDialect.MYSQL:
while result is None or result.rowcount > 0:
with session_scope(session=session_maker()) as session:
# The subquery (SELECT {foreign_column} from {foreign_table}) is
# to be compatible with old MySQL versions which do not allow
# referencing the table being updated in the WHERE clause.
result = session.connection().execute(
text(
f"UPDATE {table} as t1 " # noqa: S608
f"SET {column} = NULL "
"WHERE ("
f"t1.{column} IS NOT NULL AND "
"NOT EXISTS "
"(SELECT 1 "
f"FROM (SELECT {foreign_column} from {foreign_table}) AS t2 "
f"WHERE t2.{foreign_column} = t1.{column})) "
"LIMIT 100000;"
)
)
elif engine.dialect.name == SupportedDialect.POSTGRESQL:
while result is None or result.rowcount > 0:
with session_scope(session=session_maker()) as session:
# PostgreSQL does not support LIMIT in UPDATE clauses, so we
# update matches from a limited subquery instead.
result = session.connection().execute(
text(
f"UPDATE {table} " # noqa: S608
f"SET {column} = NULL "
f"WHERE {column} in "
f"(SELECT {column} from {table} as t1 "
"WHERE ("
f"t1.{column} IS NOT NULL AND "
"NOT EXISTS "
"(SELECT 1 "
f"FROM {foreign_table} AS t2 "
f"WHERE t2.{foreign_column} = t1.{column})) "
"LIMIT 100000);"
)
)
return
if engine.dialect.name == SupportedDialect.MYSQL:
while result is None or result.rowcount > 0:
with session_scope(session=session_maker()) as session:
result = session.connection().execute(
# We don't use an alias for the table we're deleting from,
# support of the form `DELETE FROM table AS t1` was added in
# MariaDB 11.6 and is not supported by MySQL. Those engines
# instead support the from `DELETE t1 from table AS t1` which
# is not supported by PostgreSQL and undocumented for MariaDB.
text(
f"DELETE FROM {table} " # noqa: S608
"WHERE ("
f"{table}.{column} IS NOT NULL AND "
"NOT EXISTS "
"(SELECT 1 "
f"FROM {foreign_table} AS t2 "
f"WHERE t2.{foreign_column} = {table}.{column})) "
"LIMIT 100000;"
)
)
elif engine.dialect.name == SupportedDialect.POSTGRESQL:
while result is None or result.rowcount > 0:
with session_scope(session=session_maker()) as session:
# PostgreSQL does not support LIMIT in DELETE clauses, so we
# delete matches from a limited subquery instead.
result = session.connection().execute(
text(
f"DELETE FROM {table} " # noqa: S608
f"WHERE {column} in "
f"(SELECT {column} from {table} as t1 "
"WHERE ("
f"t1.{column} IS NOT NULL AND "
"NOT EXISTS "
"(SELECT 1 "
f"FROM {foreign_table} AS t2 "
f"WHERE t2.{foreign_column} = t1.{column})) "
"LIMIT 100000);"
)
)
@database_job_retry_wrapper("Apply migration update", 10)
@ -1459,6 +1603,38 @@ class _SchemaVersion43Migrator(_SchemaVersionMigrator, target_version=43):
)
FOREIGN_COLUMNS = (
(
"events",
("data_id", "event_type_id"),
(
("data_id", "event_data", "data_id"),
("event_type_id", "event_types", "event_type_id"),
),
),
(
"states",
("event_id", "old_state_id", "attributes_id", "metadata_id"),
(
("event_id", None, None),
("old_state_id", "states", "state_id"),
("attributes_id", "state_attributes", "attributes_id"),
("metadata_id", "states_meta", "metadata_id"),
),
),
(
"statistics",
("metadata_id",),
(("metadata_id", "statistics_meta", "id"),),
),
(
"statistics_short_term",
("metadata_id",),
(("metadata_id", "statistics_meta", "id"),),
),
)
class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44):
def _apply_update(self) -> None:
"""Version specific update method."""
@ -1471,24 +1647,14 @@ class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44):
else ""
)
# First drop foreign key constraints
foreign_columns = (
("events", ("data_id", "event_type_id")),
("states", ("event_id", "old_state_id", "attributes_id", "metadata_id")),
("statistics", ("metadata_id",)),
("statistics_short_term", ("metadata_id",)),
)
dropped_constraints = [
dropped_constraint
for table, columns in foreign_columns
for column in columns
for dropped_constraint in _drop_foreign_key_constraints(
self.session_maker, self.engine, table, column
)[1]
]
_LOGGER.debug("Dropped foreign key constraints: %s", dropped_constraints)
for table, columns, _ in FOREIGN_COLUMNS:
for column in columns:
_drop_foreign_key_constraints(
self.session_maker, self.engine, table, column
)
# Then modify the constrained columns
for table, columns in foreign_columns:
for table, columns, _ in FOREIGN_COLUMNS:
_modify_columns(
self.session_maker,
self.engine,
@ -1518,9 +1684,24 @@ class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44):
table,
[f"{column} {BIG_INTEGER_SQL} {identity_sql}"],
)
# Finally restore dropped constraints
class _SchemaVersion45Migrator(_SchemaVersionMigrator, target_version=45):
def _apply_update(self) -> None:
"""Version specific update method."""
# We skip this step for SQLITE, it doesn't have differently sized integers
if self.engine.dialect.name == SupportedDialect.SQLITE:
return
# Restore constraints dropped in migration to schema version 44
_restore_foreign_key_constraints(
self.session_maker, self.engine, dropped_constraints
self.session_maker,
self.engine,
[
(table, column, foreign_table, foreign_column)
for table, _, foreign_mappings in FOREIGN_COLUMNS
for column, foreign_table, foreign_column in foreign_mappings
],
)

View File

@ -831,9 +831,9 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
"""
constraints_to_recreate = (
("events", "data_id"),
("states", "event_id"), # This won't be found
("states", "old_state_id"),
("events", "data_id", "event_data", "data_id"),
("states", "event_id", None, None), # This won't be found
("states", "old_state_id", "states", "state_id"),
)
db_engine = recorder_db_url.partition("://")[0]
@ -902,7 +902,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
session_maker = Mock(return_value=session)
dropped_constraints_1 = [
dropped_constraint
for table, column in constraints_to_recreate
for table, column, _, _ in constraints_to_recreate
for dropped_constraint in migration._drop_foreign_key_constraints(
session_maker, engine, table, column
)[1]
@ -914,7 +914,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
session_maker = Mock(return_value=session)
dropped_constraints_2 = [
dropped_constraint
for table, column in constraints_to_recreate
for table, column, _, _ in constraints_to_recreate
for dropped_constraint in migration._drop_foreign_key_constraints(
session_maker, engine, table, column
)[1]
@ -925,7 +925,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
with Session(engine) as session:
session_maker = Mock(return_value=session)
migration._restore_foreign_key_constraints(
session_maker, engine, dropped_constraints_1
session_maker, engine, constraints_to_recreate
)
# Check we do find the constrained columns again (they are restored)
@ -933,7 +933,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None:
session_maker = Mock(return_value=session)
dropped_constraints_3 = [
dropped_constraint
for table, column in constraints_to_recreate
for table, column, _, _ in constraints_to_recreate
for dropped_constraint in migration._drop_foreign_key_constraints(
session_maker, engine, table, column
)[1]
@ -951,21 +951,7 @@ def test_restore_foreign_key_constraints_with_error(
This is not supported on SQLite
"""
constraints_to_restore = [
(
"events",
"data_id",
{
"comment": None,
"constrained_columns": ["data_id"],
"name": "events_data_id_fkey",
"options": {},
"referred_columns": ["data_id"],
"referred_schema": None,
"referred_table": "event_data",
},
),
]
constraints_to_restore = [("events", "data_id", "event_data", "data_id")]
connection = Mock()
connection.execute = Mock(side_effect=InternalError(None, None, None))
@ -981,3 +967,88 @@ def test_restore_foreign_key_constraints_with_error(
)
assert "Could not update foreign options in events table" in caplog.text
@pytest.mark.skip_on_db_engine(["sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
def test_restore_foreign_key_constraints_with_integrity_error(
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test we can drop and then restore foreign keys.
This is not supported on SQLite
"""
constraints = (
("events", "data_id", "event_data", "data_id", Events),
("states", "old_state_id", "states", "state_id", States),
)
engine = create_engine(recorder_db_url)
db_schema.Base.metadata.create_all(engine)
# Drop constraints
with Session(engine) as session:
session_maker = Mock(return_value=session)
for table, column, _, _, _ in constraints:
migration._drop_foreign_key_constraints(
session_maker, engine, table, column
)
# Add rows violating the constraints
with Session(engine) as session:
for _, column, _, _, table_class in constraints:
session.add(table_class(**{column: 123}))
session.add(table_class())
# Insert a States row referencing the row with an invalid foreign reference
session.add(States(old_state_id=1))
session.commit()
# Check we could insert the rows
with Session(engine) as session:
assert session.query(Events).count() == 2
assert session.query(States).count() == 3
# Restore constraints
to_restore = [
(table, column, foreign_table, foreign_column)
for table, column, foreign_table, foreign_column, _ in constraints
]
with Session(engine) as session:
session_maker = Mock(return_value=session)
migration._restore_foreign_key_constraints(session_maker, engine, to_restore)
# Check the violating row has been deleted from the Events table
with Session(engine) as session:
assert session.query(Events).count() == 1
assert session.query(States).count() == 3
engine.dispose()
assert (
"Could not update foreign options in events table, "
"will delete violations and try again"
) in caplog.text
def test_delete_foreign_key_violations_unsupported_engine(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test calling _delete_foreign_key_violations with an unsupported engine."""
connection = Mock()
connection.execute = Mock(side_effect=InternalError(None, None, None))
session = Mock()
session.connection = Mock(return_value=connection)
instance = Mock()
instance.get_session = Mock(return_value=session)
engine = Mock()
engine.dialect = Mock()
engine.dialect.name = "sqlite"
session_maker = Mock(return_value=session)
with pytest.raises(
RuntimeError, match="_delete_foreign_key_violations not supported for sqlite"
):
migration._delete_foreign_key_violations(session_maker, engine, "", "", "", "")