diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 31c36be9c88..c57274317e3 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -367,13 +367,6 @@ class Recorder(threading.Thread): """Add an executor job from within the event loop.""" return self.hass.loop.run_in_executor(self._db_executor, target, *args) - def _stop_executor(self) -> None: - """Stop the executor.""" - if self._db_executor is None: - return - self._db_executor.shutdown() - self._db_executor = None - @callback def _async_check_queue(self, *_: Any) -> None: """Periodic check of the queue size to ensure we do not exhaust memory. @@ -1501,5 +1494,13 @@ class Recorder(threading.Thread): try: self._end_session() finally: - self._stop_executor() + if self._db_executor: + # We shutdown the executor without forcefully + # joining the threads until after we have tried + # to cleanly close the connection. + self._db_executor.shutdown(join_threads_or_timeout=False) self._close_connection() + if self._db_executor: + # After the connection is closed, we can join the threads + # or forcefully shutdown the threads if they take too long. + self._db_executor.join_threads_or_timeout() diff --git a/homeassistant/util/executor.py b/homeassistant/util/executor.py index 47b6d08a197..5f0fdd5c273 100644 --- a/homeassistant/util/executor.py +++ b/homeassistant/util/executor.py @@ -63,10 +63,18 @@ def join_or_interrupt_threads( class InterruptibleThreadPoolExecutor(ThreadPoolExecutor): """A ThreadPoolExecutor instance that will not deadlock on shutdown.""" - def shutdown(self, *args: Any, **kwargs: Any) -> None: - """Shutdown with interrupt support added.""" + def shutdown( + self, *args: Any, join_threads_or_timeout: bool = True, **kwargs: Any + ) -> None: + """Shutdown with interrupt support added. + + By default shutdown will wait for threads to finish up + to the timeout before forcefully stopping them. This can + be disabled by setting `join_threads_or_timeout` to False. + """ super().shutdown(wait=False, cancel_futures=True) - self.join_threads_or_timeout() + if join_threads_or_timeout: + self.join_threads_or_timeout() def join_threads_or_timeout(self) -> None: """Join threads or timeout.""" diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 72d47515edd..3bbc78e21ce 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -166,11 +166,10 @@ async def test_shutdown_before_startup_finishes( await hass.async_block_till_done() await hass.async_stop() - def _run_information_with_session(): - instance.recorder_and_worker_thread_ids.add(threading.get_ident()) - return run_information_with_session(session) - - run_info = await instance.async_add_executor_job(_run_information_with_session) + # The database executor is shutdown so we must run the + # query in the main thread for testing + instance.recorder_and_worker_thread_ids.add(threading.get_ident()) + run_info = run_information_with_session(session) assert run_info.run_id == 1 assert run_info.start is not None @@ -216,8 +215,7 @@ async def test_shutdown_closes_connections( instance = recorder.get_instance(hass) await instance.async_db_ready await hass.async_block_till_done() - pool = instance.engine.pool - pool.shutdown = Mock() + pool = instance.engine def _ensure_connected(): with session_scope(hass=hass, read_only=True) as session: @@ -225,10 +223,11 @@ async def test_shutdown_closes_connections( await instance.async_add_executor_job(_ensure_connected) - hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) - await hass.async_block_till_done() + with patch.object(pool, "dispose", wraps=pool.dispose) as dispose: + hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) + await hass.async_block_till_done() - assert len(pool.shutdown.mock_calls) == 1 + assert len(dispose.mock_calls) == 1 with pytest.raises(RuntimeError): assert instance.get_session()