diff --git a/homeassistant/bootstrap.py b/homeassistant/bootstrap.py index 28eb45e5273..3d4e79c1ca2 100644 --- a/homeassistant/bootstrap.py +++ b/homeassistant/bootstrap.py @@ -877,7 +877,7 @@ async def _async_set_up_integrations( _LOGGER.debug("Waiting for startup to wrap up") try: async with hass.timeout.async_timeout(WRAP_UP_TIMEOUT, cool_down=COOLDOWN_TIME): - await hass.async_block_till_done() + await hass.async_block_till_done(wait_periodic_tasks=False) except TimeoutError: _LOGGER.warning( "Setup timed out for bootstrap waiting on %s - moving forward", diff --git a/homeassistant/config_entries.py b/homeassistant/config_entries.py index d9023e5e11a..6976d6d36a8 100644 --- a/homeassistant/config_entries.py +++ b/homeassistant/config_entries.py @@ -17,6 +17,7 @@ from contextvars import ContextVar from copy import deepcopy from enum import Enum, StrEnum import functools +from itertools import chain import logging from random import randint from types import MappingProxyType @@ -377,6 +378,7 @@ class ConfigEntry: self._tasks: set[asyncio.Future[Any]] = set() self._background_tasks: set[asyncio.Future[Any]] = set() + self._periodic_tasks: set[asyncio.Future[Any]] = set() self._integration_for_domain: loader.Integration | None = None self._tries = 0 @@ -854,15 +856,15 @@ class ConfigEntry: if job := self._on_unload.pop()(): self.async_create_task(hass, job) - if not self._tasks and not self._background_tasks: + if not self._tasks and not self._background_tasks and not self._periodic_tasks: return cancel_message = f"Config entry {self.title} with {self.domain} unloading" - for task in self._background_tasks: + for task in chain(self._background_tasks, self._periodic_tasks): task.cancel(cancel_message) _, pending = await asyncio.wait( - [*self._tasks, *self._background_tasks], timeout=10 + [*self._tasks, *self._background_tasks, *self._periodic_tasks], timeout=10 ) for task in pending: @@ -1026,7 +1028,13 @@ class ConfigEntry: Background tasks are automatically canceled when config entry is unloaded. - target: target to call. + A background task is different from a normal task: + + - Will not block startup + - Will be automatically cancelled on shutdown + - Calls to async_block_till_done will not wait for completion + + This method must be run in the event loop. """ task = hass.async_create_background_task(target, name, eager_start) if task.done(): @@ -1035,6 +1043,35 @@ class ConfigEntry: task.add_done_callback(self._background_tasks.remove) return task + @callback + def async_create_periodic_task( + self, + hass: HomeAssistant, + target: Coroutine[Any, Any, _R], + name: str, + eager_start: bool = False, + ) -> asyncio.Task[_R]: + """Create a periodic task tied to the config entry lifecycle. + + Periodic tasks are automatically canceled when config entry is unloaded. + + This type of task is typically used for polling. + + A periodic task is different from a normal task: + + - Will not block startup + - Will be automatically cancelled on shutdown + - Calls to async_block_till_done will wait for completion by default + + This method must be run in the event loop. + """ + task = hass.async_create_periodic_task(target, name, eager_start) + if task.done(): + return task + self._periodic_tasks.add(task) + task.add_done_callback(self._periodic_tasks.remove) + return task + current_entry: ContextVar[ConfigEntry | None] = ContextVar( "current_entry", default=None diff --git a/homeassistant/core.py b/homeassistant/core.py index 3a60e6f1170..b78be4ff3ad 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -382,6 +382,7 @@ class HomeAssistant: self.loop = asyncio.get_running_loop() self._tasks: set[asyncio.Future[Any]] = set() self._background_tasks: set[asyncio.Future[Any]] = set() + self._periodic_tasks: set[asyncio.Future[Any]] = set() self.bus = EventBus(self) self.services = ServiceRegistry(self) self.states = StateMachine(self.bus, self.loop) @@ -640,6 +641,56 @@ class HomeAssistant: return task + @overload + @callback + def async_run_periodic_hass_job( + self, hassjob: HassJob[..., Coroutine[Any, Any, _R]], *args: Any + ) -> asyncio.Future[_R] | None: + ... + + @overload + @callback + def async_run_periodic_hass_job( + self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any + ) -> asyncio.Future[_R] | None: + ... + + @callback + def async_run_periodic_hass_job( + self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any + ) -> asyncio.Future[_R] | None: + """Add a periodic HassJob from within the event loop. + + This method must be run in the event loop. + hassjob: HassJob to call. + args: parameters for method to call. + """ + task: asyncio.Future[_R] + # This code path is performance sensitive and uses + # if TYPE_CHECKING to avoid the overhead of constructing + # the type used for the cast. For history see: + # https://github.com/home-assistant/core/pull/71960 + if hassjob.job_type is HassJobType.Coroutinefunction: + if TYPE_CHECKING: + hassjob.target = cast( + Callable[..., Coroutine[Any, Any, _R]], hassjob.target + ) + task = create_eager_task(hassjob.target(*args), name=hassjob.name) + elif hassjob.job_type is HassJobType.Callback: + if TYPE_CHECKING: + hassjob.target = cast(Callable[..., _R], hassjob.target) + hassjob.target(*args) + return None + else: + if TYPE_CHECKING: + hassjob.target = cast(Callable[..., _R], hassjob.target) + task = self.loop.run_in_executor(None, hassjob.target, *args) + + self._periodic_tasks.add(task) + task.add_done_callback(self._periodic_tasks.remove) + + return task + def create_task( self, target: Coroutine[Any, Any, Any], name: str | None = None ) -> None: @@ -681,9 +732,17 @@ class HomeAssistant: ) -> asyncio.Task[_R]: """Create a task from within the event loop. - This is a background task which will not block startup and will be - automatically cancelled on shutdown. If you are using this in your - integration, use the create task methods on the config entry instead. + This type of task is for background tasks that usually run for + the lifetime of Home Assistant or an integration's setup. + + A background task is different from a normal task: + + - Will not block startup + - Will be automatically cancelled on shutdown + - Calls to async_block_till_done will not wait for completion + + If you are using this in your integration, use the create task + methods on the config entry instead. This method must be run in the event loop. """ @@ -699,6 +758,37 @@ class HomeAssistant: task.add_done_callback(self._background_tasks.remove) return task + @callback + def async_create_periodic_task( + self, target: Coroutine[Any, Any, _R], name: str, eager_start: bool = False + ) -> asyncio.Task[_R]: + """Create a task from within the event loop. + + This type of task is typically used for polling. + + A periodic task is different from a normal task: + + - Will not block startup + - Will be automatically cancelled on shutdown + - Calls to async_block_till_done will wait for completion by default + + If you are using this in your integration, use the create task + methods on the config entry instead. + + This method must be run in the event loop. + """ + if eager_start: + task = create_eager_task(target, name=name, loop=self.loop) + if task.done(): + return task + else: + # Use loop.create_task + # to avoid the extra function call in asyncio.create_task. + task = self.loop.create_task(target, name=name) + self._periodic_tasks.add(task) + task.add_done_callback(self._periodic_tasks.remove) + return task + @callback def async_add_executor_job( self, target: Callable[..., _T], *args: Any @@ -808,16 +898,19 @@ class HomeAssistant: self.async_block_till_done(), self.loop ).result() - async def async_block_till_done(self) -> None: + async def async_block_till_done(self, wait_periodic_tasks: bool = True) -> None: """Block until all pending work is done.""" # To flush out any call_soon_threadsafe await asyncio.sleep(0) start_time: float | None = None current_task = asyncio.current_task() - while tasks := [ task - for task in self._tasks + for task in ( + self._tasks | self._periodic_tasks + if wait_periodic_tasks + else self._tasks + ) if task is not current_task and not cancelling(task) ]: await self._await_and_log_pending(tasks) @@ -948,7 +1041,7 @@ class HomeAssistant: self._tasks = set() # Cancel all background tasks - for task in self._background_tasks: + for task in self._background_tasks | self._periodic_tasks: self._tasks.add(task) task.add_done_callback(self._tasks.remove) task.cancel("Home Assistant is stopping") @@ -960,7 +1053,7 @@ class HomeAssistant: self.bus.async_fire(EVENT_HOMEASSISTANT_STOP) try: async with self.timeout.async_timeout(STOP_STAGE_SHUTDOWN_TIMEOUT): - await self.async_block_till_done() + await self.async_block_till_done(wait_periodic_tasks=False) except TimeoutError: _LOGGER.warning( "Timed out waiting for integrations to stop, the shutdown will" @@ -973,7 +1066,7 @@ class HomeAssistant: self.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) try: async with self.timeout.async_timeout(FINAL_WRITE_STAGE_SHUTDOWN_TIMEOUT): - await self.async_block_till_done() + await self.async_block_till_done(wait_periodic_tasks=False) except TimeoutError: _LOGGER.warning( "Timed out waiting for final writes to complete, the shutdown will" @@ -1025,7 +1118,7 @@ class HomeAssistant: try: async with self.timeout.async_timeout(CLOSE_STAGE_SHUTDOWN_TIMEOUT): - await self.async_block_till_done() + await self.async_block_till_done(wait_periodic_tasks=False) except TimeoutError: _LOGGER.warning( "Timed out waiting for close event to be processed, the shutdown will" diff --git a/homeassistant/helpers/entity_platform.py b/homeassistant/helpers/entity_platform.py index 3a441e75e84..e9258f8d1c7 100644 --- a/homeassistant/helpers/entity_platform.py +++ b/homeassistant/helpers/entity_platform.py @@ -641,7 +641,19 @@ class EntityPlatform: @callback def _async_handle_interval_callback(self, now: datetime) -> None: """Update all the entity states in a single platform.""" - self.hass.async_create_task(self._update_entity_states(now), eager_start=True) + if self.config_entry: + self.config_entry.async_create_periodic_task( + self.hass, + self._update_entity_states(now), + name=f"EntityPlatform poll {self.domain}.{self.platform_name}", + eager_start=True, + ) + else: + self.hass.async_create_periodic_task( + self._update_entity_states(now), + name=f"EntityPlatform poll {self.domain}.{self.platform_name}", + eager_start=True, + ) def _entity_id_already_exists(self, entity_id: str) -> tuple[bool, bool]: """Check if an entity_id already exists. diff --git a/homeassistant/helpers/event.py b/homeassistant/helpers/event.py index 3c0aa4b9e34..0233eea37d6 100644 --- a/homeassistant/helpers/event.py +++ b/homeassistant/helpers/event.py @@ -1609,7 +1609,7 @@ class _TrackTimeInterval: self._track_job, hass.loop.time() + self.seconds, ) - hass.async_run_hass_job(self._run_job, now) + hass.async_run_periodic_hass_job(self._run_job, now) @callback def async_cancel(self) -> None: @@ -1694,7 +1694,7 @@ class SunListener: """Handle solar event.""" self._unsub_sun = None self._listen_next_sun_event() - self.hass.async_run_hass_job(self.job) + self.hass.async_run_periodic_hass_job(self.job) @callback def _handle_config_event(self, _event: Any) -> None: @@ -1780,7 +1780,7 @@ class _TrackUTCTimeChange: # time when the timer was scheduled utc_now = time_tracker_utcnow() localized_now = dt_util.as_local(utc_now) if self.local else utc_now - hass.async_run_hass_job(self.job, localized_now) + hass.async_run_periodic_hass_job(self.job, localized_now) if TYPE_CHECKING: assert self._pattern_time_change_listener_job is not None self._cancel_callback = async_track_point_in_utc_time( diff --git a/homeassistant/helpers/update_coordinator.py b/homeassistant/helpers/update_coordinator.py index 018ba1d13e4..5fe2071e853 100644 --- a/homeassistant/helpers/update_coordinator.py +++ b/homeassistant/helpers/update_coordinator.py @@ -253,7 +253,19 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]): @callback def __wrap_handle_refresh_interval(self) -> None: """Handle a refresh interval occurrence.""" - self.hass.async_create_task(self._handle_refresh_interval(), eager_start=True) + if self.config_entry: + self.config_entry.async_create_periodic_task( + self.hass, + self._handle_refresh_interval(), + name=f"{self.name} - {self.config_entry.title} - refresh", + eager_start=True, + ) + else: + self.hass.async_create_periodic_task( + self._handle_refresh_interval(), + name=f"{self.name} - refresh", + eager_start=True, + ) async def _handle_refresh_interval(self, _now: datetime | None = None) -> None: """Handle a refresh interval occurrence.""" diff --git a/tests/components/hassio/test_sensor.py b/tests/components/hassio/test_sensor.py index fbc6f08a1f5..9dbcb5d0e5d 100644 --- a/tests/components/hassio/test_sensor.py +++ b/tests/components/hassio/test_sensor.py @@ -3,14 +3,17 @@ from datetime import timedelta import os from unittest.mock import patch +from freezegun.api import FrozenDateTimeFactory import pytest +from homeassistant import config_entries from homeassistant.components.hassio import ( DOMAIN, HASSIO_UPDATE_INTERVAL, HassioAPIError, ) from homeassistant.components.hassio.const import REQUEST_REFRESH_DELAY +from homeassistant.const import STATE_UNAVAILABLE from homeassistant.core import HomeAssistant from homeassistant.helpers import entity_registry as er from homeassistant.setup import async_setup_component @@ -264,6 +267,7 @@ async def test_sensor( ("sensor.test_memory_percent", "4.59"), ], ) +@patch.dict(os.environ, MOCK_ENVIRON) async def test_stats_addon_sensor( hass: HomeAssistant, entity_id, @@ -271,18 +275,17 @@ async def test_stats_addon_sensor( aioclient_mock: AiohttpClientMocker, entity_registry: er.EntityRegistry, caplog: pytest.LogCaptureFixture, + freezer: FrozenDateTimeFactory, ) -> None: """Test stats addons sensor.""" config_entry = MockConfigEntry(domain=DOMAIN, data={}, unique_id=DOMAIN) config_entry.add_to_hass(hass) - with patch.dict(os.environ, MOCK_ENVIRON): - result = await async_setup_component( - hass, - "hassio", - {"http": {"server_port": 9999, "server_host": "127.0.0.1"}, "hassio": {}}, - ) - assert result + assert await async_setup_component( + hass, + "hassio", + {"http": {"server_port": 9999, "server_host": "127.0.0.1"}, "hassio": {}}, + ) await hass.async_block_till_done() # Verify that the entity is disabled by default. @@ -292,9 +295,8 @@ async def test_stats_addon_sensor( _install_default_mocks(aioclient_mock) _install_test_addon_stats_failure_mock(aioclient_mock) - async_fire_time_changed( - hass, dt_util.utcnow() + HASSIO_UPDATE_INTERVAL + timedelta(seconds=1) - ) + freezer.tick(HASSIO_UPDATE_INTERVAL + timedelta(seconds=1)) + async_fire_time_changed(hass) await hass.async_block_till_done() assert "Could not fetch stats" not in caplog.text @@ -303,22 +305,31 @@ async def test_stats_addon_sensor( _install_default_mocks(aioclient_mock) _install_test_addon_stats_mock(aioclient_mock) - async_fire_time_changed( - hass, dt_util.utcnow() + HASSIO_UPDATE_INTERVAL + timedelta(seconds=1) - ) + freezer.tick(HASSIO_UPDATE_INTERVAL + timedelta(seconds=1)) + async_fire_time_changed(hass) await hass.async_block_till_done() - # Enable the entity. + assert "Could not fetch stats" not in caplog.text + + # Enable the entity and wait for the reload to complete. entity_registry.async_update_entity(entity_id, disabled_by=None) - await hass.config_entries.async_reload(config_entry.entry_id) + freezer.tick(config_entries.RELOAD_AFTER_UPDATE_DELAY) + async_fire_time_changed(hass) + await hass.async_block_till_done() + assert config_entry.state is config_entries.ConfigEntryState.LOADED + # Verify the entity is still enabled + assert entity_registry.async_get(entity_id).disabled_by is None + + # The config entry just reloaded, so we need to wait for the next update + freezer.tick(HASSIO_UPDATE_INTERVAL + timedelta(seconds=1)) + async_fire_time_changed(hass) await hass.async_block_till_done() - # There is a REQUEST_REFRESH_DELAYs cooldown on the debouncer - async_fire_time_changed( - hass, dt_util.now() + timedelta(seconds=REQUEST_REFRESH_DELAY) - ) - await hass.async_block_till_done() + assert hass.states.get(entity_id) is not None + freezer.tick(HASSIO_UPDATE_INTERVAL + timedelta(seconds=1)) + async_fire_time_changed(hass) + await hass.async_block_till_done() # Verify that the entity have the expected state. state = hass.states.get(entity_id) assert state.state == expected @@ -327,9 +338,10 @@ async def test_stats_addon_sensor( _install_default_mocks(aioclient_mock) _install_test_addon_stats_failure_mock(aioclient_mock) - async_fire_time_changed( - hass, dt_util.utcnow() + HASSIO_UPDATE_INTERVAL + timedelta(seconds=1) - ) + freezer.tick(HASSIO_UPDATE_INTERVAL + timedelta(seconds=1)) + async_fire_time_changed(hass) await hass.async_block_till_done() + state = hass.states.get(entity_id) + assert state.state == STATE_UNAVAILABLE assert "Could not fetch stats" in caplog.text diff --git a/tests/components/homewizard/test_sensor.py b/tests/components/homewizard/test_sensor.py index 243e8f542e2..a7d018ea35f 100644 --- a/tests/components/homewizard/test_sensor.py +++ b/tests/components/homewizard/test_sensor.py @@ -2,7 +2,7 @@ from unittest.mock import MagicMock -from homewizard_energy.errors import DisabledError, RequestError +from homewizard_energy.errors import RequestError from homewizard_energy.models import Data import pytest from syrupy.assertion import SnapshotAssertion @@ -375,7 +375,7 @@ async def test_disabled_by_default_sensors( assert entry.disabled_by is er.RegistryEntryDisabler.INTEGRATION -@pytest.mark.parametrize("exception", [RequestError, DisabledError]) +@pytest.mark.parametrize("exception", [RequestError]) async def test_sensors_unreachable( hass: HomeAssistant, mock_homewizardenergy: MagicMock, diff --git a/tests/components/homewizard/test_switch.py b/tests/components/homewizard/test_switch.py index bfc23264340..85c2bee709c 100644 --- a/tests/components/homewizard/test_switch.py +++ b/tests/components/homewizard/test_switch.py @@ -192,7 +192,7 @@ async def test_switch_entities( @pytest.mark.parametrize("device_fixture", ["HWE-SKT"]) -@pytest.mark.parametrize("exception", [RequestError, DisabledError, UnsupportedError]) +@pytest.mark.parametrize("exception", [RequestError, UnsupportedError]) @pytest.mark.parametrize( ("entity_id", "method"), [ diff --git a/tests/helpers/test_entity_platform.py b/tests/helpers/test_entity_platform.py index 07ecd7844da..f6fc5888c1c 100644 --- a/tests/helpers/test_entity_platform.py +++ b/tests/helpers/test_entity_platform.py @@ -200,7 +200,7 @@ async def test_set_scan_interval_via_platform( component = EntityComponent(_LOGGER, DOMAIN, hass) - component.setup({DOMAIN: {"platform": "platform"}}) + await component.async_setup({DOMAIN: {"platform": "platform"}}) await hass.async_block_till_done() assert mock_track.called diff --git a/tests/test_config_entries.py b/tests/test_config_entries.py index d6c5d8bdc5c..089d0f6b21b 100644 --- a/tests/test_config_entries.py +++ b/tests/test_config_entries.py @@ -4329,10 +4329,23 @@ async def test_task_tracking(hass: HomeAssistant) -> None: entry.async_create_background_task( hass, test_task(), "background-task-name", eager_start=False ) + entry.async_create_periodic_task( + hass, test_task(), "periodic-task-name", eager_start=False + ) + entry.async_create_periodic_task( + hass, test_task(), "periodic-task-name", eager_start=True + ) await asyncio.sleep(0) hass.loop.call_soon(event.set) await entry._async_process_on_unload(hass) - assert results == ["on_unload", "background", "background", "normal"] + assert results == [ + "on_unload", + "background", + "background", + "background", + "background", + "normal", + ] async def test_preview_supported( diff --git a/tests/test_core.py b/tests/test_core.py index 75d06a7c61f..9960e8a1671 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -223,6 +223,47 @@ async def test_async_create_task_schedule_coroutine_with_name() -> None: assert "named task" in str(task) +async def test_async_run_periodic_hass_job_calls_callback() -> None: + """Test that the callback annotation is respected.""" + hass = MagicMock() + calls = [] + + def job(): + asyncio.get_running_loop() # ensure we are in the event loop + calls.append(1) + + ha.HomeAssistant.async_run_periodic_hass_job(hass, ha.HassJob(ha.callback(job))) + assert len(calls) == 1 + + +async def test_async_run_periodic_hass_job_calls_coro_function() -> None: + """Test running coros from async_run_periodic_hass_job.""" + hass = MagicMock() + calls = [] + + async def job(): + calls.append(1) + + await ha.HomeAssistant.async_run_periodic_hass_job(hass, ha.HassJob(job)) + assert len(calls) == 1 + + +async def test_async_run_periodic_hass_job_calls_executor_function() -> None: + """Test running in the executor from async_run_periodic_hass_job.""" + hass = MagicMock() + hass.loop = asyncio.get_running_loop() + calls = [] + + def job(): + try: + asyncio.get_running_loop() # ensure we are not in the event loop + except RuntimeError: + calls.append(1) + + await ha.HomeAssistant.async_run_periodic_hass_job(hass, ha.HassJob(job)) + assert len(calls) == 1 + + async def test_async_run_hass_job_calls_callback() -> None: """Test that the callback annotation is respected.""" hass = MagicMock() @@ -514,7 +555,7 @@ async def test_shutdown_calls_block_till_done_after_shutdown_run_callback_thread """Ensure shutdown_run_callback_threadsafe is called before the final async_block_till_done.""" stop_calls = [] - async def _record_block_till_done(): + async def _record_block_till_done(wait_periodic_tasks: bool = True): nonlocal stop_calls stop_calls.append("async_block_till_done") @@ -2098,9 +2139,9 @@ async def test_chained_logging_hits_log_timeout( return hass.async_create_task(_task_chain_1()) - with patch.object(ha, "BLOCK_LOG_TIMEOUT", 0.0001): + with patch.object(ha, "BLOCK_LOG_TIMEOUT", 0.0): hass.async_create_task(_task_chain_1()) - await hass.async_block_till_done() + await hass.async_block_till_done(wait_periodic_tasks=False) assert "_task_chain_" in caplog.text @@ -2654,6 +2695,27 @@ async def test_background_task(hass: HomeAssistant, eager_start: bool) -> None: assert result.result() == ha.CoreState.stopping +@pytest.mark.parametrize("eager_start", (True, False)) +async def test_periodic_task(hass: HomeAssistant, eager_start: bool) -> None: + """Test periodic tasks being quit.""" + result = asyncio.Future() + + async def test_task(): + try: + await asyncio.sleep(1) + except asyncio.CancelledError: + result.set_result(hass.state) + raise + + task = hass.async_create_periodic_task( + test_task(), "happy task", eager_start=eager_start + ) + assert "happy task" in str(task) + await asyncio.sleep(0) + await hass.async_stop() + assert result.result() == ha.CoreState.stopping + + async def test_shutdown_does_not_block_on_normal_tasks( hass: HomeAssistant, ) -> None: