From adc8a13f93920d1f47a22b5dadc144022cb3fffb Mon Sep 17 00:00:00 2001 From: starkillerOG Date: Sat, 15 Apr 2023 03:05:22 +0200 Subject: [PATCH] Reolink prevent ONVIF push being lost due to ConnectionResetError (#91070) * Make "Connection lost" error less likely * Handle connection loss during ONVIF event reading * tweak * fix styling * catch asyncio.CancelledError from request.text() * missing () * re-raise cancelation for proper cleanup * Simplify * Also set webhook_reachable if connection lost * fix styntax * Send HTTP_OK directly after data read done * protect agains garbage collection * Protect shielded task (inner) not shielded future (outer) * fix black * Make sure exceptions are logged * fix spelling * fix black * fix spelling * Simplify using hass.async_create_task * clarify comment * Eleborate comment * Update homeassistant/components/reolink/host.py Co-authored-by: J. Nick Koston * Apply suggestions from bdraco --------- Co-authored-by: J. Nick Koston --- homeassistant/components/reolink/host.py | 66 ++++++++++++++++++------ 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/homeassistant/components/reolink/host.py b/homeassistant/components/reolink/host.py index e6c90343229..214c36230da 100644 --- a/homeassistant/components/reolink/host.py +++ b/homeassistant/components/reolink/host.py @@ -361,34 +361,70 @@ class ReolinkHost: async def handle_webhook( self, hass: HomeAssistant, webhook_id: str, request: Request - ): + ) -> None: """Shield the incoming webhook callback from cancellation.""" - await asyncio.shield(self.handle_webhook_shielded(hass, webhook_id, request)) - - async def handle_webhook_shielded( - self, hass: HomeAssistant, webhook_id: str, request: Request - ): - """Handle incoming webhook from Reolink for inbound messages and calls.""" - + shielded_future = asyncio.shield( + self._handle_webhook(hass, webhook_id, request) + ) _LOGGER.debug("Webhook '%s' called", webhook_id) if not self._webhook_reachable.is_set(): self._webhook_reachable.set() + ir.async_delete_issue(self._hass, DOMAIN, "webhook_url") + await shielded_future - if not request.body_exists: - _LOGGER.debug("Webhook '%s' triggered without payload", webhook_id) + async def _handle_webhook( + self, hass: HomeAssistant, webhook_id: str, request: Request + ) -> None: + """Handle incoming webhook from Reolink for inbound messages and calls.""" + try: + data = await request.text() + except ConnectionResetError: + # We lost the connection before reading the message, fallback to polling + # No need for a background task here as we already know the connection is lost + _LOGGER.debug( + "Webhook '%s' called, but lost connection before reading message, issuing poll", + webhook_id, + ) + if not await self._api.get_motion_state_all_ch(): + _LOGGER.error( + "Could not poll motion state after losing connection during receiving ONVIF event" + ) + return + async_dispatcher_send(hass, f"{webhook_id}_all", {}) return - data = await request.text() if not data: _LOGGER.debug( "Webhook '%s' triggered with unknown payload: %s", webhook_id, data ) return - channels = await self._api.ONVIF_event_callback(data) + # We received the data but we want handle_webhook to return as soon as possible + # so we process the data in the background + hass.async_create_background_task( + self._process_webhook_data(hass, webhook_id, data), + "Process Reolink webhook", + ) + + async def _process_webhook_data( + self, hass: HomeAssistant, webhook_id: str, data: str + ) -> None: + """Process the data from the webhook.""" + # This task is executed in the background so we need to catch exceptions + # and log them + try: + channels = await self._api.ONVIF_event_callback(data) + except Exception as ex: # pylint: disable=broad-except + _LOGGER.exception( + "Error processing ONVIF event for Reolink %s: %s", + self._api.nvr_name, + ex, + ) + return if channels is None: async_dispatcher_send(hass, f"{webhook_id}_all", {}) - else: - for channel in channels: - async_dispatcher_send(hass, f"{webhook_id}_{channel}", {}) + return + + for channel in channels: + async_dispatcher_send(hass, f"{webhook_id}_{channel}", {})