From c8256d1d3d26a2d0d5fa318d0599e1d8c8db051f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 7 Aug 2023 11:09:32 -1000 Subject: [PATCH] Optimize august timings to prepare for Yale Doorman support (#97940) --- homeassistant/components/august/__init__.py | 12 +- homeassistant/components/august/activity.py | 151 +++++++++++------- homeassistant/components/august/subscriber.py | 34 ++-- tests/components/august/conftest.py | 13 ++ tests/components/august/mocks.py | 35 ++-- tests/components/august/test_init.py | 17 +- 6 files changed, 159 insertions(+), 103 deletions(-) create mode 100644 tests/components/august/conftest.py diff --git a/homeassistant/components/august/__init__.py b/homeassistant/components/august/__init__.py index 8738b58dab9..408d6e0be7e 100644 --- a/homeassistant/components/august/__init__.py +++ b/homeassistant/components/august/__init__.py @@ -3,8 +3,10 @@ from __future__ import annotations import asyncio from collections.abc import ValuesView +from datetime import datetime from itertools import chain import logging +from typing import Any from aiohttp import ClientError, ClientResponseError from yalexs.const import DEFAULT_BRAND @@ -238,14 +240,18 @@ class AugustData(AugustSubscriberMixin): ) @callback - def async_pubnub_message(self, device_id, date_time, message): + def async_pubnub_message( + self, device_id: str, date_time: datetime, message: dict[str, Any] + ) -> None: """Process a pubnub message.""" device = self.get_device_detail(device_id) activities = activities_from_pubnub_message(device, date_time, message) + activity_stream = self.activity_stream + assert activity_stream is not None if activities: - self.activity_stream.async_process_newer_device_activities(activities) + activity_stream.async_process_newer_device_activities(activities) self.async_signal_device_id_update(device.device_id) - self.activity_stream.async_schedule_house_id_refresh(device.house_id) + activity_stream.async_schedule_house_id_refresh(device.house_id) @callback def async_stop(self): diff --git a/homeassistant/components/august/activity.py b/homeassistant/components/august/activity.py index ad9045a3d0d..3909e36ded8 100644 --- a/homeassistant/components/august/activity.py +++ b/homeassistant/components/august/activity.py @@ -1,16 +1,24 @@ """Consume the august activity stream.""" import asyncio +from datetime import datetime import logging from aiohttp import ClientError +from yalexs.activity import ( + Activity, + ActivityType, +) +from yalexs.api_async import ApiAsync +from yalexs.pubnub_async import AugustPubNub from yalexs.util import get_latest_activity -from homeassistant.core import callback +from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.helpers.debounce import Debouncer from homeassistant.helpers.event import async_call_later from homeassistant.util.dt import utcnow from .const import ACTIVITY_UPDATE_INTERVAL +from .gateway import AugustGateway from .subscriber import AugustSubscriberMixin _LOGGER = logging.getLogger(__name__) @@ -18,29 +26,50 @@ _LOGGER = logging.getLogger(__name__) ACTIVITY_STREAM_FETCH_LIMIT = 10 ACTIVITY_CATCH_UP_FETCH_LIMIT = 2500 +# If there is a storm of activity (ie lock, unlock, door open, door close, etc) +# we want to debounce the updates so we don't hammer the activity api too much. +ACTIVITY_DEBOUNCE_COOLDOWN = 3 + + +@callback +def _async_cancel_future_scheduled_updates(cancels: list[CALLBACK_TYPE]) -> None: + """Cancel future scheduled updates.""" + for cancel in cancels: + cancel() + cancels.clear() + class ActivityStream(AugustSubscriberMixin): """August activity stream handler.""" - def __init__(self, hass, api, august_gateway, house_ids, pubnub): + def __init__( + self, + hass: HomeAssistant, + api: ApiAsync, + august_gateway: AugustGateway, + house_ids: set[str], + pubnub: AugustPubNub, + ) -> None: """Init August activity stream object.""" super().__init__(hass, ACTIVITY_UPDATE_INTERVAL) self._hass = hass - self._schedule_updates = {} + self._schedule_updates: dict[str, list[CALLBACK_TYPE]] = {} self._august_gateway = august_gateway self._api = api self._house_ids = house_ids - self._latest_activities = {} - self._last_update_time = None + self._latest_activities: dict[str, dict[ActivityType, Activity]] = {} + self._did_first_update = False self.pubnub = pubnub - self._update_debounce = {} + self._update_debounce: dict[str, Debouncer] = {} async def async_setup(self): """Token refresh check and catch up the activity stream.""" - for house_id in self._house_ids: - self._update_debounce[house_id] = self._async_create_debouncer(house_id) - + self._update_debounce = { + house_id: self._async_create_debouncer(house_id) + for house_id in self._house_ids + } await self._async_refresh(utcnow()) + self._did_first_update = True @callback def _async_create_debouncer(self, house_id): @@ -52,7 +81,7 @@ class ActivityStream(AugustSubscriberMixin): return Debouncer( self._hass, _LOGGER, - cooldown=ACTIVITY_UPDATE_INTERVAL.total_seconds(), + cooldown=ACTIVITY_DEBOUNCE_COOLDOWN, immediate=True, function=_async_update_house_id, ) @@ -62,73 +91,73 @@ class ActivityStream(AugustSubscriberMixin): """Cleanup any debounces.""" for debouncer in self._update_debounce.values(): debouncer.async_cancel() - for house_id, updater in self._schedule_updates.items(): - if updater is not None: - updater() - self._schedule_updates[house_id] = None + for cancels in self._schedule_updates.values(): + _async_cancel_future_scheduled_updates(cancels) - def get_latest_device_activity(self, device_id, activity_types): + def get_latest_device_activity( + self, device_id: str, activity_types: set[ActivityType] + ) -> Activity | None: """Return latest activity that is one of the activity_types.""" - if device_id not in self._latest_activities: + if not (latest_device_activities := self._latest_activities.get(device_id)): return None - latest_device_activities = self._latest_activities[device_id] - latest_activity = None + latest_activity: Activity | None = None for activity_type in activity_types: - if activity_type in latest_device_activities: + if activity := latest_device_activities.get(activity_type): if ( - latest_activity is not None - and latest_device_activities[activity_type].activity_start_time + latest_activity + and activity.activity_start_time <= latest_activity.activity_start_time ): continue - latest_activity = latest_device_activities[activity_type] + latest_activity = activity return latest_activity - async def _async_refresh(self, time): + async def _async_refresh(self, time: datetime) -> None: """Update the activity stream from August.""" # This is the only place we refresh the api token await self._august_gateway.async_refresh_access_token_if_needed() if self.pubnub.connected: _LOGGER.debug("Skipping update because pubnub is connected") return - await self._async_update_device_activities(time) - - async def _async_update_device_activities(self, time): _LOGGER.debug("Start retrieving device activities") await asyncio.gather( - *( - self._update_debounce[house_id].async_call() - for house_id in self._house_ids - ) + *(debouncer.async_call() for debouncer in self._update_debounce.values()) ) - self._last_update_time = time @callback - def async_schedule_house_id_refresh(self, house_id): + def async_schedule_house_id_refresh(self, house_id: str) -> None: """Update for a house activities now and once in the future.""" - if self._schedule_updates.get(house_id): - self._schedule_updates[house_id]() - self._schedule_updates[house_id] = None + if cancels := self._schedule_updates.get(house_id): + _async_cancel_future_scheduled_updates(cancels) - async def _update_house_activities(_): - await self._update_debounce[house_id].async_call() + debouncer = self._update_debounce[house_id] - self._hass.async_create_task(self._update_debounce[house_id].async_call()) - # Schedule an update past the debounce to ensure - # we catch the case where the lock operator is - # not updated or the lock failed - self._schedule_updates[house_id] = async_call_later( - self._hass, - ACTIVITY_UPDATE_INTERVAL.total_seconds() + 1, - _update_house_activities, - ) + self._hass.async_create_task(debouncer.async_call()) + # Schedule two updates past the debounce time + # to ensure we catch the case where the activity + # api does not update right away and we need to poll + # it again. Sometimes the lock operator or a doorbell + # will not show up in the activity stream right away. + future_updates = self._schedule_updates.setdefault(house_id, []) - async def _async_update_house_id(self, house_id): + async def _update_house_activities(now: datetime) -> None: + await debouncer.async_call() + + for step in (1, 2): + future_updates.append( + async_call_later( + self._hass, + (step * ACTIVITY_DEBOUNCE_COOLDOWN) + 0.1, + _update_house_activities, + ) + ) + + async def _async_update_house_id(self, house_id: str) -> None: """Update device activities for a house.""" - if self._last_update_time: + if self._did_first_update: limit = ACTIVITY_STREAM_FETCH_LIMIT else: limit = ACTIVITY_CATCH_UP_FETCH_LIMIT @@ -150,36 +179,34 @@ class ActivityStream(AugustSubscriberMixin): _LOGGER.debug( "Completed retrieving device activities for house id %s", house_id ) - - updated_device_ids = self.async_process_newer_device_activities(activities) - - if not updated_device_ids: - return - - for device_id in updated_device_ids: + for device_id in self.async_process_newer_device_activities(activities): _LOGGER.debug( "async_signal_device_id_update (from activity stream): %s", device_id, ) self.async_signal_device_id_update(device_id) - def async_process_newer_device_activities(self, activities): + def async_process_newer_device_activities( + self, activities: list[Activity] + ) -> set[str]: """Process activities if they are newer than the last one.""" updated_device_ids = set() + latest_activities = self._latest_activities for activity in activities: device_id = activity.device_id activity_type = activity.activity_type - device_activities = self._latest_activities.setdefault(device_id, {}) + device_activities = latest_activities.setdefault(device_id, {}) # Ignore activities that are older than the latest one unless it is a non # locking or unlocking activity with the exact same start time. - if ( - get_latest_activity(activity, device_activities.get(activity_type)) - != activity - ): + last_activity = device_activities.get(activity_type) + # The activity stream can have duplicate activities. So we need + # to call get_latest_activity to figure out if if the activity + # is actually newer than the last one. + latest_activity = get_latest_activity(activity, last_activity) + if latest_activity != activity: continue device_activities[activity_type] = activity - updated_device_ids.add(device_id) return updated_device_ids diff --git a/homeassistant/components/august/subscriber.py b/homeassistant/components/august/subscriber.py index 62aef44a9ee..138887ed09e 100644 --- a/homeassistant/components/august/subscriber.py +++ b/homeassistant/components/august/subscriber.py @@ -1,25 +1,30 @@ """Base class for August entity.""" +from abc import abstractmethod +from datetime import datetime, timedelta + from homeassistant.const import EVENT_HOMEASSISTANT_STOP -from homeassistant.core import callback +from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.helpers.event import async_track_time_interval class AugustSubscriberMixin: """Base implementation for a subscriber.""" - def __init__(self, hass, update_interval): + def __init__(self, hass: HomeAssistant, update_interval: timedelta) -> None: """Initialize an subscriber.""" super().__init__() self._hass = hass self._update_interval = update_interval - self._subscriptions = {} - self._unsub_interval = None - self._stop_interval = None + self._subscriptions: dict[str, list[CALLBACK_TYPE]] = {} + self._unsub_interval: CALLBACK_TYPE | None = None + self._stop_interval: CALLBACK_TYPE | None = None @callback - def async_subscribe_device_id(self, device_id, update_callback): + def async_subscribe_device_id( + self, device_id: str, update_callback: CALLBACK_TYPE + ) -> CALLBACK_TYPE: """Add an callback subscriber. Returns a callable that can be used to unsubscribe. @@ -34,8 +39,12 @@ class AugustSubscriberMixin: return _unsubscribe + @abstractmethod + async def _async_refresh(self, time: datetime) -> None: + """Refresh data.""" + @callback - def _async_setup_listeners(self): + def _async_setup_listeners(self) -> None: """Create interval and stop listeners.""" self._unsub_interval = async_track_time_interval( self._hass, @@ -54,7 +63,9 @@ class AugustSubscriberMixin: ) @callback - def async_unsubscribe_device_id(self, device_id, update_callback): + def async_unsubscribe_device_id( + self, device_id: str, update_callback: CALLBACK_TYPE + ) -> None: """Remove a callback subscriber.""" self._subscriptions[device_id].remove(update_callback) if not self._subscriptions[device_id]: @@ -63,14 +74,15 @@ class AugustSubscriberMixin: if self._subscriptions: return - self._unsub_interval() - self._unsub_interval = None + if self._unsub_interval: + self._unsub_interval() + self._unsub_interval = None if self._stop_interval: self._stop_interval() self._stop_interval = None @callback - def async_signal_device_id_update(self, device_id): + def async_signal_device_id_update(self, device_id: str) -> None: """Call the callbacks for a device_id.""" if not self._subscriptions.get(device_id): return diff --git a/tests/components/august/conftest.py b/tests/components/august/conftest.py new file mode 100644 index 00000000000..1cb52966fea --- /dev/null +++ b/tests/components/august/conftest.py @@ -0,0 +1,13 @@ +"""August tests conftest.""" +from unittest.mock import patch + +import pytest + + +@pytest.fixture(name="mock_discovery", autouse=True) +def mock_discovery_fixture(): + """Mock discovery to avoid loading the whole bluetooth stack.""" + with patch( + "homeassistant.components.august.discovery_flow.async_create_flow" + ) as mock_discovery: + yield mock_discovery diff --git a/tests/components/august/mocks.py b/tests/components/august/mocks.py index d5517f64249..910c1d29ed6 100644 --- a/tests/components/august/mocks.py +++ b/tests/components/august/mocks.py @@ -162,24 +162,23 @@ async def _create_august_api_with_devices( # noqa: C901 _mock_door_operation_activity(lock, "dooropen", 0), ] - if "get_lock_detail" not in api_call_side_effects: - api_call_side_effects["get_lock_detail"] = get_lock_detail_side_effect - if "get_doorbell_detail" not in api_call_side_effects: - api_call_side_effects["get_doorbell_detail"] = get_doorbell_detail_side_effect - if "get_operable_locks" not in api_call_side_effects: - api_call_side_effects["get_operable_locks"] = get_operable_locks_side_effect - if "get_doorbells" not in api_call_side_effects: - api_call_side_effects["get_doorbells"] = get_doorbells_side_effect - if "get_house_activities" not in api_call_side_effects: - api_call_side_effects["get_house_activities"] = get_house_activities_side_effect - if "lock_return_activities" not in api_call_side_effects: - api_call_side_effects[ - "lock_return_activities" - ] = lock_return_activities_side_effect - if "unlock_return_activities" not in api_call_side_effects: - api_call_side_effects[ - "unlock_return_activities" - ] = unlock_return_activities_side_effect + api_call_side_effects.setdefault("get_lock_detail", get_lock_detail_side_effect) + api_call_side_effects.setdefault( + "get_doorbell_detail", get_doorbell_detail_side_effect + ) + api_call_side_effects.setdefault( + "get_operable_locks", get_operable_locks_side_effect + ) + api_call_side_effects.setdefault("get_doorbells", get_doorbells_side_effect) + api_call_side_effects.setdefault( + "get_house_activities", get_house_activities_side_effect + ) + api_call_side_effects.setdefault( + "lock_return_activities", lock_return_activities_side_effect + ) + api_call_side_effects.setdefault( + "unlock_return_activities", unlock_return_activities_side_effect + ) api_instance, entry = await _mock_setup_august_with_api_side_effects( hass, api_call_side_effects, pubnub diff --git a/tests/components/august/test_init.py b/tests/components/august/test_init.py index 23ea12a9f82..fe297c97a57 100644 --- a/tests/components/august/test_init.py +++ b/tests/components/august/test_init.py @@ -1,6 +1,6 @@ """The tests for the august platform.""" import asyncio -from unittest.mock import patch +from unittest.mock import Mock, patch from aiohttp import ClientResponseError from yalexs.authenticator_common import AuthenticationState @@ -361,19 +361,18 @@ async def test_load_unload(hass: HomeAssistant) -> None: await hass.async_block_till_done() -async def test_load_triggers_ble_discovery(hass: HomeAssistant) -> None: +async def test_load_triggers_ble_discovery( + hass: HomeAssistant, mock_discovery: Mock +) -> None: """Test that loading a lock that supports offline ble operation passes the keys to yalexe_ble.""" august_lock_with_key = await _mock_lock_with_offline_key(hass) august_lock_without_key = await _mock_operative_august_lock_detail(hass) - with patch( - "homeassistant.components.august.discovery_flow.async_create_flow" - ) as mock_discovery: - config_entry = await _create_august_with_devices( - hass, [august_lock_with_key, august_lock_without_key] - ) - await hass.async_block_till_done() + config_entry = await _create_august_with_devices( + hass, [august_lock_with_key, august_lock_without_key] + ) + await hass.async_block_till_done() assert config_entry.state is ConfigEntryState.LOADED assert len(mock_discovery.mock_calls) == 1