From d9b43fc43fb6756e57db5c73feafe5d652ae364c Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 25 May 2023 20:44:12 -0500 Subject: [PATCH] Significantly speed up recorder event listener (#93532) * Significantly speed up recorder event listener This code is called every time an event happens since it subscribes to all events. Its our most frequently called listener out of the box. It used to have a seperate filter function but it was later combined after core had some previous refactoring. It was never optimized after that happened. This change reduces the run time by ~70% * decruft --- homeassistant/components/recorder/core.py | 59 ++++++++++++----------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 67d3bff3b2a..5023393dc5e 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -299,9 +299,39 @@ class Recorder(threading.Thread): @callback def async_initialize(self) -> None: """Initialize the recorder.""" + entity_filter = self.entity_filter + exclude_event_types = self.exclude_event_types + queue_put = self._queue.put_nowait + event_task = EventTask + + @callback + def _event_listener(event: Event) -> None: + """Listen for new events and put them in the process queue.""" + if event.event_type in exclude_event_types: + return + + if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None: + queue_put(event_task(event)) + return + + if isinstance(entity_id, str): + if entity_filter(entity_id): + queue_put(event_task(event)) + return + + if isinstance(entity_id, list): + for eid in entity_id: + if entity_filter(eid): + queue_put(event_task(event)) + return + return + + # Unknown what it is. + queue_put(event_task(event)) + self._event_listener = self.hass.bus.async_listen( MATCH_ALL, - self.event_listener, + _event_listener, run_immediately=True, ) self._queue_watcher = async_track_time_interval( @@ -412,27 +442,6 @@ class Recorder(threading.Thread): self._periodic_listener() self._periodic_listener = None - @callback - def _async_event_filter(self, event: Event) -> bool: - """Filter events.""" - if event.event_type in self.exclude_event_types: - return False - - if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None: - return True - - if isinstance(entity_id, str): - return self.entity_filter(entity_id) - - if isinstance(entity_id, list): - for eid in entity_id: - if self.entity_filter(eid): - return True - return False - - # Unknown what it is. - return True - async def _async_close(self, event: Event) -> None: """Empty the queue if its still present at close.""" @@ -1257,12 +1266,6 @@ class Recorder(threading.Thread): _LOGGER.debug("Sending keepalive") self.event_session.connection().scalar(select(1)) - @callback - def event_listener(self, event: Event) -> None: - """Listen for new events and put them in the process queue.""" - if self._async_event_filter(event): - self.queue_task(EventTask(event)) - async def async_block_till_done(self) -> None: """Async version of block_till_done.""" if self._queue.empty() and not self._event_session_has_pending_writes: