Teach recorder data migrator base class to remove index (#125168)

* Teach recorder data migrator base class to remove index

* Fix tests
This commit is contained in:
Erik Montnemery 2024-09-04 09:52:41 +02:00 committed by GitHub
parent 7fc0e36b2f
commit 8fd691be69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 15 deletions

View File

@ -2191,8 +2191,6 @@ class MigrationTask(RecorderTask):
if not self.migrator.migrate_data(instance): if not self.migrator.migrate_data(instance):
# Schedule a new migration task if this one didn't finish # Schedule a new migration task if this one didn't finish
instance.queue_task(MigrationTask(self.migrator)) instance.queue_task(MigrationTask(self.migrator))
else:
self.migrator.migration_done(instance, None)
@dataclass(slots=True) @dataclass(slots=True)
@ -2213,6 +2211,7 @@ class NeedsMigrateResult:
class BaseRunTimeMigration(ABC): class BaseRunTimeMigration(ABC):
"""Base class for run time migrations.""" """Base class for run time migrations."""
index_to_drop: tuple[str, str] | None = None
required_schema_version = 0 required_schema_version = 0
migration_version = 1 migration_version = 1
migration_id: str migration_id: str
@ -2230,11 +2229,29 @@ class BaseRunTimeMigration(ABC):
else: else:
self.migration_done(instance, session) self.migration_done(instance, session)
def migrate_data(self, instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed."""
if result := self.migrate_data_impl(instance):
if self.index_to_drop is not None:
self._remove_index(instance, self.index_to_drop)
self.migration_done(instance, None)
return result
@staticmethod @staticmethod
@abstractmethod @abstractmethod
def migrate_data(instance: Recorder) -> bool: def migrate_data_impl(instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed.""" """Migrate some data, returns True if migration is completed."""
@staticmethod
@database_job_retry_wrapper("remove index")
def _remove_index(instance: Recorder, index_to_drop: tuple[str, str]) -> None:
"""Remove indices.
Called when migration is completed.
"""
table, index = index_to_drop
_drop_index(instance.get_session, table, index)
def migration_done(self, instance: Recorder, session: Session | None) -> None: def migration_done(self, instance: Recorder, session: Session | None) -> None:
"""Will be called after migrate returns True or if migration is not needed.""" """Will be called after migrate returns True or if migration is not needed."""
@ -2260,8 +2277,14 @@ class BaseRunTimeMigration(ABC):
# The migration changes table indicates that the migration has been done # The migration changes table indicates that the migration has been done
return False return False
# We do not know if the migration is done from the # We do not know if the migration is done from the
# migration changes table so we must check the data # migration changes table so we must check the index and data
# This is the slow path # This is the slow path
if (
self.index_to_drop is not None
and get_index_by_name(session, self.index_to_drop[0], self.index_to_drop[1])
is not None
):
return True
needs_migrate = self.needs_migrate_impl(instance, session) needs_migrate = self.needs_migrate_impl(instance, session)
if needs_migrate.migration_done: if needs_migrate.migration_done:
_mark_migration_done(session, self.__class__) _mark_migration_done(session, self.__class__)
@ -2290,10 +2313,11 @@ class StatesContextIDMigration(BaseRunTimeMigrationWithQuery):
required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
migration_id = "state_context_id_as_binary" migration_id = "state_context_id_as_binary"
index_to_drop = ("states", "ix_states_context_id")
@staticmethod @staticmethod
@retryable_database_job("migrate states context_ids to binary format") @retryable_database_job("migrate states context_ids to binary format")
def migrate_data(instance: Recorder) -> bool: def migrate_data_impl(instance: Recorder) -> bool:
"""Migrate states context_ids to use binary format, return True if completed.""" """Migrate states context_ids to use binary format, return True if completed."""
_to_bytes = _context_id_to_bytes _to_bytes = _context_id_to_bytes
session_maker = instance.get_session session_maker = instance.get_session
@ -2323,9 +2347,6 @@ class StatesContextIDMigration(BaseRunTimeMigrationWithQuery):
if is_done := not states: if is_done := not states:
_mark_migration_done(session, StatesContextIDMigration) _mark_migration_done(session, StatesContextIDMigration)
if is_done:
_drop_index(session_maker, "states", "ix_states_context_id")
_LOGGER.debug("Migrating states context_ids to binary format: done=%s", is_done) _LOGGER.debug("Migrating states context_ids to binary format: done=%s", is_done)
return is_done return is_done
@ -2339,10 +2360,11 @@ class EventsContextIDMigration(BaseRunTimeMigrationWithQuery):
required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
migration_id = "event_context_id_as_binary" migration_id = "event_context_id_as_binary"
index_to_drop = ("events", "ix_events_context_id")
@staticmethod @staticmethod
@retryable_database_job("migrate events context_ids to binary format") @retryable_database_job("migrate events context_ids to binary format")
def migrate_data(instance: Recorder) -> bool: def migrate_data_impl(instance: Recorder) -> bool:
"""Migrate events context_ids to use binary format, return True if completed.""" """Migrate events context_ids to use binary format, return True if completed."""
_to_bytes = _context_id_to_bytes _to_bytes = _context_id_to_bytes
session_maker = instance.get_session session_maker = instance.get_session
@ -2372,9 +2394,6 @@ class EventsContextIDMigration(BaseRunTimeMigrationWithQuery):
if is_done := not events: if is_done := not events:
_mark_migration_done(session, EventsContextIDMigration) _mark_migration_done(session, EventsContextIDMigration)
if is_done:
_drop_index(session_maker, "events", "ix_events_context_id")
_LOGGER.debug("Migrating events context_ids to binary format: done=%s", is_done) _LOGGER.debug("Migrating events context_ids to binary format: done=%s", is_done)
return is_done return is_done
@ -2395,7 +2414,7 @@ class EventTypeIDMigration(BaseRunTimeMigrationWithQuery):
@staticmethod @staticmethod
@retryable_database_job("migrate events event_types to event_type_ids") @retryable_database_job("migrate events event_types to event_type_ids")
def migrate_data(instance: Recorder) -> bool: def migrate_data_impl(instance: Recorder) -> bool:
"""Migrate event_type to event_type_ids, return True if completed.""" """Migrate event_type to event_type_ids, return True if completed."""
session_maker = instance.get_session session_maker = instance.get_session
_LOGGER.debug("Migrating event_types") _LOGGER.debug("Migrating event_types")
@ -2478,7 +2497,7 @@ class EntityIDMigration(BaseRunTimeMigrationWithQuery):
@staticmethod @staticmethod
@retryable_database_job("migrate states entity_ids to states_meta") @retryable_database_job("migrate states entity_ids to states_meta")
def migrate_data(instance: Recorder) -> bool: def migrate_data_impl(instance: Recorder) -> bool:
"""Migrate entity_ids to states_meta, return True if completed. """Migrate entity_ids to states_meta, return True if completed.
We do this in two steps because we need the history queries to work We do this in two steps because we need the history queries to work
@ -2592,7 +2611,7 @@ class EventIDPostMigration(BaseRunTimeMigration):
@staticmethod @staticmethod
@retryable_database_job("cleanup_legacy_event_ids") @retryable_database_job("cleanup_legacy_event_ids")
def migrate_data(instance: Recorder) -> bool: def migrate_data_impl(instance: Recorder) -> bool:
"""Remove old event_id index from states, returns True if completed. """Remove old event_id index from states, returns True if completed.
We used to link states to events using the event_id column but we no We used to link states to events using the event_id column but we no

View File

@ -219,6 +219,7 @@ async def test_migrate_times(
await hass.async_stop() await hass.async_stop()
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_can_resume_entity_id_post_migration( async def test_migrate_can_resume_entity_id_post_migration(
@ -321,6 +322,7 @@ async def test_migrate_can_resume_entity_id_post_migration(
await hass.async_stop() await hass.async_stop()
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.parametrize("enable_migrate_event_ids", [True]) @pytest.mark.parametrize("enable_migrate_event_ids", [True])
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
@ -625,6 +627,7 @@ async def test_out_of_disk_space_while_rebuild_states_table(
@pytest.mark.usefixtures("skip_by_db_engine") @pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.skip_on_db_engine(["sqlite"]) @pytest.mark.skip_on_db_engine(["sqlite"])
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.parametrize("enable_migrate_event_ids", [True]) @pytest.mark.parametrize("enable_migrate_event_ids", [True])
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage