From c23866e5e59cfec3085faa37a0cc5a56135a7064 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 1 May 2022 21:01:17 -0500 Subject: [PATCH] De-duplicate event data into a new event_data table (#71135) --- homeassistant/components/logbook/__init__.py | 59 ++-- homeassistant/components/recorder/__init__.py | 68 +++- .../components/recorder/migration.py | 6 +- homeassistant/components/recorder/models.py | 68 +++- homeassistant/components/recorder/purge.py | 322 +++++++++++++++++- tests/components/recorder/test_init.py | 53 ++- tests/components/recorder/test_models.py | 11 +- 7 files changed, 526 insertions(+), 61 deletions(-) diff --git a/homeassistant/components/logbook/__init__.py b/homeassistant/components/logbook/__init__.py index b136c7330ac..96e6b975e32 100644 --- a/homeassistant/components/logbook/__init__.py +++ b/homeassistant/components/logbook/__init__.py @@ -28,6 +28,7 @@ from homeassistant.components.history import ( from homeassistant.components.http import HomeAssistantView from homeassistant.components.recorder import get_instance from homeassistant.components.recorder.models import ( + EventData, Events, StateAttributes, States, @@ -512,6 +513,7 @@ def _get_events( # When entity_matches_only is provided, contexts and events that do not # contain the entity_ids are not included in the logbook response. query = _apply_event_entity_id_matchers(query, entity_ids) + query = query.outerjoin(EventData, (Events.data_id == EventData.data_id)) query = query.union_all( _generate_states_query( @@ -535,6 +537,8 @@ def _get_events( if context_id is not None: query = query.filter(Events.context_id == context_id) + query = query.outerjoin(EventData, (Events.data_id == EventData.data_id)) + query = query.order_by(Events.time_fired) return list( @@ -545,6 +549,18 @@ def _get_events( def _generate_events_query(session: Session) -> Query: return session.query( *EVENT_COLUMNS, + EventData.shared_data, + States.state, + States.entity_id, + States.attributes, + StateAttributes.shared_attrs, + ) + + +def _generate_events_query_without_data(session: Session) -> Query: + return session.query( + *EVENT_COLUMNS, + literal(value=None, type_=sqlalchemy.Text).label("shared_data"), States.state, States.entity_id, States.attributes, @@ -555,6 +571,7 @@ def _generate_events_query(session: Session) -> Query: def _generate_events_query_without_states(session: Session) -> Query: return session.query( *EVENT_COLUMNS, + EventData.shared_data, literal(value=None, type_=sqlalchemy.String).label("state"), literal(value=None, type_=sqlalchemy.String).label("entity_id"), literal(value=None, type_=sqlalchemy.Text).label("attributes"), @@ -570,7 +587,7 @@ def _generate_states_query( entity_ids: Iterable[str], ) -> Query: return ( - _generate_events_query(session) + _generate_events_query_without_data(session) .outerjoin(Events, (States.event_id == Events.event_id)) .outerjoin(old_state, (States.old_state_id == old_state.state_id)) .filter(_missing_state_matcher(old_state)) @@ -671,14 +688,12 @@ def _apply_event_types_filter( def _apply_event_entity_id_matchers( events_query: Query, entity_ids: Iterable[str] ) -> Query: - return events_query.filter( - sqlalchemy.or_( - *( - Events.event_data.like(ENTITY_ID_JSON_TEMPLATE.format(entity_id)) - for entity_id in entity_ids - ) - ) - ) + ors = [] + for entity_id in entity_ids: + like = ENTITY_ID_JSON_TEMPLATE.format(entity_id) + ors.append(Events.event_data.like(like)) + ors.append(EventData.shared_data.like(like)) + return events_query.filter(sqlalchemy.or_(*ors)) def _keep_event( @@ -840,7 +855,17 @@ class LazyEventPartialState: if self._event_data: return self._event_data.get(ATTR_ENTITY_ID) - result = ENTITY_ID_JSON_EXTRACT.search(self._row.event_data) + result = ENTITY_ID_JSON_EXTRACT.search( + self._row.shared_data or self._row.event_data or "" + ) + return result.group(1) if result else None + + @property + def data_domain(self) -> str | None: + """Extract the domain from the decoded data or json.""" + result = DOMAIN_JSON_EXTRACT.search( + self._row.shared_data or self._row.event_data or "" + ) return result.group(1) if result else None @property @@ -851,18 +876,14 @@ class LazyEventPartialState: ) return result.group(1) if result else None - @property - def data_domain(self) -> str | None: - """Extract the domain from the decoded data or json.""" - result = DOMAIN_JSON_EXTRACT.search(self._row.event_data) - return result.group(1) if result else None - @property def data(self) -> dict[str, Any]: """Event data.""" - if not self._event_data: - source: str = self._row.event_data - if event_data := self._event_data_cache.get(source): + if self._event_data is None: + source: str = self._row.shared_data or self._row.event_data + if not source: + self._event_data = {} + elif event_data := self._event_data_cache.get(source): self._event_data = event_data else: self._event_data = self._event_data_cache[source] = cast( diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index ef53721efd2..d190ebd0a99 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -80,6 +80,7 @@ from .const import ( from .executor import DBInterruptibleThreadPoolExecutor from .models import ( Base, + EventData, Events, StateAttributes, States, @@ -158,6 +159,7 @@ EXPIRE_AFTER_COMMITS = 120 # - How frequently states with overlapping attributes will change # - How much memory our low end hardware has STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 +EVENT_DATA_ID_CACHE_SIZE = 2048 SHUTDOWN_TASK = object() @@ -639,10 +641,13 @@ class Recorder(threading.Thread): self._commits_without_expire = 0 self._old_states: dict[str, States] = {} self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) + self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE) self._pending_state_attributes: dict[str, StateAttributes] = {} + self._pending_event_data: dict[str, EventData] = {} self._pending_expunge: list[States] = [] self._bakery = bakery self._find_shared_attr_query: Query | None = None + self._find_shared_data_query: Query | None = None self.event_session: Session | None = None self.get_session: Callable[[], Session] | None = None self._completed_first_database_setup: bool | None = None @@ -1162,17 +1167,61 @@ class Recorder(threading.Thread): return cast(int, attributes[0]) return None + def _find_shared_data_in_db(self, data_hash: int, shared_data: str) -> int | None: + """Find shared event data in the db from the hash and shared_attrs.""" + # + # Avoid the event session being flushed since it will + # commit all the pending events and states to the database. + # + # The lookup has already have checked to see if the data is cached + # or going to be written in the next commit so there is no + # need to flush before checking the database. + # + assert self.event_session is not None + if self._find_shared_data_query is None: + self._find_shared_data_query = self._bakery( + lambda session: session.query(EventData.data_id) + .filter(EventData.hash == bindparam("data_hash")) + .filter(EventData.shared_data == bindparam("shared_data")) + ) + with self.event_session.no_autoflush: + if ( + data_id := self._find_shared_data_query(self.event_session) + .params(data_hash=data_hash, shared_data=shared_data) + .first() + ): + return cast(int, data_id[0]) + return None + def _process_event_into_session(self, event: Event) -> None: assert self.event_session is not None + dbevent = Events.from_event(event) - try: - if event.event_type == EVENT_STATE_CHANGED: - dbevent = Events.from_event(event, event_data=None) + if event.event_type != EVENT_STATE_CHANGED and event.data: + try: + shared_data = EventData.shared_data_from_event(event) + except (TypeError, ValueError): + _LOGGER.warning("Event is not JSON serializable: %s", event) + return + + # Matching attributes found in the pending commit + if pending_event_data := self._pending_event_data.get(shared_data): + dbevent.event_data_rel = pending_event_data + # Matching attributes id found in the cache + elif data_id := self._event_data_ids.get(shared_data): + dbevent.data_id = data_id else: - dbevent = Events.from_event(event) - except (TypeError, ValueError): - _LOGGER.warning("Event is not JSON serializable: %s", event) - return + data_hash = EventData.hash_shared_data(shared_data) + # Matching attributes found in the database + if data_id := self._find_shared_data_in_db(data_hash, shared_data): + self._event_data_ids[shared_data] = dbevent.data_id = data_id + # No matching attributes found, save them in the DB + else: + dbevent_data = EventData(shared_data=shared_data, hash=data_hash) + dbevent.event_data_rel = self._pending_event_data[ + shared_data + ] = dbevent_data + self.event_session.add(dbevent_data) self.event_session.add(dbevent) if event.event_type != EVENT_STATE_CHANGED: @@ -1286,6 +1335,9 @@ class Recorder(threading.Thread): state_attr.shared_attrs ] = state_attr.attributes_id self._pending_state_attributes = {} + for event_data in self._pending_event_data.values(): + self._event_data_ids[event_data.shared_data] = event_data.data_id + self._pending_event_data = {} # Expire is an expensive operation (frequently more expensive # than the flush and commit itself) so we only @@ -1307,7 +1359,9 @@ class Recorder(threading.Thread): """Close the event session.""" self._old_states = {} self._state_attributes_ids = {} + self._event_data_ids = {} self._pending_state_attributes = {} + self._pending_event_data = {} if not self.event_session: return diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 94614fe8cff..2ef51824737 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -380,6 +380,8 @@ def _apply_update(instance, new_version, old_version): # noqa: C901 """Perform operations to bring schema up to date.""" engine = instance.engine dialect = engine.dialect.name + big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER" + if new_version == 1: _create_index(instance, "events", "ix_events_time_fired") elif new_version == 2: @@ -643,11 +645,13 @@ def _apply_update(instance, new_version, old_version): # noqa: C901 "ix_statistics_short_term_statistic_id_start", ) elif new_version == 25: - big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER" _add_columns(instance, "states", [f"attributes_id {big_int}"]) _create_index(instance, "states", "ix_states_attributes_id") elif new_version == 26: _create_index(instance, "statistics_runs", "ix_statistics_runs_start") + elif new_version == 27: + _add_columns(instance, "events", [f"data_id {big_int}"]) + _create_index(instance, "events", "ix_events_data_id") else: raise ValueError(f"No schema migration defined for version {new_version}") diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index 5b402dec7a3..212c0c7e7d4 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -35,7 +35,6 @@ from homeassistant.const import ( MAX_LENGTH_STATE_STATE, ) from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id -from homeassistant.helpers.typing import UNDEFINED, UndefinedType import homeassistant.util.dt as dt_util from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP @@ -44,13 +43,14 @@ from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP # pylint: disable=invalid-name Base = declarative_base() -SCHEMA_VERSION = 26 +SCHEMA_VERSION = 27 _LOGGER = logging.getLogger(__name__) DB_TIMEZONE = "+00:00" TABLE_EVENTS = "events" +TABLE_EVENT_DATA = "event_data" TABLE_STATES = "states" TABLE_STATE_ATTRIBUTES = "state_attributes" TABLE_RECORDER_RUNS = "recorder_runs" @@ -60,6 +60,9 @@ TABLE_STATISTICS_META = "statistics_meta" TABLE_STATISTICS_RUNS = "statistics_runs" TABLE_STATISTICS_SHORT_TERM = "statistics_short_term" +# Only add TABLE_STATE_ATTRIBUTES and TABLE_EVENT_DATA +# to the below list once we want to check for their +# instance in the sanity check. ALL_TABLES = [ TABLE_STATES, TABLE_EVENTS, @@ -103,24 +106,24 @@ class Events(Base): # type: ignore[misc,valid-type] context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) + data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True) + event_data_rel = relationship("EventData") def __repr__(self) -> str: """Return string representation of instance for debugging.""" return ( f"" + f", data_id={self.data_id})>" ) @staticmethod - def from_event( - event: Event, event_data: UndefinedType | None = UNDEFINED - ) -> Events: + def from_event(event: Event) -> Events: """Create an event database object from a native event.""" return Events( event_type=event.event_type, - event_data=JSON_DUMP(event.data) if event_data is UNDEFINED else event_data, + event_data=None, origin=str(event.origin.value), time_fired=event.time_fired, context_id=event.context.id, @@ -138,7 +141,7 @@ class Events(Base): # type: ignore[misc,valid-type] try: return Event( self.event_type, - json.loads(self.event_data), + json.loads(self.event_data) if self.event_data else {}, EventOrigin(self.origin), process_timestamp(self.time_fired), context=context, @@ -149,6 +152,53 @@ class Events(Base): # type: ignore[misc,valid-type] return None +class EventData(Base): # type: ignore[misc,valid-type] + """Event data history.""" + + __table_args__ = ( + {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, + ) + __tablename__ = TABLE_EVENT_DATA + data_id = Column(Integer, Identity(), primary_key=True) + hash = Column(BigInteger, index=True) + # Note that this is not named attributes to avoid confusion with the states table + shared_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql")) + + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + f"" + ) + + @staticmethod + def from_event(event: Event) -> EventData: + """Create object from an event.""" + shared_data = JSON_DUMP(event.data) + return EventData( + shared_data=shared_data, hash=EventData.hash_shared_data(shared_data) + ) + + @staticmethod + def shared_data_from_event(event: Event) -> str: + """Create shared_attrs from an event.""" + return JSON_DUMP(event.data) + + @staticmethod + def hash_shared_data(shared_data: str) -> int: + """Return the hash of json encoded shared data.""" + return cast(int, fnv1a_32(shared_data.encode("utf-8"))) + + def to_native(self) -> dict[str, Any]: + """Convert to an HA state object.""" + try: + return cast(dict[str, Any], json.loads(self.shared_data)) + except ValueError: + _LOGGER.exception("Error converting row to event data: %s", self) + return {} + + class States(Base): # type: ignore[misc,valid-type] """State change history.""" diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index d4061a69bab..4aad3a28a88 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -17,6 +17,7 @@ from homeassistant.const import EVENT_STATE_CHANGED from .const import MAX_ROWS_TO_PURGE from .models import ( + EventData, Events, RecorderRuns, StateAttributes, @@ -53,7 +54,8 @@ def purge_old_data( event_ids, state_ids, attributes_ids, - ) = _select_event_state_and_attributes_ids_to_purge(session, purge_before) + data_ids, + ) = _select_event_state_attributes_ids_data_ids_to_purge(session, purge_before) statistics_runs = _select_statistics_runs_to_purge(session, purge_before) short_term_statistics = _select_short_term_statistics_to_purge( session, purge_before @@ -70,6 +72,11 @@ def purge_old_data( if event_ids: _purge_event_ids(session, event_ids) + if unused_data_ids_set := _select_unused_event_data_ids( + session, data_ids, using_sqlite + ): + _purge_event_data_ids(instance, session, unused_data_ids_set) + if statistics_runs: _purge_statistics_runs(session, statistics_runs) @@ -91,12 +98,14 @@ def purge_old_data( return True -def _select_event_state_and_attributes_ids_to_purge( +def _select_event_state_attributes_ids_data_ids_to_purge( session: Session, purge_before: datetime -) -> tuple[set[int], set[int], set[int]]: +) -> tuple[set[int], set[int], set[int], set[int]]: """Return a list of event, state, and attribute ids to purge.""" events = ( - session.query(Events.event_id, States.state_id, States.attributes_id) + session.query( + Events.event_id, Events.data_id, States.state_id, States.attributes_id + ) .outerjoin(States, Events.event_id == States.event_id) .filter(Events.time_fired < purge_before) .limit(MAX_ROWS_TO_PURGE) @@ -106,13 +115,16 @@ def _select_event_state_and_attributes_ids_to_purge( event_ids = set() state_ids = set() attributes_ids = set() + data_ids = set() for event in events: event_ids.add(event.event_id) if event.state_id: state_ids.add(event.state_id) if event.attributes_id: attributes_ids.add(event.attributes_id) - return event_ids, state_ids, attributes_ids + if event.data_id: + data_ids.add(event.data_id) + return event_ids, state_ids, attributes_ids, data_ids def _state_attrs_exist(attr: int | None) -> Select: @@ -400,6 +412,255 @@ def _select_unused_attributes_ids( return to_remove +def _event_data_id_exist(data_id: int | None) -> Select: + """Check if a event data id exists in the events table.""" + return select(func.min(Events.data_id)).where(Events.data_id == data_id) + + +def _generate_find_data_id_lambda( + id1: int, + id2: int | None, + id3: int | None, + id4: int | None, + id5: int | None, + id6: int | None, + id7: int | None, + id8: int | None, + id9: int | None, + id10: int | None, + id11: int | None, + id12: int | None, + id13: int | None, + id14: int | None, + id15: int | None, + id16: int | None, + id17: int | None, + id18: int | None, + id19: int | None, + id20: int | None, + id21: int | None, + id22: int | None, + id23: int | None, + id24: int | None, + id25: int | None, + id26: int | None, + id27: int | None, + id28: int | None, + id29: int | None, + id30: int | None, + id31: int | None, + id32: int | None, + id33: int | None, + id34: int | None, + id35: int | None, + id36: int | None, + id37: int | None, + id38: int | None, + id39: int | None, + id40: int | None, + id41: int | None, + id42: int | None, + id43: int | None, + id44: int | None, + id45: int | None, + id46: int | None, + id47: int | None, + id48: int | None, + id49: int | None, + id50: int | None, + id51: int | None, + id52: int | None, + id53: int | None, + id54: int | None, + id55: int | None, + id56: int | None, + id57: int | None, + id58: int | None, + id59: int | None, + id60: int | None, + id61: int | None, + id62: int | None, + id63: int | None, + id64: int | None, + id65: int | None, + id66: int | None, + id67: int | None, + id68: int | None, + id69: int | None, + id70: int | None, + id71: int | None, + id72: int | None, + id73: int | None, + id74: int | None, + id75: int | None, + id76: int | None, + id77: int | None, + id78: int | None, + id79: int | None, + id80: int | None, + id81: int | None, + id82: int | None, + id83: int | None, + id84: int | None, + id85: int | None, + id86: int | None, + id87: int | None, + id88: int | None, + id89: int | None, + id90: int | None, + id91: int | None, + id92: int | None, + id93: int | None, + id94: int | None, + id95: int | None, + id96: int | None, + id97: int | None, + id98: int | None, + id99: int | None, + id100: int | None, +) -> StatementLambdaElement: + """Generate the find event data select only once. + + https://docs.sqlalchemy.org/en/14/core/connections.html#quick-guidelines-for-lambdas + """ + return lambda_stmt( + lambda: union_all( + _event_data_id_exist(id1), + _event_data_id_exist(id2), + _event_data_id_exist(id3), + _event_data_id_exist(id4), + _event_data_id_exist(id5), + _event_data_id_exist(id6), + _event_data_id_exist(id7), + _event_data_id_exist(id8), + _event_data_id_exist(id9), + _event_data_id_exist(id10), + _event_data_id_exist(id11), + _event_data_id_exist(id12), + _event_data_id_exist(id13), + _event_data_id_exist(id14), + _event_data_id_exist(id15), + _event_data_id_exist(id16), + _event_data_id_exist(id17), + _event_data_id_exist(id18), + _event_data_id_exist(id19), + _event_data_id_exist(id20), + _event_data_id_exist(id21), + _event_data_id_exist(id22), + _event_data_id_exist(id23), + _event_data_id_exist(id24), + _event_data_id_exist(id25), + _event_data_id_exist(id26), + _event_data_id_exist(id27), + _event_data_id_exist(id28), + _event_data_id_exist(id29), + _event_data_id_exist(id30), + _event_data_id_exist(id31), + _event_data_id_exist(id32), + _event_data_id_exist(id33), + _event_data_id_exist(id34), + _event_data_id_exist(id35), + _event_data_id_exist(id36), + _event_data_id_exist(id37), + _event_data_id_exist(id38), + _event_data_id_exist(id39), + _event_data_id_exist(id40), + _event_data_id_exist(id41), + _event_data_id_exist(id42), + _event_data_id_exist(id43), + _event_data_id_exist(id44), + _event_data_id_exist(id45), + _event_data_id_exist(id46), + _event_data_id_exist(id47), + _event_data_id_exist(id48), + _event_data_id_exist(id49), + _event_data_id_exist(id50), + _event_data_id_exist(id51), + _event_data_id_exist(id52), + _event_data_id_exist(id53), + _event_data_id_exist(id54), + _event_data_id_exist(id55), + _event_data_id_exist(id56), + _event_data_id_exist(id57), + _event_data_id_exist(id58), + _event_data_id_exist(id59), + _event_data_id_exist(id60), + _event_data_id_exist(id61), + _event_data_id_exist(id62), + _event_data_id_exist(id63), + _event_data_id_exist(id64), + _event_data_id_exist(id65), + _event_data_id_exist(id66), + _event_data_id_exist(id67), + _event_data_id_exist(id68), + _event_data_id_exist(id69), + _event_data_id_exist(id70), + _event_data_id_exist(id71), + _event_data_id_exist(id72), + _event_data_id_exist(id73), + _event_data_id_exist(id74), + _event_data_id_exist(id75), + _event_data_id_exist(id76), + _event_data_id_exist(id77), + _event_data_id_exist(id78), + _event_data_id_exist(id79), + _event_data_id_exist(id80), + _event_data_id_exist(id81), + _event_data_id_exist(id82), + _event_data_id_exist(id83), + _event_data_id_exist(id84), + _event_data_id_exist(id85), + _event_data_id_exist(id86), + _event_data_id_exist(id87), + _event_data_id_exist(id88), + _event_data_id_exist(id89), + _event_data_id_exist(id90), + _event_data_id_exist(id91), + _event_data_id_exist(id92), + _event_data_id_exist(id93), + _event_data_id_exist(id94), + _event_data_id_exist(id95), + _event_data_id_exist(id96), + _event_data_id_exist(id97), + _event_data_id_exist(id98), + _event_data_id_exist(id99), + _event_data_id_exist(id100), + ) + ) + + +def _select_unused_event_data_ids( + session: Session, data_ids: set[int], using_sqlite: bool +) -> set[int]: + """Return a set of event data ids that are not used by any events in the database.""" + if not data_ids: + return set() + + # See _select_unused_attributes_ids for why this function + # branches for non-sqlite databases. + if using_sqlite: + seen_ids = { + state[0] + for state in session.query(distinct(Events.data_id)) + .filter(Events.data_id.in_(data_ids)) + .all() + } + else: + seen_ids = set() + groups = [iter(data_ids)] * 100 + for data_ids_group in zip_longest(*groups, fillvalue=None): + seen_ids |= { + state[0] + for state in session.execute( + _generate_find_data_id_lambda(*data_ids_group) + ).all() + if state[0] is not None + } + to_remove = data_ids - seen_ids + _LOGGER.debug("Selected %s shared event data to remove", len(to_remove)) + return to_remove + + def _select_statistics_runs_to_purge( session: Session, purge_before: datetime ) -> list[int]: @@ -477,6 +738,21 @@ def _evict_purged_states_from_old_states_cache( old_states.pop(old_state_reversed[purged_state_id], None) +def _evict_purged_data_from_data_cache( + instance: Recorder, purged_data_ids: set[int] +) -> None: + """Evict purged data ids from the data ids cache.""" + # Make a map from data_id to the data json + event_data_ids = instance._event_data_ids # pylint: disable=protected-access + event_data_ids_reversed = { + data_id: data for data, data_id in event_data_ids.items() + } + + # Evict any purged data from the event_data_ids cache + for purged_attribute_id in purged_data_ids.intersection(event_data_ids_reversed): + event_data_ids.pop(event_data_ids_reversed[purged_attribute_id], None) + + def _evict_purged_attributes_from_attributes_cache( instance: Recorder, purged_attributes_ids: set[int] ) -> None: @@ -515,6 +791,22 @@ def _purge_attributes_ids( _evict_purged_attributes_from_attributes_cache(instance, attributes_ids) +def _purge_event_data_ids( + instance: Recorder, session: Session, data_ids: set[int] +) -> None: + """Delete old event data ids.""" + + deleted_rows = ( + session.query(EventData) + .filter(EventData.data_id.in_(data_ids)) + .delete(synchronize_session=False) + ) + _LOGGER.debug("Deleted %s data events", deleted_rows) + + # Evict any entries in the event_data_ids cache referring to a purged state + _evict_purged_data_from_data_cache(instance, data_ids) + + def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None: """Delete by run_id.""" deleted_rows = ( @@ -623,15 +915,15 @@ def _purge_filtered_events( instance: Recorder, session: Session, excluded_event_types: list[str] ) -> None: """Remove filtered events and linked states.""" - events: list[Events] = ( - session.query(Events.event_id) - .filter(Events.event_type.in_(excluded_event_types)) - .limit(MAX_ROWS_TO_PURGE) - .all() + using_sqlite = instance.using_sqlite() + event_ids, data_ids = zip( + *( + session.query(Events.event_id, Events.data_id) + .filter(Events.event_type.in_(excluded_event_types)) + .limit(MAX_ROWS_TO_PURGE) + .all() + ) ) - event_ids: list[int] = [ - event.event_id for event in events if event.event_id is not None - ] _LOGGER.debug( "Selected %s event_ids to remove that should be filtered", len(event_ids) ) @@ -641,6 +933,10 @@ def _purge_filtered_events( state_ids: set[int] = {state.state_id for state in states} _purge_state_ids(instance, session, state_ids) _purge_event_ids(session, event_ids) + if unused_data_ids_set := _select_unused_event_data_ids( + session, set(data_ids), using_sqlite + ): + _purge_event_data_ids(instance, session, unused_data_ids_set) if EVENT_STATE_CHANGED in excluded_event_types: session.query(StateAttributes).delete(synchronize_session=False) instance._state_attributes_ids = {} # pylint: disable=protected-access diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 77bd3992e72..c3d22255ccc 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -6,6 +6,7 @@ import asyncio from datetime import datetime, timedelta import sqlite3 import threading +from typing import cast from unittest.mock import Mock, patch import pytest @@ -31,6 +32,7 @@ from homeassistant.components.recorder import ( ) from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.models import ( + EventData, Events, RecorderRuns, StateAttributes, @@ -47,7 +49,7 @@ from homeassistant.const import ( STATE_LOCKED, STATE_UNLOCKED, ) -from homeassistant.core import Context, CoreState, HomeAssistant, callback +from homeassistant.core import Context, CoreState, Event, HomeAssistant, callback from homeassistant.setup import async_setup_component, setup_component from homeassistant.util import dt as dt_util @@ -363,14 +365,25 @@ def test_saving_event(hass, hass_recorder): wait_recording_done(hass) assert len(events) == 1 - event = events[0] + event: Event = events[0] hass.data[DATA_INSTANCE].block_till_done() + events: list[Event] = [] with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type=event_type)) - assert len(db_events) == 1 - db_event = db_events[0].to_native() + for select_event, event_data in ( + session.query(Events, EventData) + .filter_by(event_type=event_type) + .outerjoin(EventData, Events.data_id == EventData.data_id) + ): + select_event = cast(Events, select_event) + event_data = cast(EventData, event_data) + + native_event = select_event.to_native() + native_event.data = event_data.to_native() + events.append(native_event) + + db_event = events[0] assert event.event_type == db_event.event_type assert event.data == db_event.data @@ -427,7 +440,18 @@ def _add_events(hass, events): wait_recording_done(hass) with session_scope(hass=hass) as session: - return [ev.to_native() for ev in session.query(Events)] + events = [] + for event, event_data in session.query(Events, EventData).outerjoin( + EventData, Events.data_id == EventData.data_id + ): + event = cast(Events, event) + event_data = cast(EventData, event_data) + + native_event = event.to_native() + if event_data: + native_event.data = event_data.to_native() + events.append(native_event) + return events def _state_empty_context(hass, entity_id): @@ -1073,11 +1097,22 @@ def test_service_disable_events_not_recording(hass, hass_recorder): assert events[0] != events[1] assert events[0].data != events[1].data + db_events = [] with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type=event_type)) - assert len(db_events) == 1 - db_event = db_events[0].to_native() + for select_event, event_data in ( + session.query(Events, EventData) + .filter_by(event_type=event_type) + .outerjoin(EventData, Events.data_id == EventData.data_id) + ): + select_event = cast(Events, select_event) + event_data = cast(EventData, event_data) + native_event = select_event.to_native() + native_event.data = event_data.to_native() + db_events.append(native_event) + + assert len(db_events) == 1 + db_event = db_events[0] event = events[1] assert event.event_type == db_event.event_type diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py index 8382afe4353..3bdb7992c7c 100644 --- a/tests/components/recorder/test_models.py +++ b/tests/components/recorder/test_models.py @@ -8,6 +8,7 @@ from sqlalchemy.orm import scoped_session, sessionmaker from homeassistant.components.recorder.models import ( Base, + EventData, Events, LazyState, RecorderRuns, @@ -25,7 +26,9 @@ from homeassistant.util import dt, dt as dt_util def test_from_event_to_db_event(): """Test converting event to db event.""" event = ha.Event("test_event", {"some_data": 15}) - assert event == Events.from_event(event).to_native() + db_event = Events.from_event(event) + db_event.event_data = EventData.from_event(event).shared_data + assert event == db_event.to_native() def test_from_event_to_db_state(): @@ -231,10 +234,12 @@ async def test_event_to_db_model(): event = ha.Event( "state_changed", {"some": "attr"}, ha.EventOrigin.local, dt_util.utcnow() ) - native = Events.from_event(event).to_native() + db_event = Events.from_event(event) + db_event.event_data = EventData.from_event(event).shared_data + native = db_event.to_native() assert native == event - native = Events.from_event(event, event_data="{}").to_native() + native = Events.from_event(event).to_native() event.data = {} assert native == event