From 377dff5ee4d15e413665945c133eef3ef70fd460 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 17 Mar 2023 10:45:58 -1000 Subject: [PATCH] Ensure all recorder session executes use retries or the execute helper (#89888) --- homeassistant/components/recorder/const.py | 6 +++ homeassistant/components/recorder/core.py | 40 ++++++++++++------- .../recorder/table_managers/states_meta.py | 11 ++++- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index effecf15a8b..d72666f76b2 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -43,6 +43,12 @@ KEEPALIVE_TIME = 30 EXCLUDE_ATTRIBUTES = f"{DOMAIN}_exclude_attributes_by_domain" +STATISTICS_ROWS_SCHEMA_VERSION = 23 +CONTEXT_ID_AS_BINARY_SCHEMA_VERSION = 36 +EVENT_TYPE_IDS_SCHEMA_VERSION = 37 +STATES_META_SCHEMA_VERSION = 38 + + class SupportedDialect(StrEnum): """Supported dialects.""" diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index db44baee06c..8d1e8708657 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -41,8 +41,10 @@ from homeassistant.util.enum import try_parse_enum from . import migration, statistics from .const import ( + CONTEXT_ID_AS_BINARY_SCHEMA_VERSION, DB_WORKER_PREFIX, DOMAIN, + EVENT_TYPE_IDS_SCHEMA_VERSION, KEEPALIVE_TIME, MARIADB_PYMYSQL_URL_PREFIX, MARIADB_URL_PREFIX, @@ -50,6 +52,8 @@ from .const import ( MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_URL_PREFIX, SQLITE_URL_PREFIX, + STATES_META_SCHEMA_VERSION, + STATISTICS_ROWS_SCHEMA_VERSION, SupportedDialect, ) from .db_schema import ( @@ -108,6 +112,7 @@ from .util import ( build_mysqldb_conv, dburl_to_path, end_incomplete_runs, + execute_stmt_lambda_element, is_second_sunday, move_away_broken_database, session_scope, @@ -688,24 +693,28 @@ class Recorder(threading.Thread): # since we want the frontend queries to avoid a thundering # herd of queries to find the statistics meta data if # there are a lot of statistics graphs on the frontend. - if self.schema_version >= 23: + if self.schema_version >= STATISTICS_ROWS_SCHEMA_VERSION: self.statistics_meta_manager.load(session) if ( - self.schema_version < 36 - or session.execute(has_events_context_ids_to_migrate()).scalar() + self.schema_version < CONTEXT_ID_AS_BINARY_SCHEMA_VERSION + or execute_stmt_lambda_element( + session, has_events_context_ids_to_migrate() + ) ): self.queue_task(StatesContextIDMigrationTask()) if ( - self.schema_version < 36 - or session.execute(has_states_context_ids_to_migrate()).scalar() + self.schema_version < CONTEXT_ID_AS_BINARY_SCHEMA_VERSION + or execute_stmt_lambda_element( + session, has_states_context_ids_to_migrate() + ) ): self.queue_task(EventsContextIDMigrationTask()) if ( - self.schema_version < 37 - or session.execute(has_event_type_to_migrate()).scalar() + self.schema_version < EVENT_TYPE_IDS_SCHEMA_VERSION + or execute_stmt_lambda_element(session, has_event_type_to_migrate()) ): self.queue_task(EventTypeIDMigrationTask()) else: @@ -713,8 +722,8 @@ class Recorder(threading.Thread): self.event_type_manager.active = True if ( - self.schema_version < 38 - or session.execute(has_entity_ids_to_migrate()).scalar() + self.schema_version < STATES_META_SCHEMA_VERSION + or execute_stmt_lambda_element(session, has_entity_ids_to_migrate()) ): self.queue_task(EntityIDMigrationTask()) else: @@ -1002,7 +1011,7 @@ class Recorder(threading.Thread): if states_meta_manager.active: dbstate.entity_id = None - self.event_session.add(dbstate) + session.add(dbstate) def _handle_database_error(self, err: Exception) -> bool: """Handle a database error that may result in moving away the corrupt db.""" @@ -1015,9 +1024,9 @@ class Recorder(threading.Thread): return False def _event_session_has_pending_writes(self) -> bool: - return bool( - self.event_session and (self.event_session.new or self.event_session.dirty) - ) + """Return True if there are pending writes in the event session.""" + session = self.event_session + return bool(session and (session.new or session.dirty)) def _commit_event_session_or_retry(self) -> None: """Commit the event session if there is work to do.""" @@ -1043,9 +1052,10 @@ class Recorder(threading.Thread): def _commit_event_session(self) -> None: assert self.event_session is not None + session = self.event_session self._commits_without_expire += 1 - self.event_session.commit() + session.commit() # We just committed the state attributes to the database # and we now know the attributes_ids. We can save # many selects for matching attributes by loading them @@ -1061,7 +1071,7 @@ class Recorder(threading.Thread): # do it after EXPIRE_AFTER_COMMITS commits if self._commits_without_expire >= EXPIRE_AFTER_COMMITS: self._commits_without_expire = 0 - self.event_session.expire_all() + session.expire_all() def _handle_sqlite_corruption(self) -> None: """Handle the sqlite3 database being corrupt.""" diff --git a/homeassistant/components/recorder/table_managers/states_meta.py b/homeassistant/components/recorder/table_managers/states_meta.py index 76b748d4697..c9c1ba90280 100644 --- a/homeassistant/components/recorder/table_managers/states_meta.py +++ b/homeassistant/components/recorder/table_managers/states_meta.py @@ -1,7 +1,7 @@ """Support managing StatesMeta.""" from __future__ import annotations -from collections.abc import Iterable +from collections.abc import Iterable, Sequence from typing import TYPE_CHECKING, cast from sqlalchemy.orm.session import Session @@ -63,7 +63,14 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]): This call is always thread-safe. """ with session.no_autoflush: - return dict(tuple(session.execute(find_all_states_metadata_ids()))) # type: ignore[arg-type] + return dict( + cast( + Sequence[tuple[int, str]], + execute_stmt_lambda_element( + session, find_all_states_metadata_ids() + ), + ) + ) def get_many( self, entity_ids: Iterable[str], session: Session, from_recorder: bool