Ensure indices needed by data migrators exist (#133367)

* Ensure indices needed by data migrators exist

* Update test

* Improve test

* Ignore index error on char(0) columns

* Adjust tests

* Address review comments

* Add comment motivating magic number
This commit is contained in:
Erik Montnemery 2024-12-18 21:29:52 +01:00 committed by GitHub
parent 8a8be71f96
commit ff8bc763c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 340 additions and 136 deletions

View File

@ -6,7 +6,7 @@ from collections.abc import Callable
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import time import time
from typing import Any, Self, cast from typing import Any, Final, Self, cast
import ciso8601 import ciso8601
from fnv_hash_fast import fnv1a_32 from fnv_hash_fast import fnv1a_32
@ -130,7 +130,8 @@ METADATA_ID_LAST_UPDATED_INDEX_TS = "ix_states_metadata_id_last_updated_ts"
EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id" LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id"
LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated_ts" LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX = "ix_states_entity_id_last_updated_ts"
LEGACY_MAX_LENGTH_EVENT_CONTEXT_ID: Final = 36
CONTEXT_ID_BIN_MAX_LENGTH = 16 CONTEXT_ID_BIN_MAX_LENGTH = 16
MYSQL_COLLATE = "utf8mb4_unicode_ci" MYSQL_COLLATE = "utf8mb4_unicode_ci"
@ -350,6 +351,17 @@ class Events(Base):
return None return None
class LegacyEvents(LegacyBase):
"""Event history data with event_id, used for schema migration."""
__table_args__ = (_DEFAULT_TABLE_ARGS,)
__tablename__ = TABLE_EVENTS
event_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
context_id: Mapped[str | None] = mapped_column(
String(LEGACY_MAX_LENGTH_EVENT_CONTEXT_ID), index=True
)
class EventData(Base): class EventData(Base):
"""Event data history.""" """Event data history."""
@ -575,6 +587,28 @@ class States(Base):
) )
class LegacyStates(LegacyBase):
"""State change history with entity_id, used for schema migration."""
__table_args__ = (
Index(
LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX,
"entity_id",
"last_updated_ts",
),
_DEFAULT_TABLE_ARGS,
)
__tablename__ = TABLE_STATES
state_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
entity_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
last_updated_ts: Mapped[float | None] = mapped_column(
TIMESTAMP_TYPE, default=time.time, index=True
)
context_id: Mapped[str | None] = mapped_column(
String(LEGACY_MAX_LENGTH_EVENT_CONTEXT_ID), index=True
)
class StateAttributes(Base): class StateAttributes(Base):
"""State attribute change history.""" """State attribute change history."""

View File

