Remove recorder PostSchemaMigrationTask (#125076)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Erik Montnemery 2024-09-03 07:51:27 +02:00 committed by GitHub
parent 0c18b2e7ff
commit 7c223db1d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 78 deletions

View File

@ -1283,10 +1283,6 @@ class Recorder(threading.Thread):
self.event_session = self.get_session()
self.event_session.expire_on_commit = False
def _post_schema_migration(self, old_version: int, new_version: int) -> None:
"""Run post schema migration tasks."""
migration.post_schema_migration(self, old_version, new_version)
def _post_migrate_entity_ids(self) -> bool:
"""Post migrate entity_ids if needed."""
return migration.post_migrate_entity_ids(self)

View File

@ -99,14 +99,8 @@ from .queries import (
migrate_single_short_term_statistics_row_to_timestamp,
migrate_single_statistics_row_to_timestamp,
)
from .statistics import get_start_time
from .tasks import (
CommitTask,
EntityIDPostMigrationTask,
PostSchemaMigrationTask,
RecorderTask,
StatisticsTimestampMigrationCleanupTask,
)
from .statistics import cleanup_statistics_timestamp_migration, get_start_time
from .tasks import EntityIDPostMigrationTask, RecorderTask
from .util import (
database_job_retry_wrapper,
execute_stmt_lambda_element,
@ -350,13 +344,6 @@ def migrate_schema_live(
states_correct_db_schema(instance, schema_errors)
events_correct_db_schema(instance, schema_errors)
start_version = schema_status.start_version
if start_version != SCHEMA_VERSION:
instance.queue_task(PostSchemaMigrationTask(start_version, SCHEMA_VERSION))
# Make sure the post schema migration task is committed in case
# the next task does not have commit_before = True
instance.queue_task(CommitTask())
return schema_status
@ -1414,6 +1401,12 @@ class _SchemaVersion32Migrator(_SchemaVersionMigrator, target_version=32):
_drop_index(self.session_maker, "events", "ix_events_event_type_time_fired")
_drop_index(self.session_maker, "states", "ix_states_last_updated")
_drop_index(self.session_maker, "events", "ix_events_time_fired")
with session_scope(session=self.session_maker()) as session:
# In version 31 we migrated all the time_fired, last_updated, and last_changed
# columns to be timestamps. In version 32 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space.
assert self.instance.engine is not None, "engine should never be None"
_wipe_old_string_time_columns(self.instance, self.instance.engine, session)
class _SchemaVersion33Migrator(_SchemaVersionMigrator, target_version=33):
@ -1492,6 +1485,12 @@ class _SchemaVersion35Migrator(_SchemaVersionMigrator, target_version=35):
# ix_statistics_start and ix_statistics_statistic_id_start are still used
# for the post migration cleanup and can be removed in a future version.
# In version 34 we migrated all the created, start, and last_reset
# columns to be timestamps. In version 35 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space.
while not cleanup_statistics_timestamp_migration(self.instance):
pass
class _SchemaVersion36Migrator(_SchemaVersionMigrator, target_version=36):
def _apply_update(self) -> None:
@ -1828,40 +1827,6 @@ def _correct_table_character_set_and_collation(
)
def post_schema_migration(
instance: Recorder,
old_version: int,
new_version: int,
) -> None:
"""Post schema migration.
Run any housekeeping tasks after the schema migration has completed.
Post schema migration is run after the schema migration has completed
and the queue has been processed to ensure that we reduce the memory
pressure since events are held in memory until the queue is processed
which is blocked from being processed until the schema migration is
complete.
"""
if old_version < 32 <= new_version:
# In version 31 we migrated all the time_fired, last_updated, and last_changed
# columns to be timestamps. In version 32 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space.
assert instance.event_session is not None
assert instance.engine is not None
_wipe_old_string_time_columns(instance, instance.engine, instance.event_session)
if old_version < 35 <= new_version:
# In version 34 we migrated all the created, start, and last_reset
# columns to be timestamps. In version 35 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space.
_wipe_old_string_statistics_columns(instance)
def _wipe_old_string_statistics_columns(instance: Recorder) -> None:
"""Wipe old string statistics columns to save space."""
instance.queue_task(StatisticsTimestampMigrationCleanupTask())
@database_job_retry_wrapper("Wipe old string time columns", 3)
def _wipe_old_string_time_columns(
instance: Recorder, engine: Engine, session: Session

View File

@ -322,31 +322,6 @@ class SynchronizeTask(RecorderTask):
instance.hass.loop.call_soon_threadsafe(self.event.set)
@dataclass(slots=True)
class PostSchemaMigrationTask(RecorderTask):
"""Post migration task to update schema."""
old_version: int
new_version: int
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance._post_schema_migration( # noqa: SLF001
self.old_version, self.new_version
)
@dataclass(slots=True)
class StatisticsTimestampMigrationCleanupTask(RecorderTask):
"""An object to insert into the recorder queue to run a statistics migration cleanup task."""
def run(self, instance: Recorder) -> None:
"""Run statistics timestamp cleanup task."""
if not statistics.cleanup_statistics_timestamp_migration(instance):
# Schedule a new statistics migration task if this one didn't finish
instance.queue_task(StatisticsTimestampMigrationCleanupTask())
@dataclass(slots=True)
class AdjustLRUSizeTask(RecorderTask):
"""An object to insert into the recorder queue to adjust the LRU size."""