diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index d68b993444b..205b544f6ab 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -4,7 +4,6 @@ from __future__ import annotations import abc import asyncio from collections.abc import Callable, Iterable -import concurrent.futures from dataclasses import dataclass from datetime import datetime, timedelta import logging @@ -149,6 +148,8 @@ EXPIRE_AFTER_COMMITS = 120 # - How much memory our low end hardware has STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 +SHUTDOWN_TASK = object() + DB_LOCK_TIMEOUT = 30 DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 @@ -298,6 +299,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: exclude_t=exclude_t, ) instance.async_initialize() + instance.async_register() instance.start() _async_register_services(hass, instance) history.async_setup(hass) @@ -566,6 +568,7 @@ class Recorder(threading.Thread): self.hass = hass self.auto_purge = auto_purge self.keep_days = keep_days + self._hass_started: asyncio.Future[object] = asyncio.Future() self.commit_interval = commit_interval self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue() self.recording_start = dt_util.utcnow() @@ -710,12 +713,10 @@ class Recorder(threading.Thread): self.queue.put(StatisticsTask(start)) @callback - def async_register( - self, shutdown_task: object, hass_started: concurrent.futures.Future - ) -> None: + def async_register(self) -> None: """Post connection initialize.""" - def _empty_queue(event): + def _empty_queue(event: Event) -> None: """Empty the queue if its still present at final write.""" # If the queue is full of events to be processed because @@ -734,26 +735,28 @@ class Recorder(threading.Thread): self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue) - def shutdown(event): + async def _async_shutdown(event: Event) -> None: """Shut down the Recorder.""" - if not hass_started.done(): - hass_started.set_result(shutdown_task) + if not self._hass_started.done(): + self._hass_started.set_result(SHUTDOWN_TASK) self.queue.put(StopTask()) - self.hass.add_job(self._async_stop_queue_watcher_and_event_listener) - self.join() + self._async_stop_queue_watcher_and_event_listener() + await self.hass.async_add_executor_job(self.join) - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown) if self.hass.state == CoreState.running: - hass_started.set_result(None) + self._hass_started.set_result(None) return @callback - def async_hass_started(event): + def _async_hass_started(event: Event) -> None: """Notify that hass has started.""" - hass_started.set_result(None) + self._hass_started.set_result(None) - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, async_hass_started) + self.hass.bus.async_listen_once( + EVENT_HOMEASSISTANT_STARTED, _async_hass_started + ) @callback def async_connection_failed(self) -> None: @@ -833,13 +836,18 @@ class Recorder(threading.Thread): self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10 ) + async def _async_wait_for_started(self) -> object | None: + """Wait for the hass started future.""" + return await self._hass_started + + def _wait_startup_or_shutdown(self) -> object | None: + """Wait for startup or shutdown before starting.""" + return asyncio.run_coroutine_threadsafe( + self._async_wait_for_started(), self.hass.loop + ).result() + def run(self) -> None: """Start processing events to save.""" - shutdown_task = object() - hass_started: concurrent.futures.Future = concurrent.futures.Future() - - self.hass.add_job(self.async_register, shutdown_task, hass_started) - current_version = self._setup_recorder() if current_version is None: @@ -853,8 +861,9 @@ class Recorder(threading.Thread): self.migration_in_progress = True self.hass.add_job(self.async_connection_success) + # If shutdown happened before Home Assistant finished starting - if hass_started.result() is shutdown_task: + if self._wait_startup_or_shutdown() is SHUTDOWN_TASK: self.migration_in_progress = False # Make sure we cleanly close the run if # we restart before startup finishes diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 5e837eb36ac..337c0a65c2b 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -299,7 +299,7 @@ async def test_schema_migrate(hass, start_version): assert recorder.util.async_migration_in_progress(hass) is True migration_stall.set() await hass.async_block_till_done() - migration_done.wait() + await hass.async_add_executor_job(migration_done.wait) await async_wait_recording_done_without_instance(hass) assert migration_version == models.SCHEMA_VERSION assert setup_run.called