Use loop.time in DataUpdateCoordinator (#98937)

This commit is contained in:
Erik Montnemery 2023-08-28 17:16:34 +02:00 committed by GitHub
parent d4e72c49fa
commit 9dac6a2948
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 32 deletions

View File

@ -221,7 +221,10 @@ class TomorrowioDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
await self.async_refresh() await self.async_refresh()
self.update_interval = async_set_update_interval(self.hass, self._api) self.update_interval = async_set_update_interval(self.hass, self._api)
self._schedule_refresh() self._next_refresh = None
self._async_unsub_refresh()
if self._listeners:
self._schedule_refresh()
async def async_unload_entry(self, entry: ConfigEntry) -> bool | None: async def async_unload_entry(self, entry: ConfigEntry) -> bool | None:
"""Unload a config entry from coordinator. """Unload a config entry from coordinator.

View File

@ -1434,6 +1434,37 @@ def async_track_point_in_utc_time(
track_point_in_utc_time = threaded_listener_factory(async_track_point_in_utc_time) track_point_in_utc_time = threaded_listener_factory(async_track_point_in_utc_time)
@callback
@bind_hass
def async_call_at(
hass: HomeAssistant,
action: HassJob[[datetime], Coroutine[Any, Any, None] | None]
| Callable[[datetime], Coroutine[Any, Any, None] | None],
loop_time: float,
) -> CALLBACK_TYPE:
"""Add a listener that is called at <loop_time>."""
@callback
def run_action(job: HassJob[[datetime], Coroutine[Any, Any, None] | None]) -> None:
"""Call the action."""
hass.async_run_hass_job(job, time_tracker_utcnow())
job = (
action
if isinstance(action, HassJob)
else HassJob(action, f"call_at {loop_time}")
)
cancel_callback = hass.loop.call_at(loop_time, run_action, job)
@callback
def unsub_call_later_listener() -> None:
"""Cancel the call_later."""
assert cancel_callback is not None
cancel_callback.cancel()
return unsub_call_later_listener
@callback @callback
@bind_hass @bind_hass
def async_call_later( def async_call_later(

View File

@ -81,6 +81,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]):
self._shutdown_requested = False self._shutdown_requested = False
self.config_entry = config_entries.current_entry.get() self.config_entry = config_entries.current_entry.get()
self.always_update = always_update self.always_update = always_update
self._next_refresh: float | None = None
# It's None before the first successful update. # It's None before the first successful update.
# Components should call async_config_entry_first_refresh # Components should call async_config_entry_first_refresh
@ -89,10 +90,11 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]):
# when it was already checked during setup. # when it was already checked during setup.
self.data: _DataT = None # type: ignore[assignment] self.data: _DataT = None # type: ignore[assignment]
# Pick a random microsecond to stagger the refreshes # Pick a random microsecond in range 0.05..0.50 to stagger the refreshes
# and avoid a thundering herd. # and avoid a thundering herd.
self._microsecond = randint( self._microsecond = (
event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX randint(event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX)
/ 10**6
) )
self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {} self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
@ -182,6 +184,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]):
"""Unschedule any pending refresh since there is no longer any listeners.""" """Unschedule any pending refresh since there is no longer any listeners."""
self._async_unsub_refresh() self._async_unsub_refresh()
self._debounced_refresh.async_cancel() self._debounced_refresh.async_cancel()
self._next_refresh = None
def async_contexts(self) -> Generator[Any, None, None]: def async_contexts(self) -> Generator[Any, None, None]:
"""Return all registered contexts.""" """Return all registered contexts."""
@ -214,20 +217,16 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]):
# than the debouncer cooldown, this would cause the debounce to never be called # than the debouncer cooldown, this would cause the debounce to never be called
self._async_unsub_refresh() self._async_unsub_refresh()
# We _floor_ utcnow to create a schedule on a rounded second, # We use event.async_call_at because DataUpdateCoordinator does
# minimizing the time between the point and the real activation. # not need an exact update interval.
# That way we obtain a constant update frequency, now = self.hass.loop.time()
# as long as the update process takes less than 500ms if self._next_refresh is None or self._next_refresh <= now:
# self._next_refresh = int(now) + self._microsecond
# We do not align everything to happen at microsecond 0 self._next_refresh += self.update_interval.total_seconds()
# since it increases the risk of a thundering herd self._unsub_refresh = event.async_call_at(
# when multiple coordinators are scheduled to update at the same time.
#
# https://github.com/home-assistant/core/issues/82231
self._unsub_refresh = event.async_track_point_in_utc_time(
self.hass, self.hass,
self._job, self._job,
utcnow().replace(microsecond=self._microsecond) + self.update_interval, self._next_refresh,
) )
async def _handle_refresh_interval(self, _now: datetime) -> None: async def _handle_refresh_interval(self, _now: datetime) -> None:
@ -266,6 +265,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]):
async def async_refresh(self) -> None: async def async_refresh(self) -> None:
"""Refresh data and log errors.""" """Refresh data and log errors."""
self._next_refresh = None
await self._async_refresh(log_failures=True) await self._async_refresh(log_failures=True)
async def _async_refresh( # noqa: C901 async def _async_refresh( # noqa: C901
@ -405,6 +405,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]):
"""Manually update data, notify listeners and reset refresh interval.""" """Manually update data, notify listeners and reset refresh interval."""
self._async_unsub_refresh() self._async_unsub_refresh()
self._debounced_refresh.async_cancel() self._debounced_refresh.async_cancel()
self._next_refresh = None
self.data = data self.data = data
self.last_update_success = True self.last_update_success = True

