Optimize august timings to prepare for Yale Doorman support (#97940)

This commit is contained in:
J. Nick Koston 2023-08-07 11:09:32 -10:00 committed by GitHub
parent 7080e0c280
commit c8256d1d3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 159 additions and 103 deletions

View File

@ -3,8 +3,10 @@ from __future__ import annotations
import asyncio import asyncio
from collections.abc import ValuesView from collections.abc import ValuesView
from datetime import datetime
from itertools import chain from itertools import chain
import logging import logging
from typing import Any
from aiohttp import ClientError, ClientResponseError from aiohttp import ClientError, ClientResponseError
from yalexs.const import DEFAULT_BRAND from yalexs.const import DEFAULT_BRAND
@ -238,14 +240,18 @@ class AugustData(AugustSubscriberMixin):
) )
@callback @callback
def async_pubnub_message(self, device_id, date_time, message): def async_pubnub_message(
self, device_id: str, date_time: datetime, message: dict[str, Any]
) -> None:
"""Process a pubnub message.""" """Process a pubnub message."""
device = self.get_device_detail(device_id) device = self.get_device_detail(device_id)
activities = activities_from_pubnub_message(device, date_time, message) activities = activities_from_pubnub_message(device, date_time, message)
activity_stream = self.activity_stream
assert activity_stream is not None
if activities: if activities:
self.activity_stream.async_process_newer_device_activities(activities) activity_stream.async_process_newer_device_activities(activities)
self.async_signal_device_id_update(device.device_id) self.async_signal_device_id_update(device.device_id)
self.activity_stream.async_schedule_house_id_refresh(device.house_id) activity_stream.async_schedule_house_id_refresh(device.house_id)
@callback @callback
def async_stop(self): def async_stop(self):

View File

