From 0bec93fa3755b6d490a044af6ae9c413a6e2c5c2 Mon Sep 17 00:00:00 2001 From: starkillerOG Date: Mon, 26 Jun 2023 21:54:40 +0200 Subject: [PATCH] Reolink ONVIF long polling (#94770) --- homeassistant/components/reolink/host.py | 119 +++++++++++++++++++---- tests/components/reolink/test_init.py | 6 +- 2 files changed, 105 insertions(+), 20 deletions(-) diff --git a/homeassistant/components/reolink/host.py b/homeassistant/components/reolink/host.py index ec4ca304d49..81fbda63fef 100644 --- a/homeassistant/components/reolink/host.py +++ b/homeassistant/components/reolink/host.py @@ -25,9 +25,11 @@ from .const import CONF_PROTOCOL, CONF_USE_HTTPS, DOMAIN from .exceptions import ReolinkSetupException, ReolinkWebhookException, UserNotAdmin DEFAULT_TIMEOUT = 60 -FIRST_ONVIF_TIMEOUT = 15 +FIRST_ONVIF_TIMEOUT = 10 SUBSCRIPTION_RENEW_THRESHOLD = 300 POLL_INTERVAL_NO_PUSH = 5 +LONG_POLL_COOLDOWN = 0.75 +LONG_POLL_ERROR_COOLDOWN = 30 _LOGGER = logging.getLogger(__name__) @@ -60,10 +62,14 @@ class ReolinkHost: self.webhook_id: str | None = None self._base_url: str = "" self._webhook_url: str = "" - self._webhook_reachable: asyncio.Event = asyncio.Event() + self._webhook_reachable: bool = False + self._long_poll_received: bool = False + self._long_poll_error: bool = False self._cancel_poll: CALLBACK_TYPE | None = None self._cancel_onvif_check: CALLBACK_TYPE | None = None + self._cancel_long_poll_check: CALLBACK_TYPE | None = None self._poll_job = HassJob(self._async_poll_all_motion, cancel_on_shutdown=True) + self._long_poll_task: asyncio.Task | None = None self._lost_subscription: bool = False @property @@ -185,15 +191,32 @@ class ReolinkHost: async def _async_check_onvif(self, *_) -> None: """Check the ONVIF subscription.""" - if ( - self._api.supported(None, "initial_ONVIF_state") - and not self._webhook_reachable.is_set() - ): + if self._webhook_reachable: + ir.async_delete_issue(self._hass, DOMAIN, "webhook_url") + self._cancel_onvif_check = None + return + if self._api.supported(None, "initial_ONVIF_state"): _LOGGER.debug( "Did not receive initial ONVIF state on webhook '%s' after %i seconds", self._webhook_url, FIRST_ONVIF_TIMEOUT, ) + + # ONVIF push is not received, start long polling and schedule check + await self._async_start_long_polling() + self._cancel_long_poll_check = async_call_later( + self._hass, FIRST_ONVIF_TIMEOUT, self._async_check_onvif_long_poll + ) + + self._cancel_onvif_check = None + + async def _async_check_onvif_long_poll(self, *_) -> None: + """Check if ONVIF long polling is working.""" + if not self._long_poll_received: + _LOGGER.debug( + "Did not receive state through ONVIF long polling after %i seconds", + FIRST_ONVIF_TIMEOUT, + ) ir.async_create_issue( self._hass, DOMAIN, @@ -210,10 +233,10 @@ class ReolinkHost: else: ir.async_delete_issue(self._hass, DOMAIN, "webhook_url") - # If no ONVIF push is received, start fast polling + # If no ONVIF push or long polling state is received, start fast polling await self._async_poll_all_motion() - self._cancel_onvif_check = None + self._cancel_long_poll_check = None async def update_states(self) -> None: """Call the API of the camera device to update the internal states.""" @@ -241,6 +264,20 @@ class ReolinkHost: str(err), ) + async def _async_start_long_polling(self): + """Start ONVIF long polling task.""" + if self._long_poll_task is None: + await self._api.subscribe(sub_type=SubType.long_poll) + self._long_poll_task = asyncio.create_task(self._async_long_polling()) + + async def _async_stop_long_polling(self): + """Stop ONVIF long polling task.""" + if self._long_poll_task is not None: + self._long_poll_task.cancel() + self._long_poll_task = None + + await self._api.unsubscribe(sub_type=SubType.long_poll) + async def stop(self, event=None): """Disconnect the API.""" if self._cancel_poll is not None: @@ -249,6 +286,10 @@ class ReolinkHost: if self._cancel_onvif_check is not None: self._cancel_onvif_check() self._cancel_onvif_check = None + if self._cancel_long_poll_check is not None: + self._cancel_long_poll_check() + self._cancel_long_poll_check = None + await self._async_stop_long_polling() self.unregister_webhook() await self.disconnect() @@ -277,6 +318,8 @@ class ReolinkHost: """Renew the subscription of motion events (lease time is 15 minutes).""" try: await self._renew(SubType.push) + if self._long_poll_task is not None: + await self._renew(SubType.long_poll) except SubscriptionError as err: if not self._lost_subscription: self._lost_subscription = True @@ -297,7 +340,10 @@ class ReolinkHost: self._api.host, sub_type, ) - await self.subscribe() + if sub_type == SubType.push: + await self.subscribe() + else: + await self._api.subscribe(self._webhook_url, sub_type) return timer = self._api.renewtimer(sub_type) @@ -386,10 +432,44 @@ class ReolinkHost: webhook.async_unregister(self._hass, self.webhook_id) self.webhook_id = None + async def _async_long_polling(self, *_) -> None: + """Use ONVIF long polling to immediately receive events.""" + # This task will be cancelled once _async_stop_long_polling is called + while True: + if self._webhook_reachable: + self._long_poll_task = None + await self._async_stop_long_polling() + return + + try: + channels = await self._api.pull_point_request() + except ReolinkError as ex: + if not self._long_poll_error: + _LOGGER.error("Error while requesting ONVIF pull point: %s", ex) + await self._api.unsubscribe(sub_type=SubType.long_poll) + self._long_poll_error = True + await asyncio.sleep(LONG_POLL_ERROR_COOLDOWN) + continue + except Exception as ex: + _LOGGER.exception("Error while requesting ONVIF pull point: %s", ex) + await self._api.unsubscribe(sub_type=SubType.long_poll) + raise ex + + self._long_poll_error = False + + if not self._long_poll_received and channels != []: + self._long_poll_received = True + ir.async_delete_issue(self._hass, DOMAIN, "webhook_url") + + self._signal_write_ha_state(channels) + + # Cooldown to prevent CPU over usage on camera freezes + await asyncio.sleep(LONG_POLL_COOLDOWN) + async def _async_poll_all_motion(self, *_) -> None: """Poll motion and AI states until the first ONVIF push is received.""" - if self._webhook_reachable.is_set(): - # ONVIF push is working, stop polling + if self._webhook_reachable or self._long_poll_received: + # ONVIF push or long polling is working, stop fast polling self._cancel_poll = None return @@ -409,10 +489,7 @@ class ReolinkHost: self._hass, POLL_INTERVAL_NO_PUSH, self._poll_job ) - # After receiving the new motion states in the upstream lib, - # update the binary sensors with async_write_ha_state - # The same dispatch as for the webhook can be used - async_dispatcher_send(self._hass, f"{self.webhook_id}_all", {}) + self._signal_write_ha_state(None) async def handle_webhook( self, hass: HomeAssistant, webhook_id: str, request: Request @@ -460,8 +537,8 @@ class ReolinkHost: """Process the data from the Reolink webhook.""" # This task is executed in the background so we need to catch exceptions # and log them - if not self._webhook_reachable.is_set(): - self._webhook_reachable.set() + if not self._webhook_reachable: + self._webhook_reachable = True ir.async_delete_issue(self._hass, DOMAIN, "webhook_url") try: @@ -484,9 +561,13 @@ class ReolinkHost: ) return + self._signal_write_ha_state(channels) + + def _signal_write_ha_state(self, channels: list[int] | None) -> None: + """Update the binary sensors with async_write_ha_state.""" if channels is None: - async_dispatcher_send(hass, f"{webhook_id}_all", {}) + async_dispatcher_send(self._hass, f"{self.webhook_id}_all", {}) return for channel in channels: - async_dispatcher_send(hass, f"{webhook_id}_{channel}", {}) + async_dispatcher_send(self._hass, f"{self.webhook_id}_{channel}", {}) diff --git a/tests/components/reolink/test_init.py b/tests/components/reolink/test_init.py index 8dd6db270fb..1e588d5e3a1 100644 --- a/tests/components/reolink/test_init.py +++ b/tests/components/reolink/test_init.py @@ -148,7 +148,11 @@ async def test_webhook_repair_issue( hass: HomeAssistant, config_entry: MockConfigEntry ) -> None: """Test repairs issue is raised when the webhook url is unreachable.""" - with patch("homeassistant.components.reolink.host.FIRST_ONVIF_TIMEOUT", new=0): + with patch( + "homeassistant.components.reolink.host.FIRST_ONVIF_TIMEOUT", new=0 + ), patch( + "homeassistant.components.reolink.host.ReolinkHost._async_long_polling", + ): assert await hass.config_entries.async_setup(config_entry.entry_id) await hass.async_block_till_done()