From dba22953719071f85c347efe4afa9d411c752108 Mon Sep 17 00:00:00 2001 From: jbouwh Date: Fri, 7 Jul 2023 15:36:27 +0000 Subject: [PATCH] Test for async_get_hass --- tests/test_core.py | 181 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index 8b63eab7b42..9f720f6b3b9 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -9,10 +9,12 @@ import gc import logging import os from tempfile import TemporaryDirectory +import threading import time from typing import Any from unittest.mock import MagicMock, Mock, PropertyMock, patch +import async_timeout import pytest import voluptuous as vol @@ -40,6 +42,7 @@ from homeassistant.core import ( ServiceResponse, State, SupportsResponse, + callback, ) from homeassistant.exceptions import ( HomeAssistantError, @@ -202,6 +205,184 @@ def test_async_run_hass_job_delegates_non_async() -> None: assert len(hass.async_add_hass_job.mock_calls) == 1 +async def test_async_get_hass_can_be_called(hass: HomeAssistant) -> None: + """Test calling async_get_hass via different paths. + + The test asserts async_get_hass can be called from: + - Coroutines and callbacks + - Callbacks scheduled from callbacks, coroutines and threads + - Coroutines scheduled from callbacks, coroutines and threads + + The test also asserts async_get_hass can not be called from threads + other than the event loop. + """ + task_finished = asyncio.Event() + + def can_call_async_get_hass() -> bool: + """Test if it's possible to call async_get_hass.""" + try: + if ha.async_get_hass() is hass: + return True + raise Exception + except (LookupError, HomeAssistantError): + return False + + raise Exception + + # Test scheduling a coroutine which calls async_get_hass via hass.async_create_task + async def _async_create_task() -> None: + task_finished.set() + assert can_call_async_get_hass() + + hass.async_create_task(_async_create_task(), "create_task") + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a callback which calls async_get_hass via hass.async_add_job + @callback + def _add_job() -> None: + assert can_call_async_get_hass() + task_finished.set() + + hass.async_add_job(_add_job) + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a callback which calls async_get_hass from a callback + @callback + def _schedule_callback_from_callback() -> None: + @callback + def _callback(): + assert can_call_async_get_hass() + task_finished.set() + + # Test the scheduled callback itself can call async_get_hass + assert can_call_async_get_hass() + hass.async_add_job(_callback) + + _schedule_callback_from_callback() + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a coroutine which calls async_get_hass from a callback + @callback + def _schedule_coroutine_from_callback() -> None: + async def _coroutine(): + assert can_call_async_get_hass() + task_finished.set() + + # Test the scheduled callback itself can call async_get_hass + assert can_call_async_get_hass() + hass.async_add_job(_coroutine()) + + _schedule_coroutine_from_callback() + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a callback which calls async_get_hass from a coroutine + async def _schedule_callback_from_coroutine() -> None: + @callback + def _callback(): + assert can_call_async_get_hass() + task_finished.set() + + # Test the coroutine itself can call async_get_hass + assert can_call_async_get_hass() + hass.async_add_job(_callback) + + await _schedule_callback_from_coroutine() + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a coroutine which calls async_get_hass from a coroutine + async def _schedule_callback_from_coroutine() -> None: + async def _coroutine(): + assert can_call_async_get_hass() + task_finished.set() + + # Test the coroutine itself can call async_get_hass + assert can_call_async_get_hass() + await hass.async_create_task(_coroutine()) + + await _schedule_callback_from_coroutine() + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a callback which calls async_get_hass from an executor + def _async_add_executor_job_add_job() -> None: + @callback + def _async_add_job(): + assert can_call_async_get_hass() + task_finished.set() + + # Test the executor itself can not call async_get_hass + assert not can_call_async_get_hass() + hass.add_job(_async_add_job) + + await hass.async_add_executor_job(_async_add_executor_job_add_job) + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a coroutine which calls async_get_hass from an executor + def _async_add_executor_job_create_task() -> None: + async def _async_create_task() -> None: + assert can_call_async_get_hass() + task_finished.set() + + # Test the executor itself can not call async_get_hass + assert not can_call_async_get_hass() + hass.create_task(_async_create_task()) + + await hass.async_add_executor_job(_async_add_executor_job_create_task) + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + + # Test scheduling a callback which calls async_get_hass from a worker thread + class MyJobAddJob(threading.Thread): + @callback + def _my_threaded_job_add_job(self) -> None: + assert can_call_async_get_hass() + task_finished.set() + + def run(self) -> None: + # Test the worker thread itself can not call async_get_hass + assert not can_call_async_get_hass() + hass.add_job(self._my_threaded_job_add_job) + + my_job_add_job = MyJobAddJob() + my_job_add_job.start() + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + my_job_add_job.join() + + # Test scheduling a coroutine which calls async_get_hass from a worker thread + class MyJobCreateTask(threading.Thread): + async def _my_threaded_job_create_task(self) -> None: + assert can_call_async_get_hass() + task_finished.set() + + def run(self) -> None: + # Test the worker thread itself can not call async_get_hass + assert not can_call_async_get_hass() + hass.create_task(self._my_threaded_job_create_task()) + + my_job_create_task = MyJobCreateTask() + my_job_create_task.start() + async with async_timeout.timeout(1): + await task_finished.wait() + task_finished.clear() + my_job_create_task.join() + + async def test_stage_shutdown(hass: HomeAssistant) -> None: """Simulate a shutdown, test calling stuff.""" test_stop = async_capture_events(hass, EVENT_HOMEASSISTANT_STOP)