From 9f59515bb8f300d6cb28830393efb7aa5fd37c8b Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 31 Jan 2021 23:54:39 -1000 Subject: [PATCH] Fix shutdown deadlock with run_callback_threadsafe (#45807) --- homeassistant/core.py | 14 ++++++++++- homeassistant/util/async_.py | 41 ++++++++++++++++++++++++++++++++ tests/test_core.py | 24 +++++++++++++++++++ tests/util/test_async.py | 45 +++++++++++++++++++++++++++++++++++- 4 files changed, 122 insertions(+), 2 deletions(-) diff --git a/homeassistant/core.py b/homeassistant/core.py index 6d187225685..4294eb530a7 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -71,7 +71,11 @@ from homeassistant.exceptions import ( Unauthorized, ) from homeassistant.util import location, network -from homeassistant.util.async_ import fire_coroutine_threadsafe, run_callback_threadsafe +from homeassistant.util.async_ import ( + fire_coroutine_threadsafe, + run_callback_threadsafe, + shutdown_run_callback_threadsafe, +) import homeassistant.util.dt as dt_util from homeassistant.util.timeout import TimeoutManager from homeassistant.util.unit_system import IMPERIAL_SYSTEM, METRIC_SYSTEM, UnitSystem @@ -548,6 +552,14 @@ class HomeAssistant: # stage 3 self.state = CoreState.not_running self.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE) + + # Prevent run_callback_threadsafe from scheduling any additional + # callbacks in the event loop as callbacks created on the futures + # it returns will never run after the final `self.async_block_till_done` + # which will cause the futures to block forever when waiting for + # the `result()` which will cause a deadlock when shutting down the executor. + shutdown_run_callback_threadsafe(self.loop) + try: async with self.timeout.async_timeout(30): await self.async_block_till_done() diff --git a/homeassistant/util/async_.py b/homeassistant/util/async_.py index ded44473038..f61225502ee 100644 --- a/homeassistant/util/async_.py +++ b/homeassistant/util/async_.py @@ -10,6 +10,8 @@ from typing import Any, Awaitable, Callable, Coroutine, TypeVar _LOGGER = logging.getLogger(__name__) +_SHUTDOWN_RUN_CALLBACK_THREADSAFE = "_shutdown_run_callback_threadsafe" + T = TypeVar("T") @@ -58,6 +60,28 @@ def run_callback_threadsafe( _LOGGER.warning("Exception on lost future: ", exc_info=True) loop.call_soon_threadsafe(run_callback) + + if hasattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE): + # + # If the final `HomeAssistant.async_block_till_done` in + # `HomeAssistant.async_stop` has already been called, the callback + # will never run and, `future.result()` will block forever which + # will prevent the thread running this code from shutting down which + # will result in a deadlock when the main thread attempts to shutdown + # the executor and `.join()` the thread running this code. + # + # To prevent this deadlock we do the following on shutdown: + # + # 1. Set the _SHUTDOWN_RUN_CALLBACK_THREADSAFE attr on this function + # by calling `shutdown_run_callback_threadsafe` + # 2. Call `hass.async_block_till_done` at least once after shutdown + # to ensure all callbacks have run + # 3. Raise an exception here to ensure `future.result()` can never be + # called and hit the deadlock since once `shutdown_run_callback_threadsafe` + # we cannot promise the callback will be executed. + # + raise RuntimeError("The event loop is in the process of shutting down.") + return future @@ -139,3 +163,20 @@ async def gather_with_concurrency( return await gather( *(sem_task(task) for task in tasks), return_exceptions=return_exceptions ) + + +def shutdown_run_callback_threadsafe(loop: AbstractEventLoop) -> None: + """Call when run_callback_threadsafe should prevent creating new futures. + + We must finish all callbacks before the executor is shutdown + or we can end up in a deadlock state where: + + `executor.result()` is waiting for its `._condition` + and the executor shutdown is trying to `.join()` the + executor thread. + + This function is considered irreversible and should only ever + be called when Home Assistant is going to shutdown and + python is going to exit. + """ + setattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE, True) diff --git a/tests/test_core.py b/tests/test_core.py index dbed2b8c0bf..0bf00d92c45 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -169,6 +169,30 @@ async def test_stage_shutdown(hass): assert len(test_all) == 2 +async def test_shutdown_calls_block_till_done_after_shutdown_run_callback_threadsafe( + hass, +): + """Ensure shutdown_run_callback_threadsafe is called before the final async_block_till_done.""" + stop_calls = [] + + async def _record_block_till_done(): + nonlocal stop_calls + stop_calls.append("async_block_till_done") + + def _record_shutdown_run_callback_threadsafe(loop): + nonlocal stop_calls + stop_calls.append(("shutdown_run_callback_threadsafe", loop)) + + with patch.object(hass, "async_block_till_done", _record_block_till_done), patch( + "homeassistant.core.shutdown_run_callback_threadsafe", + _record_shutdown_run_callback_threadsafe, + ): + await hass.async_stop() + + assert stop_calls[-2] == ("shutdown_run_callback_threadsafe", hass.loop) + assert stop_calls[-1] == "async_block_till_done" + + async def test_pending_sheduler(hass): """Add a coro to pending tasks.""" call_count = [] diff --git a/tests/util/test_async.py b/tests/util/test_async.py index db088ada93e..d4fdce1e912 100644 --- a/tests/util/test_async.py +++ b/tests/util/test_async.py @@ -50,7 +50,8 @@ def test_fire_coroutine_threadsafe_from_inside_event_loop( def test_run_callback_threadsafe_from_inside_event_loop(mock_ident, _): """Testing calling run_callback_threadsafe from inside an event loop.""" callback = MagicMock() - loop = MagicMock() + + loop = Mock(spec=["call_soon_threadsafe"]) loop._thread_ident = None mock_ident.return_value = 5 @@ -168,3 +169,45 @@ async def test_gather_with_concurrency(): ) assert results == [2, 2, -1, -1] + + +async def test_shutdown_run_callback_threadsafe(hass): + """Test we can shutdown run_callback_threadsafe.""" + hasync.shutdown_run_callback_threadsafe(hass.loop) + callback = MagicMock() + + with pytest.raises(RuntimeError): + hasync.run_callback_threadsafe(hass.loop, callback) + + +async def test_run_callback_threadsafe(hass): + """Test run_callback_threadsafe runs code in the event loop.""" + it_ran = False + + def callback(): + nonlocal it_ran + it_ran = True + + assert hasync.run_callback_threadsafe(hass.loop, callback) + assert it_ran is False + + # Verify that async_block_till_done will flush + # out the callback + await hass.async_block_till_done() + assert it_ran is True + + +async def test_callback_is_always_scheduled(hass): + """Test run_callback_threadsafe always calls call_soon_threadsafe before checking for shutdown.""" + # We have to check the shutdown state AFTER the callback is scheduled otherwise + # the function could continue on and the caller call `future.result()` after + # the point in the main thread where callbacks are no longer run. + + callback = MagicMock() + hasync.shutdown_run_callback_threadsafe(hass.loop) + + with patch.object(hass.loop, "call_soon_threadsafe") as mock_call_soon_threadsafe: + with pytest.raises(RuntimeError): + hasync.run_callback_threadsafe(hass.loop, callback) + + mock_call_soon_threadsafe.assert_called_once()