diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 8d4cc29d9be..dd293ed6bc2 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -77,7 +77,7 @@ class LegacyBase(DeclarativeBase): """Base class for tables, used for schema migration.""" -SCHEMA_VERSION = 44 +SCHEMA_VERSION = 45 _LOGGER = logging.getLogger(__name__) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index a41de07e243..55856dcf449 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -669,33 +669,177 @@ def _drop_foreign_key_constraints( def _restore_foreign_key_constraints( session_maker: Callable[[], Session], engine: Engine, - dropped_constraints: list[tuple[str, str, ReflectedForeignKeyConstraint]], + foreign_columns: list[tuple[str, str, str | None, str | None]], ) -> None: """Restore foreign key constraints.""" - for table, column, dropped_constraint in dropped_constraints: + for table, column, foreign_table, foreign_column in foreign_columns: constraints = Base.metadata.tables[table].foreign_key_constraints for constraint in constraints: if constraint.column_keys == [column]: break else: - _LOGGER.info( - "Did not find a matching constraint for %s", dropped_constraint - ) + _LOGGER.info("Did not find a matching constraint for %s.%s", table, column) continue + if TYPE_CHECKING: + assert foreign_table is not None + assert foreign_column is not None + # AddConstraint mutates the constraint passed to it, we need to # undo that to avoid changing the behavior of the table schema. # https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/sql/ddl.py#L746-L748 create_rule = constraint._create_rule # noqa: SLF001 add_constraint = AddConstraint(constraint) # type: ignore[no-untyped-call] constraint._create_rule = create_rule # noqa: SLF001 + try: + _add_constraint(session_maker, add_constraint, table, column) + except IntegrityError: + _LOGGER.exception( + ( + "Could not update foreign options in %s table, will delete " + "violations and try again" + ), + table, + ) + _delete_foreign_key_violations( + session_maker, engine, table, column, foreign_table, foreign_column + ) + _add_constraint(session_maker, add_constraint, table, column) - with session_scope(session=session_maker()) as session: - try: - connection = session.connection() - connection.execute(add_constraint) - except (InternalError, OperationalError): - _LOGGER.exception("Could not update foreign options in %s table", table) + +def _add_constraint( + session_maker: Callable[[], Session], + add_constraint: AddConstraint, + table: str, + column: str, +) -> None: + """Add a foreign key constraint.""" + _LOGGER.warning( + "Adding foreign key constraint to %s.%s. " + "Note: this can take several minutes on large databases and slow " + "machines. Please be patient!", + table, + column, + ) + with session_scope(session=session_maker()) as session: + try: + connection = session.connection() + connection.execute(add_constraint) + except (InternalError, OperationalError): + _LOGGER.exception("Could not update foreign options in %s table", table) + + +def _delete_foreign_key_violations( + session_maker: Callable[[], Session], + engine: Engine, + table: str, + column: str, + foreign_table: str, + foreign_column: str, +) -> None: + """Remove rows which violate the constraints.""" + if engine.dialect.name not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL): + raise RuntimeError( + f"_delete_foreign_key_violations not supported for {engine.dialect.name}" + ) + + _LOGGER.warning( + "Rows in table %s where %s references non existing %s.%s will be %s. " + "Note: this can take several minutes on large databases and slow " + "machines. Please be patient!", + table, + column, + foreign_table, + foreign_column, + "set to NULL" if table == foreign_table else "deleted", + ) + + result: CursorResult | None = None + if table == foreign_table: + # In case of a foreign reference to the same table, we set invalid + # references to NULL instead of deleting as deleting rows may + # cause additional invalid references to be created. This is to handle + # old_state_id referencing a missing state. + if engine.dialect.name == SupportedDialect.MYSQL: + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + # The subquery (SELECT {foreign_column} from {foreign_table}) is + # to be compatible with old MySQL versions which do not allow + # referencing the table being updated in the WHERE clause. + result = session.connection().execute( + text( + f"UPDATE {table} as t1 " # noqa: S608 + f"SET {column} = NULL " + "WHERE (" + f"t1.{column} IS NOT NULL AND " + "NOT EXISTS " + "(SELECT 1 " + f"FROM (SELECT {foreign_column} from {foreign_table}) AS t2 " + f"WHERE t2.{foreign_column} = t1.{column})) " + "LIMIT 100000;" + ) + ) + elif engine.dialect.name == SupportedDialect.POSTGRESQL: + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + # PostgreSQL does not support LIMIT in UPDATE clauses, so we + # update matches from a limited subquery instead. + result = session.connection().execute( + text( + f"UPDATE {table} " # noqa: S608 + f"SET {column} = NULL " + f"WHERE {column} in " + f"(SELECT {column} from {table} as t1 " + "WHERE (" + f"t1.{column} IS NOT NULL AND " + "NOT EXISTS " + "(SELECT 1 " + f"FROM {foreign_table} AS t2 " + f"WHERE t2.{foreign_column} = t1.{column})) " + "LIMIT 100000);" + ) + ) + return + + if engine.dialect.name == SupportedDialect.MYSQL: + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + result = session.connection().execute( + # We don't use an alias for the table we're deleting from, + # support of the form `DELETE FROM table AS t1` was added in + # MariaDB 11.6 and is not supported by MySQL. Those engines + # instead support the from `DELETE t1 from table AS t1` which + # is not supported by PostgreSQL and undocumented for MariaDB. + text( + f"DELETE FROM {table} " # noqa: S608 + "WHERE (" + f"{table}.{column} IS NOT NULL AND " + "NOT EXISTS " + "(SELECT 1 " + f"FROM {foreign_table} AS t2 " + f"WHERE t2.{foreign_column} = {table}.{column})) " + "LIMIT 100000;" + ) + ) + elif engine.dialect.name == SupportedDialect.POSTGRESQL: + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + # PostgreSQL does not support LIMIT in DELETE clauses, so we + # delete matches from a limited subquery instead. + result = session.connection().execute( + text( + f"DELETE FROM {table} " # noqa: S608 + f"WHERE {column} in " + f"(SELECT {column} from {table} as t1 " + "WHERE (" + f"t1.{column} IS NOT NULL AND " + "NOT EXISTS " + "(SELECT 1 " + f"FROM {foreign_table} AS t2 " + f"WHERE t2.{foreign_column} = t1.{column})) " + "LIMIT 100000);" + ) + ) @database_job_retry_wrapper("Apply migration update", 10) @@ -1459,6 +1603,38 @@ class _SchemaVersion43Migrator(_SchemaVersionMigrator, target_version=43): ) +FOREIGN_COLUMNS = ( + ( + "events", + ("data_id", "event_type_id"), + ( + ("data_id", "event_data", "data_id"), + ("event_type_id", "event_types", "event_type_id"), + ), + ), + ( + "states", + ("event_id", "old_state_id", "attributes_id", "metadata_id"), + ( + ("event_id", None, None), + ("old_state_id", "states", "state_id"), + ("attributes_id", "state_attributes", "attributes_id"), + ("metadata_id", "states_meta", "metadata_id"), + ), + ), + ( + "statistics", + ("metadata_id",), + (("metadata_id", "statistics_meta", "id"),), + ), + ( + "statistics_short_term", + ("metadata_id",), + (("metadata_id", "statistics_meta", "id"),), + ), +) + + class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44): def _apply_update(self) -> None: """Version specific update method.""" @@ -1471,24 +1647,14 @@ class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44): else "" ) # First drop foreign key constraints - foreign_columns = ( - ("events", ("data_id", "event_type_id")), - ("states", ("event_id", "old_state_id", "attributes_id", "metadata_id")), - ("statistics", ("metadata_id",)), - ("statistics_short_term", ("metadata_id",)), - ) - dropped_constraints = [ - dropped_constraint - for table, columns in foreign_columns - for column in columns - for dropped_constraint in _drop_foreign_key_constraints( - self.session_maker, self.engine, table, column - )[1] - ] - _LOGGER.debug("Dropped foreign key constraints: %s", dropped_constraints) + for table, columns, _ in FOREIGN_COLUMNS: + for column in columns: + _drop_foreign_key_constraints( + self.session_maker, self.engine, table, column + ) # Then modify the constrained columns - for table, columns in foreign_columns: + for table, columns, _ in FOREIGN_COLUMNS: _modify_columns( self.session_maker, self.engine, @@ -1518,9 +1684,24 @@ class _SchemaVersion44Migrator(_SchemaVersionMigrator, target_version=44): table, [f"{column} {BIG_INTEGER_SQL} {identity_sql}"], ) - # Finally restore dropped constraints + + +class _SchemaVersion45Migrator(_SchemaVersionMigrator, target_version=45): + def _apply_update(self) -> None: + """Version specific update method.""" + # We skip this step for SQLITE, it doesn't have differently sized integers + if self.engine.dialect.name == SupportedDialect.SQLITE: + return + + # Restore constraints dropped in migration to schema version 44 _restore_foreign_key_constraints( - self.session_maker, self.engine, dropped_constraints + self.session_maker, + self.engine, + [ + (table, column, foreign_table, foreign_column) + for table, _, foreign_mappings in FOREIGN_COLUMNS + for column, foreign_table, foreign_column in foreign_mappings + ], ) diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index e55793caad7..988eade29b6 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -831,9 +831,9 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: """ constraints_to_recreate = ( - ("events", "data_id"), - ("states", "event_id"), # This won't be found - ("states", "old_state_id"), + ("events", "data_id", "event_data", "data_id"), + ("states", "event_id", None, None), # This won't be found + ("states", "old_state_id", "states", "state_id"), ) db_engine = recorder_db_url.partition("://")[0] @@ -902,7 +902,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: session_maker = Mock(return_value=session) dropped_constraints_1 = [ dropped_constraint - for table, column in constraints_to_recreate + for table, column, _, _ in constraints_to_recreate for dropped_constraint in migration._drop_foreign_key_constraints( session_maker, engine, table, column )[1] @@ -914,7 +914,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: session_maker = Mock(return_value=session) dropped_constraints_2 = [ dropped_constraint - for table, column in constraints_to_recreate + for table, column, _, _ in constraints_to_recreate for dropped_constraint in migration._drop_foreign_key_constraints( session_maker, engine, table, column )[1] @@ -925,7 +925,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: with Session(engine) as session: session_maker = Mock(return_value=session) migration._restore_foreign_key_constraints( - session_maker, engine, dropped_constraints_1 + session_maker, engine, constraints_to_recreate ) # Check we do find the constrained columns again (they are restored) @@ -933,7 +933,7 @@ def test_drop_restore_foreign_key_constraints(recorder_db_url: str) -> None: session_maker = Mock(return_value=session) dropped_constraints_3 = [ dropped_constraint - for table, column in constraints_to_recreate + for table, column, _, _ in constraints_to_recreate for dropped_constraint in migration._drop_foreign_key_constraints( session_maker, engine, table, column )[1] @@ -951,21 +951,7 @@ def test_restore_foreign_key_constraints_with_error( This is not supported on SQLite """ - constraints_to_restore = [ - ( - "events", - "data_id", - { - "comment": None, - "constrained_columns": ["data_id"], - "name": "events_data_id_fkey", - "options": {}, - "referred_columns": ["data_id"], - "referred_schema": None, - "referred_table": "event_data", - }, - ), - ] + constraints_to_restore = [("events", "data_id", "event_data", "data_id")] connection = Mock() connection.execute = Mock(side_effect=InternalError(None, None, None)) @@ -981,3 +967,88 @@ def test_restore_foreign_key_constraints_with_error( ) assert "Could not update foreign options in events table" in caplog.text + + +@pytest.mark.skip_on_db_engine(["sqlite"]) +@pytest.mark.usefixtures("skip_by_db_engine") +def test_restore_foreign_key_constraints_with_integrity_error( + recorder_db_url: str, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test we can drop and then restore foreign keys. + + This is not supported on SQLite + """ + + constraints = ( + ("events", "data_id", "event_data", "data_id", Events), + ("states", "old_state_id", "states", "state_id", States), + ) + + engine = create_engine(recorder_db_url) + db_schema.Base.metadata.create_all(engine) + + # Drop constraints + with Session(engine) as session: + session_maker = Mock(return_value=session) + for table, column, _, _, _ in constraints: + migration._drop_foreign_key_constraints( + session_maker, engine, table, column + ) + + # Add rows violating the constraints + with Session(engine) as session: + for _, column, _, _, table_class in constraints: + session.add(table_class(**{column: 123})) + session.add(table_class()) + # Insert a States row referencing the row with an invalid foreign reference + session.add(States(old_state_id=1)) + session.commit() + + # Check we could insert the rows + with Session(engine) as session: + assert session.query(Events).count() == 2 + assert session.query(States).count() == 3 + + # Restore constraints + to_restore = [ + (table, column, foreign_table, foreign_column) + for table, column, foreign_table, foreign_column, _ in constraints + ] + with Session(engine) as session: + session_maker = Mock(return_value=session) + migration._restore_foreign_key_constraints(session_maker, engine, to_restore) + + # Check the violating row has been deleted from the Events table + with Session(engine) as session: + assert session.query(Events).count() == 1 + assert session.query(States).count() == 3 + + engine.dispose() + + assert ( + "Could not update foreign options in events table, " + "will delete violations and try again" + ) in caplog.text + + +def test_delete_foreign_key_violations_unsupported_engine( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test calling _delete_foreign_key_violations with an unsupported engine.""" + + connection = Mock() + connection.execute = Mock(side_effect=InternalError(None, None, None)) + session = Mock() + session.connection = Mock(return_value=connection) + instance = Mock() + instance.get_session = Mock(return_value=session) + engine = Mock() + engine.dialect = Mock() + engine.dialect.name = "sqlite" + + session_maker = Mock(return_value=session) + with pytest.raises( + RuntimeError, match="_delete_foreign_key_violations not supported for sqlite" + ): + migration._delete_foreign_key_violations(session_maker, engine, "", "", "", "")