Unifi websocket manager (#111041)

* Move hub into .hub.hub

* Move websocket to own module

* Minor shuffle
This commit is contained in:
Robert Svensson 2024-02-24 22:20:59 +01:00 committed by GitHub
parent d796085923
commit efc89cd34f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 154 additions and 90 deletions

View File

@ -53,7 +53,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
if len(hass.data[UNIFI_DOMAIN]) == 1: if len(hass.data[UNIFI_DOMAIN]) == 1:
async_setup_services(hass) async_setup_services(hass)
hub.start_websocket() hub.websocket.start()
config_entry.async_on_unload( config_entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, hub.shutdown) hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, hub.shutdown)

View File

@ -25,13 +25,14 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
import homeassistant.helpers.entity_registry as er import homeassistant.helpers.entity_registry as er
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .const import DOMAIN as UNIFI_DOMAIN
from .entity import ( from .entity import (
HandlerT, HandlerT,
UnifiEntity, UnifiEntity,
UnifiEntityDescription, UnifiEntityDescription,
async_device_available_fn, async_device_available_fn,
) )
from .hub import UNIFI_DOMAIN, UnifiHub from .hub import UnifiHub
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)

View File

@ -0,0 +1,3 @@
"""Internal functionality not part of HA infrastructure."""
from .hub import UnifiHub, get_unifi_api # noqa: F401

View File

