Remove support for live recorder data migration of context ids (#125309)

This commit is contained in:
Erik Montnemery 2024-10-26 07:19:03 +02:00 committed by GitHub
parent e888a95bd1
commit d8b618f7c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 350 additions and 133 deletions

View File

@ -78,16 +78,8 @@ from .db_schema import (
StatisticsShortTerm, StatisticsShortTerm,
) )
from .executor import DBInterruptibleThreadPoolExecutor from .executor import DBInterruptibleThreadPoolExecutor
from .migration import (
EntityIDMigration,
EventIDPostMigration,
EventsContextIDMigration,
EventTypeIDMigration,
StatesContextIDMigration,
)
from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect
from .pool import POOL_SIZE, MutexPool, RecorderPool from .pool import POOL_SIZE, MutexPool, RecorderPool
from .queries import get_migration_changes
from .table_managers.event_data import EventDataManager from .table_managers.event_data import EventDataManager
from .table_managers.event_types import EventTypeManager from .table_managers.event_types import EventTypeManager
from .table_managers.recorder_runs import RecorderRunsManager from .table_managers.recorder_runs import RecorderRunsManager
@ -120,7 +112,6 @@ from .util import (
build_mysqldb_conv, build_mysqldb_conv,
dburl_to_path, dburl_to_path,
end_incomplete_runs, end_incomplete_runs,
execute_stmt_lambda_element,
is_second_sunday, is_second_sunday,
move_away_broken_database, move_away_broken_database,
session_scope, session_scope,
@ -740,12 +731,17 @@ class Recorder(threading.Thread):
# First do non-live migration steps, if needed # First do non-live migration steps, if needed
if schema_status.migration_needed: if schema_status.migration_needed:
# Do non-live schema migration
result, schema_status = self._migrate_schema_offline(schema_status) result, schema_status = self._migrate_schema_offline(schema_status)
if not result: if not result:
self._notify_migration_failed() self._notify_migration_failed()
self.migration_in_progress = False self.migration_in_progress = False
return return
self.schema_version = schema_status.current_version self.schema_version = schema_status.current_version
# Do non-live data migration
migration.migrate_data_non_live(self, self.get_session, schema_status)
# Non-live migration is now completed, remaining steps are live # Non-live migration is now completed, remaining steps are live
self.migration_is_live = True self.migration_is_live = True
@ -801,20 +797,7 @@ class Recorder(threading.Thread):
# there are a lot of statistics graphs on the frontend. # there are a lot of statistics graphs on the frontend.
self.statistics_meta_manager.load(session) self.statistics_meta_manager.load(session)
migration_changes: dict[str, int] = { migration.migrate_data_live(self, self.get_session, schema_status)
row[0]: row[1]
for row in execute_stmt_lambda_element(session, get_migration_changes())
}
for migrator_cls in (
StatesContextIDMigration,
EventsContextIDMigration,
EventTypeIDMigration,
EntityIDMigration,
EventIDPostMigration,
):
migrator = migrator_cls(schema_status.start_version, migration_changes)
migrator.do_migrate(self, session)
# We must only set the db ready after we have set the table managers # We must only set the db ready after we have set the table managers
# to active if there is no data to migrate. # to active if there is no data to migrate.

View File

@ -91,6 +91,7 @@ from .queries import (
find_states_context_ids_to_migrate, find_states_context_ids_to_migrate,
find_unmigrated_short_term_statistics_rows, find_unmigrated_short_term_statistics_rows,
find_unmigrated_statistics_rows, find_unmigrated_statistics_rows,
get_migration_changes,
has_entity_ids_to_migrate, has_entity_ids_to_migrate,
has_event_type_to_migrate, has_event_type_to_migrate,
has_events_context_ids_to_migrate, has_events_context_ids_to_migrate,
@ -104,6 +105,7 @@ from .statistics import cleanup_statistics_timestamp_migration, get_start_time
from .tasks import RecorderTask from .tasks import RecorderTask
from .util import ( from .util import (
database_job_retry_wrapper, database_job_retry_wrapper,
database_job_retry_wrapper_method,
execute_stmt_lambda_element, execute_stmt_lambda_element,
get_index_by_name, get_index_by_name,
retryable_database_job_method, retryable_database_job_method,
@ -233,8 +235,12 @@ def validate_db_schema(
# columns may otherwise not exist etc. # columns may otherwise not exist etc.
schema_errors = _find_schema_errors(hass, instance, session_maker) schema_errors = _find_schema_errors(hass, instance, session_maker)
migration_needed = not is_current or non_live_data_migration_needed(
instance, session_maker, current_version
)
return SchemaValidationStatus( return SchemaValidationStatus(
current_version, not is_current, schema_errors, current_version current_version, migration_needed, schema_errors, current_version
) )
@ -350,6 +356,68 @@ def migrate_schema_live(
return schema_status return schema_status
def _get_migration_changes(session: Session) -> dict[str, int]:
"""Return migration changes as a dict."""
migration_changes: dict[str, int] = {
row[0]: row[1]
for row in execute_stmt_lambda_element(session, get_migration_changes())
}
return migration_changes
def non_live_data_migration_needed(
instance: Recorder,
session_maker: Callable[[], Session],
schema_version: int,
) -> bool:
"""Return True if non-live data migration is needed.
This must only be called if database schema is current.
"""
migration_needed = False
with session_scope(session=session_maker()) as session:
migration_changes = _get_migration_changes(session)
for migrator_cls in NON_LIVE_DATA_MIGRATORS:
migrator = migrator_cls(schema_version, migration_changes)
migration_needed |= migrator.needs_migrate(instance, session)
return migration_needed
def migrate_data_non_live(
instance: Recorder,
session_maker: Callable[[], Session],
schema_status: SchemaValidationStatus,
) -> None:
"""Do non-live data migration.
This must be called after non-live schema migration is completed.
"""
with session_scope(session=session_maker()) as session:
migration_changes = _get_migration_changes(session)
for migrator_cls in NON_LIVE_DATA_MIGRATORS:
migrator = migrator_cls(schema_status.start_version, migration_changes)
migrator.migrate_all(instance, session_maker)
def migrate_data_live(
instance: Recorder,
session_maker: Callable[[], Session],
schema_status: SchemaValidationStatus,
) -> None:
"""Queue live schema migration tasks.
This must be called after live schema migration is completed.
"""
with session_scope(session=session_maker()) as session:
migration_changes = _get_migration_changes(session)
for migrator_cls in LIVE_DATA_MIGRATORS:
migrator = migrator_cls(schema_status.start_version, migration_changes)
migrator.queue_migration(instance, session)
def _create_index( def _create_index(
session_maker: Callable[[], Session], table_name: str, index_name: str session_maker: Callable[[], Session], table_name: str, index_name: str
) -> None: ) -> None:
@ -2196,29 +2264,24 @@ class DataMigrationStatus:
migration_done: bool migration_done: bool
class BaseRunTimeMigration(ABC): class BaseMigration(ABC):
"""Base class for run time migrations.""" """Base class for migrations."""
index_to_drop: tuple[str, str] | None = None index_to_drop: tuple[str, str] | None = None
required_schema_version = 0 required_schema_version = 0
migration_version = 1 migration_version = 1
migration_id: str migration_id: str
task = MigrationTask
def __init__(self, schema_version: int, migration_changes: dict[str, int]) -> None: def __init__(self, schema_version: int, migration_changes: dict[str, int]) -> None:
"""Initialize a new BaseRunTimeMigration.""" """Initialize a new BaseRunTimeMigration."""
self.schema_version = schema_version self.schema_version = schema_version
self.migration_changes = migration_changes self.migration_changes = migration_changes
def do_migrate(self, instance: Recorder, session: Session) -> None: @abstractmethod
"""Start migration if needed."""
if self.needs_migrate(instance, session):
instance.queue_task(self.task(self))
else:
self.migration_done(instance, session)
@retryable_database_job_method("migrate data")
def migrate_data(self, instance: Recorder) -> bool: def migrate_data(self, instance: Recorder) -> bool:
"""Migrate some data, return True if migration is completed."""
def _migrate_data(self, instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed.""" """Migrate some data, returns True if migration is completed."""
status = self.migrate_data_impl(instance) status = self.migrate_data_impl(instance)
if status.migration_done: if status.migration_done:
@ -2273,7 +2336,45 @@ class BaseRunTimeMigration(ABC):
return needs_migrate.needs_migrate return needs_migrate.needs_migrate
class BaseRunTimeMigrationWithQuery(BaseRunTimeMigration): class BaseOffLineMigration(BaseMigration):
"""Base class for off line migrations."""
def migrate_all(
self, instance: Recorder, session_maker: Callable[[], Session]
) -> None:
"""Migrate all data."""
with session_scope(session=session_maker()) as session:
if not self.needs_migrate(instance, session):
self.migration_done(instance, session)
return
while not self.migrate_data(instance):
pass
@database_job_retry_wrapper_method("migrate data", 10)
def migrate_data(self, instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed."""
return self._migrate_data(instance)
class BaseRunTimeMigration(BaseMigration):
"""Base class for run time migrations."""
task = MigrationTask
def queue_migration(self, instance: Recorder, session: Session) -> None:
"""Start migration if needed."""
if self.needs_migrate(instance, session):
instance.queue_task(self.task(self))
else:
self.migration_done(instance, session)
@retryable_database_job_method("migrate data")
def migrate_data(self, instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed."""
return self._migrate_data(instance)
class BaseMigrationWithQuery(BaseMigration):
"""Base class for run time migrations.""" """Base class for run time migrations."""
@abstractmethod @abstractmethod
@ -2290,7 +2391,7 @@ class BaseRunTimeMigrationWithQuery(BaseRunTimeMigration):
) )
class StatesContextIDMigration(BaseRunTimeMigrationWithQuery): class StatesContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
"""Migration to migrate states context_ids to binary format.""" """Migration to migrate states context_ids to binary format."""
required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
@ -2333,7 +2434,7 @@ class StatesContextIDMigration(BaseRunTimeMigrationWithQuery):
return has_states_context_ids_to_migrate() return has_states_context_ids_to_migrate()
class EventsContextIDMigration(BaseRunTimeMigrationWithQuery): class EventsContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
"""Migration to migrate events context_ids to binary format.""" """Migration to migrate events context_ids to binary format."""
required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
@ -2376,7 +2477,7 @@ class EventsContextIDMigration(BaseRunTimeMigrationWithQuery):
return has_events_context_ids_to_migrate() return has_events_context_ids_to_migrate()
class EventTypeIDMigration(BaseRunTimeMigrationWithQuery): class EventTypeIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
"""Migration to migrate event_type to event_type_ids.""" """Migration to migrate event_type to event_type_ids."""
required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION
@ -2454,7 +2555,7 @@ class EventTypeIDMigration(BaseRunTimeMigrationWithQuery):
return has_event_type_to_migrate() return has_event_type_to_migrate()
class EntityIDMigration(BaseRunTimeMigrationWithQuery): class EntityIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
"""Migration to migrate entity_ids to states_meta.""" """Migration to migrate entity_ids to states_meta."""
required_schema_version = STATES_META_SCHEMA_VERSION required_schema_version = STATES_META_SCHEMA_VERSION
@ -2542,7 +2643,7 @@ class EntityIDMigration(BaseRunTimeMigrationWithQuery):
instance.states_meta_manager.active = True instance.states_meta_manager.active = True
with contextlib.suppress(SQLAlchemyError): with contextlib.suppress(SQLAlchemyError):
migrate = EntityIDPostMigration(self.schema_version, self.migration_changes) migrate = EntityIDPostMigration(self.schema_version, self.migration_changes)
migrate.do_migrate(instance, session) migrate.queue_migration(instance, session)
def needs_migrate_query(self) -> StatementLambdaElement: def needs_migrate_query(self) -> StatementLambdaElement:
"""Check if the data is migrated.""" """Check if the data is migrated."""
@ -2631,7 +2732,7 @@ class EventIDPostMigration(BaseRunTimeMigration):
return DataMigrationStatus(needs_migrate=False, migration_done=True) return DataMigrationStatus(needs_migrate=False, migration_done=True)
class EntityIDPostMigration(BaseRunTimeMigrationWithQuery): class EntityIDPostMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
"""Migration to remove old entity_id strings from states.""" """Migration to remove old entity_id strings from states."""
migration_id = "entity_id_post_migration" migration_id = "entity_id_post_migration"
@ -2648,9 +2749,19 @@ class EntityIDPostMigration(BaseRunTimeMigrationWithQuery):
return has_used_states_entity_ids() return has_used_states_entity_ids()
def _mark_migration_done( NON_LIVE_DATA_MIGRATORS = (
session: Session, migration: type[BaseRunTimeMigration] StatesContextIDMigration, # Introduced in HA Core 2023.4
) -> None: EventsContextIDMigration, # Introduced in HA Core 2023.4
)
LIVE_DATA_MIGRATORS = (
EventTypeIDMigration,
EntityIDMigration,
EventIDPostMigration,
)
def _mark_migration_done(session: Session, migration: type[BaseMigration]) -> None:
"""Mark a migration as done in the database.""" """Mark a migration as done in the database."""
session.merge( session.merge(
MigrationChanges( MigrationChanges(

View File

@ -652,13 +652,13 @@ type _FuncOrMethType[**_P, _R] = Callable[_P, _R]
def retryable_database_job[**_P]( def retryable_database_job[**_P](
description: str, description: str,
) -> Callable[[_FuncType[_P, bool]], _FuncType[_P, bool]]: ) -> Callable[[_FuncType[_P, bool]], _FuncType[_P, bool]]:
"""Try to execute a database job. """Execute a database job repeatedly until it succeeds.
The job should return True if it finished, and False if it needs to be rescheduled. The job should return True if it finished, and False if it needs to be rescheduled.
""" """
def decorator(job: _FuncType[_P, bool]) -> _FuncType[_P, bool]: def decorator(job: _FuncType[_P, bool]) -> _FuncType[_P, bool]:
return _wrap_func_or_meth(job, description, False) return _wrap_retryable_database_job_func_or_meth(job, description, False)
return decorator return decorator
@ -666,18 +666,18 @@ def retryable_database_job[**_P](
def retryable_database_job_method[_Self, **_P]( def retryable_database_job_method[_Self, **_P](
description: str, description: str,
) -> Callable[[_MethType[_Self, _P, bool]], _MethType[_Self, _P, bool]]: ) -> Callable[[_MethType[_Self, _P, bool]], _MethType[_Self, _P, bool]]:
"""Try to execute a database job. """Execute a database job repeatedly until it succeeds.
The job should return True if it finished, and False if it needs to be rescheduled. The job should return True if it finished, and False if it needs to be rescheduled.
""" """
def decorator(job: _MethType[_Self, _P, bool]) -> _MethType[_Self, _P, bool]: def decorator(job: _MethType[_Self, _P, bool]) -> _MethType[_Self, _P, bool]:
return _wrap_func_or_meth(job, description, True) return _wrap_retryable_database_job_func_or_meth(job, description, True)
return decorator return decorator
def _wrap_func_or_meth[**_P]( def _wrap_retryable_database_job_func_or_meth[**_P](
job: _FuncOrMethType[_P, bool], description: str, method: bool job: _FuncOrMethType[_P, bool], description: str, method: bool
) -> _FuncOrMethType[_P, bool]: ) -> _FuncOrMethType[_P, bool]:
recorder_pos = 1 if method else 0 recorder_pos = 1 if method else 0
@ -705,10 +705,10 @@ def _wrap_func_or_meth[**_P](
return wrapper return wrapper
def database_job_retry_wrapper[**_P]( def database_job_retry_wrapper[**_P, _R](
description: str, attempts: int = 5 description: str, attempts: int
) -> Callable[[_FuncType[_P, None]], _FuncType[_P, None]]: ) -> Callable[[_FuncType[_P, _R]], _FuncType[_P, _R]]:
"""Try to execute a database job multiple times. """Execute a database job repeatedly until it succeeds, at most attempts times.
This wrapper handles InnoDB deadlocks and lock timeouts. This wrapper handles InnoDB deadlocks and lock timeouts.
@ -717,32 +717,63 @@ def database_job_retry_wrapper[**_P](
""" """
def decorator( def decorator(
job: _FuncType[_P, None], job: _FuncType[_P, _R],
) -> _FuncType[_P, None]: ) -> _FuncType[_P, _R]:
@functools.wraps(job) return _database_job_retry_wrapper_func_or_meth(
def wrapper(instance: Recorder, *args: _P.args, **kwargs: _P.kwargs) -> None: job, description, attempts, False
for attempt in range(attempts):
try:
job(instance, *args, **kwargs)
except OperationalError as err:
if attempt == attempts - 1 or not _is_retryable_error(
instance, err
):
raise
assert isinstance(err.orig, BaseException) # noqa: PT017
_LOGGER.info(
"%s; %s failed, retrying", err.orig.args[1], description
) )
time.sleep(instance.db_retry_wait)
# Failed with retryable error
else:
return
return wrapper
return decorator return decorator
def database_job_retry_wrapper_method[_Self, **_P, _R](
description: str, attempts: int
) -> Callable[[_MethType[_Self, _P, _R]], _MethType[_Self, _P, _R]]:
"""Execute a database job repeatedly until it succeeds, at most attempts times.
This wrapper handles InnoDB deadlocks and lock timeouts.
This is different from retryable_database_job in that it will retry the job
attempts number of times instead of returning False if the job fails.
"""
def decorator(
job: _MethType[_Self, _P, _R],
) -> _MethType[_Self, _P, _R]:
return _database_job_retry_wrapper_func_or_meth(
job, description, attempts, True
)
return decorator
def _database_job_retry_wrapper_func_or_meth[**_P, _R](
job: _FuncOrMethType[_P, _R],
description: str,
attempts: int,
method: bool,
) -> _FuncOrMethType[_P, _R]:
recorder_pos = 1 if method else 0
@functools.wraps(job)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
instance: Recorder = args[recorder_pos] # type: ignore[assignment]
for attempt in range(attempts):
try:
return job(*args, **kwargs)
except OperationalError as err:
# Failed with retryable error
if attempt == attempts - 1 or not _is_retryable_error(instance, err):
raise
assert isinstance(err.orig, BaseException) # noqa: PT017
_LOGGER.info("%s; %s failed, retrying", err.orig.args[1], description)
time.sleep(instance.db_retry_wait)
raise ValueError("attempts must be a positive integer")
return wrapper
def periodic_db_cleanups(instance: Recorder) -> None: def periodic_db_cleanups(instance: Recorder) -> None:
"""Run any database cleanups that need to happen periodically. """Run any database cleanups that need to happen periodically.

View File

@ -189,6 +189,9 @@ async def test_delete_metadata_duplicates(
patch.object( patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), ),
patch.object(
recorder.migration, "non_live_data_migration_needed", return_value=False
),
patch( patch(
"homeassistant.components.recorder.core.create_engine", "homeassistant.components.recorder.core.create_engine",
new=_create_engine_28, new=_create_engine_28,
@ -306,6 +309,9 @@ async def test_delete_metadata_duplicates_many(
patch.object( patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), ),
patch.object(
recorder.migration, "non_live_data_migration_needed", return_value=False
),
patch( patch(
"homeassistant.components.recorder.core.create_engine", "homeassistant.components.recorder.core.create_engine",
new=_create_engine_28, new=_create_engine_28,

View File

@ -445,9 +445,8 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]:
with ( with (
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object( patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION patch.object(migration, "non_live_data_migration_needed", return_value=False),
),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData), patch.object(core, "EventData", old_db_schema.EventData),

View File

@ -105,9 +105,8 @@ def db_schema_32():
with ( with (
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object( patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION patch.object(migration, "non_live_data_migration_needed", return_value=False),
),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData), patch.object(core, "EventData", old_db_schema.EventData),
@ -120,13 +119,13 @@ def db_schema_32():
yield yield
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_event_context_ids", [True]) @pytest.mark.parametrize("enable_migrate_event_context_ids", [True])
@pytest.mark.usefixtures("db_schema_32") @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_events_context_ids( async def test_migrate_events_context_ids(
hass: HomeAssistant, recorder_mock: Recorder async_test_recorder: RecorderInstanceGenerator,
) -> None: ) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format.""" """Test we can migrate old uuid context ids and ulid context ids to binary format."""
await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE]
@ -219,18 +218,28 @@ async def test_migrate_events_context_ids(
) )
) )
await recorder_mock.async_add_executor_job(_insert_events) # Create database with old schema
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventsContextIDMigration, "migrate_data"),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
now = dt_util.utcnow() now = dt_util.utcnow()
expected_ulid_fallback_start = ulid_to_bytes(ulid_at_time(now.timestamp()))[0:6] expected_ulid_fallback_start = ulid_to_bytes(ulid_at_time(now.timestamp()))[
0:6
]
await _async_wait_migration_done(hass) await _async_wait_migration_done(hass)
with freeze_time(now): await hass.async_stop()
# This is a threadsafe way to add a task to the recorder await hass.async_block_till_done()
migrator = migration.EventsContextIDMigration(None, None)
recorder_mock.queue_task(migrator.task(migrator))
await _async_wait_migration_done(hass)
def _object_as_dict(obj): def _object_as_dict(obj):
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
@ -256,7 +265,34 @@ async def test_migrate_events_context_ids(
assert len(events) == 6 assert len(events) == 6
return {event.event_type: _object_as_dict(event) for event in events} return {event.event_type: _object_as_dict(event) for event in events}
events_by_type = await recorder_mock.async_add_executor_job(_fetch_migrated_events) # Run again with new schema, let migration run
with freeze_time(now):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
events_by_type = await instance.async_add_executor_job(
_fetch_migrated_events
)
migration_changes = await instance.async_add_executor_job(
_get_migration_id, hass
)
# Check the index which will be removed by the migrator no longer exists
with session_scope(hass=hass) as session:
assert (
get_index_by_name(session, "events", "ix_events_context_id") is None
)
await hass.async_stop()
await hass.async_block_till_done()
old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"] old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"]
assert old_uuid_context_id_event["context_id"] is None assert old_uuid_context_id_event["context_id"] is None
@ -327,18 +363,11 @@ async def test_migrate_events_context_ids(
event_with_garbage_context_id_no_time_fired_ts["context_parent_id_bin"] is None event_with_garbage_context_id_no_time_fired_ts["context_parent_id_bin"] is None
) )
migration_changes = await recorder_mock.async_add_executor_job(
_get_migration_id, hass
)
assert ( assert (
migration_changes[migration.EventsContextIDMigration.migration_id] migration_changes[migration.EventsContextIDMigration.migration_id]
== migration.EventsContextIDMigration.migration_version == migration.EventsContextIDMigration.migration_version
) )
# Check the index which will be removed by the migrator no longer exists
with session_scope(hass=hass) as session:
assert get_index_by_name(session, "events", "ix_events_context_id") is None
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_event_context_ids", [True]) @pytest.mark.parametrize("enable_migrate_event_context_ids", [True])
@ -448,13 +477,13 @@ async def test_finish_migrate_events_context_ids(
await hass.async_block_till_done() await hass.async_block_till_done()
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_state_context_ids", [True]) @pytest.mark.parametrize("enable_migrate_state_context_ids", [True])
@pytest.mark.usefixtures("db_schema_32") @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_states_context_ids( async def test_migrate_states_context_ids(
hass: HomeAssistant, recorder_mock: Recorder async_test_recorder: RecorderInstanceGenerator,
) -> None: ) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format.""" """Test we can migrate old uuid context ids and ulid context ids to binary format."""
await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE]
@ -529,13 +558,25 @@ async def test_migrate_states_context_ids(
) )
) )
await recorder_mock.async_add_executor_job(_insert_states) # Create database with old schema
with (
patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.StatesContextIDMigration, "migrate_data"),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
):
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await instance.async_add_executor_job(_insert_states)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
migrator = migration.StatesContextIDMigration(None, None)
recorder_mock.queue_task(migrator.task(migrator))
await _async_wait_migration_done(hass) await _async_wait_migration_done(hass)
await hass.async_stop()
await hass.async_block_till_done()
def _object_as_dict(obj): def _object_as_dict(obj):
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
@ -560,10 +601,32 @@ async def test_migrate_states_context_ids(
assert len(events) == 6 assert len(events) == 6
return {state.entity_id: _object_as_dict(state) for state in events} return {state.entity_id: _object_as_dict(state) for state in events}
states_by_entity_id = await recorder_mock.async_add_executor_job( # Run again with new schema, let migration run
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
states_by_entity_id = await instance.async_add_executor_job(
_fetch_migrated_states _fetch_migrated_states
) )
migration_changes = await instance.async_add_executor_job(
_get_migration_id, hass
)
# Check the index which will be removed by the migrator no longer exists
with session_scope(hass=hass) as session:
assert get_index_by_name(session, "states", "ix_states_context_id") is None
await hass.async_stop()
await hass.async_block_till_done()
old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"] old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"]
assert old_uuid_context_id["context_id"] is None assert old_uuid_context_id["context_id"] is None
assert old_uuid_context_id["context_user_id"] is None assert old_uuid_context_id["context_user_id"] is None
@ -637,18 +700,11 @@ async def test_migrate_states_context_ids(
== b"\n\xe2\x97\x99\xeeNOE\x81\x16\xf5\x82\xd7\xd3\xeee" == b"\n\xe2\x97\x99\xeeNOE\x81\x16\xf5\x82\xd7\xd3\xeee"
) )
migration_changes = await recorder_mock.async_add_executor_job(
_get_migration_id, hass
)
assert ( assert (
migration_changes[migration.StatesContextIDMigration.migration_id] migration_changes[migration.StatesContextIDMigration.migration_id]
== migration.StatesContextIDMigration.migration_version == migration.StatesContextIDMigration.migration_version
) )
# Check the index which will be removed by the migrator no longer exists
with session_scope(hass=hass) as session:
assert get_index_by_name(session, "states", "ix_states_context_id") is None
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_state_context_ids", [True]) @pytest.mark.parametrize("enable_migrate_state_context_ids", [True])
@ -1763,6 +1819,7 @@ async def test_migrate_times(
with ( with (
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration, "non_live_data_migration_needed", return_value=False),
patch(CREATE_ENGINE_TARGET, new=_create_engine_test), patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
): ):
async with ( async with (

View File

@ -94,9 +94,8 @@ async def test_migration_changes_prevent_trying_to_migrate_again(
# Start with db schema that needs migration (version 32) # Start with db schema that needs migration (version 32)
with ( with (
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object( patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION patch.object(migration, "non_live_data_migration_needed", return_value=False),
),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventTypes", old_db_schema.EventTypes),
patch.object(core, "EventData", old_db_schema.EventData), patch.object(core, "EventData", old_db_schema.EventData),

View File

@ -168,6 +168,9 @@ async def test_delete_duplicates(
patch.object( patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), ),
patch.object(
recorder.migration, "non_live_data_migration_needed", return_value=False
),
patch( patch(
CREATE_ENGINE_TARGET, CREATE_ENGINE_TARGET,
new=partial( new=partial(
@ -352,6 +355,9 @@ async def test_delete_duplicates_many(
patch.object( patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), ),
patch.object(
recorder.migration, "non_live_data_migration_needed", return_value=False
),
patch( patch(
CREATE_ENGINE_TARGET, CREATE_ENGINE_TARGET,
new=partial( new=partial(
@ -515,6 +521,9 @@ async def test_delete_duplicates_non_identical(
patch.object( patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), ),
patch.object(
recorder.migration, "non_live_data_migration_needed", return_value=False
),
patch( patch(
CREATE_ENGINE_TARGET, CREATE_ENGINE_TARGET,
new=partial( new=partial(
@ -638,6 +647,9 @@ async def test_delete_duplicates_short_term(
patch.object( patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), ),
patch.object(
recorder.migration, "non_live_data_migration_needed", return_value=False
),
patch( patch(
CREATE_ENGINE_TARGET, CREATE_ENGINE_TARGET,
new=partial( new=partial(

View File

@ -1134,19 +1134,32 @@ Retryable = OperationalError(None, None, BaseException(RETRYABLE_MYSQL_ERRORS[0]
@pytest.mark.parametrize( @pytest.mark.parametrize(
("side_effect", "dialect", "expected_result", "num_calls"), ("side_effect", "dialect", "retval", "expected_result", "num_calls"),
[ [
(None, SupportedDialect.MYSQL, does_not_raise(), 1), (None, SupportedDialect.MYSQL, None, does_not_raise(), 1),
(ValueError, SupportedDialect.MYSQL, pytest.raises(ValueError), 1), (ValueError, SupportedDialect.MYSQL, None, pytest.raises(ValueError), 1),
(NonRetryable, SupportedDialect.MYSQL, pytest.raises(OperationalError), 1), (
(Retryable, SupportedDialect.MYSQL, pytest.raises(OperationalError), 5), NonRetryable,
(NonRetryable, SupportedDialect.SQLITE, pytest.raises(OperationalError), 1), SupportedDialect.MYSQL,
(Retryable, SupportedDialect.SQLITE, pytest.raises(OperationalError), 1), None,
pytest.raises(OperationalError),
1,
),
(Retryable, SupportedDialect.MYSQL, None, pytest.raises(OperationalError), 5),
(
NonRetryable,
SupportedDialect.SQLITE,
None,
pytest.raises(OperationalError),
1,
),
(Retryable, SupportedDialect.SQLITE, None, pytest.raises(OperationalError), 1),
], ],
) )
def test_database_job_retry_wrapper( def test_database_job_retry_wrapper(
side_effect: Any, side_effect: Any,
dialect: str, dialect: str,
retval: Any,
expected_result: AbstractContextManager, expected_result: AbstractContextManager,
num_calls: int, num_calls: int,
) -> None: ) -> None:
@ -1157,12 +1170,13 @@ def test_database_job_retry_wrapper(
instance.engine.dialect.name = dialect instance.engine.dialect.name = dialect
mock_job = Mock(side_effect=side_effect) mock_job = Mock(side_effect=side_effect)
@database_job_retry_wrapper(description="test") @database_job_retry_wrapper("test", 5)
def job(instance, *args, **kwargs) -> None: def job(instance, *args, **kwargs) -> None:
mock_job() mock_job()
return retval
with expected_result: with expected_result:
job(instance) assert job(instance) == retval
assert len(mock_job.mock_calls) == num_calls assert len(mock_job.mock_calls) == num_calls

View File

@ -110,6 +110,7 @@ async def test_migrate_times(
with ( with (
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration, "non_live_data_migration_needed", return_value=False),
patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(migration.EventsContextIDMigration, "migrate_data"), patch.object(migration.EventsContextIDMigration, "migrate_data"),
patch.object(migration.StatesContextIDMigration, "migrate_data"), patch.object(migration.StatesContextIDMigration, "migrate_data"),
@ -266,6 +267,7 @@ async def test_migrate_can_resume_entity_id_post_migration(
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"), patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "non_live_data_migration_needed", return_value=False),
patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventTypes", old_db_schema.EventTypes),
@ -385,6 +387,7 @@ async def test_migrate_can_resume_ix_states_event_id_removed(
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"), patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "non_live_data_migration_needed", return_value=False),
patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventTypes", old_db_schema.EventTypes),
@ -517,6 +520,7 @@ async def test_out_of_disk_space_while_rebuild_states_table(
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"), patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "non_live_data_migration_needed", return_value=False),
patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventTypes", old_db_schema.EventTypes),
@ -694,6 +698,7 @@ async def test_out_of_disk_space_while_removing_foreign_key(
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION),
patch.object(migration.EventIDPostMigration, "migrate_data"), patch.object(migration.EventIDPostMigration, "migrate_data"),
patch.object(migration, "non_live_data_migration_needed", return_value=False),
patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False),
patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventTypes", old_db_schema.EventTypes),