diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index b57498e2987..3e4f11dc95f 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -80,15 +80,14 @@ from .models import ( from .pool import POOL_SIZE, MutexPool, RecorderPool from .queries import ( find_shared_attributes_id, - find_shared_data_id, get_shared_attributes, - get_shared_event_datas, has_entity_ids_to_migrate, has_event_type_to_migrate, has_events_context_ids_to_migrate, has_states_context_ids_to_migrate, ) from .run_history import RunHistory +from .table_managers.event_data import EventDataManager from .table_managers.event_types import EventTypeManager from .table_managers.states_meta import StatesMetaManager from .tasks import ( @@ -144,7 +143,6 @@ EXPIRE_AFTER_COMMITS = 120 # - How frequently states with overlapping attributes will change # - How much memory our low end hardware has STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 -EVENT_DATA_ID_CACHE_SIZE = 2048 SHUTDOWN_TASK = object() @@ -220,11 +218,10 @@ class Recorder(threading.Thread): self._commits_without_expire = 0 self._old_states: dict[str | None, States] = {} self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) - self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE) - self.event_type_manager = EventTypeManager() - self.states_meta_manager = StatesMetaManager() + self.event_data_manager = EventDataManager(self) + self.event_type_manager = EventTypeManager(self) + self.states_meta_manager = StatesMetaManager(self) self._pending_state_attributes: dict[str, StateAttributes] = {} - self._pending_event_data: dict[str, EventData] = {} self._pending_expunge: list[States] = [] self.event_session: Session | None = None self._get_session: Callable[[], Session] | None = None @@ -780,7 +777,7 @@ class Recorder(threading.Thread): assert self.event_session is not None self._pre_process_state_change_events(state_change_events) - self._pre_process_non_state_change_events(non_state_change_events) + self.event_data_manager.load(non_state_change_events, self.event_session) self.event_type_manager.load(non_state_change_events, self.event_session) self.states_meta_manager.load(state_change_events, self.event_session) @@ -807,26 +804,6 @@ class Recorder(threading.Thread): ).fetchall(): self._state_attributes_ids[shared_attrs] = id_ - def _pre_process_non_state_change_events(self, events: list[Event]) -> None: - """Load startup event attributes from the database. - - Since the _event_data_ids cache is empty at startup - we restore it from the database to avoid having to look up - the data in the database for every event until its primed. - """ - assert self.event_session is not None - if hashes := { - EventData.hash_shared_data_bytes(shared_event_bytes) - for event in events - if (shared_event_bytes := self._serialize_event_data_from_event(event)) - }: - with self.event_session.no_autoflush: - for hash_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): - for id_, shared_data in self.event_session.execute( - get_shared_event_datas(hash_chunk) - ).fetchall(): - self._event_data_ids[shared_data] = id_ - def _guarded_process_one_task_or_recover(self, task: RecorderTask) -> None: """Process a task, guarding against exceptions to ensure the loop does not collapse.""" _LOGGER.debug("Processing task: %s", task) @@ -973,79 +950,51 @@ class Recorder(threading.Thread): return cast(int, attributes_id[0]) return None - def _find_shared_data_in_db(self, data_hash: int, shared_data: str) -> int | None: - """Find shared event data in the db from the hash and shared_attrs.""" - # - # Avoid the event session being flushed since it will - # commit all the pending events and states to the database. - # - # The lookup has already have checked to see if the data is cached - # or going to be written in the next commit so there is no - # need to flush before checking the database. - # - assert self.event_session is not None - with self.event_session.no_autoflush: - if data_id := self.event_session.execute( - find_shared_data_id(data_hash, shared_data) - ).first(): - return cast(int, data_id[0]) - return None - - def _serialize_event_data_from_event(self, event: Event) -> bytes | None: - """Serialize event data.""" - try: - return EventData.shared_data_bytes_from_event(event, self.dialect_name) - except JSON_ENCODE_EXCEPTIONS as ex: - _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex) - return None - def _process_non_state_changed_event_into_session(self, event: Event) -> None: """Process any event into the session except state changed.""" - event_session = self.event_session - assert event_session is not None + session = self.event_session + assert session is not None dbevent = Events.from_event(event) # Map the event_type to the EventTypes table event_type_manager = self.event_type_manager if pending_event_types := event_type_manager.get_pending(event.event_type): dbevent.event_type_rel = pending_event_types - elif event_type_id := event_type_manager.get(event.event_type, event_session): + elif event_type_id := event_type_manager.get(event.event_type, session): dbevent.event_type_id = event_type_id else: event_types = EventTypes(event_type=event.event_type) event_type_manager.add_pending(event_types) - event_session.add(event_types) + session.add(event_types) dbevent.event_type_rel = event_types if not event.data: - event_session.add(dbevent) + session.add(dbevent) return - if not (shared_data_bytes := self._serialize_event_data_from_event(event)): + event_data_manager = self.event_data_manager + if not (shared_data_bytes := event_data_manager.serialize_from_event(event)): return # Map the event data to the EventData table shared_data = shared_data_bytes.decode("utf-8") # Matching attributes found in the pending commit - if pending_event_data := self._pending_event_data.get(shared_data): + if pending_event_data := event_data_manager.get_pending(shared_data): dbevent.event_data_rel = pending_event_data # Matching attributes id found in the cache - elif data_id := self._event_data_ids.get(shared_data): + elif (data_id := event_data_manager.get_from_cache(shared_data)) or ( + (hash_ := EventData.hash_shared_data_bytes(shared_data_bytes)) + and (data_id := event_data_manager.get(shared_data, hash_, session)) + ): dbevent.data_id = data_id else: - data_hash = EventData.hash_shared_data_bytes(shared_data_bytes) - # Matching attributes found in the database - if data_id := self._find_shared_data_in_db(data_hash, shared_data): - self._event_data_ids[shared_data] = dbevent.data_id = data_id # No matching attributes found, save them in the DB - else: - dbevent_data = EventData(shared_data=shared_data, hash=data_hash) - dbevent.event_data_rel = self._pending_event_data[ - shared_data - ] = dbevent_data - event_session.add(dbevent_data) + dbevent_data = EventData(shared_data=shared_data, hash=hash_) + event_data_manager.add_pending(dbevent_data) + session.add(dbevent_data) + dbevent.event_data_rel = dbevent_data - event_session.add(dbevent) + session.add(dbevent) def _serialize_state_attributes_from_event(self, event: Event) -> bytes | None: """Serialize state changed event data.""" @@ -1184,9 +1133,7 @@ class Recorder(threading.Thread): state_attr.shared_attrs ] = state_attr.attributes_id self._pending_state_attributes = {} - for event_data in self._pending_event_data.values(): - self._event_data_ids[event_data.shared_data] = event_data.data_id - self._pending_event_data = {} + self.event_data_manager.post_commit_pending() self.event_type_manager.post_commit_pending() self.states_meta_manager.post_commit_pending() @@ -1212,9 +1159,8 @@ class Recorder(threading.Thread): """Close the event session.""" self._old_states.clear() self._state_attributes_ids.clear() - self._event_data_ids.clear() self._pending_state_attributes.clear() - self._pending_event_data.clear() + self.event_data_manager.reset() self.event_type_manager.reset() self.states_meta_manager.reset() diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index c644a17be0b..5dffead5978 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -479,21 +479,6 @@ def _evict_purged_states_from_old_states_cache( old_states.pop(old_state_reversed[purged_state_id], None) -def _evict_purged_data_from_data_cache( - instance: Recorder, purged_data_ids: set[int] -) -> None: - """Evict purged data ids from the data ids cache.""" - # Make a map from data_id to the data json - event_data_ids = instance._event_data_ids # pylint: disable=protected-access - event_data_ids_reversed = { - data_id: data for data, data_id in event_data_ids.items() - } - - # Evict any purged data from the event_data_ids cache - for purged_attribute_id in purged_data_ids.intersection(event_data_ids_reversed): - event_data_ids.pop(event_data_ids_reversed[purged_attribute_id], None) - - def _evict_purged_attributes_from_attributes_cache( instance: Recorder, purged_attributes_ids: set[int] ) -> None: @@ -539,7 +524,7 @@ def _purge_batch_data_ids( _LOGGER.debug("Deleted %s data events", deleted_rows) # Evict any entries in the event_data_ids cache referring to a purged state - _evict_purged_data_from_data_cache(instance, data_ids) + instance.event_data_manager.evict_purged(data_ids) def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None: diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index 737faf2f7ec..0882da9d48c 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -85,15 +85,6 @@ def find_shared_attributes_id( ) -def find_shared_data_id(attr_hash: int, shared_data: str) -> StatementLambdaElement: - """Find a data_id by hash and shared_data.""" - return lambda_stmt( - lambda: select(EventData.data_id) - .filter(EventData.hash == attr_hash) - .filter(EventData.shared_data == shared_data) - ) - - def _state_attrs_exist(attr: int | None) -> Select: """Check if a state attributes id exists in the states table.""" # https://github.com/sqlalchemy/sqlalchemy/issues/9189 diff --git a/homeassistant/components/recorder/table_managers/__init__.py b/homeassistant/components/recorder/table_managers/__init__.py index c011520204b..50ea8f0e11f 100644 --- a/homeassistant/components/recorder/table_managers/__init__.py +++ b/homeassistant/components/recorder/table_managers/__init__.py @@ -1 +1,15 @@ """Managers for each table.""" + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ..core import Recorder + + +class BaseTableManager: + """Base class for table managers.""" + + def __init__(self, recorder: "Recorder") -> None: + """Initialize the table manager.""" + self.active = False + self.recorder = recorder diff --git a/homeassistant/components/recorder/table_managers/event_data.py b/homeassistant/components/recorder/table_managers/event_data.py new file mode 100644 index 00000000000..975681b4667 --- /dev/null +++ b/homeassistant/components/recorder/table_managers/event_data.py @@ -0,0 +1,128 @@ +"""Support managing EventData.""" +from __future__ import annotations + +from collections.abc import Iterable +import logging +from typing import TYPE_CHECKING, cast + +from lru import LRU # pylint: disable=no-name-in-module +from sqlalchemy.orm.session import Session + +from homeassistant.core import Event +from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS + +from . import BaseTableManager +from ..const import SQLITE_MAX_BIND_VARS +from ..db_schema import EventData +from ..queries import get_shared_event_datas +from ..util import chunked + +if TYPE_CHECKING: + from ..core import Recorder + + +CACHE_SIZE = 2048 + +_LOGGER = logging.getLogger(__name__) + + +class EventDataManager(BaseTableManager): + """Manage the EventData table.""" + + def __init__(self, recorder: Recorder) -> None: + """Initialize the event type manager.""" + self._id_map: dict[str, int] = LRU(CACHE_SIZE) + self._pending: dict[str, EventData] = {} + super().__init__(recorder) + self.active = True # always active + + def serialize_from_event(self, event: Event) -> bytes | None: + """Serialize event data.""" + try: + return EventData.shared_data_bytes_from_event( + event, self.recorder.dialect_name + ) + except JSON_ENCODE_EXCEPTIONS as ex: + _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex) + return None + + def load(self, events: list[Event], session: Session) -> None: + """Load the shared_datas to data_ids mapping into memory from events.""" + if hashes := { + EventData.hash_shared_data_bytes(shared_event_bytes) + for event in events + if (shared_event_bytes := self.serialize_from_event(event)) + }: + self._load_from_hashes(hashes, session) + + def get(self, shared_data: str, data_hash: int, session: Session) -> int | None: + """Resolve shared_datas to the data_id.""" + return self.get_many(((shared_data, data_hash),), session)[shared_data] + + def get_from_cache(self, shared_data: str) -> int | None: + """Resolve shared_data to the data_id without accessing the underlying database.""" + return self._id_map.get(shared_data) + + def get_many( + self, shared_data_data_hashs: Iterable[tuple[str, int]], session: Session + ) -> dict[str, int | None]: + """Resolve shared_datas to data_ids.""" + results: dict[str, int | None] = {} + missing_hashes: set[int] = set() + for shared_data, data_hash in shared_data_data_hashs: + if (data_id := self._id_map.get(shared_data)) is None: + missing_hashes.add(data_hash) + + results[shared_data] = data_id + + if not missing_hashes: + return results + + return results | self._load_from_hashes(missing_hashes, session) + + def _load_from_hashes( + self, hashes: Iterable[int], session: Session + ) -> dict[str, int | None]: + """Load the shared_datas to data_ids mapping into memory from a list of hashes.""" + results: dict[str, int | None] = {} + with session.no_autoflush: + for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): + for data_id, shared_data in session.execute( + get_shared_event_datas(hashs_chunk) + ): + results[shared_data] = self._id_map[shared_data] = cast( + int, data_id + ) + + return results + + def get_pending(self, shared_data: str) -> EventData | None: + """Get pending EventData that have not be assigned ids yet.""" + return self._pending.get(shared_data) + + def add_pending(self, db_event_data: EventData) -> None: + """Add a pending EventData that will be committed at the next interval.""" + assert db_event_data.shared_data is not None + shared_data: str = db_event_data.shared_data + self._pending[shared_data] = db_event_data + + def post_commit_pending(self) -> None: + """Call after commit to load the data_ids of the new EventData into the LRU.""" + for shared_data, db_event_data in self._pending.items(): + self._id_map[shared_data] = db_event_data.data_id + self._pending.clear() + + def reset(self) -> None: + """Reset the event manager after the database has been reset or changed.""" + self._id_map.clear() + self._pending.clear() + + def evict_purged(self, data_ids: set[int]) -> None: + """Evict purged data_ids from the cache when they are no longer used.""" + id_map = self._id_map + event_data_ids_reversed = { + data_id: shared_data for shared_data, data_id in id_map.items() + } + # Evict any purged data from the cache + for purged_data_id in data_ids.intersection(event_data_ids_reversed): + id_map.pop(event_data_ids_reversed[purged_data_id], None) diff --git a/homeassistant/components/recorder/table_managers/event_types.py b/homeassistant/components/recorder/table_managers/event_types.py index 21bcf78bf1a..cc7490f8d73 100644 --- a/homeassistant/components/recorder/table_managers/event_types.py +++ b/homeassistant/components/recorder/table_managers/event_types.py @@ -2,29 +2,34 @@ from __future__ import annotations from collections.abc import Iterable -from typing import cast +from typing import TYPE_CHECKING, cast from lru import LRU # pylint: disable=no-name-in-module from sqlalchemy.orm.session import Session from homeassistant.core import Event +from . import BaseTableManager from ..const import SQLITE_MAX_BIND_VARS from ..db_schema import EventTypes from ..queries import find_event_type_ids from ..util import chunked +if TYPE_CHECKING: + from ..core import Recorder + + CACHE_SIZE = 2048 -class EventTypeManager: +class EventTypeManager(BaseTableManager): """Manage the EventTypes table.""" - def __init__(self) -> None: + def __init__(self, recorder: Recorder) -> None: """Initialize the event type manager.""" self._id_map: dict[str, int] = LRU(CACHE_SIZE) self._pending: dict[str, EventTypes] = {} - self.active = False + super().__init__(recorder) def load(self, events: list[Event], session: Session) -> None: """Load the event_type to event_type_ids mapping into memory.""" diff --git a/homeassistant/components/recorder/table_managers/states_meta.py b/homeassistant/components/recorder/table_managers/states_meta.py index 8af872ff969..0352587aa60 100644 --- a/homeassistant/components/recorder/table_managers/states_meta.py +++ b/homeassistant/components/recorder/table_managers/states_meta.py @@ -2,29 +2,33 @@ from __future__ import annotations from collections.abc import Iterable -from typing import cast +from typing import TYPE_CHECKING, cast from lru import LRU # pylint: disable=no-name-in-module from sqlalchemy.orm.session import Session from homeassistant.core import Event +from . import BaseTableManager from ..const import SQLITE_MAX_BIND_VARS from ..db_schema import StatesMeta from ..queries import find_all_states_metadata_ids, find_states_metadata_ids from ..util import chunked +if TYPE_CHECKING: + from ..core import Recorder + CACHE_SIZE = 8192 -class StatesMetaManager: +class StatesMetaManager(BaseTableManager): """Manage the StatesMeta table.""" - def __init__(self) -> None: + def __init__(self, recorder: Recorder) -> None: """Initialize the states meta manager.""" self._id_map: dict[str, int] = LRU(CACHE_SIZE) self._pending: dict[str, StatesMeta] = {} - self.active = False + super().__init__(recorder) def load(self, events: list[Event], session: Session) -> None: """Load the entity_id to metadata_id mapping into memory."""