Remove direct usage of concurrent.futures from recorder (#68593)

This commit is contained in:
J. Nick Koston 2022-03-24 09:48:49 -10:00 committed by GitHub
parent 3777fa52f0
commit e911936a0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 22 deletions

View File

@ -4,7 +4,6 @@ from __future__ import annotations
import abc import abc
import asyncio import asyncio
from collections.abc import Callable, Iterable from collections.abc import Callable, Iterable
import concurrent.futures
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
@ -149,6 +148,8 @@ EXPIRE_AFTER_COMMITS = 120
# - How much memory our low end hardware has # - How much memory our low end hardware has
STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048
SHUTDOWN_TASK = object()
DB_LOCK_TIMEOUT = 30 DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 DB_LOCK_QUEUE_CHECK_TIMEOUT = 1
@ -298,6 +299,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
exclude_t=exclude_t, exclude_t=exclude_t,
) )
instance.async_initialize() instance.async_initialize()
instance.async_register()
instance.start() instance.start()
_async_register_services(hass, instance) _async_register_services(hass, instance)
history.async_setup(hass) history.async_setup(hass)
@ -566,6 +568,7 @@ class Recorder(threading.Thread):
self.hass = hass self.hass = hass
self.auto_purge = auto_purge self.auto_purge = auto_purge
self.keep_days = keep_days self.keep_days = keep_days
self._hass_started: asyncio.Future[object] = asyncio.Future()
self.commit_interval = commit_interval self.commit_interval = commit_interval
self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue() self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue()
self.recording_start = dt_util.utcnow() self.recording_start = dt_util.utcnow()
@ -710,12 +713,10 @@ class Recorder(threading.Thread):
self.queue.put(StatisticsTask(start)) self.queue.put(StatisticsTask(start))
@callback @callback
def async_register( def async_register(self) -> None:
self, shutdown_task: object, hass_started: concurrent.futures.Future
) -> None:
"""Post connection initialize.""" """Post connection initialize."""
def _empty_queue(event): def _empty_queue(event: Event) -> None:
"""Empty the queue if its still present at final write.""" """Empty the queue if its still present at final write."""
# If the queue is full of events to be processed because # If the queue is full of events to be processed because
@ -734,26 +735,28 @@ class Recorder(threading.Thread):
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue) self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue)
def shutdown(event): async def _async_shutdown(event: Event) -> None:
"""Shut down the Recorder.""" """Shut down the Recorder."""
if not hass_started.done(): if not self._hass_started.done():
hass_started.set_result(shutdown_task) self._hass_started.set_result(SHUTDOWN_TASK)
self.queue.put(StopTask()) self.queue.put(StopTask())
self.hass.add_job(self._async_stop_queue_watcher_and_event_listener) self._async_stop_queue_watcher_and_event_listener()
self.join() await self.hass.async_add_executor_job(self.join)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown)
if self.hass.state == CoreState.running: if self.hass.state == CoreState.running:
hass_started.set_result(None) self._hass_started.set_result(None)
return return
@callback @callback
def async_hass_started(event): def _async_hass_started(event: Event) -> None:
"""Notify that hass has started.""" """Notify that hass has started."""
hass_started.set_result(None) self._hass_started.set_result(None)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, async_hass_started) self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, _async_hass_started
)
@callback @callback
def async_connection_failed(self) -> None: def async_connection_failed(self) -> None:
@ -833,13 +836,18 @@ class Recorder(threading.Thread):
self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10 self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10
) )
async def _async_wait_for_started(self) -> object | None:
"""Wait for the hass started future."""
return await self._hass_started
def _wait_startup_or_shutdown(self) -> object | None:
"""Wait for startup or shutdown before starting."""
return asyncio.run_coroutine_threadsafe(
self._async_wait_for_started(), self.hass.loop
).result()
def run(self) -> None: def run(self) -> None:
"""Start processing events to save.""" """Start processing events to save."""
shutdown_task = object()
hass_started: concurrent.futures.Future = concurrent.futures.Future()
self.hass.add_job(self.async_register, shutdown_task, hass_started)
current_version = self._setup_recorder() current_version = self._setup_recorder()
if current_version is None: if current_version is None:
@ -853,8 +861,9 @@ class Recorder(threading.Thread):
self.migration_in_progress = True self.migration_in_progress = True
self.hass.add_job(self.async_connection_success) self.hass.add_job(self.async_connection_success)
# If shutdown happened before Home Assistant finished starting # If shutdown happened before Home Assistant finished starting
if hass_started.result() is shutdown_task: if self._wait_startup_or_shutdown() is SHUTDOWN_TASK:
self.migration_in_progress = False self.migration_in_progress = False
# Make sure we cleanly close the run if # Make sure we cleanly close the run if
# we restart before startup finishes # we restart before startup finishes

View File

@ -299,7 +299,7 @@ async def test_schema_migrate(hass, start_version):
assert recorder.util.async_migration_in_progress(hass) is True assert recorder.util.async_migration_in_progress(hass) is True
migration_stall.set() migration_stall.set()
await hass.async_block_till_done() await hass.async_block_till_done()
migration_done.wait() await hass.async_add_executor_job(migration_done.wait)
await async_wait_recording_done_without_instance(hass) await async_wait_recording_done_without_instance(hass)
assert migration_version == models.SCHEMA_VERSION assert migration_version == models.SCHEMA_VERSION
assert setup_run.called assert setup_run.called