Migrate EventData management to a table manager (#89716)

This commit is contained in:
J. Nick Koston 2023-03-14 21:40:59 -10:00 committed by GitHub
parent bf8c4cae27
commit 59de7f3057
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 184 additions and 111 deletions

View File

@ -80,15 +80,14 @@ from .models import (
from .pool import POOL_SIZE, MutexPool, RecorderPool
from .queries import (
find_shared_attributes_id,
find_shared_data_id,
get_shared_attributes,
get_shared_event_datas,
has_entity_ids_to_migrate,
has_event_type_to_migrate,
has_events_context_ids_to_migrate,
has_states_context_ids_to_migrate,
)
from .run_history import RunHistory
from .table_managers.event_data import EventDataManager
from .table_managers.event_types import EventTypeManager
from .table_managers.states_meta import StatesMetaManager
from .tasks import (
@ -144,7 +143,6 @@ EXPIRE_AFTER_COMMITS = 120
# - How frequently states with overlapping attributes will change
# - How much memory our low end hardware has
STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048
EVENT_DATA_ID_CACHE_SIZE = 2048
SHUTDOWN_TASK = object()
@ -220,11 +218,10 @@ class Recorder(threading.Thread):
self._commits_without_expire = 0
self._old_states: dict[str | None, States] = {}
self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE)
self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE)
self.event_type_manager = EventTypeManager()
self.states_meta_manager = StatesMetaManager()
self.event_data_manager = EventDataManager(self)
self.event_type_manager = EventTypeManager(self)
self.states_meta_manager = StatesMetaManager(self)
self._pending_state_attributes: dict[str, StateAttributes] = {}
self._pending_event_data: dict[str, EventData] = {}
self._pending_expunge: list[States] = []
self.event_session: Session | None = None
self._get_session: Callable[[], Session] | None = None
@ -780,7 +777,7 @@ class Recorder(threading.Thread):
assert self.event_session is not None
self._pre_process_state_change_events(state_change_events)
self._pre_process_non_state_change_events(non_state_change_events)
self.event_data_manager.load(non_state_change_events, self.event_session)
self.event_type_manager.load(non_state_change_events, self.event_session)
self.states_meta_manager.load(state_change_events, self.event_session)
@ -807,26 +804,6 @@ class Recorder(threading.Thread):
).fetchall():
self._state_attributes_ids[shared_attrs] = id_
def _pre_process_non_state_change_events(self, events: list[Event]) -> None:
"""Load startup event attributes from the database.
Since the _event_data_ids cache is empty at startup
we restore it from the database to avoid having to look up
the data in the database for every event until its primed.
"""
assert self.event_session is not None
if hashes := {
EventData.hash_shared_data_bytes(shared_event_bytes)
for event in events
if (shared_event_bytes := self._serialize_event_data_from_event(event))
}:
with self.event_session.no_autoflush:
for hash_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS):
for id_, shared_data in self.event_session.execute(
get_shared_event_datas(hash_chunk)
).fetchall():
self._event_data_ids[shared_data] = id_
def _guarded_process_one_task_or_recover(self, task: RecorderTask) -> None:
"""Process a task, guarding against exceptions to ensure the loop does not collapse."""
_LOGGER.debug("Processing task: %s", task)
@ -973,79 +950,51 @@ class Recorder(threading.Thread):
return cast(int, attributes_id[0])
return None
def _find_shared_data_in_db(self, data_hash: int, shared_data: str) -> int | None:
"""Find shared event data in the db from the hash and shared_attrs."""
#
# Avoid the event session being flushed since it will
# commit all the pending events and states to the database.
#
# The lookup has already have checked to see if the data is cached
# or going to be written in the next commit so there is no
# need to flush before checking the database.
#
assert self.event_session is not None
with self.event_session.no_autoflush:
if data_id := self.event_session.execute(
find_shared_data_id(data_hash, shared_data)
).first():
return cast(int, data_id[0])
return None
def _serialize_event_data_from_event(self, event: Event) -> bytes | None:
"""Serialize event data."""
try:
return EventData.shared_data_bytes_from_event(event, self.dialect_name)
except JSON_ENCODE_EXCEPTIONS as ex:
_LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex)
return None
def _process_non_state_changed_event_into_session(self, event: Event) -> None:
"""Process any event into the session except state changed."""
event_session = self.event_session
assert event_session is not None
session = self.event_session
assert session is not None
dbevent = Events.from_event(event)
# Map the event_type to the EventTypes table
event_type_manager = self.event_type_manager
if pending_event_types := event_type_manager.get_pending(event.event_type):
dbevent.event_type_rel = pending_event_types
elif event_type_id := event_type_manager.get(event.event_type, event_session):
elif event_type_id := event_type_manager.get(event.event_type, session):
dbevent.event_type_id = event_type_id
else:
event_types = EventTypes(event_type=event.event_type)
event_type_manager.add_pending(event_types)
event_session.add(event_types)
session.add(event_types)
dbevent.event_type_rel = event_types
if not event.data:
event_session.add(dbevent)
session.add(dbevent)
return
if not (shared_data_bytes := self._serialize_event_data_from_event(event)):
event_data_manager = self.event_data_manager
if not (shared_data_bytes := event_data_manager.serialize_from_event(event)):
return
# Map the event data to the EventData table
shared_data = shared_data_bytes.decode("utf-8")
# Matching attributes found in the pending commit
if pending_event_data := self._pending_event_data.get(shared_data):
if pending_event_data := event_data_manager.get_pending(shared_data):
dbevent.event_data_rel = pending_event_data
# Matching attributes id found in the cache
elif data_id := self._event_data_ids.get(shared_data):
elif (data_id := event_data_manager.get_from_cache(shared_data)) or (
(hash_ := EventData.hash_shared_data_bytes(shared_data_bytes))
and (data_id := event_data_manager.get(shared_data, hash_, session))
):
dbevent.data_id = data_id
else:
data_hash = EventData.hash_shared_data_bytes(shared_data_bytes)
# Matching attributes found in the database
if data_id := self._find_shared_data_in_db(data_hash, shared_data):
self._event_data_ids[shared_data] = dbevent.data_id = data_id
# No matching attributes found, save them in the DB
else:
dbevent_data = EventData(shared_data=shared_data, hash=data_hash)
dbevent.event_data_rel = self._pending_event_data[
shared_data
] = dbevent_data
event_session.add(dbevent_data)
dbevent_data = EventData(shared_data=shared_data, hash=hash_)
event_data_manager.add_pending(dbevent_data)
session.add(dbevent_data)
dbevent.event_data_rel = dbevent_data
event_session.add(dbevent)
session.add(dbevent)
def _serialize_state_attributes_from_event(self, event: Event) -> bytes | None:
"""Serialize state changed event data."""
@ -1184,9 +1133,7 @@ class Recorder(threading.Thread):
state_attr.shared_attrs
] = state_attr.attributes_id
self._pending_state_attributes = {}
for event_data in self._pending_event_data.values():
self._event_data_ids[event_data.shared_data] = event_data.data_id
self._pending_event_data = {}
self.event_data_manager.post_commit_pending()
self.event_type_manager.post_commit_pending()
self.states_meta_manager.post_commit_pending()
@ -1212,9 +1159,8 @@ class Recorder(threading.Thread):
"""Close the event session."""
self._old_states.clear()
self._state_attributes_ids.clear()
self._event_data_ids.clear()
self._pending_state_attributes.clear()
self._pending_event_data.clear()
self.event_data_manager.reset()
self.event_type_manager.reset()
self.states_meta_manager.reset()

