Reolink ONVIF long polling (#94770)

This commit is contained in:
starkillerOG 2023-06-26 21:54:40 +02:00 committed by GitHub
parent f9707cc87b
commit 0bec93fa37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 105 additions and 20 deletions

View File

@ -25,9 +25,11 @@ from .const import CONF_PROTOCOL, CONF_USE_HTTPS, DOMAIN
from .exceptions import ReolinkSetupException, ReolinkWebhookException, UserNotAdmin
DEFAULT_TIMEOUT = 60
FIRST_ONVIF_TIMEOUT = 15
FIRST_ONVIF_TIMEOUT = 10
SUBSCRIPTION_RENEW_THRESHOLD = 300
POLL_INTERVAL_NO_PUSH = 5
LONG_POLL_COOLDOWN = 0.75
LONG_POLL_ERROR_COOLDOWN = 30
_LOGGER = logging.getLogger(__name__)
@ -60,10 +62,14 @@ class ReolinkHost:
self.webhook_id: str | None = None
self._base_url: str = ""
self._webhook_url: str = ""
self._webhook_reachable: asyncio.Event = asyncio.Event()
self._webhook_reachable: bool = False
self._long_poll_received: bool = False
self._long_poll_error: bool = False
self._cancel_poll: CALLBACK_TYPE | None = None
self._cancel_onvif_check: CALLBACK_TYPE | None = None
self._cancel_long_poll_check: CALLBACK_TYPE | None = None
self._poll_job = HassJob(self._async_poll_all_motion, cancel_on_shutdown=True)
self._long_poll_task: asyncio.Task | None = None
self._lost_subscription: bool = False
@property
@ -185,15 +191,32 @@ class ReolinkHost:
async def _async_check_onvif(self, *_) -> None:
"""Check the ONVIF subscription."""
if (
self._api.supported(None, "initial_ONVIF_state")
and not self._webhook_reachable.is_set()
):
if self._webhook_reachable:
ir.async_delete_issue(self._hass, DOMAIN, "webhook_url")
self._cancel_onvif_check = None
return
if self._api.supported(None, "initial_ONVIF_state"):
_LOGGER.debug(
"Did not receive initial ONVIF state on webhook '%s' after %i seconds",
self._webhook_url,
FIRST_ONVIF_TIMEOUT,
)
# ONVIF push is not received, start long polling and schedule check
await self._async_start_long_polling()
self._cancel_long_poll_check = async_call_later(
self._hass, FIRST_ONVIF_TIMEOUT, self._async_check_onvif_long_poll
)
self._cancel_onvif_check = None
async def _async_check_onvif_long_poll(self, *_) -> None:
"""Check if ONVIF long polling is working."""
if not self._long_poll_received:
_LOGGER.debug(
"Did not receive state through ONVIF long polling after %i seconds",
FIRST_ONVIF_TIMEOUT,
)
ir.async_create_issue(
self._hass,
DOMAIN,
@ -210,10 +233,10 @@ class ReolinkHost:
else:
ir.async_delete_issue(self._hass, DOMAIN, "webhook_url")
# If no ONVIF push is received, start fast polling
# If no ONVIF push or long polling state is received, start fast polling
await self._async_poll_all_motion()
self._cancel_onvif_check = None
self._cancel_long_poll_check = None
async def update_states(self) -> None:
"""Call the API of the camera device to update the internal states."""
@ -241,6 +264,20 @@ class ReolinkHost:
str(err),
)
async def _async_start_long_polling(self):
"""Start ONVIF long polling task."""
if self._long_poll_task is None:
await self._api.subscribe(sub_type=SubType.long_poll)
self._long_poll_task = asyncio.create_task(self._async_long_polling())
async def _async_stop_long_polling(self):
"""Stop ONVIF long polling task."""
if self._long_poll_task is not None:
self._long_poll_task.cancel()
self._long_poll_task = None
await self._api.unsubscribe(sub_type=SubType.long_poll)
async def stop(self, event=None):
"""Disconnect the API."""
if self._cancel_poll is not None:
@ -249,6 +286,10 @@ class ReolinkHost:
if self._cancel_onvif_check is not None:
self._cancel_onvif_check()
self._cancel_onvif_check = None
if self._cancel_long_poll_check is not None:
self._cancel_long_poll_check()
self._cancel_long_poll_check = None
await self._async_stop_long_polling()
self.unregister_webhook()
await self.disconnect()
@ -277,6 +318,8 @@ class ReolinkHost:
"""Renew the subscription of motion events (lease time is 15 minutes)."""
try:
await self._renew(SubType.push)
if self._long_poll_task is not None:
await self._renew(SubType.long_poll)
except SubscriptionError as err:
if not self._lost_subscription:
self._lost_subscription = True
@ -297,7 +340,10 @@ class ReolinkHost:
self._api.host,
sub_type,
)
await self.subscribe()
if sub_type == SubType.push:
await self.subscribe()
else:
await self._api.subscribe(self._webhook_url, sub_type)
return
timer = self._api.renewtimer(sub_type)
@ -386,10 +432,44 @@ class ReolinkHost:
webhook.async_unregister(self._hass, self.webhook_id)
self.webhook_id = None
async def _async_long_polling(self, *_) -> None:
"""Use ONVIF long polling to immediately receive events."""
# This task will be cancelled once _async_stop_long_polling is called
while True:
if self._webhook_reachable:
self._long_poll_task = None
await self._async_stop_long_polling()
return
try:
channels = await self._api.pull_point_request()
except ReolinkError as ex:
if not self._long_poll_error:
_LOGGER.error("Error while requesting ONVIF pull point: %s", ex)
await self._api.unsubscribe(sub_type=SubType.long_poll)
self._long_poll_error = True
await asyncio.sleep(LONG_POLL_ERROR_COOLDOWN)
continue
except Exception as ex:
_LOGGER.exception("Error while requesting ONVIF pull point: %s", ex)
await self._api.unsubscribe(sub_type=SubType.long_poll)
raise ex
self._long_poll_error = False
if not self._long_poll_received and channels != []:
self._long_poll_received = True
ir.async_delete_issue(self._hass, DOMAIN, "webhook_url")
self._signal_write_ha_state(channels)
# Cooldown to prevent CPU over usage on camera freezes
await asyncio.sleep(LONG_POLL_COOLDOWN)
async def _async_poll_all_motion(self, *_) -> None:
"""Poll motion and AI states until the first ONVIF push is received."""
if self._webhook_reachable.is_set():
# ONVIF push is working, stop polling
if self._webhook_reachable or self._long_poll_received:
# ONVIF push or long polling is working, stop fast polling
self._cancel_poll = None
return
@ -409,10 +489,7 @@ class ReolinkHost:
self._hass, POLL_INTERVAL_NO_PUSH, self._poll_job
)
# After receiving the new motion states in the upstream lib,
# update the binary sensors with async_write_ha_state
# The same dispatch as for the webhook can be used
async_dispatcher_send(self._hass, f"{self.webhook_id}_all", {})
self._signal_write_ha_state(None)
async def handle_webhook(
self, hass: HomeAssistant, webhook_id: str, request: Request
@ -460,8 +537,8 @@ class ReolinkHost:
"""Process the data from the Reolink webhook."""
# This task is executed in the background so we need to catch exceptions
# and log them
if not self._webhook_reachable.is_set():
self._webhook_reachable.set()
if not self._webhook_reachable:
self._webhook_reachable = True
ir.async_delete_issue(self._hass, DOMAIN, "webhook_url")
try:
@ -484,9 +561,13 @@ class ReolinkHost:
)
return
self._signal_write_ha_state(channels)
def _signal_write_ha_state(self, channels: list[int] | None) -> None:
"""Update the binary sensors with async_write_ha_state."""
if channels is None:
async_dispatcher_send(hass, f"{webhook_id}_all", {})
async_dispatcher_send(self._hass, f"{self.webhook_id}_all", {})
return
for channel in channels:
async_dispatcher_send(hass, f"{webhook_id}_{channel}", {})
async_dispatcher_send(self._hass, f"{self.webhook_id}_{channel}", {})

View File

@ -148,7 +148,11 @@ async def test_webhook_repair_issue(
hass: HomeAssistant, config_entry: MockConfigEntry
) -> None:
"""Test repairs issue is raised when the webhook url is unreachable."""
with patch("homeassistant.components.reolink.host.FIRST_ONVIF_TIMEOUT", new=0):
with patch(
"homeassistant.components.reolink.host.FIRST_ONVIF_TIMEOUT", new=0
), patch(
"homeassistant.components.reolink.host.ReolinkHost._async_long_polling",
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()