@ -23,6 +23,7 @@ from sqlalchemy.exc import (
ProgrammingError, ProgrammingError,
SQLAlchemyError, SQLAlchemyError,
) )
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.schema import AddConstraint, CreateTable, DropConstraint from sqlalchemy.schema import AddConstraint, CreateTable, DropConstraint
from sqlalchemy.sql.expression import true from sqlalchemy.sql.expression import true
@ -59,7 +60,7 @@ from .db_schema import (
BIG_INTEGER_SQL, BIG_INTEGER_SQL,
CONTEXT_ID_BIN_MAX_LENGTH, CONTEXT_ID_BIN_MAX_LENGTH,
DOUBLE_PRECISION_TYPE_SQL, DOUBLE_PRECISION_TYPE_SQL,
LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX, LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX,
LEGACY_STATES_EVENT_ID_INDEX, LEGACY_STATES_EVENT_ID_INDEX,
MYSQL_COLLATE, MYSQL_COLLATE,
MYSQL_DEFAULT_CHARSET, MYSQL_DEFAULT_CHARSET,
@ -169,6 +170,24 @@ _COLUMN_TYPES_FOR_DIALECT: dict[SupportedDialect | None, _ColumnTypesForDialect]
} }
def _unindexable_legacy_column(
instance: Recorder, base: type[DeclarativeBase], err: Exception
) -> bool:
"""Ignore index errors on char(0) columns."""
# The error code is hard coded because the PyMySQL library may not be
# installed when using other database engines than MySQL or MariaDB.
# 1167: The used storage engine can't index column '%s'
return bool(
base == LegacyBase
and isinstance(err, OperationalError)
and instance.engine
and instance.engine.dialect.name == SupportedDialect.MYSQL
and isinstance(err.orig, BaseException)
and err.orig.args
and err.orig.args[0] == 1167
)
def raise_if_exception_missing_str(ex: Exception, match_substrs: Iterable[str]) -> None: def raise_if_exception_missing_str(ex: Exception, match_substrs: Iterable[str]) -> None:
"""Raise if the exception and cause do not contain the match substrs.""" """Raise if the exception and cause do not contain the match substrs."""
lower_ex_strs = [str(ex).lower(), str(ex.__cause__).lower()] lower_ex_strs = [str(ex).lower(), str(ex.__cause__).lower()]
@ -471,14 +490,19 @@ def migrate_data_live(
def _create_index( def _create_index(
session_maker: Callable[[], Session], table_name: str, index_name: str instance: Recorder,
session_maker: Callable[[], Session],
table_name: str,
index_name: str,
*,
base: type[DeclarativeBase] = Base,
) -> None: ) -> None:
"""Create an index for the specified table. """Create an index for the specified table.
The index name should match the name given for the index The index name should match the name given for the index
within the table definition described in the models within the table definition described in the models
""" """
table = Table(table_name, Base.metadata) table = Table(table_name, base.metadata)
_LOGGER.debug("Looking up index %s for table %s", index_name, table_name) _LOGGER.debug("Looking up index %s for table %s", index_name, table_name)
# Look up the index object by name from the table is the models # Look up the index object by name from the table is the models
index_list = [idx for idx in table.indexes if idx.name == index_name] index_list = [idx for idx in table.indexes if idx.name == index_name]
@ -498,10 +522,18 @@ def _create_index(
connection = session.connection() connection = session.connection()
index.create(connection) index.create(connection)
except (InternalError, OperationalError, ProgrammingError) as err: except (InternalError, OperationalError, ProgrammingError) as err:
if _unindexable_legacy_column(instance, base, err):
_LOGGER.debug(
"Can't add legacy index %s to column %s, continuing",
index_name,
table_name,
)
return
raise_if_exception_missing_str(err, ["already exists", "duplicate"]) raise_if_exception_missing_str(err, ["already exists", "duplicate"])
_LOGGER.warning( _LOGGER.warning(
"Index %s already exists on %s, continuing", index_name, table_name "Index %s already exists on %s, continuing", index_name, table_name
) )
return
_LOGGER.warning("Finished adding index `%s` to table `%s`", index_name, table_name) _LOGGER.warning("Finished adding index `%s` to table `%s`", index_name, table_name)
@ -1040,7 +1072,12 @@ class _SchemaVersion2Migrator(_SchemaVersionMigrator, target_version=2):
def _apply_update(self) -> None: def _apply_update(self) -> None:
"""Version specific update method.""" """Version specific update method."""
# Create compound start/end index for recorder_runs # Create compound start/end index for recorder_runs
_create_index(self.session_maker, "recorder_runs", "ix_recorder_runs_start_end") _create_index(
self.instance,
self.session_maker,
"recorder_runs",
"ix_recorder_runs_start_end",
)
# This used to create ix_states_last_updated bit it was removed in version 32 # This used to create ix_states_last_updated bit it was removed in version 32
@ -1075,7 +1112,9 @@ class _SchemaVersion5Migrator(_SchemaVersionMigrator, target_version=5):
def _apply_update(self) -> None: def _apply_update(self) -> None:
"""Version specific update method.""" """Version specific update method."""
# Create supporting index for States.event_id foreign key # Create supporting index for States.event_id foreign key
_create_index(self.session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) _create_index(
self.instance, self.session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX
)
class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6): class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6):
@ -1086,7 +1125,9 @@ class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6):
"events", "events",
["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"], ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"],
) )
_create_index(self.session_maker, "events", "ix_events_context_id") _create_index(
self.instance, self.session_maker, "events", "ix_events_context_id"
)
# This used to create ix_events_context_user_id, # This used to create ix_events_context_user_id,
# but it was removed in version 28 # but it was removed in version 28
_add_columns( _add_columns(
@ -1094,7 +1135,9 @@ class _SchemaVersion6Migrator(_SchemaVersionMigrator, target_version=6):
"states", "states",
["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"], ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"],
) )
_create_index(self.session_maker, "states", "ix_states_context_id") _create_index(
self.instance, self.session_maker, "states", "ix_states_context_id"
)
# This used to create ix_states_context_user_id, # This used to create ix_states_context_user_id,
# but it was removed in version 28 # but it was removed in version 28
@ -1148,7 +1191,9 @@ class _SchemaVersion10Migrator(_SchemaVersionMigrator, target_version=10):
class _SchemaVersion11Migrator(_SchemaVersionMigrator, target_version=11): class _SchemaVersion11Migrator(_SchemaVersionMigrator, target_version=11):
def _apply_update(self) -> None: def _apply_update(self) -> None:
"""Version specific update method.""" """Version specific update method."""
_create_index(self.session_maker, "states", "ix_states_old_state_id") _create_index(
self.instance, self.session_maker, "states", "ix_states_old_state_id"
)
# _update_states_table_with_foreign_key_options first drops foreign # _update_states_table_with_foreign_key_options first drops foreign
# key constraints, and then re-adds them with the correct settings. # key constraints, and then re-adds them with the correct settings.
@ -1390,13 +1435,20 @@ class _SchemaVersion25Migrator(_SchemaVersionMigrator, target_version=25):
"states", "states",
[f"attributes_id {self.column_types.big_int_type}"], [f"attributes_id {self.column_types.big_int_type}"],
) )
_create_index(self.session_maker, "states", "ix_states_attributes_id") _create_index(
self.instance, self.session_maker, "states", "ix_states_attributes_id"
)
class _SchemaVersion26Migrator(_SchemaVersionMigrator, target_version=26): class _SchemaVersion26Migrator(_SchemaVersionMigrator, target_version=26):
def _apply_update(self) -> None: def _apply_update(self) -> None:
"""Version specific update method.""" """Version specific update method."""
_create_index(self.session_maker, "statistics_runs", "ix_statistics_runs_start") _create_index(
self.instance,
self.session_maker,
"statistics_runs",
"ix_statistics_runs_start",
)
class _SchemaVersion27Migrator(_SchemaVersionMigrator, target_version=27): class _SchemaVersion27Migrator(_SchemaVersionMigrator, target_version=27):
@ -1405,7 +1457,7 @@ class _SchemaVersion27Migrator(_SchemaVersionMigrator, target_version=27):
_add_columns( _add_columns(
self.session_maker, "events", [f"data_id {self.column_types.big_int_type}"] self.session_maker, "events", [f"data_id {self.column_types.big_int_type}"]
) )
_create_index(self.session_maker, "events", "ix_events_data_id") _create_index(self.instance, self.session_maker, "events", "ix_events_data_id")
class _SchemaVersion28Migrator(_SchemaVersionMigrator, target_version=28): class _SchemaVersion28Migrator(_SchemaVersionMigrator, target_version=28):
@ -1425,7 +1477,9 @@ class _SchemaVersion28Migrator(_SchemaVersionMigrator, target_version=28):
"context_parent_id VARCHAR(36)", "context_parent_id VARCHAR(36)",
], ],
) )
_create_index(self.session_maker, "states", "ix_states_context_id") _create_index(
self.instance, self.session_maker, "states", "ix_states_context_id"
)
# Once there are no longer any state_changed events # Once there are no longer any state_changed events
# in the events table we can drop the index on states.event_id # in the events table we can drop the index on states.event_id
@ -1452,7 +1506,10 @@ class _SchemaVersion29Migrator(_SchemaVersionMigrator, target_version=29):
) )
try: try:
_create_index( _create_index(
self.session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" self.instance,
self.session_maker,
"statistics_meta",
"ix_statistics_meta_statistic_id",
) )
except DatabaseError: except DatabaseError:
# There may be duplicated statistics_meta entries, delete duplicates # There may be duplicated statistics_meta entries, delete duplicates
@ -1460,7 +1517,10 @@ class _SchemaVersion29Migrator(_SchemaVersionMigrator, target_version=29):
with session_scope(session=self.session_maker()) as session: with session_scope(session=self.session_maker()) as session:
delete_statistics_meta_duplicates(self.instance, session) delete_statistics_meta_duplicates(self.instance, session)
_create_index( _create_index(
self.session_maker, "statistics_meta", "ix_statistics_meta_statistic_id" self.instance,
self.session_maker,
"statistics_meta",
"ix_statistics_meta_statistic_id",
) )
@ -1494,14 +1554,24 @@ class _SchemaVersion31Migrator(_SchemaVersionMigrator, target_version=31):
f"last_changed_ts {self.column_types.timestamp_type}", f"last_changed_ts {self.column_types.timestamp_type}",
], ],
) )
_create_index(self.session_maker, "events", "ix_events_time_fired_ts")
_create_index( _create_index(
self.session_maker, "events", "ix_events_event_type_time_fired_ts" self.instance, self.session_maker, "events", "ix_events_time_fired_ts"
) )
_create_index( _create_index(
self.session_maker, "states", "ix_states_entity_id_last_updated_ts" self.instance,
self.session_maker,
"events",
"ix_events_event_type_time_fired_ts",
)
_create_index(
self.instance,
self.session_maker,
"states",
"ix_states_entity_id_last_updated_ts",
)
_create_index(
self.instance, self.session_maker, "states", "ix_states_last_updated_ts"
) )
_create_index(self.session_maker, "states", "ix_states_last_updated_ts")
_migrate_columns_to_timestamp(self.instance, self.session_maker, self.engine) _migrate_columns_to_timestamp(self.instance, self.session_maker, self.engine)
@ -1559,16 +1629,23 @@ class _SchemaVersion34Migrator(_SchemaVersionMigrator, target_version=34):
f"last_reset_ts {self.column_types.timestamp_type}", f"last_reset_ts {self.column_types.timestamp_type}",
], ],
) )
_create_index(self.session_maker, "statistics", "ix_statistics_start_ts")
_create_index( _create_index(
self.session_maker, "statistics", "ix_statistics_statistic_id_start_ts" self.instance, self.session_maker, "statistics", "ix_statistics_start_ts"
) )
_create_index( _create_index(
self.instance,
self.session_maker,
"statistics",
"ix_statistics_statistic_id_start_ts",
)
_create_index(
self.instance,
self.session_maker, self.session_maker,
"statistics_short_term", "statistics_short_term",
"ix_statistics_short_term_start_ts", "ix_statistics_short_term_start_ts",
) )
_create_index( _create_index(
self.instance,
self.session_maker, self.session_maker,
"statistics_short_term", "statistics_short_term",
"ix_statistics_short_term_statistic_id_start_ts", "ix_statistics_short_term_statistic_id_start_ts",
@ -1618,8 +1695,12 @@ class _SchemaVersion36Migrator(_SchemaVersionMigrator, target_version=36):
f"context_parent_id_bin {self.column_types.context_bin_type}", f"context_parent_id_bin {self.column_types.context_bin_type}",
], ],
) )
_create_index(self.session_maker, "events", "ix_events_context_id_bin") _create_index(
_create_index(self.session_maker, "states", "ix_states_context_id_bin") self.instance, self.session_maker, "events", "ix_events_context_id_bin"
)
_create_index(
self.instance, self.session_maker, "states", "ix_states_context_id_bin"
)
class _SchemaVersion37Migrator(_SchemaVersionMigrator, target_version=37): class _SchemaVersion37Migrator(_SchemaVersionMigrator, target_version=37):
@ -1630,10 +1711,15 @@ class _SchemaVersion37Migrator(_SchemaVersionMigrator, target_version=37):
"events", "events",
[f"event_type_id {self.column_types.big_int_type}"], [f"event_type_id {self.column_types.big_int_type}"],
) )
_create_index(self.session_maker, "events", "ix_events_event_type_id") _create_index(
self.instance, self.session_maker, "events", "ix_events_event_type_id"
)
_drop_index(self.session_maker, "events", "ix_events_event_type_time_fired_ts") _drop_index(self.session_maker, "events", "ix_events_event_type_time_fired_ts")
_create_index( _create_index(
self.session_maker, "events", "ix_events_event_type_id_time_fired_ts" self.instance,
self.session_maker,
"events",
"ix_events_event_type_id_time_fired_ts",
) )
@ -1645,9 +1731,14 @@ class _SchemaVersion38Migrator(_SchemaVersionMigrator, target_version=38):
"states", "states",
[f"metadata_id {self.column_types.big_int_type}"], [f"metadata_id {self.column_types.big_int_type}"],
) )
_create_index(self.session_maker, "states", "ix_states_metadata_id")
_create_index( _create_index(
self.session_maker, "states", "ix_states_metadata_id_last_updated_ts" self.instance, self.session_maker, "states", "ix_states_metadata_id"
)
_create_index(
self.instance,
self.session_maker,
"states",
"ix_states_metadata_id_last_updated_ts",
) )
@ -1731,8 +1822,15 @@ class _SchemaVersion40Migrator(_SchemaVersionMigrator, target_version=40):
class _SchemaVersion41Migrator(_SchemaVersionMigrator, target_version=41): class _SchemaVersion41Migrator(_SchemaVersionMigrator, target_version=41):
def _apply_update(self) -> None: def _apply_update(self) -> None:
"""Version specific update method.""" """Version specific update method."""
_create_index(self.session_maker, "event_types", "ix_event_types_event_type") _create_index(
_create_index(self.session_maker, "states_meta", "ix_states_meta_entity_id") self.instance,
self.session_maker,
"event_types",
"ix_event_types_event_type",
)
_create_index(
self.instance, self.session_maker, "states_meta", "ix_states_meta_entity_id"
)
class _SchemaVersion42Migrator(_SchemaVersionMigrator, target_version=42): class _SchemaVersion42Migrator(_SchemaVersionMigrator, target_version=42):
@ -2319,7 +2417,7 @@ class DataMigrationStatus:
class BaseMigration(ABC): class BaseMigration(ABC):
"""Base class for migrations.""" """Base class for migrations."""
index_to_drop: tuple[str, str] | None = None index_to_drop: tuple[str, str, type[DeclarativeBase]] | None = None
required_schema_version = 0 # Schema version required to run migration queries required_schema_version = 0 # Schema version required to run migration queries
max_initial_schema_version: int # Skip migration if db created after this version max_initial_schema_version: int # Skip migration if db created after this version
migration_version = 1 migration_version = 1
@ -2349,12 +2447,12 @@ class BaseMigration(ABC):
"""Migrate some data, returns True if migration is completed.""" """Migrate some data, returns True if migration is completed."""
status = self.migrate_data_impl(instance) status = self.migrate_data_impl(instance)
if status.migration_done: if status.migration_done:
if self.index_to_drop is not None:
table, index = self.index_to_drop
_drop_index(instance.get_session, table, index)
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
self.migration_done(instance, session) self.migration_done(instance, session)
_mark_migration_done(session, self.__class__) _mark_migration_done(session, self.__class__)
if self.index_to_drop is not None:
table, index, _ = self.index_to_drop
_drop_index(instance.get_session, table, index)
return not status.needs_migrate return not status.needs_migrate
@abstractmethod @abstractmethod
@ -2393,25 +2491,31 @@ class BaseMigration(ABC):
"Data migration '%s' needed, schema too old", self.migration_id "Data migration '%s' needed, schema too old", self.migration_id
) )
return True return True
has_needed_index = self._has_needed_index(session)
if has_needed_index is True:
# The index to be removed by the migration still exists
_LOGGER.info(
"Data migration '%s' needed, index to drop still exists",
self.migration_id,
)
return True
if self.migration_changes.get(self.migration_id, -1) >= self.migration_version: if self.migration_changes.get(self.migration_id, -1) >= self.migration_version:
# The migration changes table indicates that the migration has been done # The migration changes table indicates that the migration has been done
_LOGGER.debug( _LOGGER.debug(
"Data migration '%s' not needed, already completed", self.migration_id "Data migration '%s' not needed, already completed", self.migration_id
) )
return False return False
# We do not know if the migration is done from the if has_needed_index is False:
# migration changes table so we must check the index and data # The index to be removed by the migration does not exist, but the migration
# This is the slow path # changes table indicates that the migration has not been done
if (
self.index_to_drop is not None
and get_index_by_name(session, self.index_to_drop[0], self.index_to_drop[1])
is not None
):
_LOGGER.info( _LOGGER.info(
"Data migration '%s' needed, index to drop still exists", "Data migration '%s' needed, index to drop does not exist",
self.migration_id, self.migration_id,
) )
return True return True
# We do not know if the migration is done from the
# migration changes table or the index so we must check the data
# This is the slow path
needs_migrate = self.needs_migrate_impl(instance, session) needs_migrate = self.needs_migrate_impl(instance, session)
if needs_migrate.migration_done: if needs_migrate.migration_done:
_mark_migration_done(session, self.__class__) _mark_migration_done(session, self.__class__)
@ -2422,6 +2526,13 @@ class BaseMigration(ABC):
) )
return needs_migrate.needs_migrate return needs_migrate.needs_migrate
def _has_needed_index(self, session: Session) -> bool | None:
"""Check if the index needed by the migration exists."""
if self.index_to_drop is None:
return None
table_name, index_name, _ = self.index_to_drop
return get_index_by_name(session, table_name, index_name) is not None
class BaseOffLineMigration(BaseMigration): class BaseOffLineMigration(BaseMigration):
"""Base class for off line migrations.""" """Base class for off line migrations."""
@ -2435,6 +2546,7 @@ class BaseOffLineMigration(BaseMigration):
_LOGGER.debug("Migration not needed for '%s'", self.migration_id) _LOGGER.debug("Migration not needed for '%s'", self.migration_id)
self.migration_done(instance, session) self.migration_done(instance, session)
return return
self._ensure_index_exists(instance)
_LOGGER.warning( _LOGGER.warning(
"The database is about to do data migration step '%s', %s", "The database is about to do data migration step '%s', %s",
self.migration_id, self.migration_id,
@ -2449,6 +2561,25 @@ class BaseOffLineMigration(BaseMigration):
"""Migrate some data, returns True if migration is completed.""" """Migrate some data, returns True if migration is completed."""
return self._migrate_data(instance) return self._migrate_data(instance)
def _ensure_index_exists(self, instance: Recorder) -> None:
"""Ensure the index needed by the migration exists."""
if not self.index_to_drop:
return
table_name, index_name, base = self.index_to_drop
with session_scope(session=instance.get_session()) as session:
if get_index_by_name(session, table_name, index_name) is not None:
return
_LOGGER.warning(
(
"Data migration step '%s' needs index `%s` on table `%s`, but "
"it does not exist and will be added now"
),
self.migration_id,
index_name,
table_name,
)
_create_index(instance, instance.get_session, table_name, index_name, base=base)
class BaseRunTimeMigration(BaseMigration): class BaseRunTimeMigration(BaseMigration):
"""Base class for run time migrations.""" """Base class for run time migrations."""
@ -2492,7 +2623,7 @@ class StatesContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
max_initial_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION - 1 max_initial_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION - 1
migration_id = "state_context_id_as_binary" migration_id = "state_context_id_as_binary"
migration_version = 2 migration_version = 2
index_to_drop = ("states", "ix_states_context_id") index_to_drop = ("states", "ix_states_context_id", LegacyBase)
def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
"""Migrate states context_ids to use binary format, return True if completed.""" """Migrate states context_ids to use binary format, return True if completed."""
@ -2536,7 +2667,7 @@ class EventsContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
max_initial_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION - 1 max_initial_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION - 1
migration_id = "event_context_id_as_binary" migration_id = "event_context_id_as_binary"
migration_version = 2 migration_version = 2
index_to_drop = ("events", "ix_events_context_id") index_to_drop = ("events", "ix_events_context_id", LegacyBase)
def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
"""Migrate events context_ids to use binary format, return True if completed.""" """Migrate events context_ids to use binary format, return True if completed."""
@ -2814,7 +2945,11 @@ class EntityIDPostMigration(BaseMigrationWithQuery, BaseOffLineMigration):
migration_id = "entity_id_post_migration" migration_id = "entity_id_post_migration"
max_initial_schema_version = STATES_META_SCHEMA_VERSION - 1 max_initial_schema_version = STATES_META_SCHEMA_VERSION - 1
index_to_drop = (TABLE_STATES, LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX) index_to_drop = (
TABLE_STATES,
LEGACY_STATES_ENTITY_ID_LAST_UPDATED_TS_INDEX,
LegacyBase,
)
def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus: def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
"""Migrate some data, returns True if migration is completed.""" """Migrate some data, returns True if migration is completed."""

View File

@ -600,7 +600,7 @@ async def test_schema_migrate(
start=self.recorder_runs_manager.recording_start, created=dt_util.utcnow() start=self.recorder_runs_manager.recording_start, created=dt_util.utcnow()
) )
def _sometimes_failing_create_index(*args): def _sometimes_failing_create_index(*args, **kwargs):
"""Make the first index create raise a retryable error to ensure we retry.""" """Make the first index create raise a retryable error to ensure we retry."""
if recorder_db_url.startswith("mysql://"): if recorder_db_url.startswith("mysql://"):
nonlocal create_calls nonlocal create_calls
@ -609,7 +609,7 @@ async def test_schema_migrate(
mysql_exception = OperationalError("statement", {}, []) mysql_exception = OperationalError("statement", {}, [])
mysql_exception.orig = Exception(1205, "retryable") mysql_exception.orig = Exception(1205, "retryable")
raise mysql_exception raise mysql_exception
real_create_index(*args) real_create_index(*args, **kwargs)
with ( with (
patch( patch(
@ -712,7 +712,7 @@ def test_forgiving_add_index(recorder_db_url: str) -> None:
instance = Mock() instance = Mock()
instance.get_session = Mock(return_value=session) instance.get_session = Mock(return_value=session)
migration._create_index( migration._create_index(
instance.get_session, "states", "ix_states_context_id_bin" instance, instance.get_session, "states", "ix_states_context_id_bin"
) )
engine.dispose() engine.dispose()
@ -788,7 +788,7 @@ def test_forgiving_add_index_with_other_db_types(
with patch( with patch(
"homeassistant.components.recorder.migration.Table", return_value=mocked_table "homeassistant.components.recorder.migration.Table", return_value=mocked_table
): ):
migration._create_index(Mock(), "states", "ix_states_context_id") migration._create_index(Mock(), Mock(), "states", "ix_states_context_id")
assert "already exists on states" in caplog.text assert "already exists on states" in caplog.text
assert "continuing" in caplog.text assert "continuing" in caplog.text

View File

@ -1,6 +1,6 @@
"""Test run time migrations are remembered in the migration_changes table.""" """Test run time migrations are remembered in the migration_changes table."""
from collections.abc import Callable from collections.abc import Callable, Generator
import importlib import importlib
import sys import sys
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
@ -8,6 +8,7 @@ from unittest.mock import Mock, patch
import pytest import pytest
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy.schema import Index
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.recorder import core, migration, statistics from homeassistant.components.recorder import core, migration, statistics
@ -87,138 +88,165 @@ def _create_engine_test(
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
@pytest.mark.parametrize( @pytest.mark.parametrize(
("initial_version", "expected_migrator_calls"), ("initial_version", "expected_migrator_calls", "expected_created_indices"),
# expected_migrator_calls is a dict of
# migrator_id: (needs_migrate_calls, migrate_data_calls)
[ [
( (
27, 27,
{ {
"state_context_id_as_binary": 1, "state_context_id_as_binary": (0, 1),
"event_context_id_as_binary": 1, "event_context_id_as_binary": (0, 1),
"event_type_id_migration": 1, "event_type_id_migration": (2, 1),
"entity_id_migration": 1, "entity_id_migration": (2, 1),
"event_id_post_migration": 1, "event_id_post_migration": (1, 1),
"entity_id_post_migration": 1, "entity_id_post_migration": (0, 1),
}, },
[
"ix_states_context_id",
"ix_events_context_id",
"ix_states_entity_id_last_updated_ts",
],
), ),
( (
28, 28,
{ {
"state_context_id_as_binary": 1, "state_context_id_as_binary": (0, 1),
"event_context_id_as_binary": 1, "event_context_id_as_binary": (0, 1),
"event_type_id_migration": 1, "event_type_id_migration": (2, 1),
"entity_id_migration": 1, "entity_id_migration": (2, 1),
"event_id_post_migration": 0, "event_id_post_migration": (0, 0),
"entity_id_post_migration": 1, "entity_id_post_migration": (0, 1),
}, },
[
"ix_states_context_id",
"ix_events_context_id",
"ix_states_entity_id_last_updated_ts",
],
), ),
( (
36, 36,
{ {
"state_context_id_as_binary": 0, "state_context_id_as_binary": (0, 0),
"event_context_id_as_binary": 0, "event_context_id_as_binary": (0, 0),
"event_type_id_migration": 1, "event_type_id_migration": (2, 1),
"entity_id_migration": 1, "entity_id_migration": (2, 1),
"event_id_post_migration": 0, "event_id_post_migration": (0, 0),
"entity_id_post_migration": 1, "entity_id_post_migration": (0, 1),
}, },
["ix_states_entity_id_last_updated_ts"],
), ),
( (
37, 37,
{ {
"state_context_id_as_binary": 0, "state_context_id_as_binary": (0, 0),
"event_context_id_as_binary": 0, "event_context_id_as_binary": (0, 0),
"event_type_id_migration": 0, "event_type_id_migration": (0, 0),
"entity_id_migration": 1, "entity_id_migration": (2, 1),
"event_id_post_migration": 0, "event_id_post_migration": (0, 0),
"entity_id_post_migration": 1, "entity_id_post_migration": (0, 1),
}, },
["ix_states_entity_id_last_updated_ts"],
), ),
( (
38, 38,
{ {
"state_context_id_as_binary": 0, "state_context_id_as_binary": (0, 0),
"event_context_id_as_binary": 0, "event_context_id_as_binary": (0, 0),
"event_type_id_migration": 0, "event_type_id_migration": (0, 0),
"entity_id_migration": 0, "entity_id_migration": (0, 0),
"event_id_post_migration": 0, "event_id_post_migration": (0, 0),
"entity_id_post_migration": 0, "entity_id_post_migration": (0, 0),
}, },
[],
), ),
( (
SCHEMA_VERSION, SCHEMA_VERSION,
{ {
"state_context_id_as_binary": 0, "state_context_id_as_binary": (0, 0),
"event_context_id_as_binary": 0, "event_context_id_as_binary": (0, 0),
"event_type_id_migration": 0, "event_type_id_migration": (0, 0),
"entity_id_migration": 0, "entity_id_migration": (0, 0),
"event_id_post_migration": 0, "event_id_post_migration": (0, 0),
"entity_id_post_migration": 0, "entity_id_post_migration": (0, 0),
}, },
[],
), ),
], ],
) )
async def test_data_migrator_new_database( async def test_data_migrator_logic(
async_test_recorder: RecorderInstanceGenerator, async_test_recorder: RecorderInstanceGenerator,
initial_version: int, initial_version: int,
expected_migrator_calls: dict[str, int], expected_migrator_calls: dict[str, tuple[int, int]],
expected_created_indices: list[str],
) -> None: ) -> None:
"""Test that the data migrators are not executed on a new database.""" """Test the data migrator logic.
- The data migrators should not be executed on a new database.
- Indices needed by the migrators should be created if missing.
"""
config = {recorder.CONF_COMMIT_INTERVAL: 1} config = {recorder.CONF_COMMIT_INTERVAL: 1}
def needs_migrate_mock() -> Mock: def migrator_mock() -> dict[str, Mock]:
return Mock( return {
spec_set=[], "needs_migrate": Mock(
return_value=migration.DataMigrationStatus( spec_set=[],
needs_migrate=False, migration_done=True return_value=migration.DataMigrationStatus(
needs_migrate=True, migration_done=False
),
), ),
) "migrate_data": Mock(spec_set=[], return_value=True),
}
migrator_mocks = { migrator_mocks = {
"state_context_id_as_binary": needs_migrate_mock(), "state_context_id_as_binary": migrator_mock(),
"event_context_id_as_binary": needs_migrate_mock(), "event_context_id_as_binary": migrator_mock(),
"event_type_id_migration": needs_migrate_mock(), "event_type_id_migration": migrator_mock(),
"entity_id_migration": needs_migrate_mock(), "entity_id_migration": migrator_mock(),
"event_id_post_migration": needs_migrate_mock(), "event_id_post_migration": migrator_mock(),
"entity_id_post_migration": needs_migrate_mock(), "entity_id_post_migration": migrator_mock(),
} }
def patch_check(
migrator_id: str, migrator_class: type[migration.BaseMigration]
) -> Generator[None]:
return patch.object(
migrator_class,
"needs_migrate_impl",
side_effect=migrator_mocks[migrator_id]["needs_migrate"],
)
def patch_migrate(
migrator_id: str, migrator_class: type[migration.BaseMigration]
) -> Generator[None]:
return patch.object(
migrator_class,
"migrate_data",
side_effect=migrator_mocks[migrator_id]["migrate_data"],
)
with ( with (
patch.object( patch_check("state_context_id_as_binary", migration.StatesContextIDMigration),
migration.StatesContextIDMigration, patch_check("event_context_id_as_binary", migration.EventsContextIDMigration),
"needs_migrate_impl", patch_check("event_type_id_migration", migration.EventTypeIDMigration),
side_effect=migrator_mocks["state_context_id_as_binary"], patch_check("entity_id_migration", migration.EntityIDMigration),
), patch_check("event_id_post_migration", migration.EventIDPostMigration),
patch.object( patch_check("entity_id_post_migration", migration.EntityIDPostMigration),
migration.EventsContextIDMigration, patch_migrate("state_context_id_as_binary", migration.StatesContextIDMigration),
"needs_migrate_impl", patch_migrate("event_context_id_as_binary", migration.EventsContextIDMigration),
side_effect=migrator_mocks["event_context_id_as_binary"], patch_migrate("event_type_id_migration", migration.EventTypeIDMigration),
), patch_migrate("entity_id_migration", migration.EntityIDMigration),
patch.object( patch_migrate("event_id_post_migration", migration.EventIDPostMigration),
migration.EventTypeIDMigration, patch_migrate("entity_id_post_migration", migration.EntityIDPostMigration),
"needs_migrate_impl",
side_effect=migrator_mocks["event_type_id_migration"],
),
patch.object(
migration.EntityIDMigration,
"needs_migrate_impl",
side_effect=migrator_mocks["entity_id_migration"],
),
patch.object(
migration.EventIDPostMigration,
"needs_migrate_impl",
side_effect=migrator_mocks["event_id_post_migration"],
),
patch.object(
migration.EntityIDPostMigration,
"needs_migrate_impl",
side_effect=migrator_mocks["entity_id_post_migration"],
),
patch( patch(
CREATE_ENGINE_TARGET, CREATE_ENGINE_TARGET,
new=_create_engine_test( new=_create_engine_test(
SCHEMA_MODULE_CURRENT, initial_version=initial_version SCHEMA_MODULE_CURRENT, initial_version=initial_version
), ),
), ),
patch(
"sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create
) as wrapped_idx_create,
): ):
async with ( async with (
async_test_home_assistant() as hass, async_test_home_assistant() as hass,
@ -231,8 +259,15 @@ async def test_data_migrator_new_database(
await hass.async_block_till_done() await hass.async_block_till_done()
await hass.async_stop() await hass.async_stop()
index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls]
assert index_names == expected_created_indices
# Check each data migrator's needs_migrate_impl and migrate_data methods were called
# the expected number of times.
for migrator, mock in migrator_mocks.items(): for migrator, mock in migrator_mocks.items():
assert len(mock.mock_calls) == expected_migrator_calls[migrator] needs_migrate_calls, migrate_data_calls = expected_migrator_calls[migrator]
assert len(mock["needs_migrate"].mock_calls) == needs_migrate_calls
assert len(mock["migrate_data"].mock_calls) == migrate_data_calls
@pytest.mark.parametrize("enable_migrate_state_context_ids", [True]) @pytest.mark.parametrize("enable_migrate_state_context_ids", [True])

View File

@ -1027,7 +1027,7 @@ async def test_purge_can_mix_legacy_and_new_format(
def _recreate_legacy_events_index(): def _recreate_legacy_events_index():
"""Recreate the legacy events index since its no longer created on new instances.""" """Recreate the legacy events index since its no longer created on new instances."""
migration._create_index( migration._create_index(
recorder_mock.get_session, "states", "ix_states_event_id" recorder_mock, recorder_mock.get_session, "states", "ix_states_event_id"
) )
recorder_mock.use_legacy_events_index = True recorder_mock.use_legacy_events_index = True
@ -1178,7 +1178,7 @@ async def test_purge_can_mix_legacy_and_new_format_with_detached_state(
def _recreate_legacy_events_index(): def _recreate_legacy_events_index():
"""Recreate the legacy events index since its no longer created on new instances.""" """Recreate the legacy events index since its no longer created on new instances."""
migration._create_index( migration._create_index(
recorder_mock.get_session, "states", "ix_states_event_id" recorder_mock, recorder_mock.get_session, "states", "ix_states_event_id"
) )
recorder_mock.use_legacy_events_index = True recorder_mock.use_legacy_events_index = True