Reduce overhead to update august activities (#98730)

This commit is contained in:
J. Nick Koston 2023-08-21 04:48:54 -04:00 committed by GitHub
parent a713d7585f
commit 976f6582e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,6 +1,7 @@
"""Consume the august activity stream.""" """Consume the august activity stream."""
import asyncio import asyncio
from datetime import datetime from datetime import datetime
from functools import partial
import logging import logging
from aiohttp import ClientError from aiohttp import ClientError
@ -9,7 +10,7 @@ from yalexs.api_async import ApiAsync
from yalexs.pubnub_async import AugustPubNub from yalexs.pubnub_async import AugustPubNub
from yalexs.util import get_latest_activity from yalexs.util import get_latest_activity
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
from homeassistant.helpers.debounce import Debouncer from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.event import async_call_later from homeassistant.helpers.event import async_call_later
from homeassistant.util.dt import utcnow from homeassistant.util.dt import utcnow
@ -58,33 +59,38 @@ class ActivityStream(AugustSubscriberMixin):
self._did_first_update = False self._did_first_update = False
self.pubnub = pubnub self.pubnub = pubnub
self._update_debounce: dict[str, Debouncer] = {} self._update_debounce: dict[str, Debouncer] = {}
self._update_debounce_jobs: dict[str, HassJob] = {}
async def async_setup(self): async def _async_update_house_id_later(
self, debouncer: Debouncer, _: datetime
) -> None:
"""Call a debouncer from async_call_later."""
await debouncer.async_call()
async def async_setup(self) -> None:
"""Token refresh check and catch up the activity stream.""" """Token refresh check and catch up the activity stream."""
self._update_debounce = { update_debounce = self._update_debounce
house_id: self._async_create_debouncer(house_id) update_debounce_jobs = self._update_debounce_jobs
for house_id in self._house_ids for house_id in self._house_ids:
} debouncer = Debouncer(
await self._async_refresh(utcnow())
self._did_first_update = True
@callback
def _async_create_debouncer(self, house_id):
"""Create a debouncer for the house id."""
async def _async_update_house_id():
await self._async_update_house_id(house_id)
return Debouncer(
self._hass, self._hass,
_LOGGER, _LOGGER,
cooldown=ACTIVITY_DEBOUNCE_COOLDOWN, cooldown=ACTIVITY_DEBOUNCE_COOLDOWN,
immediate=True, immediate=True,
function=_async_update_house_id, function=partial(self._async_update_house_id, house_id),
)
update_debounce[house_id] = debouncer
update_debounce_jobs[house_id] = HassJob(
partial(self._async_update_house_id_later, debouncer),
f"debounced august activity update for {house_id}",
cancel_on_shutdown=True,
) )
await self._async_refresh(utcnow())
self._did_first_update = True
@callback @callback
def async_stop(self): def async_stop(self) -> None:
"""Cleanup any debounces.""" """Cleanup any debounces."""
for debouncer in self._update_debounce.values(): for debouncer in self._update_debounce.values():
debouncer.async_cancel() debouncer.async_cancel()
@ -127,28 +133,23 @@ class ActivityStream(AugustSubscriberMixin):
@callback @callback
def async_schedule_house_id_refresh(self, house_id: str) -> None: def async_schedule_house_id_refresh(self, house_id: str) -> None:
"""Update for a house activities now and once in the future.""" """Update for a house activities now and once in the future."""
if cancels := self._schedule_updates.get(house_id): if future_updates := self._schedule_updates.setdefault(house_id, []):
_async_cancel_future_scheduled_updates(cancels) _async_cancel_future_scheduled_updates(future_updates)
debouncer = self._update_debounce[house_id] debouncer = self._update_debounce[house_id]
self._hass.async_create_task(debouncer.async_call()) self._hass.async_create_task(debouncer.async_call())
# Schedule two updates past the debounce time # Schedule two updates past the debounce time
# to ensure we catch the case where the activity # to ensure we catch the case where the activity
# api does not update right away and we need to poll # api does not update right away and we need to poll
# it again. Sometimes the lock operator or a doorbell # it again. Sometimes the lock operator or a doorbell
# will not show up in the activity stream right away. # will not show up in the activity stream right away.
future_updates = self._schedule_updates.setdefault(house_id, []) job = self._update_debounce_jobs[house_id]
async def _update_house_activities(now: datetime) -> None:
await debouncer.async_call()
for step in (1, 2): for step in (1, 2):
future_updates.append( future_updates.append(
async_call_later( async_call_later(
self._hass, self._hass,
(step * ACTIVITY_DEBOUNCE_COOLDOWN) + 0.1, (step * ACTIVITY_DEBOUNCE_COOLDOWN) + 0.1,
_update_house_activities, job,
) )
) )