diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index d72666f76b2..6bf46efd360 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -48,6 +48,8 @@ CONTEXT_ID_AS_BINARY_SCHEMA_VERSION = 36 EVENT_TYPE_IDS_SCHEMA_VERSION = 37 STATES_META_SCHEMA_VERSION = 38 +LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION = 28 + class SupportedDialect(StrEnum): """Supported dialects.""" diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 8d1e8708657..538d07eb4d7 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -46,6 +46,7 @@ from .const import ( DOMAIN, EVENT_TYPE_IDS_SCHEMA_VERSION, KEEPALIVE_TIME, + LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION, MARIADB_PYMYSQL_URL_PREFIX, MARIADB_URL_PREFIX, MAX_QUEUE_BACKLOG, @@ -57,7 +58,9 @@ from .const import ( SupportedDialect, ) from .db_schema import ( + LEGACY_STATES_EVENT_ID_INDEX, SCHEMA_VERSION, + TABLE_STATES, Base, EventData, Events, @@ -93,6 +96,7 @@ from .tasks import ( CompileMissingStatisticsTask, DatabaseLockTask, EntityIDMigrationTask, + EventIdMigrationTask, EventsContextIDMigrationTask, EventTask, EventTypeIDMigrationTask, @@ -113,6 +117,7 @@ from .util import ( dburl_to_path, end_incomplete_runs, execute_stmt_lambda_element, + get_index_by_name, is_second_sunday, move_away_broken_database, session_scope, @@ -730,6 +735,15 @@ class Recorder(threading.Thread): _LOGGER.debug("Activating states_meta manager as all data is migrated") self.states_meta_manager.active = True + if self.schema_version > LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION: + with contextlib.suppress(SQLAlchemyError): + # If the index of event_ids on the states table is still present + # we need to queue a task to remove it. + if get_index_by_name( + session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX + ): + self.queue_task(EventIdMigrationTask()) + # We must only set the db ready after we have set the table managers # to active if there is no data to migrate. # @@ -1138,6 +1152,10 @@ class Recorder(threading.Thread): """Post migrate entity_ids if needed.""" return migration.post_migrate_entity_ids(self) + def _cleanup_legacy_states_event_ids(self) -> bool: + """Cleanup legacy event_ids if needed.""" + return migration.cleanup_legacy_states_event_ids(self) + def _send_keep_alive(self) -> None: """Send a keep alive to keep the db connection open.""" assert self.event_session is not None diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 2fa3746a2c8..4826453e4c8 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -116,6 +116,7 @@ LAST_UPDATED_INDEX_TS = "ix_states_last_updated_ts" METADATA_ID_LAST_UPDATED_INDEX_TS = "ix_states_metadata_id_last_updated_ts" EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" +LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id" CONTEXT_ID_BIN_MAX_LENGTH = 16 _DEFAULT_TABLE_ARGS = { @@ -385,9 +386,7 @@ class States(Base): attributes: Mapped[str | None] = mapped_column( Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb") ) # no longer used for new rows - event_id: Mapped[int | None] = mapped_column( # no longer used for new rows - Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True - ) + event_id: Mapped[int | None] = mapped_column(Integer) # no longer used for new rows last_changed: Mapped[datetime | None] = mapped_column( DATETIME_TYPE ) # no longer used for new rows diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 08f5f21b896..4619c4531d0 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -30,6 +30,7 @@ from homeassistant.util.ulid import ulid_to_bytes from .const import SupportedDialect from .db_schema import ( CONTEXT_ID_BIN_MAX_LENGTH, + LEGACY_STATES_EVENT_ID_INDEX, SCHEMA_VERSION, STATISTICS_TABLES, TABLE_STATES, @@ -51,6 +52,7 @@ from .queries import ( find_event_type_to_migrate, find_events_context_ids_to_migrate, find_states_context_ids_to_migrate, + has_used_states_event_ids, ) from .statistics import ( correct_db_schema as statistics_correct_db_schema, @@ -64,7 +66,12 @@ from .tasks import ( PostSchemaMigrationTask, StatisticsTimestampMigrationCleanupTask, ) -from .util import database_job_retry_wrapper, retryable_database_job, session_scope +from .util import ( + database_job_retry_wrapper, + get_index_by_name, + retryable_database_job, + session_scope, +) if TYPE_CHECKING: from . import Recorder @@ -308,18 +315,7 @@ def _drop_index( with session_scope(session=session_maker()) as session, contextlib.suppress( SQLAlchemyError ): - connection = session.connection() - inspector = sqlalchemy.inspect(connection) - indexes = inspector.get_indexes(table_name) - if index_to_drop := next( - ( - possible_index["name"] - for possible_index in indexes - if possible_index["name"] - and possible_index["name"].endswith(f"_{index_name}") - ), - None, - ): + if index_to_drop := get_index_by_name(session, table_name, index_name): connection.execute(text(f"DROP INDEX {index_to_drop}")) success = True @@ -593,7 +589,7 @@ def _apply_update( # noqa: C901 # but it was removed in version 32 elif new_version == 5: # Create supporting index for States.event_id foreign key - _create_index(session_maker, "states", "ix_states_event_id") + _create_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) elif new_version == 6: _add_columns( session_maker, @@ -1529,6 +1525,33 @@ def post_migrate_entity_ids(instance: Recorder) -> bool: return is_done +@retryable_database_job("cleanup_legacy_event_ids") +def cleanup_legacy_states_event_ids(instance: Recorder) -> bool: + """Remove old event_id index from states. + + We used to link states to events using the event_id column but we no + longer store state changed events in the events table. + + If all old states have been purged and existing states are in the new + format we can drop the index since it can take up ~10MB per 1M rows. + """ + session_maker = instance.get_session + _LOGGER.debug("Cleanup legacy entity_ids") + with session_scope(session=session_maker()) as session: + result = session.execute(has_used_states_event_ids()).scalar() + # In the future we may migrate existing states to the new format + # but in practice very few of these still exist in production and + # removing the index is the likely all that needs to happen. + all_gone = not result + + if all_gone: + # Only drop the index if there are no more event_ids in the states table + # ex all NULL + _drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX) + + return True + + def _initialize_database(session: Session) -> bool: """Initialize a new database. diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index 5a2c7040f43..f983224e212 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -745,6 +745,13 @@ def batch_cleanup_entity_ids() -> StatementLambdaElement: ) +def has_used_states_event_ids() -> StatementLambdaElement: + """Check if there are used event_ids in the states table.""" + return lambda_stmt( + lambda: select(States.state_id).filter(States.event_id.isnot(None)).limit(1) + ) + + def has_events_context_ids_to_migrate() -> StatementLambdaElement: """Check if there are events context ids to migrate.""" return lambda_stmt( diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index 5762a9ab69c..ef8f6a95a7c 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -438,3 +438,17 @@ class EntityIDPostMigrationTask(RecorderTask): ): # Schedule a new migration task if this one didn't finish instance.queue_task(EntityIDPostMigrationTask()) + + +@dataclass +class EventIdMigrationTask(RecorderTask): + """An object to insert into the recorder queue to cleanup legacy event_ids in the states table. + + This task should only be queued if the ix_states_event_id index exists + since it is used to scan the states table and it will be removed after this + task is run if its no longer needed. + """ + + def run(self, instance: Recorder) -> None: + """Clean up the legacy event_id index on states.""" + instance._cleanup_legacy_states_event_ids() # pylint: disable=[protected-access] diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index ae09f9fd6a2..4ec0a0c4501 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -18,7 +18,7 @@ from awesomeversion import ( AwesomeVersionStrategy, ) import ciso8601 -from sqlalchemy import text +from sqlalchemy import inspect, text from sqlalchemy.engine import Result, Row from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.orm.query import Query @@ -832,3 +832,22 @@ def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]: From more-itertools """ return iter(partial(take, chunked_num, iter(iterable)), []) + + +def get_index_by_name(session: Session, table_name: str, index_name: str) -> str | None: + """Get an index by name.""" + connection = session.connection() + inspector = inspect(connection) + indexes = inspector.get_indexes(table_name) + return next( + ( + possible_index["name"] + for possible_index in indexes + if possible_index["name"] + and ( + possible_index["name"] == index_name + or possible_index["name"].endswith(f"_{index_name}") + ) + ), + None, + ) diff --git a/tests/components/recorder/test_v32_migration.py b/tests/components/recorder/test_v32_migration.py index 22aa96f8e2f..dd49d7b21e1 100644 --- a/tests/components/recorder/test_v32_migration.py +++ b/tests/components/recorder/test_v32_migration.py @@ -91,6 +91,10 @@ async def test_migrate_times( ) number_of_migrations = 5 + def _get_states_index_names(): + with session_scope(hass=hass) as session: + return inspect(session.connection()).get_indexes("states") + with patch.object(recorder, "db_schema", old_db_schema), patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION ), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object( @@ -113,6 +117,8 @@ async def test_migrate_times( "homeassistant.components.recorder.Recorder._migrate_entity_ids", ), patch( "homeassistant.components.recorder.Recorder._post_migrate_entity_ids" + ), patch( + "homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids" ): hass = await async_test_home_assistant(asyncio.get_running_loop()) recorder_helper.async_initialize_recorder(hass) @@ -132,11 +138,18 @@ async def test_migrate_times( await hass.async_block_till_done() await recorder.get_instance(hass).async_block_till_done() + states_indexes = await recorder.get_instance(hass).async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + await hass.async_stop() await hass.async_block_till_done() dt_util.DEFAULT_TIME_ZONE = ORIG_TZ + assert "ix_states_event_id" in states_index_names + # Test that the duplicates are removed during migration from schema 23 hass = await async_test_home_assistant(asyncio.get_running_loop()) recorder_helper.async_initialize_recorder(hass) @@ -186,13 +199,20 @@ async def test_migrate_times( with session_scope(hass=hass) as session: return inspect(session.connection()).get_indexes("events") - indexes = await recorder.get_instance(hass).async_add_executor_job( + events_indexes = await recorder.get_instance(hass).async_add_executor_job( _get_events_index_names ) - index_names = {index["name"] for index in indexes} + events_index_names = {index["name"] for index in events_indexes} - assert "ix_events_context_id_bin" in index_names - assert "ix_events_context_id" not in index_names + assert "ix_events_context_id_bin" in events_index_names + assert "ix_events_context_id" not in events_index_names + + states_indexes = await recorder.get_instance(hass).async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + + assert "ix_states_event_id" not in states_index_names await hass.async_stop() dt_util.DEFAULT_TIME_ZONE = ORIG_TZ