@ -1,16 +1,24 @@
"""Consume the august activity stream.""" """Consume the august activity stream."""
import asyncio import asyncio
from datetime import datetime
import logging import logging
from aiohttp import ClientError from aiohttp import ClientError
from yalexs.activity import (
Activity,
ActivityType,
)
from yalexs.api_async import ApiAsync
from yalexs.pubnub_async import AugustPubNub
from yalexs.util import get_latest_activity from yalexs.util import get_latest_activity
from homeassistant.core import callback from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.debounce import Debouncer from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.event import async_call_later from homeassistant.helpers.event import async_call_later
from homeassistant.util.dt import utcnow from homeassistant.util.dt import utcnow
from .const import ACTIVITY_UPDATE_INTERVAL from .const import ACTIVITY_UPDATE_INTERVAL
from .gateway import AugustGateway
from .subscriber import AugustSubscriberMixin from .subscriber import AugustSubscriberMixin
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -18,29 +26,50 @@ _LOGGER = logging.getLogger(__name__)
ACTIVITY_STREAM_FETCH_LIMIT = 10 ACTIVITY_STREAM_FETCH_LIMIT = 10
ACTIVITY_CATCH_UP_FETCH_LIMIT = 2500 ACTIVITY_CATCH_UP_FETCH_LIMIT = 2500
# If there is a storm of activity (ie lock, unlock, door open, door close, etc)
# we want to debounce the updates so we don't hammer the activity api too much.
ACTIVITY_DEBOUNCE_COOLDOWN = 3
@callback
def _async_cancel_future_scheduled_updates(cancels: list[CALLBACK_TYPE]) -> None:
"""Cancel future scheduled updates."""
for cancel in cancels:
cancel()
cancels.clear()
class ActivityStream(AugustSubscriberMixin): class ActivityStream(AugustSubscriberMixin):
"""August activity stream handler.""" """August activity stream handler."""
def __init__(self, hass, api, august_gateway, house_ids, pubnub): def __init__(
self,
hass: HomeAssistant,
api: ApiAsync,
august_gateway: AugustGateway,
house_ids: set[str],
pubnub: AugustPubNub,
) -> None:
"""Init August activity stream object.""" """Init August activity stream object."""
super().__init__(hass, ACTIVITY_UPDATE_INTERVAL) super().__init__(hass, ACTIVITY_UPDATE_INTERVAL)
self._hass = hass self._hass = hass
self._schedule_updates = {} self._schedule_updates: dict[str, list[CALLBACK_TYPE]] = {}
self._august_gateway = august_gateway self._august_gateway = august_gateway
self._api = api self._api = api
self._house_ids = house_ids self._house_ids = house_ids
self._latest_activities = {} self._latest_activities: dict[str, dict[ActivityType, Activity]] = {}
self._last_update_time = None self._did_first_update = False
self.pubnub = pubnub self.pubnub = pubnub
self._update_debounce = {} self._update_debounce: dict[str, Debouncer] = {}
async def async_setup(self): async def async_setup(self):
"""Token refresh check and catch up the activity stream.""" """Token refresh check and catch up the activity stream."""
for house_id in self._house_ids: self._update_debounce = {
self._update_debounce[house_id] = self._async_create_debouncer(house_id) house_id: self._async_create_debouncer(house_id)
for house_id in self._house_ids
}
await self._async_refresh(utcnow()) await self._async_refresh(utcnow())
self._did_first_update = True
@callback @callback
def _async_create_debouncer(self, house_id): def _async_create_debouncer(self, house_id):
@ -52,7 +81,7 @@ class ActivityStream(AugustSubscriberMixin):
return Debouncer( return Debouncer(
self._hass, self._hass,
_LOGGER, _LOGGER,
cooldown=ACTIVITY_UPDATE_INTERVAL.total_seconds(), cooldown=ACTIVITY_DEBOUNCE_COOLDOWN,
immediate=True, immediate=True,
function=_async_update_house_id, function=_async_update_house_id,
) )
@ -62,73 +91,73 @@ class ActivityStream(AugustSubscriberMixin):
"""Cleanup any debounces.""" """Cleanup any debounces."""
for debouncer in self._update_debounce.values(): for debouncer in self._update_debounce.values():
debouncer.async_cancel() debouncer.async_cancel()
for house_id, updater in self._schedule_updates.items(): for cancels in self._schedule_updates.values():
if updater is not None: _async_cancel_future_scheduled_updates(cancels)
updater()
self._schedule_updates[house_id] = None
def get_latest_device_activity(self, device_id, activity_types): def get_latest_device_activity(
self, device_id: str, activity_types: set[ActivityType]
) -> Activity | None:
"""Return latest activity that is one of the activity_types.""" """Return latest activity that is one of the activity_types."""
if device_id not in self._latest_activities: if not (latest_device_activities := self._latest_activities.get(device_id)):
return None return None
latest_device_activities = self._latest_activities[device_id] latest_activity: Activity | None = None
latest_activity = None
for activity_type in activity_types: for activity_type in activity_types:
if activity_type in latest_device_activities: if activity := latest_device_activities.get(activity_type):
if ( if (
latest_activity is not None latest_activity
and latest_device_activities[activity_type].activity_start_time and activity.activity_start_time
<= latest_activity.activity_start_time <= latest_activity.activity_start_time
): ):
continue continue
latest_activity = latest_device_activities[activity_type] latest_activity = activity
return latest_activity return latest_activity
async def _async_refresh(self, time): async def _async_refresh(self, time: datetime) -> None:
"""Update the activity stream from August.""" """Update the activity stream from August."""
# This is the only place we refresh the api token # This is the only place we refresh the api token
await self._august_gateway.async_refresh_access_token_if_needed() await self._august_gateway.async_refresh_access_token_if_needed()
if self.pubnub.connected: if self.pubnub.connected:
_LOGGER.debug("Skipping update because pubnub is connected") _LOGGER.debug("Skipping update because pubnub is connected")
return return
await self._async_update_device_activities(time)
async def _async_update_device_activities(self, time):
_LOGGER.debug("Start retrieving device activities") _LOGGER.debug("Start retrieving device activities")
await asyncio.gather( await asyncio.gather(
*( *(debouncer.async_call() for debouncer in self._update_debounce.values())
self._update_debounce[house_id].async_call()
for house_id in self._house_ids
)
) )
self._last_update_time = time
@callback @callback
def async_schedule_house_id_refresh(self, house_id): def async_schedule_house_id_refresh(self, house_id: str) -> None:
"""Update for a house activities now and once in the future.""" """Update for a house activities now and once in the future."""
if self._schedule_updates.get(house_id): if cancels := self._schedule_updates.get(house_id):
self._schedule_updates[house_id]() _async_cancel_future_scheduled_updates(cancels)
self._schedule_updates[house_id] = None
async def _update_house_activities(_): debouncer = self._update_debounce[house_id]
await self._update_debounce[house_id].async_call()
self._hass.async_create_task(self._update_debounce[house_id].async_call()) self._hass.async_create_task(debouncer.async_call())
# Schedule an update past the debounce to ensure # Schedule two updates past the debounce time
# we catch the case where the lock operator is # to ensure we catch the case where the activity
# not updated or the lock failed # api does not update right away and we need to poll
self._schedule_updates[house_id] = async_call_later( # it again. Sometimes the lock operator or a doorbell
self._hass, # will not show up in the activity stream right away.
ACTIVITY_UPDATE_INTERVAL.total_seconds() + 1, future_updates = self._schedule_updates.setdefault(house_id, [])
_update_house_activities,
)
async def _async_update_house_id(self, house_id): async def _update_house_activities(now: datetime) -> None:
await debouncer.async_call()
for step in (1, 2):
future_updates.append(
async_call_later(
self._hass,
(step * ACTIVITY_DEBOUNCE_COOLDOWN) + 0.1,
_update_house_activities,
)
)
async def _async_update_house_id(self, house_id: str) -> None:
"""Update device activities for a house.""" """Update device activities for a house."""
if self._last_update_time: if self._did_first_update:
limit = ACTIVITY_STREAM_FETCH_LIMIT limit = ACTIVITY_STREAM_FETCH_LIMIT
else: else:
limit = ACTIVITY_CATCH_UP_FETCH_LIMIT limit = ACTIVITY_CATCH_UP_FETCH_LIMIT
@ -150,36 +179,34 @@ class ActivityStream(AugustSubscriberMixin):
_LOGGER.debug( _LOGGER.debug(
"Completed retrieving device activities for house id %s", house_id "Completed retrieving device activities for house id %s", house_id
) )
for device_id in self.async_process_newer_device_activities(activities):
updated_device_ids = self.async_process_newer_device_activities(activities)
if not updated_device_ids:
return
for device_id in updated_device_ids:
_LOGGER.debug( _LOGGER.debug(
"async_signal_device_id_update (from activity stream): %s", "async_signal_device_id_update (from activity stream): %s",
device_id, device_id,
) )
self.async_signal_device_id_update(device_id) self.async_signal_device_id_update(device_id)
def async_process_newer_device_activities(self, activities): def async_process_newer_device_activities(
self, activities: list[Activity]
) -> set[str]:
"""Process activities if they are newer than the last one.""" """Process activities if they are newer than the last one."""
updated_device_ids = set() updated_device_ids = set()
latest_activities = self._latest_activities
for activity in activities: for activity in activities:
device_id = activity.device_id device_id = activity.device_id
activity_type = activity.activity_type activity_type = activity.activity_type
device_activities = self._latest_activities.setdefault(device_id, {}) device_activities = latest_activities.setdefault(device_id, {})
# Ignore activities that are older than the latest one unless it is a non # Ignore activities that are older than the latest one unless it is a non
# locking or unlocking activity with the exact same start time. # locking or unlocking activity with the exact same start time.
if ( last_activity = device_activities.get(activity_type)
get_latest_activity(activity, device_activities.get(activity_type)) # The activity stream can have duplicate activities. So we need
!= activity # to call get_latest_activity to figure out if if the activity
): # is actually newer than the last one.
latest_activity = get_latest_activity(activity, last_activity)
if latest_activity != activity:
continue continue
device_activities[activity_type] = activity device_activities[activity_type] = activity
updated_device_ids.add(device_id) updated_device_ids.add(device_id)
return updated_device_ids return updated_device_ids

View File

@ -1,25 +1,30 @@
"""Base class for August entity.""" """Base class for August entity."""
from abc import abstractmethod
from datetime import datetime, timedelta
from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import callback from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.event import async_track_time_interval
class AugustSubscriberMixin: class AugustSubscriberMixin:
"""Base implementation for a subscriber.""" """Base implementation for a subscriber."""
def __init__(self, hass, update_interval): def __init__(self, hass: HomeAssistant, update_interval: timedelta) -> None:
"""Initialize an subscriber.""" """Initialize an subscriber."""
super().__init__() super().__init__()
self._hass = hass self._hass = hass
self._update_interval = update_interval self._update_interval = update_interval
self._subscriptions = {} self._subscriptions: dict[str, list[CALLBACK_TYPE]] = {}
self._unsub_interval = None self._unsub_interval: CALLBACK_TYPE | None = None
self._stop_interval = None self._stop_interval: CALLBACK_TYPE | None = None
@callback @callback
def async_subscribe_device_id(self, device_id, update_callback): def async_subscribe_device_id(
self, device_id: str, update_callback: CALLBACK_TYPE
) -> CALLBACK_TYPE:
"""Add an callback subscriber. """Add an callback subscriber.
Returns a callable that can be used to unsubscribe. Returns a callable that can be used to unsubscribe.
@ -34,8 +39,12 @@ class AugustSubscriberMixin:
return _unsubscribe return _unsubscribe
@abstractmethod
async def _async_refresh(self, time: datetime) -> None:
"""Refresh data."""
@callback @callback
def _async_setup_listeners(self): def _async_setup_listeners(self) -> None:
"""Create interval and stop listeners.""" """Create interval and stop listeners."""
self._unsub_interval = async_track_time_interval( self._unsub_interval = async_track_time_interval(
self._hass, self._hass,
@ -54,7 +63,9 @@ class AugustSubscriberMixin:
) )
@callback @callback
def async_unsubscribe_device_id(self, device_id, update_callback): def async_unsubscribe_device_id(
self, device_id: str, update_callback: CALLBACK_TYPE
) -> None:
"""Remove a callback subscriber.""" """Remove a callback subscriber."""
self._subscriptions[device_id].remove(update_callback) self._subscriptions[device_id].remove(update_callback)
if not self._subscriptions[device_id]: if not self._subscriptions[device_id]:
@ -63,14 +74,15 @@ class AugustSubscriberMixin:
if self._subscriptions: if self._subscriptions:
return return
self._unsub_interval() if self._unsub_interval:
self._unsub_interval = None self._unsub_interval()
self._unsub_interval = None
if self._stop_interval: if self._stop_interval:
self._stop_interval() self._stop_interval()
self._stop_interval = None self._stop_interval = None
@callback @callback
def async_signal_device_id_update(self, device_id): def async_signal_device_id_update(self, device_id: str) -> None:
"""Call the callbacks for a device_id.""" """Call the callbacks for a device_id."""
if not self._subscriptions.get(device_id): if not self._subscriptions.get(device_id):
return return

