diff --git a/homeassistant/runner.py b/homeassistant/runner.py index 5eae0b1b2da..3fd4118a25b 100644 --- a/homeassistant/runner.py +++ b/homeassistant/runner.py @@ -26,6 +26,9 @@ from homeassistant.util.thread import deadlock_safe_shutdown # use case. # MAX_EXECUTOR_WORKERS = 64 +TASK_CANCELATION_TIMEOUT = 5 + +_LOGGER = logging.getLogger(__name__) @dataclasses.dataclass @@ -105,4 +108,69 @@ async def setup_and_run_hass(runtime_config: RuntimeConfig) -> int: def run(runtime_config: RuntimeConfig) -> int: """Run Home Assistant.""" asyncio.set_event_loop_policy(HassEventLoopPolicy(runtime_config.debug)) - return asyncio.run(setup_and_run_hass(runtime_config)) + # Backport of cpython 3.9 asyncio.run with a _cancel_all_tasks that times out + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete(setup_and_run_hass(runtime_config)) + finally: + try: + _cancel_all_tasks_with_timeout(loop, TASK_CANCELATION_TIMEOUT) + loop.run_until_complete(loop.shutdown_asyncgens()) + # Once cpython 3.8 is no longer supported we can use the + # the built-in loop.shutdown_default_executor + loop.run_until_complete(_shutdown_default_executor(loop)) + finally: + asyncio.set_event_loop(None) + loop.close() + + +def _cancel_all_tasks_with_timeout( + loop: asyncio.AbstractEventLoop, timeout: int +) -> None: + """Adapted _cancel_all_tasks from python 3.9 with a timeout.""" + to_cancel = asyncio.all_tasks(loop) + if not to_cancel: + return + + for task in to_cancel: + task.cancel() + + loop.run_until_complete(asyncio.wait(to_cancel, timeout=timeout)) + + for task in to_cancel: + if task.cancelled(): + continue + if not task.done(): + _LOGGER.warning( + "Task could not be canceled and was still running after shutdown: %s", + task, + ) + continue + if task.exception() is not None: + loop.call_exception_handler( + { + "message": "unhandled exception during shutdown", + "exception": task.exception(), + "task": task, + } + ) + + +async def _shutdown_default_executor(loop: asyncio.AbstractEventLoop) -> None: + """Backport of cpython 3.9 schedule the shutdown of the default executor.""" + future = loop.create_future() + + def _do_shutdown() -> None: + try: + loop._default_executor.shutdown(wait=True) # type: ignore # pylint: disable=protected-access + loop.call_soon_threadsafe(future.set_result, None) + except Exception as ex: # pylint: disable=broad-except + loop.call_soon_threadsafe(future.set_exception, ex) + + thread = threading.Thread(target=_do_shutdown) + thread.start() + try: + await future + finally: + thread.join() diff --git a/tests/test_runner.py b/tests/test_runner.py index 7bbe96dd077..0e38cef0fff 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,8 +1,11 @@ """Test the runner.""" +import asyncio import threading from unittest.mock import patch +import pytest + from homeassistant import core, runner from homeassistant.util import executor, thread @@ -37,3 +40,80 @@ async def test_setup_and_run_hass(hass, tmpdir): assert threading._shutdown == thread.deadlock_safe_shutdown assert mock_run.called + + +def test_run(hass, tmpdir): + """Test we can run.""" + test_dir = tmpdir.mkdir("config") + default_config = runner.RuntimeConfig(test_dir) + + with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch( + "homeassistant.bootstrap.async_setup_hass", return_value=hass + ), patch("threading._shutdown"), patch( + "homeassistant.core.HomeAssistant.async_run" + ) as mock_run: + runner.run(default_config) + + assert mock_run.called + + +def test_run_executor_shutdown_throws(hass, tmpdir): + """Test we can run and we still shutdown if the executor shutdown throws.""" + test_dir = tmpdir.mkdir("config") + default_config = runner.RuntimeConfig(test_dir) + + with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), pytest.raises( + RuntimeError + ), patch("homeassistant.bootstrap.async_setup_hass", return_value=hass), patch( + "threading._shutdown" + ), patch( + "homeassistant.runner.InterruptibleThreadPoolExecutor.shutdown", + side_effect=RuntimeError, + ) as mock_shutdown, patch( + "homeassistant.core.HomeAssistant.async_run" + ) as mock_run: + runner.run(default_config) + + assert mock_shutdown.called + assert mock_run.called + + +def test_run_does_not_block_forever_with_shielded_task(hass, tmpdir, caplog): + """Test we can shutdown and not block forever.""" + test_dir = tmpdir.mkdir("config") + default_config = runner.RuntimeConfig(test_dir) + created_tasks = False + + async def _async_create_tasks(*_): + nonlocal created_tasks + + async def async_raise(*_): + try: + await asyncio.sleep(2) + except asyncio.CancelledError: + raise Exception + + async def async_shielded(*_): + try: + await asyncio.sleep(2) + except asyncio.CancelledError: + await asyncio.sleep(2) + + asyncio.ensure_future(asyncio.shield(async_shielded())) + asyncio.ensure_future(asyncio.sleep(2)) + asyncio.ensure_future(async_raise()) + await asyncio.sleep(0.1) + created_tasks = True + return 0 + + with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch( + "homeassistant.bootstrap.async_setup_hass", return_value=hass + ), patch("threading._shutdown"), patch( + "homeassistant.core.HomeAssistant.async_run", _async_create_tasks + ): + runner.run(default_config) + + assert created_tasks is True + assert ( + "Task could not be canceled and was still running after shutdown" in caplog.text + )