De-duplicate event data into a new event_data table (#71135)

This commit is contained in:
J. Nick Koston 2022-05-01 21:01:17 -05:00 committed by GitHub
parent b8442d9340
commit c23866e5e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 526 additions and 61 deletions

View File

@ -28,6 +28,7 @@ from homeassistant.components.history import (
from homeassistant.components.http import HomeAssistantView from homeassistant.components.http import HomeAssistantView
from homeassistant.components.recorder import get_instance from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
EventData,
Events, Events,
StateAttributes, StateAttributes,
States, States,
@ -512,6 +513,7 @@ def _get_events(
# When entity_matches_only is provided, contexts and events that do not # When entity_matches_only is provided, contexts and events that do not
# contain the entity_ids are not included in the logbook response. # contain the entity_ids are not included in the logbook response.
query = _apply_event_entity_id_matchers(query, entity_ids) query = _apply_event_entity_id_matchers(query, entity_ids)
query = query.outerjoin(EventData, (Events.data_id == EventData.data_id))
query = query.union_all( query = query.union_all(
_generate_states_query( _generate_states_query(
@ -535,6 +537,8 @@ def _get_events(
if context_id is not None: if context_id is not None:
query = query.filter(Events.context_id == context_id) query = query.filter(Events.context_id == context_id)
query = query.outerjoin(EventData, (Events.data_id == EventData.data_id))
query = query.order_by(Events.time_fired) query = query.order_by(Events.time_fired)
return list( return list(
@ -545,6 +549,18 @@ def _get_events(
def _generate_events_query(session: Session) -> Query: def _generate_events_query(session: Session) -> Query:
return session.query( return session.query(
*EVENT_COLUMNS, *EVENT_COLUMNS,
EventData.shared_data,
States.state,
States.entity_id,
States.attributes,
StateAttributes.shared_attrs,
)
def _generate_events_query_without_data(session: Session) -> Query:
return session.query(
*EVENT_COLUMNS,
literal(value=None, type_=sqlalchemy.Text).label("shared_data"),
States.state, States.state,
States.entity_id, States.entity_id,
States.attributes, States.attributes,
@ -555,6 +571,7 @@ def _generate_events_query(session: Session) -> Query:
def _generate_events_query_without_states(session: Session) -> Query: def _generate_events_query_without_states(session: Session) -> Query:
return session.query( return session.query(
*EVENT_COLUMNS, *EVENT_COLUMNS,
EventData.shared_data,
literal(value=None, type_=sqlalchemy.String).label("state"), literal(value=None, type_=sqlalchemy.String).label("state"),
literal(value=None, type_=sqlalchemy.String).label("entity_id"), literal(value=None, type_=sqlalchemy.String).label("entity_id"),
literal(value=None, type_=sqlalchemy.Text).label("attributes"), literal(value=None, type_=sqlalchemy.Text).label("attributes"),
@ -570,7 +587,7 @@ def _generate_states_query(
entity_ids: Iterable[str], entity_ids: Iterable[str],
) -> Query: ) -> Query:
return ( return (
_generate_events_query(session) _generate_events_query_without_data(session)
.outerjoin(Events, (States.event_id == Events.event_id)) .outerjoin(Events, (States.event_id == Events.event_id))
.outerjoin(old_state, (States.old_state_id == old_state.state_id)) .outerjoin(old_state, (States.old_state_id == old_state.state_id))
.filter(_missing_state_matcher(old_state)) .filter(_missing_state_matcher(old_state))
@ -671,14 +688,12 @@ def _apply_event_types_filter(
def _apply_event_entity_id_matchers( def _apply_event_entity_id_matchers(
events_query: Query, entity_ids: Iterable[str] events_query: Query, entity_ids: Iterable[str]
) -> Query: ) -> Query:
return events_query.filter( ors = []
sqlalchemy.or_( for entity_id in entity_ids:
*( like = ENTITY_ID_JSON_TEMPLATE.format(entity_id)
Events.event_data.like(ENTITY_ID_JSON_TEMPLATE.format(entity_id)) ors.append(Events.event_data.like(like))
for entity_id in entity_ids ors.append(EventData.shared_data.like(like))
) return events_query.filter(sqlalchemy.or_(*ors))
)
)
def _keep_event( def _keep_event(
@ -840,7 +855,17 @@ class LazyEventPartialState:
if self._event_data: if self._event_data:
return self._event_data.get(ATTR_ENTITY_ID) return self._event_data.get(ATTR_ENTITY_ID)
result = ENTITY_ID_JSON_EXTRACT.search(self._row.event_data) result = ENTITY_ID_JSON_EXTRACT.search(
self._row.shared_data or self._row.event_data or ""
)
return result.group(1) if result else None
@property
def data_domain(self) -> str | None:
"""Extract the domain from the decoded data or json."""
result = DOMAIN_JSON_EXTRACT.search(
self._row.shared_data or self._row.event_data or ""
)
return result.group(1) if result else None return result.group(1) if result else None
@property @property
@ -851,18 +876,14 @@ class LazyEventPartialState:
) )
return result.group(1) if result else None return result.group(1) if result else None
@property
def data_domain(self) -> str | None:
"""Extract the domain from the decoded data or json."""
result = DOMAIN_JSON_EXTRACT.search(self._row.event_data)
return result.group(1) if result else None
@property @property
def data(self) -> dict[str, Any]: def data(self) -> dict[str, Any]:
"""Event data.""" """Event data."""
if not self._event_data: if self._event_data is None:
source: str = self._row.event_data source: str = self._row.shared_data or self._row.event_data
if event_data := self._event_data_cache.get(source): if not source:
self._event_data = {}
elif event_data := self._event_data_cache.get(source):
self._event_data = event_data self._event_data = event_data
else: else:
self._event_data = self._event_data_cache[source] = cast( self._event_data = self._event_data_cache[source] = cast(

View File

@ -80,6 +80,7 @@ from .const import (
from .executor import DBInterruptibleThreadPoolExecutor from .executor import DBInterruptibleThreadPoolExecutor
from .models import ( from .models import (
Base, Base,
EventData,
Events, Events,
StateAttributes, StateAttributes,
States, States,
@ -158,6 +159,7 @@ EXPIRE_AFTER_COMMITS = 120
# - How frequently states with overlapping attributes will change # - How frequently states with overlapping attributes will change
# - How much memory our low end hardware has # - How much memory our low end hardware has
STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048
EVENT_DATA_ID_CACHE_SIZE = 2048
SHUTDOWN_TASK = object() SHUTDOWN_TASK = object()
@ -639,10 +641,13 @@ class Recorder(threading.Thread):
self._commits_without_expire = 0 self._commits_without_expire = 0
self._old_states: dict[str, States] = {} self._old_states: dict[str, States] = {}
self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE)
self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE)
self._pending_state_attributes: dict[str, StateAttributes] = {} self._pending_state_attributes: dict[str, StateAttributes] = {}
self._pending_event_data: dict[str, EventData] = {}
self._pending_expunge: list[States] = [] self._pending_expunge: list[States] = []
self._bakery = bakery self._bakery = bakery
self._find_shared_attr_query: Query | None = None self._find_shared_attr_query: Query | None = None
self._find_shared_data_query: Query | None = None
self.event_session: Session | None = None self.event_session: Session | None = None
self.get_session: Callable[[], Session] | None = None self.get_session: Callable[[], Session] | None = None
self._completed_first_database_setup: bool | None = None self._completed_first_database_setup: bool | None = None
@ -1162,18 +1167,62 @@ class Recorder(threading.Thread):
return cast(int, attributes[0]) return cast(int, attributes[0])
return None return None
def _find_shared_data_in_db(self, data_hash: int, shared_data: str) -> int | None:
"""Find shared event data in the db from the hash and shared_attrs."""
#
# Avoid the event session being flushed since it will
# commit all the pending events and states to the database.
#
# The lookup has already have checked to see if the data is cached
# or going to be written in the next commit so there is no
# need to flush before checking the database.
#
assert self.event_session is not None
if self._find_shared_data_query is None:
self._find_shared_data_query = self._bakery(
lambda session: session.query(EventData.data_id)
.filter(EventData.hash == bindparam("data_hash"))
.filter(EventData.shared_data == bindparam("shared_data"))
)
with self.event_session.no_autoflush:
if (
data_id := self._find_shared_data_query(self.event_session)
.params(data_hash=data_hash, shared_data=shared_data)
.first()
):
return cast(int, data_id[0])
return None
def _process_event_into_session(self, event: Event) -> None: def _process_event_into_session(self, event: Event) -> None:
assert self.event_session is not None assert self.event_session is not None
try:
if event.event_type == EVENT_STATE_CHANGED:
dbevent = Events.from_event(event, event_data=None)
else:
dbevent = Events.from_event(event) dbevent = Events.from_event(event)
if event.event_type != EVENT_STATE_CHANGED and event.data:
try:
shared_data = EventData.shared_data_from_event(event)
except (TypeError, ValueError): except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event) _LOGGER.warning("Event is not JSON serializable: %s", event)
return return
# Matching attributes found in the pending commit
if pending_event_data := self._pending_event_data.get(shared_data):
dbevent.event_data_rel = pending_event_data
# Matching attributes id found in the cache
elif data_id := self._event_data_ids.get(shared_data):
dbevent.data_id = data_id
else:
data_hash = EventData.hash_shared_data(shared_data)
# Matching attributes found in the database
if data_id := self._find_shared_data_in_db(data_hash, shared_data):
self._event_data_ids[shared_data] = dbevent.data_id = data_id
# No matching attributes found, save them in the DB
else:
dbevent_data = EventData(shared_data=shared_data, hash=data_hash)
dbevent.event_data_rel = self._pending_event_data[
shared_data
] = dbevent_data
self.event_session.add(dbevent_data)
self.event_session.add(dbevent) self.event_session.add(dbevent)
if event.event_type != EVENT_STATE_CHANGED: if event.event_type != EVENT_STATE_CHANGED:
return return
@ -1286,6 +1335,9 @@ class Recorder(threading.Thread):
state_attr.shared_attrs state_attr.shared_attrs
] = state_attr.attributes_id ] = state_attr.attributes_id
self._pending_state_attributes = {} self._pending_state_attributes = {}
for event_data in self._pending_event_data.values():
self._event_data_ids[event_data.shared_data] = event_data.data_id
self._pending_event_data = {}
# Expire is an expensive operation (frequently more expensive # Expire is an expensive operation (frequently more expensive
# than the flush and commit itself) so we only # than the flush and commit itself) so we only
@ -1307,7 +1359,9 @@ class Recorder(threading.Thread):
"""Close the event session.""" """Close the event session."""
self._old_states = {} self._old_states = {}
self._state_attributes_ids = {} self._state_attributes_ids = {}
self._event_data_ids = {}
self._pending_state_attributes = {} self._pending_state_attributes = {}
self._pending_event_data = {}
if not self.event_session: if not self.event_session:
return return

View File

@ -380,6 +380,8 @@ def _apply_update(instance, new_version, old_version): # noqa: C901
"""Perform operations to bring schema up to date.""" """Perform operations to bring schema up to date."""
engine = instance.engine engine = instance.engine
dialect = engine.dialect.name dialect = engine.dialect.name
big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER"
if new_version == 1: if new_version == 1:
_create_index(instance, "events", "ix_events_time_fired") _create_index(instance, "events", "ix_events_time_fired")
elif new_version == 2: elif new_version == 2:
@ -643,11 +645,13 @@ def _apply_update(instance, new_version, old_version): # noqa: C901
"ix_statistics_short_term_statistic_id_start", "ix_statistics_short_term_statistic_id_start",
) )
elif new_version == 25: elif new_version == 25:
big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER"
_add_columns(instance, "states", [f"attributes_id {big_int}"]) _add_columns(instance, "states", [f"attributes_id {big_int}"])
_create_index(instance, "states", "ix_states_attributes_id") _create_index(instance, "states", "ix_states_attributes_id")
elif new_version == 26: elif new_version == 26:
_create_index(instance, "statistics_runs", "ix_statistics_runs_start") _create_index(instance, "statistics_runs", "ix_statistics_runs_start")
elif new_version == 27:
_add_columns(instance, "events", [f"data_id {big_int}"])
_create_index(instance, "events", "ix_events_data_id")
else: else:
raise ValueError(f"No schema migration defined for version {new_version}") raise ValueError(f"No schema migration defined for version {new_version}")

View File

@ -35,7 +35,6 @@ from homeassistant.const import (
MAX_LENGTH_STATE_STATE, MAX_LENGTH_STATE_STATE,
) )
from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id
from homeassistant.helpers.typing import UNDEFINED, UndefinedType
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP
@ -44,13 +43,14 @@ from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP
# pylint: disable=invalid-name # pylint: disable=invalid-name
Base = declarative_base() Base = declarative_base()
SCHEMA_VERSION = 26 SCHEMA_VERSION = 27
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DB_TIMEZONE = "+00:00" DB_TIMEZONE = "+00:00"
TABLE_EVENTS = "events" TABLE_EVENTS = "events"
TABLE_EVENT_DATA = "event_data"
TABLE_STATES = "states" TABLE_STATES = "states"
TABLE_STATE_ATTRIBUTES = "state_attributes" TABLE_STATE_ATTRIBUTES = "state_attributes"
TABLE_RECORDER_RUNS = "recorder_runs" TABLE_RECORDER_RUNS = "recorder_runs"
@ -60,6 +60,9 @@ TABLE_STATISTICS_META = "statistics_meta"
TABLE_STATISTICS_RUNS = "statistics_runs" TABLE_STATISTICS_RUNS = "statistics_runs"
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term" TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
# Only add TABLE_STATE_ATTRIBUTES and TABLE_EVENT_DATA
# to the below list once we want to check for their
# instance in the sanity check.
ALL_TABLES = [ ALL_TABLES = [
TABLE_STATES, TABLE_STATES,
TABLE_EVENTS, TABLE_EVENTS,
@ -103,24 +106,24 @@ class Events(Base): # type: ignore[misc,valid-type]
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True)
event_data_rel = relationship("EventData")
def __repr__(self) -> str: def __repr__(self) -> str:
"""Return string representation of instance for debugging.""" """Return string representation of instance for debugging."""
return ( return (
f"<recorder.Events(" f"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', data='{self.event_data}', " f"id={self.event_id}, type='{self.event_type}', "
f"origin='{self.origin}', time_fired='{self.time_fired}'" f"origin='{self.origin}', time_fired='{self.time_fired}'"
f")>" f", data_id={self.data_id})>"
) )
@staticmethod @staticmethod
def from_event( def from_event(event: Event) -> Events:
event: Event, event_data: UndefinedType | None = UNDEFINED
) -> Events:
"""Create an event database object from a native event.""" """Create an event database object from a native event."""
return Events( return Events(
event_type=event.event_type, event_type=event.event_type,
event_data=JSON_DUMP(event.data) if event_data is UNDEFINED else event_data, event_data=None,
origin=str(event.origin.value), origin=str(event.origin.value),
time_fired=event.time_fired, time_fired=event.time_fired,
context_id=event.context.id, context_id=event.context.id,
@ -138,7 +141,7 @@ class Events(Base): # type: ignore[misc,valid-type]
try: try:
return Event( return Event(
self.event_type, self.event_type,
json.loads(self.event_data), json.loads(self.event_data) if self.event_data else {},
EventOrigin(self.origin), EventOrigin(self.origin),
process_timestamp(self.time_fired), process_timestamp(self.time_fired),
context=context, context=context,
@ -149,6 +152,53 @@ class Events(Base): # type: ignore[misc,valid-type]
return None return None
class EventData(Base): # type: ignore[misc,valid-type]
"""Event data history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENT_DATA
data_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.EventData("
f"id={self.data_id}, hash='{self.hash}', data='{self.shared_data}'"
f")>"
)
@staticmethod
def from_event(event: Event) -> EventData:
"""Create object from an event."""
shared_data = JSON_DUMP(event.data)
return EventData(
shared_data=shared_data, hash=EventData.hash_shared_data(shared_data)
)
@staticmethod
def shared_data_from_event(event: Event) -> str:
"""Create shared_attrs from an event."""
return JSON_DUMP(event.data)
@staticmethod
def hash_shared_data(shared_data: str) -> int:
"""Return the hash of json encoded shared data."""
return cast(int, fnv1a_32(shared_data.encode("utf-8")))
def to_native(self) -> dict[str, Any]:
"""Convert to an HA state object."""
try:
return cast(dict[str, Any], json.loads(self.shared_data))
except ValueError:
_LOGGER.exception("Error converting row to event data: %s", self)
return {}
class States(Base): # type: ignore[misc,valid-type] class States(Base): # type: ignore[misc,valid-type]
"""State change history.""" """State change history."""

View File

@ -17,6 +17,7 @@ from homeassistant.const import EVENT_STATE_CHANGED
from .const import MAX_ROWS_TO_PURGE from .const import MAX_ROWS_TO_PURGE
from .models import ( from .models import (
EventData,
Events, Events,
RecorderRuns, RecorderRuns,
StateAttributes, StateAttributes,
@ -53,7 +54,8 @@ def purge_old_data(
event_ids, event_ids,
state_ids, state_ids,
attributes_ids, attributes_ids,
) = _select_event_state_and_attributes_ids_to_purge(session, purge_before) data_ids,
) = _select_event_state_attributes_ids_data_ids_to_purge(session, purge_before)
statistics_runs = _select_statistics_runs_to_purge(session, purge_before) statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
short_term_statistics = _select_short_term_statistics_to_purge( short_term_statistics = _select_short_term_statistics_to_purge(
session, purge_before session, purge_before
@ -70,6 +72,11 @@ def purge_old_data(
if event_ids: if event_ids:
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
if unused_data_ids_set := _select_unused_event_data_ids(
session, data_ids, using_sqlite
):
_purge_event_data_ids(instance, session, unused_data_ids_set)
if statistics_runs: if statistics_runs:
_purge_statistics_runs(session, statistics_runs) _purge_statistics_runs(session, statistics_runs)
@ -91,12 +98,14 @@ def purge_old_data(
return True return True
def _select_event_state_and_attributes_ids_to_purge( def _select_event_state_attributes_ids_data_ids_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime
) -> tuple[set[int], set[int], set[int]]: ) -> tuple[set[int], set[int], set[int], set[int]]:
"""Return a list of event, state, and attribute ids to purge.""" """Return a list of event, state, and attribute ids to purge."""
events = ( events = (
session.query(Events.event_id, States.state_id, States.attributes_id) session.query(
Events.event_id, Events.data_id, States.state_id, States.attributes_id
)
.outerjoin(States, Events.event_id == States.event_id) .outerjoin(States, Events.event_id == States.event_id)
.filter(Events.time_fired < purge_before) .filter(Events.time_fired < purge_before)
.limit(MAX_ROWS_TO_PURGE) .limit(MAX_ROWS_TO_PURGE)
@ -106,13 +115,16 @@ def _select_event_state_and_attributes_ids_to_purge(
event_ids = set() event_ids = set()
state_ids = set() state_ids = set()
attributes_ids = set() attributes_ids = set()
data_ids = set()
for event in events: for event in events:
event_ids.add(event.event_id) event_ids.add(event.event_id)
if event.state_id: if event.state_id:
state_ids.add(event.state_id) state_ids.add(event.state_id)
if event.attributes_id: if event.attributes_id:
attributes_ids.add(event.attributes_id) attributes_ids.add(event.attributes_id)
return event_ids, state_ids, attributes_ids if event.data_id:
data_ids.add(event.data_id)
return event_ids, state_ids, attributes_ids, data_ids
def _state_attrs_exist(attr: int | None) -> Select: def _state_attrs_exist(attr: int | None) -> Select:
@ -400,6 +412,255 @@ def _select_unused_attributes_ids(
return to_remove return to_remove
def _event_data_id_exist(data_id: int | None) -> Select:
"""Check if a event data id exists in the events table."""
return select(func.min(Events.data_id)).where(Events.data_id == data_id)
def _generate_find_data_id_lambda(
id1: int,
id2: int | None,
id3: int | None,
id4: int | None,
id5: int | None,
id6: int | None,
id7: int | None,
id8: int | None,
id9: int | None,
id10: int | None,
id11: int | None,
id12: int | None,
id13: int | None,
id14: int | None,
id15: int | None,
id16: int | None,
id17: int | None,
id18: int | None,
id19: int | None,
id20: int | None,
id21: int | None,
id22: int | None,
id23: int | None,
id24: int | None,
id25: int | None,
id26: int | None,
id27: int | None,
id28: int | None,
id29: int | None,
id30: int | None,
id31: int | None,
id32: int | None,
id33: int | None,
id34: int | None,
id35: int | None,
id36: int | None,
id37: int | None,
id38: int | None,
id39: int | None,
id40: int | None,
id41: int | None,
id42: int | None,
id43: int | None,
id44: int | None,
id45: int | None,
id46: int | None,
id47: int | None,
id48: int | None,
id49: int | None,
id50: int | None,
id51: int | None,
id52: int | None,
id53: int | None,
id54: int | None,
id55: int | None,
id56: int | None,
id57: int | None,
id58: int | None,
id59: int | None,
id60: int | None,
id61: int | None,
id62: int | None,
id63: int | None,
id64: int | None,
id65: int | None,
id66: int | None,
id67: int | None,
id68: int | None,
id69: int | None,
id70: int | None,
id71: int | None,
id72: int | None,
id73: int | None,
id74: int | None,
id75: int | None,
id76: int | None,
id77: int | None,
id78: int | None,
id79: int | None,
id80: int | None,
id81: int | None,
id82: int | None,
id83: int | None,
id84: int | None,
id85: int | None,
id86: int | None,
id87: int | None,
id88: int | None,
id89: int | None,
id90: int | None,
id91: int | None,
id92: int | None,
id93: int | None,
id94: int | None,
id95: int | None,
id96: int | None,
id97: int | None,
id98: int | None,
id99: int | None,
id100: int | None,
) -> StatementLambdaElement:
"""Generate the find event data select only once.
https://docs.sqlalchemy.org/en/14/core/connections.html#quick-guidelines-for-lambdas
"""
return lambda_stmt(
lambda: union_all(
_event_data_id_exist(id1),
_event_data_id_exist(id2),
_event_data_id_exist(id3),
_event_data_id_exist(id4),
_event_data_id_exist(id5),
_event_data_id_exist(id6),
_event_data_id_exist(id7),
_event_data_id_exist(id8),
_event_data_id_exist(id9),
_event_data_id_exist(id10),
_event_data_id_exist(id11),
_event_data_id_exist(id12),
_event_data_id_exist(id13),
_event_data_id_exist(id14),
_event_data_id_exist(id15),
_event_data_id_exist(id16),
_event_data_id_exist(id17),
_event_data_id_exist(id18),
_event_data_id_exist(id19),
_event_data_id_exist(id20),
_event_data_id_exist(id21),
_event_data_id_exist(id22),
_event_data_id_exist(id23),
_event_data_id_exist(id24),
_event_data_id_exist(id25),
_event_data_id_exist(id26),
_event_data_id_exist(id27),
_event_data_id_exist(id28),
_event_data_id_exist(id29),
_event_data_id_exist(id30),
_event_data_id_exist(id31),
_event_data_id_exist(id32),
_event_data_id_exist(id33),
_event_data_id_exist(id34),
_event_data_id_exist(id35),
_event_data_id_exist(id36),
_event_data_id_exist(id37),
_event_data_id_exist(id38),
_event_data_id_exist(id39),
_event_data_id_exist(id40),
_event_data_id_exist(id41),
_event_data_id_exist(id42),
_event_data_id_exist(id43),
_event_data_id_exist(id44),
_event_data_id_exist(id45),
_event_data_id_exist(id46),
_event_data_id_exist(id47),
_event_data_id_exist(id48),
_event_data_id_exist(id49),
_event_data_id_exist(id50),
_event_data_id_exist(id51),
_event_data_id_exist(id52),
_event_data_id_exist(id53),
_event_data_id_exist(id54),
_event_data_id_exist(id55),
_event_data_id_exist(id56),
_event_data_id_exist(id57),
_event_data_id_exist(id58),
_event_data_id_exist(id59),
_event_data_id_exist(id60),
_event_data_id_exist(id61),
_event_data_id_exist(id62),
_event_data_id_exist(id63),
_event_data_id_exist(id64),
_event_data_id_exist(id65),
_event_data_id_exist(id66),
_event_data_id_exist(id67),
_event_data_id_exist(id68),
_event_data_id_exist(id69),
_event_data_id_exist(id70),
_event_data_id_exist(id71),
_event_data_id_exist(id72),
_event_data_id_exist(id73),
_event_data_id_exist(id74),
_event_data_id_exist(id75),
_event_data_id_exist(id76),
_event_data_id_exist(id77),
_event_data_id_exist(id78),
_event_data_id_exist(id79),
_event_data_id_exist(id80),
_event_data_id_exist(id81),
_event_data_id_exist(id82),
_event_data_id_exist(id83),
_event_data_id_exist(id84),
_event_data_id_exist(id85),
_event_data_id_exist(id86),
_event_data_id_exist(id87),
_event_data_id_exist(id88),
_event_data_id_exist(id89),
_event_data_id_exist(id90),
_event_data_id_exist(id91),
_event_data_id_exist(id92),
_event_data_id_exist(id93),
_event_data_id_exist(id94),
_event_data_id_exist(id95),
_event_data_id_exist(id96),
_event_data_id_exist(id97),
_event_data_id_exist(id98),
_event_data_id_exist(id99),
_event_data_id_exist(id100),
)
)
def _select_unused_event_data_ids(
session: Session, data_ids: set[int], using_sqlite: bool
) -> set[int]:
"""Return a set of event data ids that are not used by any events in the database."""
if not data_ids:
return set()
# See _select_unused_attributes_ids for why this function
# branches for non-sqlite databases.
if using_sqlite:
seen_ids = {
state[0]
for state in session.query(distinct(Events.data_id))
.filter(Events.data_id.in_(data_ids))
.all()
}
else:
seen_ids = set()
groups = [iter(data_ids)] * 100
for data_ids_group in zip_longest(*groups, fillvalue=None):
seen_ids |= {
state[0]
for state in session.execute(
_generate_find_data_id_lambda(*data_ids_group)
).all()
if state[0] is not None
}
to_remove = data_ids - seen_ids
_LOGGER.debug("Selected %s shared event data to remove", len(to_remove))
return to_remove
def _select_statistics_runs_to_purge( def _select_statistics_runs_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime
) -> list[int]: ) -> list[int]:
@ -477,6 +738,21 @@ def _evict_purged_states_from_old_states_cache(
old_states.pop(old_state_reversed[purged_state_id], None) old_states.pop(old_state_reversed[purged_state_id], None)
def _evict_purged_data_from_data_cache(
instance: Recorder, purged_data_ids: set[int]
) -> None:
"""Evict purged data ids from the data ids cache."""
# Make a map from data_id to the data json
event_data_ids = instance._event_data_ids # pylint: disable=protected-access
event_data_ids_reversed = {
data_id: data for data, data_id in event_data_ids.items()
}
# Evict any purged data from the event_data_ids cache
for purged_attribute_id in purged_data_ids.intersection(event_data_ids_reversed):
event_data_ids.pop(event_data_ids_reversed[purged_attribute_id], None)
def _evict_purged_attributes_from_attributes_cache( def _evict_purged_attributes_from_attributes_cache(
instance: Recorder, purged_attributes_ids: set[int] instance: Recorder, purged_attributes_ids: set[int]
) -> None: ) -> None:
@ -515,6 +791,22 @@ def _purge_attributes_ids(
_evict_purged_attributes_from_attributes_cache(instance, attributes_ids) _evict_purged_attributes_from_attributes_cache(instance, attributes_ids)
def _purge_event_data_ids(
instance: Recorder, session: Session, data_ids: set[int]
) -> None:
"""Delete old event data ids."""
deleted_rows = (
session.query(EventData)
.filter(EventData.data_id.in_(data_ids))
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s data events", deleted_rows)
# Evict any entries in the event_data_ids cache referring to a purged state
_evict_purged_data_from_data_cache(instance, data_ids)
def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None: def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None:
"""Delete by run_id.""" """Delete by run_id."""
deleted_rows = ( deleted_rows = (
@ -623,15 +915,15 @@ def _purge_filtered_events(
instance: Recorder, session: Session, excluded_event_types: list[str] instance: Recorder, session: Session, excluded_event_types: list[str]
) -> None: ) -> None:
"""Remove filtered events and linked states.""" """Remove filtered events and linked states."""
events: list[Events] = ( using_sqlite = instance.using_sqlite()
session.query(Events.event_id) event_ids, data_ids = zip(
*(
session.query(Events.event_id, Events.data_id)
.filter(Events.event_type.in_(excluded_event_types)) .filter(Events.event_type.in_(excluded_event_types))
.limit(MAX_ROWS_TO_PURGE) .limit(MAX_ROWS_TO_PURGE)
.all() .all()
) )
event_ids: list[int] = [ )
event.event_id for event in events if event.event_id is not None
]
_LOGGER.debug( _LOGGER.debug(
"Selected %s event_ids to remove that should be filtered", len(event_ids) "Selected %s event_ids to remove that should be filtered", len(event_ids)
) )
@ -641,6 +933,10 @@ def _purge_filtered_events(
state_ids: set[int] = {state.state_id for state in states} state_ids: set[int] = {state.state_id for state in states}
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
if unused_data_ids_set := _select_unused_event_data_ids(
session, set(data_ids), using_sqlite
):
_purge_event_data_ids(instance, session, unused_data_ids_set)
if EVENT_STATE_CHANGED in excluded_event_types: if EVENT_STATE_CHANGED in excluded_event_types:
session.query(StateAttributes).delete(synchronize_session=False) session.query(StateAttributes).delete(synchronize_session=False)
instance._state_attributes_ids = {} # pylint: disable=protected-access instance._state_attributes_ids = {} # pylint: disable=protected-access

View File

@ -6,6 +6,7 @@ import asyncio
from datetime import datetime, timedelta from datetime import datetime, timedelta
import sqlite3 import sqlite3
import threading import threading
from typing import cast
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
import pytest import pytest
@ -31,6 +32,7 @@ from homeassistant.components.recorder import (
) )
from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
EventData,
Events, Events,
RecorderRuns, RecorderRuns,
StateAttributes, StateAttributes,
@ -47,7 +49,7 @@ from homeassistant.const import (
STATE_LOCKED, STATE_LOCKED,
STATE_UNLOCKED, STATE_UNLOCKED,
) )
from homeassistant.core import Context, CoreState, HomeAssistant, callback from homeassistant.core import Context, CoreState, Event, HomeAssistant, callback
from homeassistant.setup import async_setup_component, setup_component from homeassistant.setup import async_setup_component, setup_component
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
@ -363,14 +365,25 @@ def test_saving_event(hass, hass_recorder):
wait_recording_done(hass) wait_recording_done(hass)
assert len(events) == 1 assert len(events) == 1
event = events[0] event: Event = events[0]
hass.data[DATA_INSTANCE].block_till_done() hass.data[DATA_INSTANCE].block_till_done()
events: list[Event] = []
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
db_events = list(session.query(Events).filter_by(event_type=event_type)) for select_event, event_data in (
assert len(db_events) == 1 session.query(Events, EventData)
db_event = db_events[0].to_native() .filter_by(event_type=event_type)
.outerjoin(EventData, Events.data_id == EventData.data_id)
):
select_event = cast(Events, select_event)
event_data = cast(EventData, event_data)
native_event = select_event.to_native()
native_event.data = event_data.to_native()
events.append(native_event)
db_event = events[0]
assert event.event_type == db_event.event_type assert event.event_type == db_event.event_type
assert event.data == db_event.data assert event.data == db_event.data
@ -427,7 +440,18 @@ def _add_events(hass, events):
wait_recording_done(hass) wait_recording_done(hass)
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
return [ev.to_native() for ev in session.query(Events)] events = []
for event, event_data in session.query(Events, EventData).outerjoin(
EventData, Events.data_id == EventData.data_id
):
event = cast(Events, event)
event_data = cast(EventData, event_data)
native_event = event.to_native()
if event_data:
native_event.data = event_data.to_native()
events.append(native_event)
return events
def _state_empty_context(hass, entity_id): def _state_empty_context(hass, entity_id):
@ -1073,11 +1097,22 @@ def test_service_disable_events_not_recording(hass, hass_recorder):
assert events[0] != events[1] assert events[0] != events[1]
assert events[0].data != events[1].data assert events[0].data != events[1].data
db_events = []
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
db_events = list(session.query(Events).filter_by(event_type=event_type)) for select_event, event_data in (
assert len(db_events) == 1 session.query(Events, EventData)
db_event = db_events[0].to_native() .filter_by(event_type=event_type)
.outerjoin(EventData, Events.data_id == EventData.data_id)
):
select_event = cast(Events, select_event)
event_data = cast(EventData, event_data)
native_event = select_event.to_native()
native_event.data = event_data.to_native()
db_events.append(native_event)
assert len(db_events) == 1
db_event = db_events[0]
event = events[1] event = events[1]
assert event.event_type == db_event.event_type assert event.event_type == db_event.event_type

View File

@ -8,6 +8,7 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
Base, Base,
EventData,
Events, Events,
LazyState, LazyState,
RecorderRuns, RecorderRuns,
@ -25,7 +26,9 @@ from homeassistant.util import dt, dt as dt_util
def test_from_event_to_db_event(): def test_from_event_to_db_event():
"""Test converting event to db event.""" """Test converting event to db event."""
event = ha.Event("test_event", {"some_data": 15}) event = ha.Event("test_event", {"some_data": 15})
assert event == Events.from_event(event).to_native() db_event = Events.from_event(event)
db_event.event_data = EventData.from_event(event).shared_data
assert event == db_event.to_native()
def test_from_event_to_db_state(): def test_from_event_to_db_state():
@ -231,10 +234,12 @@ async def test_event_to_db_model():
event = ha.Event( event = ha.Event(
"state_changed", {"some": "attr"}, ha.EventOrigin.local, dt_util.utcnow() "state_changed", {"some": "attr"}, ha.EventOrigin.local, dt_util.utcnow()
) )
native = Events.from_event(event).to_native() db_event = Events.from_event(event)
db_event.event_data = EventData.from_event(event).shared_data
native = db_event.to_native()
assert native == event assert native == event
native = Events.from_event(event, event_data="{}").to_native() native = Events.from_event(event).to_native()
event.data = {} event.data = {}
assert native == event assert native == event