View File

@ -0,0 +1,13 @@
"""August tests conftest."""
from unittest.mock import patch
import pytest
@pytest.fixture(name="mock_discovery", autouse=True)
def mock_discovery_fixture():
"""Mock discovery to avoid loading the whole bluetooth stack."""
with patch(
"homeassistant.components.august.discovery_flow.async_create_flow"
) as mock_discovery:
yield mock_discovery

View File

@ -162,24 +162,23 @@ async def _create_august_api_with_devices( # noqa: C901
_mock_door_operation_activity(lock, "dooropen", 0), _mock_door_operation_activity(lock, "dooropen", 0),
] ]
if "get_lock_detail" not in api_call_side_effects: api_call_side_effects.setdefault("get_lock_detail", get_lock_detail_side_effect)
api_call_side_effects["get_lock_detail"] = get_lock_detail_side_effect api_call_side_effects.setdefault(
if "get_doorbell_detail" not in api_call_side_effects: "get_doorbell_detail", get_doorbell_detail_side_effect
api_call_side_effects["get_doorbell_detail"] = get_doorbell_detail_side_effect )
if "get_operable_locks" not in api_call_side_effects: api_call_side_effects.setdefault(
api_call_side_effects["get_operable_locks"] = get_operable_locks_side_effect "get_operable_locks", get_operable_locks_side_effect
if "get_doorbells" not in api_call_side_effects: )
api_call_side_effects["get_doorbells"] = get_doorbells_side_effect api_call_side_effects.setdefault("get_doorbells", get_doorbells_side_effect)
if "get_house_activities" not in api_call_side_effects: api_call_side_effects.setdefault(
api_call_side_effects["get_house_activities"] = get_house_activities_side_effect "get_house_activities", get_house_activities_side_effect
if "lock_return_activities" not in api_call_side_effects: )
api_call_side_effects[ api_call_side_effects.setdefault(
"lock_return_activities" "lock_return_activities", lock_return_activities_side_effect
] = lock_return_activities_side_effect )
if "unlock_return_activities" not in api_call_side_effects: api_call_side_effects.setdefault(
api_call_side_effects[ "unlock_return_activities", unlock_return_activities_side_effect
"unlock_return_activities" )
] = unlock_return_activities_side_effect
api_instance, entry = await _mock_setup_august_with_api_side_effects( api_instance, entry = await _mock_setup_august_with_api_side_effects(
hass, api_call_side_effects, pubnub hass, api_call_side_effects, pubnub

View File

@ -1,6 +1,6 @@
"""The tests for the august platform.""" """The tests for the august platform."""
import asyncio import asyncio
from unittest.mock import patch from unittest.mock import Mock, patch
from aiohttp import ClientResponseError from aiohttp import ClientResponseError
from yalexs.authenticator_common import AuthenticationState from yalexs.authenticator_common import AuthenticationState
@ -361,19 +361,18 @@ async def test_load_unload(hass: HomeAssistant) -> None:
await hass.async_block_till_done() await hass.async_block_till_done()
async def test_load_triggers_ble_discovery(hass: HomeAssistant) -> None: async def test_load_triggers_ble_discovery(
hass: HomeAssistant, mock_discovery: Mock
) -> None:
"""Test that loading a lock that supports offline ble operation passes the keys to yalexe_ble.""" """Test that loading a lock that supports offline ble operation passes the keys to yalexe_ble."""
august_lock_with_key = await _mock_lock_with_offline_key(hass) august_lock_with_key = await _mock_lock_with_offline_key(hass)
august_lock_without_key = await _mock_operative_august_lock_detail(hass) august_lock_without_key = await _mock_operative_august_lock_detail(hass)
with patch( config_entry = await _create_august_with_devices(
"homeassistant.components.august.discovery_flow.async_create_flow" hass, [august_lock_with_key, august_lock_without_key]
) as mock_discovery: )
config_entry = await _create_august_with_devices( await hass.async_block_till_done()
hass, [august_lock_with_key, august_lock_without_key]
)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.LOADED assert config_entry.state is ConfigEntryState.LOADED
assert len(mock_discovery.mock_calls) == 1 assert len(mock_discovery.mock_calls) == 1