diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 2491c0a9baa..43915c0187b 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -1011,7 +1011,7 @@ class Recorder(threading.Thread): event_type_manager = self.event_type_manager if pending_event_types := event_type_manager.get_pending(event.event_type): dbevent.event_type_rel = pending_event_types - elif event_type_id := event_type_manager.get(event.event_type, session): + elif event_type_id := event_type_manager.get(event.event_type, session, True): dbevent.event_type_id = event_type_id else: event_types = EventTypes(event_type=event.event_type) diff --git a/homeassistant/components/recorder/table_managers/event_types.py b/homeassistant/components/recorder/table_managers/event_types.py index 96ce77d3bff..465e8608661 100644 --- a/homeassistant/components/recorder/table_managers/event_types.py +++ b/homeassistant/components/recorder/table_managers/event_types.py @@ -4,6 +4,7 @@ from __future__ import annotations from collections.abc import Iterable from typing import TYPE_CHECKING, cast +from lru import LRU # pylint: disable=no-name-in-module from sqlalchemy.orm.session import Session from homeassistant.core import Event @@ -12,6 +13,7 @@ from . import BaseLRUTableManager from ..const import SQLITE_MAX_BIND_VARS from ..db_schema import EventTypes from ..queries import find_event_type_ids +from ..tasks import RefreshEventTypesTask from ..util import chunked, execute_stmt_lambda_element if TYPE_CHECKING: @@ -27,6 +29,7 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]): def __init__(self, recorder: Recorder) -> None: """Initialize the event type manager.""" super().__init__(recorder, CACHE_SIZE) + self._non_existent_event_types: LRU = LRU(CACHE_SIZE) def load(self, events: list[Event], session: Session) -> None: """Load the event_type to event_type_ids mapping into memory. @@ -37,9 +40,12 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]): self.get_many( {event.event_type for event in events if event.event_type is not None}, session, + True, ) - def get(self, event_type: str, session: Session) -> int | None: + def get( + self, event_type: str, session: Session, from_recorder: bool = False + ) -> int | None: """Resolve event_type to the event_type_id. This call is not thread-safe and must be called from the @@ -48,7 +54,7 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]): return self.get_many((event_type,), session)[event_type] def get_many( - self, event_types: Iterable[str], session: Session + self, event_types: Iterable[str], session: Session, from_recorder: bool = False ) -> dict[str, int | None]: """Resolve event_types to event_type_ids. @@ -57,9 +63,14 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]): """ results: dict[str, int | None] = {} missing: list[str] = [] + non_existent: list[str] = [] + for event_type in event_types: if (event_type_id := self._id_map.get(event_type)) is None: - missing.append(event_type) + if event_type in self._non_existent_event_types: + results[event_type] = None + else: + missing.append(event_type) results[event_type] = event_type_id @@ -75,6 +86,20 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]): int, event_type_id ) + if non_existent := [ + event_type for event_type in missing if results[event_type] is None + ]: + if from_recorder: + # We are already in the recorder thread so we can update the + # non-existent event types directly. + for event_type in non_existent: + self._non_existent_event_types[event_type] = None + else: + # Queue a task to refresh the event types since its not + # thread-safe to do it here since we are not in the recorder + # thread. + self.recorder.queue_task(RefreshEventTypesTask(non_existent)) + return results def add_pending(self, db_event_type: EventTypes) -> None: @@ -95,6 +120,7 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]): """ for event_type, db_event_types in self._pending.items(): self._id_map[event_type] = db_event_types.event_type_id + self._non_existent_event_types.pop(event_type, None) self._pending.clear() def evict_purged(self, event_types: Iterable[str]) -> None: diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index dfa6ce32d25..07be6202a0c 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -17,7 +17,7 @@ from . import entity_registry, purge, statistics from .const import DOMAIN from .db_schema import Statistics, StatisticsShortTerm from .models import StatisticData, StatisticMetaData -from .util import periodic_db_cleanups +from .util import periodic_db_cleanups, session_scope _LOGGER = logging.getLogger(__name__) @@ -466,3 +466,17 @@ class EventIdMigrationTask(RecorderTask): def run(self, instance: Recorder) -> None: """Clean up the legacy event_id index on states.""" instance._cleanup_legacy_states_event_ids() # pylint: disable=[protected-access] + + +@dataclass(slots=True) +class RefreshEventTypesTask(RecorderTask): + """An object to insert into the recorder queue to refresh event types.""" + + event_types: list[str] + + def run(self, instance: Recorder) -> None: + """Refresh event types.""" + with session_scope(session=instance.get_session(), read_only=True) as session: + instance.event_type_manager.get_many( + self.event_types, session, from_recorder=True + ) diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index 390a9e049d1..e017aa384f7 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -358,8 +358,9 @@ def convert_pending_events_to_event_types(instance: Recorder, session: Session) events.add(object) event_type_to_event_type_ids = instance.event_type_manager.get_many( - event_types, session + event_types, session, True ) + manually_added_event_types: list[str] = [] for event in events: event_type = event.event_type @@ -371,8 +372,12 @@ def convert_pending_events_to_event_types(instance: Recorder, session: Session) continue if event_type not in event_types_objects: event_types_objects[event_type] = EventTypes(event_type=event_type) + manually_added_event_types.append(event_type) event.event_type_rel = event_types_objects[event_type] + for event_type in manually_added_event_types: + instance.event_type_manager._non_existent_event_types.pop(event_type, None) + def create_engine_test_for_schema_version_postfix( *args, schema_version_postfix: str, **kwargs diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 0c0d08867df..04635acbcab 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -1055,6 +1055,7 @@ async def test_purge_filtered_events( """Test filtered events are purged.""" config: ConfigType = {"exclude": {"event_types": ["EVENT_PURGE"]}} instance = await async_setup_recorder_instance(hass, config) + await async_wait_recording_done(hass) def _add_db_entries(hass: HomeAssistant) -> None: with session_scope(hass=hass) as session: @@ -1086,14 +1087,14 @@ async def test_purge_filtered_events( convert_pending_states_to_meta(instance, session) service_data = {"keep_days": 10} - _add_db_entries(hass) + await instance.async_add_executor_job(_add_db_entries, hass) + await async_wait_recording_done(hass) - with session_scope(hass=hass) as session: + with session_scope(hass=hass, read_only=True) as session: events_purge = session.query(Events).filter( Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",))) ) states = session.query(States) - assert events_purge.count() == 60 assert states.count() == 10 @@ -1104,7 +1105,7 @@ async def test_purge_filtered_events( await async_recorder_block_till_done(hass) await async_wait_purge_done(hass) - with session_scope(hass=hass) as session: + with session_scope(hass=hass, read_only=True) as session: events_purge = session.query(Events).filter( Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",))) ) @@ -1123,7 +1124,7 @@ async def test_purge_filtered_events( await async_recorder_block_till_done(hass) await async_wait_purge_done(hass) - with session_scope(hass=hass) as session: + with session_scope(hass=hass, read_only=True) as session: events_purge = session.query(Events).filter( Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",))) )