@ -9,7 +9,6 @@ import ssl
from types import MappingProxyType from types import MappingProxyType
from typing import Any, Literal from typing import Any, Literal
import aiohttp
from aiohttp import CookieJar from aiohttp import CookieJar
import aiounifi import aiounifi
from aiounifi.interfaces.api_handlers import ItemEvent from aiounifi.interfaces.api_handlers import ItemEvent
@ -45,7 +44,7 @@ from homeassistant.helpers.entity_registry import async_entries_for_config_entry
from homeassistant.helpers.event import async_call_later, async_track_time_interval from homeassistant.helpers.event import async_call_later, async_track_time_interval
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .const import ( from ..const import (
ATTR_MANUFACTURER, ATTR_MANUFACTURER,
CONF_ALLOW_BANDWIDTH_SENSORS, CONF_ALLOW_BANDWIDTH_SENSORS,
CONF_ALLOW_UPTIME_SENSORS, CONF_ALLOW_UPTIME_SENSORS,
@ -72,12 +71,11 @@ from .const import (
PLATFORMS, PLATFORMS,
UNIFI_WIRELESS_CLIENTS, UNIFI_WIRELESS_CLIENTS,
) )
from .entity import UnifiEntity, UnifiEntityDescription from ..entity import UnifiEntity, UnifiEntityDescription
from .errors import AuthenticationRequired, CannotConnect from ..errors import AuthenticationRequired, CannotConnect
from .websocket import UnifiWebsocket
RETRY_TIMER = 15
CHECK_HEARTBEAT_INTERVAL = timedelta(seconds=1) CHECK_HEARTBEAT_INTERVAL = timedelta(seconds=1)
CHECK_WEBSOCKET_INTERVAL = timedelta(minutes=1)
class UnifiHub: class UnifiHub:
@ -90,11 +88,8 @@ class UnifiHub:
self.hass = hass self.hass = hass
self.config_entry = config_entry self.config_entry = config_entry
self.api = api self.api = api
self.websocket = UnifiWebsocket(hass, api, self.signal_reachable)
self.ws_task: asyncio.Task | None = None
self._cancel_websocket_check: CALLBACK_TYPE | None = None
self.available = True
self.wireless_clients = hass.data[UNIFI_WIRELESS_CLIENTS] self.wireless_clients = hass.data[UNIFI_WIRELESS_CLIENTS]
self.site = config_entry.data[CONF_SITE_ID] self.site = config_entry.data[CONF_SITE_ID]
@ -169,6 +164,11 @@ class UnifiHub:
host: str = self.config_entry.data[CONF_HOST] host: str = self.config_entry.data[CONF_HOST]
return host return host
@property
def available(self) -> bool:
"""Websocket connection state."""
return self.websocket.available
@callback @callback
@staticmethod @staticmethod
def register_platform( def register_platform(
@ -292,9 +292,6 @@ class UnifiHub:
self._cancel_heartbeat_check = async_track_time_interval( self._cancel_heartbeat_check = async_track_time_interval(
self.hass, self._async_check_for_stale, CHECK_HEARTBEAT_INTERVAL self.hass, self._async_check_for_stale, CHECK_HEARTBEAT_INTERVAL
) )
self._cancel_websocket_check = async_track_time_interval(
self.hass, self._async_watch_websocket, CHECK_WEBSOCKET_INTERVAL
)
@callback @callback
def async_heartbeat( def async_heartbeat(
@ -389,64 +386,13 @@ class UnifiHub:
hub.load_config_entry_options() hub.load_config_entry_options()
async_dispatcher_send(hass, hub.signal_options_update) async_dispatcher_send(hass, hub.signal_options_update)
@callback
def start_websocket(self) -> None:
"""Start up connection to websocket."""
async def _websocket_runner() -> None:
"""Start websocket."""
try:
await self.api.start_websocket()
except (aiohttp.ClientConnectorError, aiounifi.WebsocketError):
LOGGER.error("Websocket disconnected")
self.available = False
async_dispatcher_send(self.hass, self.signal_reachable)
self.hass.loop.call_later(RETRY_TIMER, self.reconnect, True)
self.ws_task = self.hass.loop.create_task(_websocket_runner())
@callback
def reconnect(self, log: bool = False) -> None:
"""Prepare to reconnect UniFi session."""
if log:
LOGGER.info("Will try to reconnect to UniFi Network")
self.hass.loop.create_task(self.async_reconnect())
async def async_reconnect(self) -> None:
"""Try to reconnect UniFi Network session."""
try:
async with asyncio.timeout(5):
await self.api.login()
self.start_websocket()
if not self.available:
self.available = True
async_dispatcher_send(self.hass, self.signal_reachable)
except (
TimeoutError,
aiounifi.BadGateway,
aiounifi.ServiceUnavailable,
aiounifi.AiounifiException,
):
self.hass.loop.call_later(RETRY_TIMER, self.reconnect)
@callback
def _async_watch_websocket(self, now: datetime) -> None:
"""Watch timestamp for last received websocket message."""
LOGGER.debug(
"Last received websocket timestamp: %s",
self.api.connectivity.ws_message_received,
)
@callback @callback
def shutdown(self, event: Event) -> None: def shutdown(self, event: Event) -> None:
"""Wrap the call to unifi.close. """Wrap the call to unifi.close.
Used as an argument to EventBus.async_listen_once. Used as an argument to EventBus.async_listen_once.
""" """
if self.ws_task is not None: self.websocket.stop()
self.ws_task.cancel()
async def async_reset(self) -> bool: async def async_reset(self) -> bool:
"""Reset this hub to default state. """Reset this hub to default state.
@ -454,18 +400,7 @@ class UnifiHub:
Will cancel any scheduled setup retry and will unload Will cancel any scheduled setup retry and will unload
the config entry. the config entry.
""" """
if self.ws_task is not None: await self.websocket.stop_and_wait()
self.ws_task.cancel()
_, pending = await asyncio.wait([self.ws_task], timeout=10)
if pending:
LOGGER.warning(
"Unloading %s (%s) config entry. Task %s did not complete in time",
self.config_entry.title,
self.config_entry.domain,
self.ws_task,
)
unload_ok = await self.hass.config_entries.async_unload_platforms( unload_ok = await self.hass.config_entries.async_unload_platforms(
self.config_entry, PLATFORMS self.config_entry, PLATFORMS
@ -478,10 +413,6 @@ class UnifiHub:
self._cancel_heartbeat_check() self._cancel_heartbeat_check()
self._cancel_heartbeat_check = None self._cancel_heartbeat_check = None
if self._cancel_websocket_check:
self._cancel_websocket_check()
self._cancel_websocket_check = None
if self._cancel_poe_command: if self._cancel_poe_command:
self._cancel_poe_command() self._cancel_poe_command()
self._cancel_poe_command = None self._cancel_poe_command = None

View File

@ -0,0 +1,129 @@
"""Websocket handler for UniFi Network integration."""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
import aiohttp
import aiounifi
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from ..const import LOGGER
RETRY_TIMER = 15
CHECK_WEBSOCKET_INTERVAL = timedelta(minutes=1)
class UnifiWebsocket:
"""Manages a single UniFi Network instance."""
def __init__(
self, hass: HomeAssistant, api: aiounifi.Controller, signal: str
) -> None:
"""Initialize the system."""
self.hass = hass
self.api = api
self.signal = signal
self.ws_task: asyncio.Task | None = None
self._cancel_websocket_check: CALLBACK_TYPE | None = None
self.available = True
@callback
def start(self) -> None:
"""Start websocket handler."""
self._cancel_websocket_check = async_track_time_interval(
self.hass, self._async_watch_websocket, CHECK_WEBSOCKET_INTERVAL
)
self.start_websocket()
@callback
def stop(self) -> None:
"""Stop websocket handler."""
if self._cancel_websocket_check:
self._cancel_websocket_check()
self._cancel_websocket_check = None
if self.ws_task is not None:
self.ws_task.cancel()
async def stop_and_wait(self) -> None:
"""Stop websocket handler and await tasks."""
if self._cancel_websocket_check:
self._cancel_websocket_check()
self._cancel_websocket_check = None
if self.ws_task is not None:
self.stop()
_, pending = await asyncio.wait([self.ws_task], timeout=10)
if pending:
LOGGER.warning(
"Unloading UniFi Network (%s). Task %s did not complete in time",
self.api.connectivity.config.host,
self.ws_task,
)
@callback
def start_websocket(self) -> None:
"""Start up connection to websocket."""
async def _websocket_runner() -> None:
"""Start websocket."""
try:
await self.api.start_websocket()
except (aiohttp.ClientConnectorError, aiohttp.WSServerHandshakeError):
LOGGER.error("Websocket setup failed")
except aiounifi.WebsocketError:
LOGGER.error("Websocket disconnected")
self.available = False
async_dispatcher_send(self.hass, self.signal)
self.hass.loop.call_later(RETRY_TIMER, self.reconnect, True)
if not self.available:
self.available = True
async_dispatcher_send(self.hass, self.signal)
self.ws_task = self.hass.loop.create_task(_websocket_runner())
@callback
def reconnect(self, log: bool = False) -> None:
"""Prepare to reconnect UniFi session."""
async def _reconnect() -> None:
"""Try to reconnect UniFi Network session."""
try:
async with asyncio.timeout(5):
await self.api.login()
except (
TimeoutError,
aiounifi.BadGateway,
aiounifi.ServiceUnavailable,
aiounifi.AiounifiException,
) as exc:
LOGGER.debug("Schedule reconnect to UniFi Network '%s'", exc)
self.hass.loop.call_later(RETRY_TIMER, self.reconnect)
else:
self.start_websocket()
if log:
LOGGER.info("Will try to reconnect to UniFi Network")
self.hass.loop.create_task(_reconnect())
@callback
def _async_watch_websocket(self, now: datetime) -> None:
"""Watch timestamp for last received websocket message."""
LOGGER.debug(
"Last received websocket timestamp: %s",
self.api.connectivity.ws_message_received,
)

View File

@ -44,7 +44,7 @@ from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
import homeassistant.helpers.entity_registry as er import homeassistant.helpers.entity_registry as er
from .const import ATTR_MANUFACTURER from .const import ATTR_MANUFACTURER, DOMAIN as UNIFI_DOMAIN
from .entity import ( from .entity import (
HandlerT, HandlerT,
SubscriptionT, SubscriptionT,
@ -55,7 +55,7 @@ from .entity import (
async_device_device_info_fn, async_device_device_info_fn,
async_wlan_device_info_fn, async_wlan_device_info_fn,
) )
from .hub import UNIFI_DOMAIN, UnifiHub from .hub import UnifiHub
CLIENT_BLOCKED = (EventKey.WIRED_CLIENT_BLOCKED, EventKey.WIRELESS_CLIENT_BLOCKED) CLIENT_BLOCKED = (EventKey.WIRED_CLIENT_BLOCKED, EventKey.WIRELESS_CLIENT_BLOCKED)
CLIENT_UNBLOCKED = (EventKey.WIRED_CLIENT_UNBLOCKED, EventKey.WIRELESS_CLIENT_UNBLOCKED) CLIENT_UNBLOCKED = (EventKey.WIRED_CLIENT_UNBLOCKED, EventKey.WIRELESS_CLIENT_UNBLOCKED)

View File

@ -9,7 +9,7 @@ from aiounifi.models.message import MessageKey
import pytest import pytest
from homeassistant.components.unifi.const import DOMAIN as UNIFI_DOMAIN from homeassistant.components.unifi.const import DOMAIN as UNIFI_DOMAIN
from homeassistant.components.unifi.hub import RETRY_TIMER from homeassistant.components.unifi.hub.websocket import RETRY_TIMER
from homeassistant.const import CONTENT_TYPE_JSON from homeassistant.const import CONTENT_TYPE_JSON
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr from homeassistant.helpers import device_registry as dr

View File

@ -391,7 +391,7 @@ async def test_reauth_flow_update_configuration(
"""Verify reauth flow can update hub configuration.""" """Verify reauth flow can update hub configuration."""
config_entry = await setup_unifi_integration(hass, aioclient_mock) config_entry = await setup_unifi_integration(hass, aioclient_mock)
hub = hass.data[UNIFI_DOMAIN][config_entry.entry_id] hub = hass.data[UNIFI_DOMAIN][config_entry.entry_id]
hub.available = False hub.websocket.available = False
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
UNIFI_DOMAIN, UNIFI_DOMAIN,

View File

@ -434,7 +434,7 @@ async def test_reconnect_mechanism_exceptions(
await setup_unifi_integration(hass, aioclient_mock) await setup_unifi_integration(hass, aioclient_mock)
with patch("aiounifi.Controller.login", side_effect=exception), patch( with patch("aiounifi.Controller.login", side_effect=exception), patch(
"homeassistant.components.unifi.hub.UnifiHub.reconnect" "homeassistant.components.unifi.hub.hub.UnifiWebsocket.reconnect"
) as mock_reconnect: ) as mock_reconnect:
await websocket_mock.disconnect() await websocket_mock.disconnect()

View File

@ -144,7 +144,7 @@ async def test_reconnect_client_hub_unavailable(
hass, aioclient_mock, clients_response=clients hass, aioclient_mock, clients_response=clients
) )
hub = hass.data[UNIFI_DOMAIN][config_entry.entry_id] hub = hass.data[UNIFI_DOMAIN][config_entry.entry_id]
hub.available = False hub.websocket.available = False
aioclient_mock.clear_requests() aioclient_mock.clear_requests()
aioclient_mock.post( aioclient_mock.post(
@ -292,7 +292,7 @@ async def test_remove_clients_hub_unavailable(
hass, aioclient_mock, clients_all_response=clients hass, aioclient_mock, clients_all_response=clients
) )
hub = hass.data[UNIFI_DOMAIN][config_entry.entry_id] hub = hass.data[UNIFI_DOMAIN][config_entry.entry_id]
hub.available = False hub.websocket.available = False
aioclient_mock.clear_requests() aioclient_mock.clear_requests()