Move google coordinator to separate module (#117473)

This commit is contained in:
epenet 2024-05-16 05:40:51 +02:00 committed by GitHub
parent f1e8262db2
commit 465e3d421e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 165 additions and 154 deletions

View File

@ -2,24 +2,15 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime, timedelta from datetime import datetime, timedelta
import itertools
import logging import logging
from typing import Any, cast from typing import Any, cast
from gcal_sync.api import ( from gcal_sync.api import Range, SyncEventsRequest
GoogleCalendarService,
ListEventsRequest,
Range,
SyncEventsRequest,
)
from gcal_sync.exceptions import ApiException from gcal_sync.exceptions import ApiException
from gcal_sync.model import AccessRole, DateOrDatetime, Event from gcal_sync.model import AccessRole, DateOrDatetime, Event
from gcal_sync.store import ScopedCalendarStore from gcal_sync.store import ScopedCalendarStore
from gcal_sync.sync import CalendarEventSyncManager from gcal_sync.sync import CalendarEventSyncManager
from gcal_sync.timeline import Timeline
from ical.iter import SortableItemValue
from homeassistant.components.calendar import ( from homeassistant.components.calendar import (
CREATE_EVENT_SCHEMA, CREATE_EVENT_SCHEMA,
@ -43,11 +34,7 @@ from homeassistant.exceptions import HomeAssistantError, PlatformNotReady
from homeassistant.helpers import entity_platform, entity_registry as er from homeassistant.helpers import entity_platform, entity_registry as er
from homeassistant.helpers.entity import generate_entity_id from homeassistant.helpers.entity import generate_entity_id
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import ( from homeassistant.helpers.update_coordinator import CoordinatorEntity
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from . import ( from . import (
@ -74,14 +61,10 @@ from .const import (
EVENT_START_DATETIME, EVENT_START_DATETIME,
FeatureAccess, FeatureAccess,
) )
from .coordinator import CalendarQueryUpdateCoordinator, CalendarSyncUpdateCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
# Maximum number of upcoming events to consider for state changes between
# coordinator updates.
MAX_UPCOMING_EVENTS = 20
# Avoid syncing super old data on initial syncs. Note that old but active # Avoid syncing super old data on initial syncs. Note that old but active
# recurring events are still included. # recurring events are still included.
SYNC_EVENT_MIN_TIME = timedelta(days=-90) SYNC_EVENT_MIN_TIME = timedelta(days=-90)
@ -249,140 +232,6 @@ async def async_setup_entry(
) )
def _truncate_timeline(timeline: Timeline, max_events: int) -> Timeline:
"""Truncate the timeline to a maximum number of events.
This is used to avoid repeated expansion of recurring events during
state machine updates.
"""
upcoming = timeline.active_after(dt_util.now())
truncated = list(itertools.islice(upcoming, max_events))
return Timeline(
[
SortableItemValue(event.timespan_of(dt_util.DEFAULT_TIME_ZONE), event)
for event in truncated
]
)
class CalendarSyncUpdateCoordinator(DataUpdateCoordinator[Timeline]): # pylint: disable=hass-enforce-coordinator-module
"""Coordinator for calendar RPC calls that use an efficient sync."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
sync: CalendarEventSyncManager,
name: str,
) -> None:
"""Create the CalendarSyncUpdateCoordinator."""
super().__init__(
hass,
_LOGGER,
name=name,
update_interval=MIN_TIME_BETWEEN_UPDATES,
)
self.sync = sync
self._upcoming_timeline: Timeline | None = None
async def _async_update_data(self) -> Timeline:
"""Fetch data from API endpoint."""
try:
await self.sync.run()
except ApiException as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
timeline = await self.sync.store_service.async_get_timeline(
dt_util.DEFAULT_TIME_ZONE
)
self._upcoming_timeline = _truncate_timeline(timeline, MAX_UPCOMING_EVENTS)
return timeline
async def async_get_events(
self, start_date: datetime, end_date: datetime
) -> Iterable[Event]:
"""Get all events in a specific time frame."""
if not self.data:
raise HomeAssistantError(
"Unable to get events: Sync from server has not completed"
)
return self.data.overlapping(
start_date,
end_date,
)
@property
def upcoming(self) -> Iterable[Event] | None:
"""Return upcoming events if any."""
if self._upcoming_timeline:
return self._upcoming_timeline.active_after(dt_util.now())
return None
class CalendarQueryUpdateCoordinator(DataUpdateCoordinator[list[Event]]): # pylint: disable=hass-enforce-coordinator-module
"""Coordinator for calendar RPC calls.
This sends a polling RPC, not using sync, as a workaround
for limitations in the calendar API for supporting search.
"""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
calendar_service: GoogleCalendarService,
name: str,
calendar_id: str,
search: str | None,
) -> None:
"""Create the CalendarQueryUpdateCoordinator."""
super().__init__(
hass,
_LOGGER,
name=name,
update_interval=MIN_TIME_BETWEEN_UPDATES,
)
self.calendar_service = calendar_service
self.calendar_id = calendar_id
self._search = search
async def async_get_events(
self, start_date: datetime, end_date: datetime
) -> Iterable[Event]:
"""Get all events in a specific time frame."""
request = ListEventsRequest(
calendar_id=self.calendar_id,
start_time=start_date,
end_time=end_date,
search=self._search,
)
result_items = []
try:
result = await self.calendar_service.async_list_events(request)
async for result_page in result:
result_items.extend(result_page.items)
except ApiException as err:
self.async_set_update_error(err)
raise HomeAssistantError(str(err)) from err
return result_items
async def _async_update_data(self) -> list[Event]:
"""Fetch data from API endpoint."""
request = ListEventsRequest(calendar_id=self.calendar_id, search=self._search)
try:
result = await self.calendar_service.async_list_events(request)
except ApiException as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
return result.items
@property
def upcoming(self) -> Iterable[Event] | None:
"""Return the next upcoming event if any."""
return self.data
class GoogleCalendarEntity( class GoogleCalendarEntity(
CoordinatorEntity[CalendarSyncUpdateCoordinator | CalendarQueryUpdateCoordinator], CoordinatorEntity[CalendarSyncUpdateCoordinator | CalendarQueryUpdateCoordinator],
CalendarEntity, CalendarEntity,

View File

@ -0,0 +1,162 @@
"""Support for Google Calendar Search binary sensors."""
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime, timedelta
import itertools
import logging
from gcal_sync.api import GoogleCalendarService, ListEventsRequest
from gcal_sync.exceptions import ApiException
from gcal_sync.model import Event
from gcal_sync.sync import CalendarEventSyncManager
from gcal_sync.timeline import Timeline
from ical.iter import SortableItemValue
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
# Maximum number of upcoming events to consider for state changes between
# coordinator updates.
MAX_UPCOMING_EVENTS = 20
def _truncate_timeline(timeline: Timeline, max_events: int) -> Timeline:
"""Truncate the timeline to a maximum number of events.
This is used to avoid repeated expansion of recurring events during
state machine updates.
"""
upcoming = timeline.active_after(dt_util.now())
truncated = list(itertools.islice(upcoming, max_events))
return Timeline(
[
SortableItemValue(event.timespan_of(dt_util.DEFAULT_TIME_ZONE), event)
for event in truncated
]
)
class CalendarSyncUpdateCoordinator(DataUpdateCoordinator[Timeline]):
"""Coordinator for calendar RPC calls that use an efficient sync."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
sync: CalendarEventSyncManager,
name: str,
) -> None:
"""Create the CalendarSyncUpdateCoordinator."""
super().__init__(
hass,
_LOGGER,
name=name,
update_interval=MIN_TIME_BETWEEN_UPDATES,
)
self.sync = sync
self._upcoming_timeline: Timeline | None = None
async def _async_update_data(self) -> Timeline:
"""Fetch data from API endpoint."""
try:
await self.sync.run()
except ApiException as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
timeline = await self.sync.store_service.async_get_timeline(
dt_util.DEFAULT_TIME_ZONE
)
self._upcoming_timeline = _truncate_timeline(timeline, MAX_UPCOMING_EVENTS)
return timeline
async def async_get_events(
self, start_date: datetime, end_date: datetime
) -> Iterable[Event]:
"""Get all events in a specific time frame."""
if not self.data:
raise HomeAssistantError(
"Unable to get events: Sync from server has not completed"
)
return self.data.overlapping(
start_date,
end_date,
)
@property
def upcoming(self) -> Iterable[Event] | None:
"""Return upcoming events if any."""
if self._upcoming_timeline:
return self._upcoming_timeline.active_after(dt_util.now())
return None
class CalendarQueryUpdateCoordinator(DataUpdateCoordinator[list[Event]]):
"""Coordinator for calendar RPC calls.
This sends a polling RPC, not using sync, as a workaround
for limitations in the calendar API for supporting search.
"""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
calendar_service: GoogleCalendarService,
name: str,
calendar_id: str,
search: str | None,
) -> None:
"""Create the CalendarQueryUpdateCoordinator."""
super().__init__(
hass,
_LOGGER,
name=name,
update_interval=MIN_TIME_BETWEEN_UPDATES,
)
self.calendar_service = calendar_service
self.calendar_id = calendar_id
self._search = search
async def async_get_events(
self, start_date: datetime, end_date: datetime
) -> Iterable[Event]:
"""Get all events in a specific time frame."""
request = ListEventsRequest(
calendar_id=self.calendar_id,
start_time=start_date,
end_time=end_date,
search=self._search,
)
result_items = []
try:
result = await self.calendar_service.async_list_events(request)
async for result_page in result:
result_items.extend(result_page.items)
except ApiException as err:
self.async_set_update_error(err)
raise HomeAssistantError(str(err)) from err
return result_items
async def _async_update_data(self) -> list[Event]:
"""Fetch data from API endpoint."""
request = ListEventsRequest(calendar_id=self.calendar_id, search=self._search)
try:
result = await self.calendar_service.async_list_events(request)
except ApiException as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
return result.items
@property
def upcoming(self) -> Iterable[Event] | None:
"""Return the next upcoming event if any."""
return self.data