Refactor live history and logbook to avoid unnecessary task creation for recorder sync (#143244)

This commit is contained in:
J. Nick Koston 2025-04-18 22:55:11 -10:00 committed by GitHub
parent f873219d25
commit aef266b940
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 35 additions and 20 deletions

View File

@ -52,7 +52,7 @@ class HistoryLiveStream:
subscriptions: list[CALLBACK_TYPE]
end_time_unsub: CALLBACK_TYPE | None = None
task: asyncio.Task | None = None
wait_sync_task: asyncio.Task | None = None
wait_sync_future: asyncio.Future[None] | None = None
@callback
@ -491,8 +491,8 @@ async def ws_stream(
subscriptions.clear()
if live_stream.task:
live_stream.task.cancel()
if live_stream.wait_sync_task:
live_stream.wait_sync_task.cancel()
if live_stream.wait_sync_future:
live_stream.wait_sync_future.cancel()
if live_stream.end_time_unsub:
live_stream.end_time_unsub()
live_stream.end_time_unsub = None
@ -554,10 +554,12 @@ async def ws_stream(
)
)
live_stream.wait_sync_task = create_eager_task(
get_instance(hass).async_block_till_done()
)
await live_stream.wait_sync_task
if sync_future := get_instance(hass).async_get_commit_future():
# Set the future so we can cancel it if the client
# unsubscribes before the commit is done so we don't
# query the database needlessly
live_stream.wait_sync_future = sync_future
await live_stream.wait_sync_future
#
# Fetch any states from the database that have

View File

@ -47,7 +47,7 @@ class LogbookLiveStream:
subscriptions: list[CALLBACK_TYPE]
end_time_unsub: CALLBACK_TYPE | None = None
task: asyncio.Task | None = None
wait_sync_task: asyncio.Task | None = None
wait_sync_future: asyncio.Future[None] | None = None
@callback
@ -329,8 +329,8 @@ async def ws_event_stream(
subscriptions.clear()
if live_stream.task:
live_stream.task.cancel()
if live_stream.wait_sync_task:
live_stream.wait_sync_task.cancel()
if live_stream.wait_sync_future:
live_stream.wait_sync_future.cancel()
if live_stream.end_time_unsub:
live_stream.end_time_unsub()
live_stream.end_time_unsub = None
@ -399,10 +399,12 @@ async def ws_event_stream(
)
)
live_stream.wait_sync_task = create_eager_task(
get_instance(hass).async_block_till_done()
)
await live_stream.wait_sync_task
if sync_future := get_instance(hass).async_get_commit_future():
# Set the future so we can cancel it if the client
# unsubscribes before the commit is done so we don't
# query the database needlessly
live_stream.wait_sync_future = sync_future
await live_stream.wait_sync_future
#
# Fetch any events from the database that have

View File

@ -1307,11 +1307,17 @@ class Recorder(threading.Thread):
async def async_block_till_done(self) -> None:
"""Async version of block_till_done."""
if future := self.async_get_commit_future():
await future
@callback
def async_get_commit_future(self) -> asyncio.Future[None] | None:
"""Return a future that will wait for the next commit or None if nothing pending."""
if self._queue.empty() and not self._event_session_has_pending_writes:
return
event = asyncio.Event()
self.queue_task(SynchronizeTask(event))
await event.wait()
return None
future: asyncio.Future[None] = self.hass.loop.create_future()
self.queue_task(SynchronizeTask(future))
return future
def block_till_done(self) -> None:
"""Block till all events processed.

View File

@ -317,13 +317,18 @@ class SynchronizeTask(RecorderTask):
"""Ensure all pending data has been committed."""
# commit_before is the default
event: asyncio.Event
future: asyncio.Future
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# Does not use a tracked task to avoid
# blocking shutdown if the recorder is broken
instance.hass.loop.call_soon_threadsafe(self.event.set)
instance.hass.loop.call_soon_threadsafe(self._set_result_if_not_done)
def _set_result_if_not_done(self) -> None:
"""Set the result if not done."""
if not self.future.done():
self.future.set_result(None)
@dataclass(slots=True)