View File

@ -412,12 +412,9 @@ def async_fire_time_changed(
else: else:
utc_datetime = dt_util.as_utc(datetime_) utc_datetime = dt_util.as_utc(datetime_)
if utc_datetime.microsecond < event.RANDOM_MICROSECOND_MAX: # Increase the mocked time by 0.5 s to account for up to 0.5 s delay
# Allow up to 500000 microseconds to be added to the time # added to events scheduled by update_coordinator and async_track_time_interval
# to handle update_coordinator's and utc_datetime += timedelta(microseconds=event.RANDOM_MICROSECOND_MAX)
# async_track_time_interval's
# staggering to avoid thundering herd.
utc_datetime = utc_datetime.replace(microsecond=event.RANDOM_MICROSECOND_MAX)
_async_fire_time_changed(hass, utc_datetime, fire_all) _async_fire_time_changed(hass, utc_datetime, fire_all)

View File

@ -4174,27 +4174,27 @@ async def test_periodic_task_entering_dst_2(
) )
freezer.move_to(f"{today} 01:59:59.999999+01:00") freezer.move_to(f"{today} 01:59:59.999999+01:00")
async_fire_time_changed(hass) async_fire_time_changed_exact(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 0 assert len(specific_runs) == 0
freezer.move_to(f"{today} 03:00:00.999999+02:00") freezer.move_to(f"{today} 03:00:00.999999+02:00")
async_fire_time_changed(hass) async_fire_time_changed_exact(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
freezer.move_to(f"{today} 03:00:01.999999+02:00") freezer.move_to(f"{today} 03:00:01.999999+02:00")
async_fire_time_changed(hass) async_fire_time_changed_exact(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
freezer.move_to(f"{tomorrow} 01:59:59.999999+02:00") freezer.move_to(f"{tomorrow} 01:59:59.999999+02:00")
async_fire_time_changed(hass) async_fire_time_changed_exact(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 3 assert len(specific_runs) == 3
freezer.move_to(f"{tomorrow} 02:00:00.999999+02:00") freezer.move_to(f"{tomorrow} 02:00:00.999999+02:00")
async_fire_time_changed(hass) async_fire_time_changed_exact(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 4 assert len(specific_runs) == 4

View File

@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, Mock, patch
import urllib.error import urllib.error
import aiohttp import aiohttp
from freezegun.api import FrozenDateTimeFactory
import pytest import pytest
import requests import requests
@ -329,11 +330,14 @@ async def test_refresh_no_update_method(
async def test_update_interval( async def test_update_interval(
hass: HomeAssistant, crd: update_coordinator.DataUpdateCoordinator[int] hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
crd: update_coordinator.DataUpdateCoordinator[int],
) -> None: ) -> None:
"""Test update interval works.""" """Test update interval works."""
# Test we don't update without subscriber # Test we don't update without subscriber
async_fire_time_changed(hass, utcnow() + crd.update_interval) freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert crd.data is None assert crd.data is None
@ -342,18 +346,21 @@ async def test_update_interval(
unsub = crd.async_add_listener(update_callback) unsub = crd.async_add_listener(update_callback)
# Test twice we update with subscriber # Test twice we update with subscriber
async_fire_time_changed(hass, utcnow() + crd.update_interval) freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert crd.data == 1 assert crd.data == 1
async_fire_time_changed(hass, utcnow() + crd.update_interval) freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
assert crd.data == 2 assert crd.data == 2
# Test removing listener # Test removing listener
unsub() unsub()
async_fire_time_changed(hass, utcnow() + crd.update_interval) freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
# Test we stop updating after we lose last subscriber # Test we stop updating after we lose last subscriber