Test for async_get_hass

This commit is contained in:
jbouwh 2023-07-07 15:36:27 +00:00
parent f205d50ac7
commit dba2295371

View File

@ -9,10 +9,12 @@ import gc
import logging
import os
from tempfile import TemporaryDirectory
import threading
import time
from typing import Any
from unittest.mock import MagicMock, Mock, PropertyMock, patch
import async_timeout
import pytest
import voluptuous as vol
@ -40,6 +42,7 @@ from homeassistant.core import (
ServiceResponse,
State,
SupportsResponse,
callback,
)
from homeassistant.exceptions import (
HomeAssistantError,
@ -202,6 +205,184 @@ def test_async_run_hass_job_delegates_non_async() -> None:
assert len(hass.async_add_hass_job.mock_calls) == 1
async def test_async_get_hass_can_be_called(hass: HomeAssistant) -> None:
"""Test calling async_get_hass via different paths.
The test asserts async_get_hass can be called from:
- Coroutines and callbacks
- Callbacks scheduled from callbacks, coroutines and threads
- Coroutines scheduled from callbacks, coroutines and threads
The test also asserts async_get_hass can not be called from threads
other than the event loop.
"""
task_finished = asyncio.Event()
def can_call_async_get_hass() -> bool:
"""Test if it's possible to call async_get_hass."""
try:
if ha.async_get_hass() is hass:
return True
raise Exception
except (LookupError, HomeAssistantError):
return False
raise Exception
# Test scheduling a coroutine which calls async_get_hass via hass.async_create_task
async def _async_create_task() -> None:
task_finished.set()
assert can_call_async_get_hass()
hass.async_create_task(_async_create_task(), "create_task")
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a callback which calls async_get_hass via hass.async_add_job
@callback
def _add_job() -> None:
assert can_call_async_get_hass()
task_finished.set()
hass.async_add_job(_add_job)
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a callback which calls async_get_hass from a callback
@callback
def _schedule_callback_from_callback() -> None:
@callback
def _callback():
assert can_call_async_get_hass()
task_finished.set()
# Test the scheduled callback itself can call async_get_hass
assert can_call_async_get_hass()
hass.async_add_job(_callback)
_schedule_callback_from_callback()
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a coroutine which calls async_get_hass from a callback
@callback
def _schedule_coroutine_from_callback() -> None:
async def _coroutine():
assert can_call_async_get_hass()
task_finished.set()
# Test the scheduled callback itself can call async_get_hass
assert can_call_async_get_hass()
hass.async_add_job(_coroutine())
_schedule_coroutine_from_callback()
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a callback which calls async_get_hass from a coroutine
async def _schedule_callback_from_coroutine() -> None:
@callback
def _callback():
assert can_call_async_get_hass()
task_finished.set()
# Test the coroutine itself can call async_get_hass
assert can_call_async_get_hass()
hass.async_add_job(_callback)
await _schedule_callback_from_coroutine()
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a coroutine which calls async_get_hass from a coroutine
async def _schedule_callback_from_coroutine() -> None:
async def _coroutine():
assert can_call_async_get_hass()
task_finished.set()
# Test the coroutine itself can call async_get_hass
assert can_call_async_get_hass()
await hass.async_create_task(_coroutine())
await _schedule_callback_from_coroutine()
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a callback which calls async_get_hass from an executor
def _async_add_executor_job_add_job() -> None:
@callback
def _async_add_job():
assert can_call_async_get_hass()
task_finished.set()
# Test the executor itself can not call async_get_hass
assert not can_call_async_get_hass()
hass.add_job(_async_add_job)
await hass.async_add_executor_job(_async_add_executor_job_add_job)
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a coroutine which calls async_get_hass from an executor
def _async_add_executor_job_create_task() -> None:
async def _async_create_task() -> None:
assert can_call_async_get_hass()
task_finished.set()
# Test the executor itself can not call async_get_hass
assert not can_call_async_get_hass()
hass.create_task(_async_create_task())
await hass.async_add_executor_job(_async_add_executor_job_create_task)
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
# Test scheduling a callback which calls async_get_hass from a worker thread
class MyJobAddJob(threading.Thread):
@callback
def _my_threaded_job_add_job(self) -> None:
assert can_call_async_get_hass()
task_finished.set()
def run(self) -> None:
# Test the worker thread itself can not call async_get_hass
assert not can_call_async_get_hass()
hass.add_job(self._my_threaded_job_add_job)
my_job_add_job = MyJobAddJob()
my_job_add_job.start()
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
my_job_add_job.join()
# Test scheduling a coroutine which calls async_get_hass from a worker thread
class MyJobCreateTask(threading.Thread):
async def _my_threaded_job_create_task(self) -> None:
assert can_call_async_get_hass()
task_finished.set()
def run(self) -> None:
# Test the worker thread itself can not call async_get_hass
assert not can_call_async_get_hass()
hass.create_task(self._my_threaded_job_create_task())
my_job_create_task = MyJobCreateTask()
my_job_create_task.start()
async with async_timeout.timeout(1):
await task_finished.wait()
task_finished.clear()
my_job_create_task.join()
async def test_stage_shutdown(hass: HomeAssistant) -> None:
"""Simulate a shutdown, test calling stuff."""
test_stop = async_capture_events(hass, EVENT_HOMEASSISTANT_STOP)