View File

@ -479,21 +479,6 @@ def _evict_purged_states_from_old_states_cache(
old_states.pop(old_state_reversed[purged_state_id], None)
def _evict_purged_data_from_data_cache(
instance: Recorder, purged_data_ids: set[int]
) -> None:
"""Evict purged data ids from the data ids cache."""
# Make a map from data_id to the data json
event_data_ids = instance._event_data_ids # pylint: disable=protected-access
event_data_ids_reversed = {
data_id: data for data, data_id in event_data_ids.items()
}
# Evict any purged data from the event_data_ids cache
for purged_attribute_id in purged_data_ids.intersection(event_data_ids_reversed):
event_data_ids.pop(event_data_ids_reversed[purged_attribute_id], None)
def _evict_purged_attributes_from_attributes_cache(
instance: Recorder, purged_attributes_ids: set[int]
) -> None:
@ -539,7 +524,7 @@ def _purge_batch_data_ids(
_LOGGER.debug("Deleted %s data events", deleted_rows)
# Evict any entries in the event_data_ids cache referring to a purged state
_evict_purged_data_from_data_cache(instance, data_ids)
instance.event_data_manager.evict_purged(data_ids)
def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None:

View File

@ -85,15 +85,6 @@ def find_shared_attributes_id(
)
def find_shared_data_id(attr_hash: int, shared_data: str) -> StatementLambdaElement:
"""Find a data_id by hash and shared_data."""
return lambda_stmt(
lambda: select(EventData.data_id)
.filter(EventData.hash == attr_hash)
.filter(EventData.shared_data == shared_data)
)
def _state_attrs_exist(attr: int | None) -> Select:
"""Check if a state attributes id exists in the states table."""
# https://github.com/sqlalchemy/sqlalchemy/issues/9189

View File

@ -1 +1,15 @@
"""Managers for each table."""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..core import Recorder
class BaseTableManager:
"""Base class for table managers."""
def __init__(self, recorder: "Recorder") -> None:
"""Initialize the table manager."""
self.active = False
self.recorder = recorder

View File

@ -0,0 +1,128 @@
"""Support managing EventData."""
from __future__ import annotations
from collections.abc import Iterable
import logging
from typing import TYPE_CHECKING, cast
from lru import LRU # pylint: disable=no-name-in-module
from sqlalchemy.orm.session import Session
from homeassistant.core import Event
from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS
from . import BaseTableManager
from ..const import SQLITE_MAX_BIND_VARS
from ..db_schema import EventData
from ..queries import get_shared_event_datas
from ..util import chunked
if TYPE_CHECKING:
from ..core import Recorder
CACHE_SIZE = 2048
_LOGGER = logging.getLogger(__name__)
class EventDataManager(BaseTableManager):
"""Manage the EventData table."""
def __init__(self, recorder: Recorder) -> None:
"""Initialize the event type manager."""
self._id_map: dict[str, int] = LRU(CACHE_SIZE)
self._pending: dict[str, EventData] = {}
super().__init__(recorder)
self.active = True # always active
def serialize_from_event(self, event: Event) -> bytes | None:
"""Serialize event data."""
try:
return EventData.shared_data_bytes_from_event(
event, self.recorder.dialect_name
)
except JSON_ENCODE_EXCEPTIONS as ex:
_LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex)
return None
def load(self, events: list[Event], session: Session) -> None:
"""Load the shared_datas to data_ids mapping into memory from events."""
if hashes := {
EventData.hash_shared_data_bytes(shared_event_bytes)
for event in events
if (shared_event_bytes := self.serialize_from_event(event))
}:
self._load_from_hashes(hashes, session)
def get(self, shared_data: str, data_hash: int, session: Session) -> int | None:
"""Resolve shared_datas to the data_id."""
return self.get_many(((shared_data, data_hash),), session)[shared_data]
def get_from_cache(self, shared_data: str) -> int | None:
"""Resolve shared_data to the data_id without accessing the underlying database."""
return self._id_map.get(shared_data)
def get_many(
self, shared_data_data_hashs: Iterable[tuple[str, int]], session: Session
) -> dict[str, int | None]:
"""Resolve shared_datas to data_ids."""
results: dict[str, int | None] = {}
missing_hashes: set[int] = set()
for shared_data, data_hash in shared_data_data_hashs:
if (data_id := self._id_map.get(shared_data)) is None:
missing_hashes.add(data_hash)
results[shared_data] = data_id
if not missing_hashes:
return results
return results | self._load_from_hashes(missing_hashes, session)
def _load_from_hashes(
self, hashes: Iterable[int], session: Session
) -> dict[str, int | None]:
"""Load the shared_datas to data_ids mapping into memory from a list of hashes."""
results: dict[str, int | None] = {}
with session.no_autoflush:
for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS):
for data_id, shared_data in session.execute(
get_shared_event_datas(hashs_chunk)
):
results[shared_data] = self._id_map[shared_data] = cast(
int, data_id
)
return results
def get_pending(self, shared_data: str) -> EventData | None:
"""Get pending EventData that have not be assigned ids yet."""
return self._pending.get(shared_data)
def add_pending(self, db_event_data: EventData) -> None:
"""Add a pending EventData that will be committed at the next interval."""
assert db_event_data.shared_data is not None
shared_data: str = db_event_data.shared_data
self._pending[shared_data] = db_event_data
def post_commit_pending(self) -> None:
"""Call after commit to load the data_ids of the new EventData into the LRU."""
for shared_data, db_event_data in self._pending.items():
self._id_map[shared_data] = db_event_data.data_id
self._pending.clear()
def reset(self) -> None:
"""Reset the event manager after the database has been reset or changed."""
self._id_map.clear()
self._pending.clear()
def evict_purged(self, data_ids: set[int]) -> None:
"""Evict purged data_ids from the cache when they are no longer used."""
id_map = self._id_map
event_data_ids_reversed = {
data_id: shared_data for shared_data, data_id in id_map.items()
}
# Evict any purged data from the cache
for purged_data_id in data_ids.intersection(event_data_ids_reversed):
id_map.pop(event_data_ids_reversed[purged_data_id], None)

View File

@ -2,29 +2,34 @@
from __future__ import annotations
from collections.abc import Iterable
from typing import cast
from typing import TYPE_CHECKING, cast
from lru import LRU # pylint: disable=no-name-in-module
from sqlalchemy.orm.session import Session
from homeassistant.core import Event
from . import BaseTableManager
from ..const import SQLITE_MAX_BIND_VARS
from ..db_schema import EventTypes
from ..queries import find_event_type_ids
from ..util import chunked
if TYPE_CHECKING:
from ..core import Recorder
CACHE_SIZE = 2048
class EventTypeManager:
class EventTypeManager(BaseTableManager):
"""Manage the EventTypes table."""
def __init__(self) -> None:
def __init__(self, recorder: Recorder) -> None:
"""Initialize the event type manager."""
self._id_map: dict[str, int] = LRU(CACHE_SIZE)
self._pending: dict[str, EventTypes] = {}
self.active = False
super().__init__(recorder)
def load(self, events: list[Event], session: Session) -> None:
"""Load the event_type to event_type_ids mapping into memory."""

View File

@ -2,29 +2,33 @@
from __future__ import annotations
from collections.abc import Iterable
from typing import cast
from typing import TYPE_CHECKING, cast
from lru import LRU # pylint: disable=no-name-in-module
from sqlalchemy.orm.session import Session
from homeassistant.core import Event
from . import BaseTableManager
from ..const import SQLITE_MAX_BIND_VARS
from ..db_schema import StatesMeta
from ..queries import find_all_states_metadata_ids, find_states_metadata_ids
from ..util import chunked
if TYPE_CHECKING:
from ..core import Recorder
CACHE_SIZE = 8192
class StatesMetaManager:
class StatesMetaManager(BaseTableManager):
"""Manage the StatesMeta table."""
def __init__(self) -> None:
def __init__(self, recorder: Recorder) -> None:
"""Initialize the states meta manager."""
self._id_map: dict[str, int] = LRU(CACHE_SIZE)
self._pending: dict[str, StatesMeta] = {}
self.active = False
super().__init__(recorder)
def load(self, events: list[Event], session: Session) -> None:
"""Load the entity_id to metadata_id mapping into memory."""