Improve performance of logbook processor humanify (#123157)

This commit is contained in:
J. Nick Koston 2024-08-05 04:33:37 -05:00 committed by GitHub
parent ab811f70b1
commit 5b7fd29797
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -7,7 +7,7 @@ from dataclasses import dataclass
from datetime import datetime as dt from datetime import datetime as dt
import logging import logging
import time import time
from typing import Any from typing import TYPE_CHECKING, Any
from sqlalchemy.engine import Result from sqlalchemy.engine import Result
from sqlalchemy.engine.row import Row from sqlalchemy.engine.row import Row
@ -211,35 +211,30 @@ def _humanify(
continuous_sensors: dict[str, bool] = {} continuous_sensors: dict[str, bool] = {}
context_lookup = logbook_run.context_lookup context_lookup = logbook_run.context_lookup
external_events = logbook_run.external_events external_events = logbook_run.external_events
event_cache = logbook_run.event_cache event_cache_get = logbook_run.event_cache.get
entity_name_cache = logbook_run.entity_name_cache entity_name_cache_get = logbook_run.entity_name_cache.get
include_entity_name = logbook_run.include_entity_name include_entity_name = logbook_run.include_entity_name
timestamp = logbook_run.timestamp timestamp = logbook_run.timestamp
memoize_new_contexts = logbook_run.memoize_new_contexts memoize_new_contexts = logbook_run.memoize_new_contexts
get_context = context_augmenter.get_context
context_id_bin: bytes
data: dict[str, Any]
# Process rows # Process rows
for row in rows: for row in rows:
context_id_bin: bytes = row[CONTEXT_ID_BIN_POS] context_id_bin = row[CONTEXT_ID_BIN_POS]
if memoize_new_contexts and context_id_bin not in context_lookup: if memoize_new_contexts and context_id_bin not in context_lookup:
context_lookup[context_id_bin] = row context_lookup[context_id_bin] = row
if row[CONTEXT_ONLY_POS]: if row[CONTEXT_ONLY_POS]:
continue continue
event_type = row[EVENT_TYPE_POS] event_type = row[EVENT_TYPE_POS]
if event_type == EVENT_CALL_SERVICE: if event_type == EVENT_CALL_SERVICE:
continue continue
time_fired_ts = row[TIME_FIRED_TS_POS]
if timestamp:
when = time_fired_ts or time.time()
else:
when = process_timestamp_to_utc_isoformat(
dt_util.utc_from_timestamp(time_fired_ts) or dt_util.utcnow()
)
if event_type is PSEUDO_EVENT_STATE_CHANGED: if event_type is PSEUDO_EVENT_STATE_CHANGED:
entity_id = row[ENTITY_ID_POS] entity_id = row[ENTITY_ID_POS]
assert entity_id is not None if TYPE_CHECKING:
assert entity_id is not None
# Skip continuous sensors # Skip continuous sensors
if ( if (
is_continuous := continuous_sensors.get(entity_id) is_continuous := continuous_sensors.get(entity_id)
@ -250,34 +245,27 @@ def _humanify(
continue continue
data = { data = {
LOGBOOK_ENTRY_WHEN: when,
LOGBOOK_ENTRY_STATE: row[STATE_POS], LOGBOOK_ENTRY_STATE: row[STATE_POS],
LOGBOOK_ENTRY_ENTITY_ID: entity_id, LOGBOOK_ENTRY_ENTITY_ID: entity_id,
} }
if include_entity_name: if include_entity_name:
data[LOGBOOK_ENTRY_NAME] = entity_name_cache.get(entity_id) data[LOGBOOK_ENTRY_NAME] = entity_name_cache_get(entity_id)
if icon := row[ICON_POS]: if icon := row[ICON_POS]:
data[LOGBOOK_ENTRY_ICON] = icon data[LOGBOOK_ENTRY_ICON] = icon
context_augmenter.augment(data, row, context_id_bin)
yield data
elif event_type in external_events: elif event_type in external_events:
domain, describe_event = external_events[event_type] domain, describe_event = external_events[event_type]
try: try:
data = describe_event(event_cache.get(row)) data = describe_event(event_cache_get(row))
except Exception: except Exception:
_LOGGER.exception( _LOGGER.exception(
"Error with %s describe event for %s", domain, event_type "Error with %s describe event for %s", domain, event_type
) )
continue continue
data[LOGBOOK_ENTRY_WHEN] = when
data[LOGBOOK_ENTRY_DOMAIN] = domain data[LOGBOOK_ENTRY_DOMAIN] = domain
context_augmenter.augment(data, row, context_id_bin)
yield data
elif event_type == EVENT_LOGBOOK_ENTRY: elif event_type == EVENT_LOGBOOK_ENTRY:
event = event_cache.get(row) event = event_cache_get(row)
if not (event_data := event.data): if not (event_data := event.data):
continue continue
entry_domain = event_data.get(ATTR_DOMAIN) entry_domain = event_data.get(ATTR_DOMAIN)
@ -285,14 +273,41 @@ def _humanify(
if entry_domain is None and entry_entity_id is not None: if entry_domain is None and entry_entity_id is not None:
entry_domain = split_entity_id(str(entry_entity_id))[0] entry_domain = split_entity_id(str(entry_entity_id))[0]
data = { data = {
LOGBOOK_ENTRY_WHEN: when,
LOGBOOK_ENTRY_NAME: event_data.get(ATTR_NAME), LOGBOOK_ENTRY_NAME: event_data.get(ATTR_NAME),
LOGBOOK_ENTRY_MESSAGE: event_data.get(ATTR_MESSAGE), LOGBOOK_ENTRY_MESSAGE: event_data.get(ATTR_MESSAGE),
LOGBOOK_ENTRY_DOMAIN: entry_domain, LOGBOOK_ENTRY_DOMAIN: entry_domain,
LOGBOOK_ENTRY_ENTITY_ID: entry_entity_id, LOGBOOK_ENTRY_ENTITY_ID: entry_entity_id,
} }
context_augmenter.augment(data, row, context_id_bin)
yield data else:
continue
time_fired_ts = row[TIME_FIRED_TS_POS]
if timestamp:
when = time_fired_ts or time.time()
else:
when = process_timestamp_to_utc_isoformat(
dt_util.utc_from_timestamp(time_fired_ts) or dt_util.utcnow()
)
data[LOGBOOK_ENTRY_WHEN] = when
if context_user_id_bin := row[CONTEXT_USER_ID_BIN_POS]:
data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(context_user_id_bin)
# Augment context if its available but not if the context is the same as the row
# or if the context is the parent of the row
if (context_row := get_context(context_id_bin, row)) and not (
(row is context_row or _rows_ids_match(row, context_row))
and (
not (context_parent := row[CONTEXT_PARENT_ID_BIN_POS])
or not (context_row := get_context(context_parent, context_row))
or row is context_row
or _rows_ids_match(row, context_row)
)
):
context_augmenter.augment(data, context_row)
yield data
class ContextAugmenter: class ContextAugmenter:
@ -306,8 +321,8 @@ class ContextAugmenter:
self.event_cache = logbook_run.event_cache self.event_cache = logbook_run.event_cache
self.include_entity_name = logbook_run.include_entity_name self.include_entity_name = logbook_run.include_entity_name
def _get_context_row( def get_context(
self, context_id_bin: bytes | None, row: Row | EventAsRow self, context_id_bin: bytes | None, row: Row | EventAsRow | None
) -> Row | EventAsRow | None: ) -> Row | EventAsRow | None:
"""Get the context row from the id or row context.""" """Get the context row from the id or row context."""
if context_id_bin is not None and ( if context_id_bin is not None and (
@ -322,34 +337,8 @@ class ContextAugmenter:
return async_event_to_row(origin_event) return async_event_to_row(origin_event)
return None return None
def augment( def augment(self, data: dict[str, Any], context_row: Row | EventAsRow) -> None:
self, data: dict[str, Any], row: Row | EventAsRow, context_id_bin: bytes | None
) -> None:
"""Augment data from the row and cache.""" """Augment data from the row and cache."""
if context_user_id_bin := row[CONTEXT_USER_ID_BIN_POS]:
data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(context_user_id_bin)
if not (context_row := self._get_context_row(context_id_bin, row)):
return
if row is context_row or _rows_ids_match(row, context_row):
# This is the first event with the given ID. Was it directly caused by
# a parent event?
context_parent_id_bin = row[CONTEXT_PARENT_ID_BIN_POS]
if (
not context_parent_id_bin
or (
context_row := self._get_context_row(
context_parent_id_bin, context_row
)
)
is None
):
return
# Ensure the (parent) context_event exists and is not the root cause of
# this log entry.
if row is context_row or _rows_ids_match(row, context_row):
return
event_type = context_row[EVENT_TYPE_POS] event_type = context_row[EVENT_TYPE_POS]
# State change # State change
if context_entity_id := context_row[ENTITY_ID_POS]: if context_entity_id := context_row[ENTITY_ID_POS]: