diff --git a/homeassistant/components/onvif/__init__.py b/homeassistant/components/onvif/__init__.py index a7c23064f64..36b4a28dffd 100644 --- a/homeassistant/components/onvif/__init__.py +++ b/homeassistant/components/onvif/__init__.py @@ -5,6 +5,7 @@ import logging from httpx import RequestError from onvif.exceptions import ONVIFAuthError, ONVIFError, ONVIFTimeoutError +from onvif.util import is_auth_error, stringify_onvif_error from zeep.exceptions import Fault, TransportError from homeassistant.components.ffmpeg import CONF_EXTRA_ARGUMENTS @@ -21,7 +22,6 @@ from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady from .const import CONF_SNAPSHOT_AUTH, DEFAULT_ARGUMENTS, DOMAIN from .device import ONVIFDevice -from .util import is_auth_error, stringify_onvif_error LOGGER = logging.getLogger(__name__) diff --git a/homeassistant/components/onvif/config_flow.py b/homeassistant/components/onvif/config_flow.py index ca447c71b84..da948787e49 100644 --- a/homeassistant/components/onvif/config_flow.py +++ b/homeassistant/components/onvif/config_flow.py @@ -6,6 +6,7 @@ from pprint import pformat from typing import Any from urllib.parse import urlparse +from onvif.util import is_auth_error, stringify_onvif_error import voluptuous as vol from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery from wsdiscovery.scope import Scope @@ -40,7 +41,6 @@ from .const import ( LOGGER, ) from .device import get_device -from .util import is_auth_error, stringify_onvif_error CONF_MANUAL_INPUT = "Manually configure ONVIF device" diff --git a/homeassistant/components/onvif/device.py b/homeassistant/components/onvif/device.py index ea2325f271c..1152503a718 100644 --- a/homeassistant/components/onvif/device.py +++ b/homeassistant/components/onvif/device.py @@ -195,7 +195,9 @@ class ONVIFDevice: await device_mgmt.SetSystemDateAndTime(dt_param) LOGGER.debug("%s: SetSystemDateAndTime: success", self.name) return - except Fault: + # Some cameras don't support setting the timezone and will throw an IndexError + # if we try to set it. If we get an error, try again without the timezone. + except (IndexError, Fault): if idx == timezone_max_idx: raise @@ -280,7 +282,7 @@ class ONVIFDevice: # Set Date and Time ourselves if Date and Time is set manually in the camera. try: await self.async_manually_set_date_and_time() - except (RequestError, TransportError): + except (RequestError, TransportError, IndexError, Fault): LOGGER.warning("%s: Could not sync date/time on this camera", self.name) async def async_get_device_info(self) -> DeviceInfo: diff --git a/homeassistant/components/onvif/event.py b/homeassistant/components/onvif/event.py index 507eda60097..dbff9660b12 100644 --- a/homeassistant/components/onvif/event.py +++ b/homeassistant/components/onvif/event.py @@ -3,32 +3,30 @@ from __future__ import annotations import asyncio from collections.abc import Callable -from contextlib import suppress import datetime as dt from aiohttp.web import Request from httpx import RemoteProtocolError, RequestError, TransportError from onvif import ONVIFCamera, ONVIFService -from onvif.client import NotificationManager, retry_connection_error +from onvif.client import ( + NotificationManager, + PullPointManager as ONVIFPullPointManager, + retry_connection_error, +) from onvif.exceptions import ONVIFError +from onvif.util import stringify_onvif_error from zeep.exceptions import Fault, ValidationError, XMLParseError from homeassistant.components import webhook from homeassistant.config_entries import ConfigEntry -from homeassistant.core import ( - CALLBACK_TYPE, - CoreState, - HassJob, - HomeAssistant, - callback, -) +from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback +from homeassistant.helpers.device_registry import format_mac from homeassistant.helpers.event import async_call_later from homeassistant.helpers.network import NoURLAvailableError, get_url from .const import DOMAIN, LOGGER from .models import Event, PullPointManagerState, WebHookManagerState from .parsers import PARSERS -from .util import stringify_onvif_error # Topics in this list are ignored because we do not want to create # entities for them. @@ -51,11 +49,6 @@ RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS) # SUBSCRIPTION_TIME = dt.timedelta(minutes=10) -# SUBSCRIPTION_RELATIVE_TIME uses a relative time since the time on the camera -# is not reliable. We use 600 seconds (10 minutes) since some cameras cannot -# parse time in the format "PT10M" (10 minutes). -SUBSCRIPTION_RELATIVE_TIME = "PT600S" - # SUBSCRIPTION_RENEW_INTERVAL Must be less than the # overall timeout of 90 * (SUBSCRIPTION_ATTEMPTS) 2 = 180 seconds # @@ -106,18 +99,13 @@ class EventManager: or self.pullpoint_manager.state == PullPointManagerState.STARTED ) - @property - def has_listeners(self) -> bool: - """Return if there are listeners.""" - return bool(self._listeners) - @callback def async_add_listener(self, update_callback: CALLBACK_TYPE) -> Callable[[], None]: """Listen for data updates.""" - # This is the first listener, set up polling. - if not self._listeners: - self.pullpoint_manager.async_schedule_pull_messages() - + # We always have to listen for events or we will never + # know which sensors to create. In practice we always have + # a listener anyways since binary_sensor and sensor will + # create a listener when they are created. self._listeners.append(update_callback) @callback @@ -133,9 +121,6 @@ class EventManager: if update_callback in self._listeners: self._listeners.remove(update_callback) - if not self._listeners: - self.pullpoint_manager.async_cancel_pull_messages() - async def async_start(self, try_pullpoint: bool, try_webhook: bool) -> bool: """Start polling events.""" # Always start pull point first, since it will populate the event list @@ -255,22 +240,15 @@ class PullPointManager: self._hass = event_manager.hass self._name = event_manager.name - self._pullpoint_subscription: ONVIFService = None self._pullpoint_service: ONVIFService = None - self._pull_lock: asyncio.Lock = asyncio.Lock() + self._pullpoint_manager: ONVIFPullPointManager | None = None self._cancel_pull_messages: CALLBACK_TYPE | None = None - self._cancel_pullpoint_renew: CALLBACK_TYPE | None = None - - self._renew_lock: asyncio.Lock = asyncio.Lock() - self._renew_or_restart_job = HassJob( - self._async_renew_or_restart_pullpoint, - f"{self._name}: renew or restart pullpoint", - ) self._pull_messages_job = HassJob( - self._async_background_pull_messages, + self._async_background_pull_messages_or_reschedule, f"{self._name}: pull messages", ) + self._pull_messages_task: asyncio.Task[None] | None = None async def async_start(self) -> bool: """Start pullpoint subscription.""" @@ -282,6 +260,7 @@ class PullPointManager: self.state = PullPointManagerState.FAILED return False self.state = PullPointManagerState.STARTED + self.async_schedule_pull_messages() return True @callback @@ -291,8 +270,9 @@ class PullPointManager: self.state = PullPointManagerState.PAUSED # Cancel the renew job so we don't renew the subscription # and stop pulling messages. - self._async_cancel_pullpoint_renew() self.async_cancel_pull_messages() + if self._pullpoint_manager: + self._pullpoint_manager.pause() # We do not unsubscribe from the pullpoint subscription and instead # let the subscription expire since some cameras will terminate all # subscriptions if we unsubscribe which will break the webhook. @@ -302,17 +282,150 @@ class PullPointManager: """Resume pullpoint subscription.""" LOGGER.debug("%s: Resuming PullPoint manager", self._name) self.state = PullPointManagerState.STARTED - self.async_schedule_pullpoint_renew(0.0) + if self._pullpoint_manager: + self._pullpoint_manager.resume() + self.async_schedule_pull_messages() - @callback - def async_schedule_pullpoint_renew(self, delay: float) -> None: - """Schedule PullPoint subscription renewal.""" - self._async_cancel_pullpoint_renew() - self._cancel_pullpoint_renew = async_call_later( - self._hass, - delay, - self._renew_or_restart_job, + async def async_stop(self) -> None: + """Unsubscribe from PullPoint and cancel callbacks.""" + self.state = PullPointManagerState.STOPPED + await self._async_cancel_and_unsubscribe() + + async def _async_start_pullpoint(self) -> bool: + """Start pullpoint subscription.""" + try: + await self._async_create_pullpoint_subscription() + except CREATE_ERRORS as err: + LOGGER.debug( + "%s: Device does not support PullPoint service or has too many subscriptions: %s", + self._name, + stringify_onvif_error(err), + ) + return False + return True + + async def _async_cancel_and_unsubscribe(self) -> None: + """Cancel and unsubscribe from PullPoint.""" + self.async_cancel_pull_messages() + if self._pull_messages_task: + self._pull_messages_task.cancel() + await self._async_unsubscribe_pullpoint() + + @retry_connection_error(SUBSCRIPTION_ATTEMPTS) + async def _async_create_pullpoint_subscription(self) -> None: + """Create pullpoint subscription.""" + self._pullpoint_manager = await self._device.create_pullpoint_manager( + SUBSCRIPTION_TIME, self._event_manager.async_mark_events_stale ) + self._pullpoint_service = self._pullpoint_manager.get_service() + await self._pullpoint_manager.set_synchronization_point() + + async def _async_unsubscribe_pullpoint(self) -> None: + """Unsubscribe the pullpoint subscription.""" + if not self._pullpoint_manager or self._pullpoint_manager.closed: + return + LOGGER.debug("%s: Unsubscribing from PullPoint", self._name) + try: + await self._pullpoint_manager.shutdown() + except UNSUBSCRIBE_ERRORS as err: + LOGGER.debug( + ( + "%s: Failed to unsubscribe PullPoint subscription;" + " This is normal if the device restarted: %s" + ), + self._name, + stringify_onvif_error(err), + ) + self._pullpoint_manager = None + + async def _async_pull_messages(self) -> None: + """Pull messages from device.""" + if self._pullpoint_manager is None: + return + assert self._pullpoint_service is not None, "PullPoint service does not exist" + LOGGER.debug( + "%s: Pulling PullPoint messages timeout=%s limit=%s", + self._name, + PULLPOINT_POLL_TIME, + PULLPOINT_MESSAGE_LIMIT, + ) + next_pull_delay = None + response = None + try: + if self._hass.is_running: + response = await self._pullpoint_service.PullMessages( + { + "MessageLimit": PULLPOINT_MESSAGE_LIMIT, + "Timeout": PULLPOINT_POLL_TIME, + } + ) + else: + LOGGER.debug( + "%s: PullPoint skipped because Home Assistant is not running yet", + self._name, + ) + except RemoteProtocolError as err: + # Either a shutdown event or the camera closed the connection. Because + # http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server + # to close the connection at any time, we treat this as a normal. Some + # cameras may close the connection if there are no messages to pull. + LOGGER.debug( + "%s: PullPoint subscription encountered a remote protocol error " + "(this is normal for some cameras): %s", + self._name, + stringify_onvif_error(err), + ) + except Fault as err: + # Device may not support subscriptions so log at debug level + # when we get an XMLParseError + LOGGER.debug( + "%s: Failed to fetch PullPoint subscription messages: %s", + self._name, + stringify_onvif_error(err), + ) + # Treat errors as if the camera restarted. Assume that the pullpoint + # subscription is no longer valid. + self._pullpoint_manager.resume() + except (XMLParseError, RequestError, TimeoutError, TransportError) as err: + LOGGER.debug( + "%s: PullPoint subscription encountered an unexpected error and will be retried " + "(this is normal for some cameras): %s", + self._name, + stringify_onvif_error(err), + ) + # Avoid renewing the subscription too often since it causes problems + # for some cameras, mainly the Tapo ones. + next_pull_delay = SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR + finally: + self.async_schedule_pull_messages(next_pull_delay) + + if self.state != PullPointManagerState.STARTED: + # If the webhook became started working during the long poll, + # and we got paused, our data is stale and we should not process it. + LOGGER.debug( + "%s: PullPoint state is %s (likely due to working webhook), skipping PullPoint messages", + self._name, + self.state, + ) + return + + if not response: + return + + # Parse response + event_manager = self._event_manager + if (notification_message := response.NotificationMessage) and ( + number_of_events := len(notification_message) + ): + LOGGER.debug( + "%s: continuous PullMessages: %s event(s)", + self._name, + number_of_events, + ) + await event_manager.async_parse_messages(notification_message) + event_manager.async_callback_listeners() + else: + LOGGER.debug("%s: continuous PullMessages: no events", self._name) @callback def async_cancel_pull_messages(self) -> None: @@ -338,269 +451,23 @@ class PullPointManager: self._hass, when, self._pull_messages_job ) - async def async_stop(self) -> None: - """Unsubscribe from PullPoint and cancel callbacks.""" - self.state = PullPointManagerState.STOPPED - await self._async_cancel_and_unsubscribe() - - async def _async_start_pullpoint(self) -> bool: - """Start pullpoint subscription.""" - try: - started = await self._async_create_pullpoint_subscription() - except CREATE_ERRORS as err: - LOGGER.debug( - "%s: Device does not support PullPoint service or has too many subscriptions: %s", - self._name, - stringify_onvif_error(err), - ) - return False - - if started: - self.async_schedule_pullpoint_renew(SUBSCRIPTION_RENEW_INTERVAL) - - return started - - async def _async_cancel_and_unsubscribe(self) -> None: - """Cancel and unsubscribe from PullPoint.""" - self._async_cancel_pullpoint_renew() - self.async_cancel_pull_messages() - await self._async_unsubscribe_pullpoint() - - async def _async_renew_or_restart_pullpoint( - self, now: dt.datetime | None = None + @callback + def _async_background_pull_messages_or_reschedule( + self, _now: dt.datetime | None = None ) -> None: - """Renew or start pullpoint subscription.""" - if self._hass.is_stopping or self.state != PullPointManagerState.STARTED: - return - if self._renew_lock.locked(): - LOGGER.debug("%s: PullPoint renew already in progress", self._name) - # Renew is already running, another one will be - # scheduled when the current one is done if needed. - return - async with self._renew_lock: - next_attempt = SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR - try: - if await self._async_renew_pullpoint(): - next_attempt = SUBSCRIPTION_RENEW_INTERVAL - else: - await self._async_restart_pullpoint() - finally: - self.async_schedule_pullpoint_renew(next_attempt) - - @retry_connection_error(SUBSCRIPTION_ATTEMPTS) - async def _async_create_pullpoint_subscription(self) -> bool: - """Create pullpoint subscription.""" - - if not await self._device.create_pullpoint_subscription( - {"InitialTerminationTime": SUBSCRIPTION_RELATIVE_TIME} - ): - LOGGER.debug("%s: Failed to create PullPoint subscription", self._name) - return False - - # Create subscription manager - self._pullpoint_subscription = await self._device.create_subscription_service( - "PullPointSubscription" - ) - - # Create the service that will be used to pull messages from the device. - self._pullpoint_service = await self._device.create_pullpoint_service() - - # Initialize events - with suppress(*SET_SYNCHRONIZATION_POINT_ERRORS): - sync_result = await self._pullpoint_service.SetSynchronizationPoint() - LOGGER.debug("%s: SetSynchronizationPoint: %s", self._name, sync_result) - - # Always schedule an initial pull messages - self.async_schedule_pull_messages(0.0) - - return True - - @callback - def _async_cancel_pullpoint_renew(self) -> None: - """Cancel the pullpoint renew task.""" - if self._cancel_pullpoint_renew: - self._cancel_pullpoint_renew() - self._cancel_pullpoint_renew = None - - async def _async_restart_pullpoint(self) -> bool: - """Restart the subscription assuming the camera rebooted.""" - self.async_cancel_pull_messages() - await self._async_unsubscribe_pullpoint() - restarted = await self._async_start_pullpoint() - if restarted and self._event_manager.has_listeners: - LOGGER.debug("%s: Restarted PullPoint subscription", self._name) - self.async_schedule_pull_messages(0.0) - return restarted - - async def _async_unsubscribe_pullpoint(self) -> None: - """Unsubscribe the pullpoint subscription.""" - if ( - not self._pullpoint_subscription - or self._pullpoint_subscription.transport.client.is_closed - ): - return - LOGGER.debug("%s: Unsubscribing from PullPoint", self._name) - try: - await self._pullpoint_subscription.Unsubscribe() - except UNSUBSCRIBE_ERRORS as err: - LOGGER.debug( - ( - "%s: Failed to unsubscribe PullPoint subscription;" - " This is normal if the device restarted: %s" - ), - self._name, - stringify_onvif_error(err), - ) - self._pullpoint_subscription = None - - @retry_connection_error(SUBSCRIPTION_ATTEMPTS) - async def _async_call_pullpoint_subscription_renew(self) -> None: - """Call PullPoint subscription Renew.""" - await self._pullpoint_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME) - - async def _async_renew_pullpoint(self) -> bool: - """Renew the PullPoint subscription.""" - if ( - not self._pullpoint_subscription - or self._pullpoint_subscription.transport.client.is_closed - ): - return False - try: - # The first time we renew, we may get a Fault error so we - # suppress it. The subscription will be restarted in - # async_restart later. - await self._async_call_pullpoint_subscription_renew() - LOGGER.debug("%s: Renewed PullPoint subscription", self._name) - return True - except RENEW_ERRORS as err: - self._event_manager.async_mark_events_stale() - LOGGER.debug( - "%s: Failed to renew PullPoint subscription; %s", - self._name, - stringify_onvif_error(err), - ) - return False - - async def _async_pull_messages_with_lock(self) -> bool: - """Pull messages from device while holding the lock. - - This function must not be called directly, it should only - be called from _async_pull_messages. - - Returns True if the subscription is working. - - Returns False if the subscription is not working and should be restarted. - """ - assert self._pull_lock.locked(), "Pull lock must be held" - assert self._pullpoint_service is not None, "PullPoint service does not exist" - event_manager = self._event_manager - LOGGER.debug( - "%s: Pulling PullPoint messages timeout=%s limit=%s", - self._name, - PULLPOINT_POLL_TIME, - PULLPOINT_MESSAGE_LIMIT, - ) - try: - response = await self._pullpoint_service.PullMessages( - { - "MessageLimit": PULLPOINT_MESSAGE_LIMIT, - "Timeout": PULLPOINT_POLL_TIME, - } - ) - except RemoteProtocolError as err: - # Either a shutdown event or the camera closed the connection. Because - # http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server - # to close the connection at any time, we treat this as a normal. Some - # cameras may close the connection if there are no messages to pull. - LOGGER.debug( - "%s: PullPoint subscription encountered a remote protocol error " - "(this is normal for some cameras): %s", - self._name, - stringify_onvif_error(err), - ) - return True - except Fault as err: - # Device may not support subscriptions so log at debug level - # when we get an XMLParseError - LOGGER.debug( - "%s: Failed to fetch PullPoint subscription messages: %s", - self._name, - stringify_onvif_error(err), - ) - # Treat errors as if the camera restarted. Assume that the pullpoint - # subscription is no longer valid. - return False - except (XMLParseError, RequestError, TimeoutError, TransportError) as err: - LOGGER.debug( - "%s: PullPoint subscription encountered an unexpected error and will be retried " - "(this is normal for some cameras): %s", - self._name, - stringify_onvif_error(err), - ) - # Avoid renewing the subscription too often since it causes problems - # for some cameras, mainly the Tapo ones. - return True - - if self.state != PullPointManagerState.STARTED: - # If the webhook became started working during the long poll, - # and we got paused, our data is stale and we should not process it. - LOGGER.debug( - "%s: PullPoint is paused (likely due to working webhook), skipping PullPoint messages", - self._name, - ) - return True - - # Parse response - if (notification_message := response.NotificationMessage) and ( - number_of_events := len(notification_message) - ): - LOGGER.debug( - "%s: continuous PullMessages: %s event(s)", - self._name, - number_of_events, - ) - await event_manager.async_parse_messages(notification_message) - event_manager.async_callback_listeners() - else: - LOGGER.debug("%s: continuous PullMessages: no events", self._name) - - return True - - @callback - def _async_background_pull_messages(self, _now: dt.datetime | None = None) -> None: """Pull messages from device in the background.""" - self._cancel_pull_messages = None - self._hass.async_create_background_task( + if self._pull_messages_task and not self._pull_messages_task.done(): + LOGGER.debug( + "%s: PullPoint message pull is already in process, skipping pull", + self._name, + ) + self.async_schedule_pull_messages() + return + self._pull_messages_task = self._hass.async_create_background_task( self._async_pull_messages(), f"{self._name} background pull messages", ) - async def _async_pull_messages(self) -> None: - """Pull messages from device.""" - event_manager = self._event_manager - - if self._pull_lock.locked(): - # Pull messages if the lock is not already locked - # any pull will do, so we don't need to wait for the lock - LOGGER.debug( - "%s: PullPoint subscription is already locked, skipping pull", - self._name, - ) - return - - async with self._pull_lock: - # Before we pop out of the lock we always need to schedule the next pull - # or call async_schedule_pullpoint_renew if the pull fails so the pull - # loop continues. - try: - if self._hass.state == CoreState.running: - if not await self._async_pull_messages_with_lock(): - self.async_schedule_pullpoint_renew(0.0) - return - finally: - if event_manager.has_listeners: - self.async_schedule_pull_messages() - class WebHookManager: """Manage ONVIF webhook subscriptions. @@ -617,21 +484,21 @@ class WebHookManager: self._event_manager = event_manager self._device = event_manager.device self._hass = event_manager.hass - self._webhook_unique_id = f"{DOMAIN}_{event_manager.config_entry.entry_id}" + config_entry = event_manager.config_entry + + self._old_webhook_unique_id = f"{DOMAIN}_{config_entry.entry_id}" + # Some cameras have a limit on the length of the webhook URL + # so we use a shorter unique ID for the webhook. + unique_id = config_entry.unique_id + assert unique_id is not None + webhook_id = format_mac(unique_id).replace(":", "").lower() + self._webhook_unique_id = f"{DOMAIN}{webhook_id}" self._name = event_manager.name self._webhook_url: str | None = None - self._webhook_subscription: ONVIFService | None = None self._notification_manager: NotificationManager | None = None - self._cancel_webhook_renew: CALLBACK_TYPE | None = None - self._renew_lock = asyncio.Lock() - self._renew_or_restart_job = HassJob( - self._async_renew_or_restart_webhook, - f"{self._name}: renew or restart webhook", - ) - async def async_start(self) -> bool: """Start polling events.""" LOGGER.debug("%s: Starting webhook manager", self._name) @@ -649,20 +516,9 @@ class WebHookManager: async def async_stop(self) -> None: """Unsubscribe from events.""" self.state = WebHookManagerState.STOPPED - self._async_cancel_webhook_renew() await self._async_unsubscribe_webhook() self._async_unregister_webhook() - @callback - def _async_schedule_webhook_renew(self, delay: float) -> None: - """Schedule webhook subscription renewal.""" - self._async_cancel_webhook_renew() - self._cancel_webhook_renew = async_call_later( - self._hass, - delay, - self._renew_or_restart_job, - ) - @retry_connection_error(SUBSCRIPTION_ATTEMPTS) async def _async_create_webhook_subscription(self) -> None: """Create webhook subscription.""" @@ -671,14 +527,12 @@ class WebHookManager: self._name, self._webhook_url, ) - self._notification_manager = self._device.create_notification_manager( - { - "InitialTerminationTime": SUBSCRIPTION_RELATIVE_TIME, - "ConsumerReference": {"Address": self._webhook_url}, - } - ) try: - self._webhook_subscription = await self._notification_manager.setup() + self._notification_manager = await self._device.create_notification_manager( + address=self._webhook_url, + interval=SUBSCRIPTION_TIME, + subscription_lost_callback=self._event_manager.async_mark_events_stale, + ) except ValidationError as err: # This should only happen if there is a problem with the webhook URL # that is causing it to not be well formed. @@ -688,7 +542,7 @@ class WebHookManager: err, ) raise - await self._notification_manager.start() + await self._notification_manager.set_synchronization_point() LOGGER.debug( "%s: Webhook subscription created with URL: %s", self._name, @@ -707,62 +561,8 @@ class WebHookManager: stringify_onvif_error(err), ) return False - - self._async_schedule_webhook_renew(SUBSCRIPTION_RENEW_INTERVAL) return True - async def _async_restart_webhook(self) -> bool: - """Restart the webhook subscription assuming the camera rebooted.""" - await self._async_unsubscribe_webhook() - return await self._async_start_webhook() - - @retry_connection_error(SUBSCRIPTION_ATTEMPTS) - async def _async_call_webhook_subscription_renew(self) -> None: - """Call PullPoint subscription Renew.""" - assert self._webhook_subscription is not None - await self._webhook_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME) - - async def _async_renew_webhook(self) -> bool: - """Renew webhook subscription.""" - if ( - not self._webhook_subscription - or self._webhook_subscription.transport.client.is_closed - ): - return False - try: - await self._async_call_webhook_subscription_renew() - LOGGER.debug("%s: Renewed Webhook subscription", self._name) - return True - except RENEW_ERRORS as err: - self._event_manager.async_mark_events_stale() - LOGGER.debug( - "%s: Failed to renew webhook subscription %s", - self._name, - stringify_onvif_error(err), - ) - return False - - async def _async_renew_or_restart_webhook( - self, now: dt.datetime | None = None - ) -> None: - """Renew or start webhook subscription.""" - if self._hass.is_stopping or self.state != WebHookManagerState.STARTED: - return - if self._renew_lock.locked(): - LOGGER.debug("%s: Webhook renew already in progress", self._name) - # Renew is already running, another one will be - # scheduled when the current one is done if needed. - return - async with self._renew_lock: - next_attempt = SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR - try: - if await self._async_renew_webhook(): - next_attempt = SUBSCRIPTION_RENEW_INTERVAL - else: - await self._async_restart_webhook() - finally: - self._async_schedule_webhook_renew(next_attempt) - @callback def _async_register_webhook(self) -> None: """Register the webhook for motion events.""" @@ -791,6 +591,7 @@ class WebHookManager: LOGGER.debug( "%s: Unregistering webhook %s", self._name, self._webhook_unique_id ) + webhook.async_unregister(self._hass, self._old_webhook_unique_id) webhook.async_unregister(self._hass, self._webhook_unique_id) self._webhook_url = None @@ -842,23 +643,13 @@ class WebHookManager: await event_manager.async_parse_messages(result.NotificationMessage) event_manager.async_callback_listeners() - @callback - def _async_cancel_webhook_renew(self) -> None: - """Cancel the webhook renew task.""" - if self._cancel_webhook_renew: - self._cancel_webhook_renew() - self._cancel_webhook_renew = None - async def _async_unsubscribe_webhook(self) -> None: """Unsubscribe from the webhook.""" - if ( - not self._webhook_subscription - or self._webhook_subscription.transport.client.is_closed - ): + if not self._notification_manager or self._notification_manager.closed: return LOGGER.debug("%s: Unsubscribing from webhook", self._name) try: - await self._webhook_subscription.Unsubscribe() + await self._notification_manager.shutdown() except UNSUBSCRIBE_ERRORS as err: LOGGER.debug( ( @@ -868,4 +659,4 @@ class WebHookManager: self._name, stringify_onvif_error(err), ) - self._webhook_subscription = None + self._notification_manager = None diff --git a/homeassistant/components/onvif/manifest.json b/homeassistant/components/onvif/manifest.json index f29fd562104..a749e59be48 100644 --- a/homeassistant/components/onvif/manifest.json +++ b/homeassistant/components/onvif/manifest.json @@ -8,5 +8,5 @@ "documentation": "https://www.home-assistant.io/integrations/onvif", "iot_class": "local_push", "loggers": ["onvif", "wsdiscovery", "zeep"], - "requirements": ["onvif-zeep-async==2.1.1", "WSDiscovery==2.0.0"] + "requirements": ["onvif-zeep-async==3.1.7", "WSDiscovery==2.0.0"] } diff --git a/requirements_all.txt b/requirements_all.txt index 2648aec0b9a..4edc2275fc5 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1264,7 +1264,7 @@ ondilo==0.2.0 onkyo-eiscp==1.2.7 # homeassistant.components.onvif -onvif-zeep-async==2.1.1 +onvif-zeep-async==3.1.7 # homeassistant.components.opengarage open-garage==0.2.0 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 5c30dc4da57..f7bfe75aaaf 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -954,7 +954,7 @@ omnilogic==0.4.5 ondilo==0.2.0 # homeassistant.components.onvif -onvif-zeep-async==2.1.1 +onvif-zeep-async==3.1.7 # homeassistant.components.opengarage open-garage==0.2.0