diff --git a/homeassistant/components/history/websocket_api.py b/homeassistant/components/history/websocket_api.py index c57e766eaed..3761c935992 100644 --- a/homeassistant/components/history/websocket_api.py +++ b/homeassistant/components/history/websocket_api.py @@ -52,7 +52,7 @@ class HistoryLiveStream: subscriptions: list[CALLBACK_TYPE] end_time_unsub: CALLBACK_TYPE | None = None task: asyncio.Task | None = None - wait_sync_task: asyncio.Task | None = None + wait_sync_future: asyncio.Future[None] | None = None @callback @@ -491,8 +491,8 @@ async def ws_stream( subscriptions.clear() if live_stream.task: live_stream.task.cancel() - if live_stream.wait_sync_task: - live_stream.wait_sync_task.cancel() + if live_stream.wait_sync_future: + live_stream.wait_sync_future.cancel() if live_stream.end_time_unsub: live_stream.end_time_unsub() live_stream.end_time_unsub = None @@ -554,10 +554,12 @@ async def ws_stream( ) ) - live_stream.wait_sync_task = create_eager_task( - get_instance(hass).async_block_till_done() - ) - await live_stream.wait_sync_task + if sync_future := get_instance(hass).async_get_commit_future(): + # Set the future so we can cancel it if the client + # unsubscribes before the commit is done so we don't + # query the database needlessly + live_stream.wait_sync_future = sync_future + await live_stream.wait_sync_future # # Fetch any states from the database that have diff --git a/homeassistant/components/logbook/websocket_api.py b/homeassistant/components/logbook/websocket_api.py index e3d0d8a29fa..4b767f66d69 100644 --- a/homeassistant/components/logbook/websocket_api.py +++ b/homeassistant/components/logbook/websocket_api.py @@ -47,7 +47,7 @@ class LogbookLiveStream: subscriptions: list[CALLBACK_TYPE] end_time_unsub: CALLBACK_TYPE | None = None task: asyncio.Task | None = None - wait_sync_task: asyncio.Task | None = None + wait_sync_future: asyncio.Future[None] | None = None @callback @@ -329,8 +329,8 @@ async def ws_event_stream( subscriptions.clear() if live_stream.task: live_stream.task.cancel() - if live_stream.wait_sync_task: - live_stream.wait_sync_task.cancel() + if live_stream.wait_sync_future: + live_stream.wait_sync_future.cancel() if live_stream.end_time_unsub: live_stream.end_time_unsub() live_stream.end_time_unsub = None @@ -399,10 +399,12 @@ async def ws_event_stream( ) ) - live_stream.wait_sync_task = create_eager_task( - get_instance(hass).async_block_till_done() - ) - await live_stream.wait_sync_task + if sync_future := get_instance(hass).async_get_commit_future(): + # Set the future so we can cancel it if the client + # unsubscribes before the commit is done so we don't + # query the database needlessly + live_stream.wait_sync_future = sync_future + await live_stream.wait_sync_future # # Fetch any events from the database that have diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 7b8043b9201..34fa6a62d44 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -1307,11 +1307,17 @@ class Recorder(threading.Thread): async def async_block_till_done(self) -> None: """Async version of block_till_done.""" + if future := self.async_get_commit_future(): + await future + + @callback + def async_get_commit_future(self) -> asyncio.Future[None] | None: + """Return a future that will wait for the next commit or None if nothing pending.""" if self._queue.empty() and not self._event_session_has_pending_writes: - return - event = asyncio.Event() - self.queue_task(SynchronizeTask(event)) - await event.wait() + return None + future: asyncio.Future[None] = self.hass.loop.create_future() + self.queue_task(SynchronizeTask(future)) + return future def block_till_done(self) -> None: """Block till all events processed. diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index 4eb9547ee9d..f5ad7f2a3d9 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -317,13 +317,18 @@ class SynchronizeTask(RecorderTask): """Ensure all pending data has been committed.""" # commit_before is the default - event: asyncio.Event + future: asyncio.Future def run(self, instance: Recorder) -> None: """Handle the task.""" # Does not use a tracked task to avoid # blocking shutdown if the recorder is broken - instance.hass.loop.call_soon_threadsafe(self.event.set) + instance.hass.loop.call_soon_threadsafe(self._set_result_if_not_done) + + def _set_result_if_not_done(self) -> None: + """Set the result if not done.""" + if not self.future.done(): + self.future.set_result(None) @dataclass(slots=True)