Rewrite the calendar trigger to fix potential bugs (#89918)

Update the calander event trigger logic to have more exhaustive coverage. The
trigger will now use a timespan to create an explicit window for considering
upcoming events. The start/end of the time span is now more explicit, rather
than getting it from the alarm time.

The trigger is now broken into composable pieces:
- A timespan object for more explicitly managing the time window
- A function to get events during a time span
- A function to process upcoming events and determine the trigger times

The existing listener is now just responsible for scheduling alarms and glue.

This fixes bug with DST handling where the conversion back and forth between
UTC and timezone ends up dropping events during the jump forward. In practice,
an event was returned from the scanning, but it was never fired by the trigger
because (1) it was filtered out of the interval and (2) the event list was
previously cleared every iteration so it would get dropped.

Future improvements can bake more invariant checking into this structure.
This commit is contained in:
Allen Porter 2023-03-19 20:42:12 -07:00 committed by GitHub
parent 2039955ef7
commit 9721ba59b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 196 additions and 67 deletions

View File

@ -1,7 +1,8 @@
"""Offer calendar automation rules."""
from __future__ import annotations
from collections.abc import Coroutine
from collections.abc import Awaitable, Callable, Coroutine
from dataclasses import dataclass
import datetime
import logging
from typing import Any
@ -14,7 +15,7 @@ from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.event import (
async_track_point_in_utc_time,
async_track_point_in_time,
async_track_time_interval,
)
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
@ -41,34 +42,135 @@ TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
# mypy: disallow-any-generics
@dataclass
class QueuedCalendarEvent:
"""An event that is queued to be fired in the future."""
trigger_time: datetime.datetime
event: CalendarEvent
@dataclass
class Timespan:
"""A time range part of start/end dates, used for considering active events."""
start: datetime.datetime
"""The start datetime of the interval."""
end: datetime.datetime
"""The end datetime (exclusive) of the interval."""
def with_offset(self, offset: datetime.timedelta) -> Timespan:
"""Return a new interval shifted by the specified offset."""
return Timespan(self.start + offset, self.end + offset)
def contains(self, trigger: datetime.datetime) -> bool:
"""Return true if the trigger time is within the time span."""
return self.start <= trigger < self.end
def next_upcoming(
self, now: datetime.datetime, interval: datetime.timedelta
) -> Timespan:
"""Return a subsequent time span following the current time span.
This effectively gives us a cursor like interface for advancing through
time using the interval as a hint. The returned span may have a
different interval than the one specified. For example, time span may
be longer during a daylight saving time transition, or may extend due to
drift if the current interval is old. The returned time span is
adjacent and non-overlapping.
"""
return Timespan(self.end, max(self.end, now) + interval)
def __str__(self) -> str:
"""Return a compact string representation."""
return f"[{self.start}, {self.end})"
EventFetcher = Callable[[Timespan], Awaitable[list[CalendarEvent]]]
QueuedEventFetcher = Callable[[Timespan], Awaitable[list[QueuedCalendarEvent]]]
def event_fetcher(hass: HomeAssistant, entity: CalendarEntity) -> EventFetcher:
"""Build an async_get_events wrapper to fetch events during a time span."""
async def async_get_events(timespan: Timespan) -> list[CalendarEvent]:
"""Return events active in the specified time span."""
# Expand by one second to make the end time exclusive
end_time = timespan.end + datetime.timedelta(seconds=1)
return await entity.async_get_events(hass, timespan.start, end_time)
return async_get_events
def queued_event_fetcher(
fetcher: EventFetcher, event_type: str, offset: datetime.timedelta
) -> QueuedEventFetcher:
"""Build a fetcher that produces a schedule of upcoming trigger events."""
def get_trigger_time(event: CalendarEvent) -> datetime.datetime:
if event_type == EVENT_START:
return event.start_datetime_local
return event.end_datetime_local
async def async_get_events(timespan: Timespan) -> list[QueuedCalendarEvent]:
"""Get calendar event triggers eligible to fire in the time span."""
offset_timespan = timespan.with_offset(-1 * offset)
active_events = await fetcher(offset_timespan)
# Determine the trigger eligibilty of events during this time span.
# Example: For an EVENT_END trigger the event may start during this
# time span, but need to be triggered later when the end happens.
results = []
for trigger_time, event in zip(
map(get_trigger_time, active_events), active_events
):
if not offset_timespan.contains(trigger_time):
continue
results.append(QueuedCalendarEvent(trigger_time + offset, event))
_LOGGER.debug(
"Scan events @ %s%s found %s eligble of %s active",
offset_timespan,
f" (offset={offset})" if offset else "",
len(results),
len(active_events),
)
results.sort(key=lambda x: x.trigger_time)
return results
return async_get_events
class CalendarEventListener:
"""Helper class to listen to calendar events."""
"""Helper class to listen to calendar events.
This listener will poll every UPDATE_INTERVAL to fetch a set of upcoming
calendar events in the upcoming window of time, putting them into a queue.
The queue is drained by scheduling an alarm for the next upcoming event
trigger time, one event at a time.
"""
def __init__(
self,
hass: HomeAssistant,
job: HassJob[..., Coroutine[Any, Any, None]],
trigger_data: dict[str, Any],
entity: CalendarEntity,
event_type: str,
offset: datetime.timedelta,
fetcher: QueuedEventFetcher,
) -> None:
"""Initialize CalendarEventListener."""
self._hass = hass
self._job = job
self._trigger_data = trigger_data
self._entity = entity
self._offset = offset
self._unsub_event: CALLBACK_TYPE | None = None
self._unsub_refresh: CALLBACK_TYPE | None = None
# Upcoming set of events with their trigger time
self._events: list[tuple[datetime.datetime, CalendarEvent]] = []
self._event_type = event_type
self._fetcher = fetcher
now = dt_util.now()
self._timespan = Timespan(now, now + UPDATE_INTERVAL)
self._events: list[QueuedCalendarEvent] = []
async def async_attach(self) -> None:
"""Attach a calendar event listener."""
now = dt_util.utcnow()
await self._fetch_events(now)
self._events.extend(await self._fetcher(self._timespan))
self._unsub_refresh = async_track_time_interval(
self._hass, self._handle_refresh, UPDATE_INTERVAL
)
@ -82,52 +184,19 @@ class CalendarEventListener:
self._unsub_refresh()
self._unsub_refresh = None
async def _fetch_events(self, last_endtime: datetime.datetime) -> None:
"""Update the set of eligible events."""
# Use a sliding window for selecting in scope events in the next interval.
# The event search range is offset, then the fire time of the returned events
# are offset again below. Event time ranges are exclusive so the end time
# is expanded by 1sec.
start_time = last_endtime - self._offset
end_time = start_time + UPDATE_INTERVAL + datetime.timedelta(seconds=1)
_LOGGER.debug(
"Fetching events between %s, %s (offset=%s)",
start_time,
end_time,
self._offset,
)
events = await self._entity.async_get_events(self._hass, start_time, end_time)
# Build list of events and the appropriate time to trigger an alarm. The
# returned events may have already started but matched the start/end time
# filtering above, so exclude any events that have already passed the
# trigger time.
event_list = []
for event in events:
event_fire_time = (
event.start_datetime_local
if self._event_type == EVENT_START
else event.end_datetime_local
)
event_fire_time += self._offset
if event_fire_time > last_endtime:
event_list.append((event_fire_time, event))
event_list.sort(key=lambda x: x[0])
self._events = event_list
_LOGGER.debug("Populated event list %s", self._events)
@callback
def _listen_next_calendar_event(self) -> None:
"""Set up the calendar event listener."""
if not self._events:
return
(event_fire_time, _event) = self._events[0]
_LOGGER.debug("Scheduled alarm for %s", event_fire_time)
self._unsub_event = async_track_point_in_utc_time(
_LOGGER.debug(
"Scheduled next event trigger for %s", self._events[0].trigger_time
)
self._unsub_event = async_track_point_in_time(
self._hass,
self._handle_calendar_event,
event_fire_time,
self._events[0].trigger_time,
)
def _clear_event_listener(self) -> None:
@ -138,29 +207,36 @@ class CalendarEventListener:
async def _handle_calendar_event(self, now: datetime.datetime) -> None:
"""Handle calendar event."""
_LOGGER.debug("Calendar event @ %s", now)
_LOGGER.debug("Calendar event @ %s", dt_util.as_local(now))
self._dispatch_events(now)
self._clear_event_listener()
self._listen_next_calendar_event()
def _dispatch_events(self, now: datetime.datetime) -> None:
"""Dispatch all events that are eligible to fire."""
while self._events and self._events[0][0] <= now:
(_fire_time, event) = self._events.pop(0)
_LOGGER.debug("Event: %s", event)
while self._events and self._events[0].trigger_time <= now:
queued_event = self._events.pop(0)
_LOGGER.debug("Dispatching event: %s", queued_event.event)
self._hass.async_run_hass_job(
self._job,
{"trigger": {**self._trigger_data, "calendar_event": event.as_dict()}},
{
"trigger": {
**self._trigger_data,
"calendar_event": queued_event.event.as_dict(),
}
},
)
async def _handle_refresh(self, now: datetime.datetime) -> None:
async def _handle_refresh(self, now_utc: datetime.datetime) -> None:
"""Handle core config update."""
now = dt_util.as_local(now_utc)
_LOGGER.debug("Refresh events @ %s", now)
# Dispatch any eligible events in the boundary case where refresh
# fires before the calendar event.
self._dispatch_events(now)
self._clear_event_listener()
await self._fetch_events(now)
self._timespan = self._timespan.next_upcoming(now, UPDATE_INTERVAL)
self._events.extend(await self._fetcher(self._timespan))
self._listen_next_calendar_event()
@ -190,7 +266,10 @@ async def async_attach_trigger(
"offset": offset,
}
listener = CalendarEventListener(
hass, HassJob(action), trigger_data, entity, event_type, offset
hass,
HassJob(action),
trigger_data,
queued_event_fetcher(event_fetcher(hass, entity), event_type, offset),
)
await listener.async_attach()
return listener.async_detach

View File

@ -14,6 +14,7 @@ import logging
import secrets
from typing import Any
from unittest.mock import patch
import zoneinfo
from freezegun.api import FrozenDateTimeFactory
import pytest
@ -87,19 +88,17 @@ class FakeSchedule:
"""Get all events in a specific time frame, used by the demo calendar."""
assert start_date < end_date
values = []
local_start_date = dt_util.as_local(start_date)
local_end_date = dt_util.as_local(end_date)
for event in self.events:
if (
event.start_datetime_local < local_end_date
and local_start_date < event.end_datetime_local
):
values.append(event)
if event.start_datetime_local >= end_date:
continue
if event.end_datetime_local < start_date:
continue
values.append(event)
return values
async def fire_time(self, trigger_time: datetime.datetime) -> None:
"""Fire an alarm and wait."""
_LOGGER.debug(f"Firing alarm @ {trigger_time}")
_LOGGER.debug(f"Firing alarm @ {dt_util.as_local(trigger_time)}")
self.freezer.move_to(trigger_time)
async_fire_time_changed(self.hass, trigger_time)
await self.hass.async_block_till_done()
@ -666,3 +665,54 @@ async def test_trigger_timestamp_window_edge(
"calendar_event": event_data,
}
]
async def test_event_start_trigger_dst(
hass: HomeAssistant, calls, fake_schedule, freezer
) -> None:
"""Test a calendar event trigger happening at the start of daylight savings time."""
tzinfo = zoneinfo.ZoneInfo("America/Los_Angeles")
hass.config.set_time_zone("America/Los_Angeles")
freezer.move_to("2023-03-12 01:00:00-08:00")
# Before DST transition starts
event1_data = fake_schedule.create_event(
summary="Event 1",
start=datetime.datetime(2023, 3, 12, 1, 30, tzinfo=tzinfo),
end=datetime.datetime(2023, 3, 12, 1, 45, tzinfo=tzinfo),
)
# During DST transition (Clocks are turned forward at 2am to 3am)
event2_data = fake_schedule.create_event(
summary="Event 2",
start=datetime.datetime(2023, 3, 12, 2, 30, tzinfo=tzinfo),
end=datetime.datetime(2023, 3, 12, 2, 45, tzinfo=tzinfo),
)
# After DST transition has ended
event3_data = fake_schedule.create_event(
summary="Event 3",
start=datetime.datetime(2023, 3, 12, 3, 30, tzinfo=tzinfo),
end=datetime.datetime(2023, 3, 12, 3, 45, tzinfo=tzinfo),
)
await create_automation(hass, EVENT_START)
assert len(calls()) == 0
await fake_schedule.fire_until(
datetime.datetime.fromisoformat("2023-03-12 05:00:00-08:00"),
)
assert calls() == [
{
"platform": "calendar",
"event": EVENT_START,
"calendar_event": event1_data,
},
{
"platform": "calendar",
"event": EVENT_START,
"calendar_event": event2_data,
},
{
"platform": "calendar",
"event": EVENT_START,
"calendar_event": event3_data,
},
]