From 8bf3c87336f9d04602c0bff3feb5021eedae2ff0 Mon Sep 17 00:00:00 2001 From: Robert Svensson Date: Tue, 23 Apr 2024 21:45:20 +0200 Subject: [PATCH] Breakout heartbeat monitor and poe command queue in UniFi (#112529) * Split out entity helper functionality to own class * Split out heartbeat to own class * Break out poe command * Make more parts private * Make more things private and simplify naming * Sort initialize * Fix ruff --- .../components/unifi/device_tracker.py | 12 +- .../components/unifi/hub/entity_helper.py | 156 ++++++++++++++++++ homeassistant/components/unifi/hub/hub.py | 110 ++++-------- homeassistant/components/unifi/sensor.py | 4 +- homeassistant/components/unifi/switch.py | 2 +- 5 files changed, 193 insertions(+), 91 deletions(-) create mode 100644 homeassistant/components/unifi/hub/entity_helper.py diff --git a/homeassistant/components/unifi/device_tracker.py b/homeassistant/components/unifi/device_tracker.py index a41d1942536..dc48b9c31fe 100644 --- a/homeassistant/components/unifi/device_tracker.py +++ b/homeassistant/components/unifi/device_tracker.py @@ -240,7 +240,7 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity): self._ignore_events = False self._is_connected = description.is_connected_fn(self.hub, self._obj_id) if self.is_connected: - self.hub.async_heartbeat( + self.hub.update_heartbeat( self.unique_id, dt_util.utcnow() + description.heartbeat_timedelta_fn(self.hub, self._obj_id), @@ -301,12 +301,12 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity): # From unifi.entity.async_signal_reachable_callback # Controller connection state has changed and entity is unavailable # Cancel heartbeat - self.hub.async_heartbeat(self.unique_id) + self.hub.remove_heartbeat(self.unique_id) return if is_connected := description.is_connected_fn(self.hub, self._obj_id): self._is_connected = is_connected - self.hub.async_heartbeat( + self.hub.update_heartbeat( self.unique_id, dt_util.utcnow() + description.heartbeat_timedelta_fn(self.hub, self._obj_id), @@ -319,12 +319,12 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity): return if event.key in self._event_is_on: - self.hub.async_heartbeat(self.unique_id) + self.hub.remove_heartbeat(self.unique_id) self._is_connected = True self.async_write_ha_state() return - self.hub.async_heartbeat( + self.hub.update_heartbeat( self.unique_id, dt_util.utcnow() + self.entity_description.heartbeat_timedelta_fn(self.hub, self._obj_id), @@ -344,7 +344,7 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity): async def async_will_remove_from_hass(self) -> None: """Disconnect object when removed.""" await super().async_will_remove_from_hass() - self.hub.async_heartbeat(self.unique_id) + self.hub.remove_heartbeat(self.unique_id) @property def extra_state_attributes(self) -> Mapping[str, Any] | None: diff --git a/homeassistant/components/unifi/hub/entity_helper.py b/homeassistant/components/unifi/hub/entity_helper.py new file mode 100644 index 00000000000..c4bcf237386 --- /dev/null +++ b/homeassistant/components/unifi/hub/entity_helper.py @@ -0,0 +1,156 @@ +"""UniFi Network entity helper.""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +import aiounifi +from aiounifi.models.device import DeviceSetPoePortModeRequest + +from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback +from homeassistant.helpers.dispatcher import async_dispatcher_send +from homeassistant.helpers.event import async_call_later, async_track_time_interval +import homeassistant.util.dt as dt_util + + +class UnifiEntityHelper: + """UniFi Network integration handling platforms for entity registration.""" + + def __init__(self, hass: HomeAssistant, api: aiounifi.Controller) -> None: + """Initialize the UniFi entity loader.""" + self.hass = hass + self.api = api + + self._device_command = UnifiDeviceCommand(hass, api) + self._heartbeat = UnifiEntityHeartbeat(hass) + + @callback + def reset(self) -> None: + """Cancel timers.""" + self._device_command.reset() + self._heartbeat.reset() + + @callback + def initialize(self) -> None: + """Initialize entity helper.""" + self._heartbeat.initialize() + + @property + def signal_heartbeat(self) -> str: + """Event to signal new heartbeat missed.""" + return self._heartbeat.signal + + @callback + def update_heartbeat(self, unique_id: str, heartbeat_expire_time: datetime) -> None: + """Update device time in heartbeat monitor.""" + self._heartbeat.update(unique_id, heartbeat_expire_time) + + @callback + def remove_heartbeat(self, unique_id: str) -> None: + """Update device time in heartbeat monitor.""" + self._heartbeat.remove(unique_id) + + @callback + def queue_poe_port_command( + self, device_id: str, port_idx: int, poe_mode: str + ) -> None: + """Queue commands to execute them together per device.""" + self._device_command.queue_poe_command(device_id, port_idx, poe_mode) + + +class UnifiEntityHeartbeat: + """UniFi entity heartbeat monitor.""" + + CHECK_HEARTBEAT_INTERVAL = timedelta(seconds=1) + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize the heartbeat monitor.""" + self.hass = hass + + self._cancel_heartbeat_check: CALLBACK_TYPE | None = None + self._heartbeat_time: dict[str, datetime] = {} + + @callback + def reset(self) -> None: + """Cancel timers.""" + if self._cancel_heartbeat_check: + self._cancel_heartbeat_check() + self._cancel_heartbeat_check = None + + @callback + def initialize(self) -> None: + """Initialize heartbeat monitor.""" + self._cancel_heartbeat_check = async_track_time_interval( + self.hass, self._check_for_stale, self.CHECK_HEARTBEAT_INTERVAL + ) + + @property + def signal(self) -> str: + """Event to signal new heartbeat missed.""" + return "unifi-heartbeat-missed" + + @callback + def update(self, unique_id: str, heartbeat_expire_time: datetime) -> None: + """Update device time in heartbeat monitor.""" + self._heartbeat_time[unique_id] = heartbeat_expire_time + + @callback + def remove(self, unique_id: str) -> None: + """Remove device from heartbeat monitor.""" + self._heartbeat_time.pop(unique_id, None) + + @callback + def _check_for_stale(self, *_: datetime) -> None: + """Check for any devices scheduled to be marked disconnected.""" + now = dt_util.utcnow() + + unique_ids_to_remove = [] + for unique_id, heartbeat_expire_time in self._heartbeat_time.items(): + if now > heartbeat_expire_time: + async_dispatcher_send(self.hass, f"{self.signal}_{unique_id}") + unique_ids_to_remove.append(unique_id) + + for unique_id in unique_ids_to_remove: + del self._heartbeat_time[unique_id] + + +class UnifiDeviceCommand: + """UniFi Device command helper class.""" + + COMMAND_DELAY = 5 + + def __init__(self, hass: HomeAssistant, api: aiounifi.Controller) -> None: + """Initialize device command helper.""" + self.hass = hass + self.api = api + + self._command_queue: dict[str, dict[int, str]] = {} + self._cancel_command: CALLBACK_TYPE | None = None + + @callback + def reset(self) -> None: + """Cancel timers.""" + if self._cancel_command: + self._cancel_command() + self._cancel_command = None + + @callback + def queue_poe_command(self, device_id: str, port_idx: int, poe_mode: str) -> None: + """Queue commands to execute them together per device.""" + self.reset() + + device_queue = self._command_queue.setdefault(device_id, {}) + device_queue[port_idx] = poe_mode + + async def _command(now: datetime) -> None: + """Execute previously queued commands.""" + queue = self._command_queue.copy() + self._command_queue.clear() + for device_id, device_commands in queue.items(): + device = self.api.devices[device_id] + commands = list(device_commands.items()) + await self.api.request( + DeviceSetPoePortModeRequest.create(device, targets=commands) + ) + + self._cancel_command = async_call_later(self.hass, self.COMMAND_DELAY, _command) diff --git a/homeassistant/components/unifi/hub/hub.py b/homeassistant/components/unifi/hub/hub.py index df91584f267..f8c1f2517a2 100644 --- a/homeassistant/components/unifi/hub/hub.py +++ b/homeassistant/components/unifi/hub/hub.py @@ -2,13 +2,12 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime import aiounifi -from aiounifi.models.device import DeviceSetPoePortModeRequest from homeassistant.config_entries import ConfigEntry -from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback +from homeassistant.core import Event, HomeAssistant, callback from homeassistant.helpers import device_registry as dr from homeassistant.helpers.device_registry import ( DeviceEntry, @@ -16,16 +15,13 @@ from homeassistant.helpers.device_registry import ( DeviceInfo, ) from homeassistant.helpers.dispatcher import async_dispatcher_send -from homeassistant.helpers.event import async_call_later, async_track_time_interval -import homeassistant.util.dt as dt_util from ..const import ATTR_MANUFACTURER, CONF_SITE_ID, DOMAIN as UNIFI_DOMAIN, PLATFORMS from .config import UnifiConfig +from .entity_helper import UnifiEntityHelper from .entity_loader import UnifiEntityLoader from .websocket import UnifiWebsocket -CHECK_HEARTBEAT_INTERVAL = timedelta(seconds=1) - class UnifiHub: """Manages a single UniFi Network instance.""" @@ -38,17 +34,12 @@ class UnifiHub: self.api = api self.config = UnifiConfig.from_config_entry(config_entry) self.entity_loader = UnifiEntityLoader(self) + self._entity_helper = UnifiEntityHelper(hass, api) self.websocket = UnifiWebsocket(hass, api, self.signal_reachable) self.site = config_entry.data[CONF_SITE_ID] self.is_admin = False - self._cancel_heartbeat_check: CALLBACK_TYPE | None = None - self._heartbeat_time: dict[str, datetime] = {} - - self.poe_command_queue: dict[str, dict[int, str]] = {} - self._cancel_poe_command: CALLBACK_TYPE | None = None - @callback @staticmethod def get_hub(hass: HomeAssistant, config_entry: ConfigEntry) -> UnifiHub: @@ -61,6 +52,28 @@ class UnifiHub: """Websocket connection state.""" return self.websocket.available + @property + def signal_heartbeat_missed(self) -> str: + """Event to signal new heartbeat missed.""" + return self._entity_helper.signal_heartbeat + + @callback + def update_heartbeat(self, unique_id: str, heartbeat_expire_time: datetime) -> None: + """Update device time in heartbeat monitor.""" + self._entity_helper.update_heartbeat(unique_id, heartbeat_expire_time) + + @callback + def remove_heartbeat(self, unique_id: str) -> None: + """Update device time in heartbeat monitor.""" + self._entity_helper.remove_heartbeat(unique_id) + + @callback + def queue_poe_port_command( + self, device_id: str, port_idx: int, poe_mode: str + ) -> None: + """Queue commands to execute them together per device.""" + self._entity_helper.queue_poe_port_command(device_id, port_idx, poe_mode) + @property def signal_reachable(self) -> str: """Integration specific event to signal a change in connection status.""" @@ -71,77 +84,16 @@ class UnifiHub: """Event specific per UniFi entry to signal new options.""" return f"unifi-options-{self.config.entry.entry_id}" - @property - def signal_heartbeat_missed(self) -> str: - """Event specific per UniFi device tracker to signal new heartbeat missed.""" - return "unifi-heartbeat-missed" - async def initialize(self) -> None: """Set up a UniFi Network instance.""" await self.entity_loader.initialize() + self._entity_helper.initialize() assert self.config.entry.unique_id is not None self.is_admin = self.api.sites[self.config.entry.unique_id].role == "admin" self.config.entry.add_update_listener(self.async_config_entry_updated) - self._cancel_heartbeat_check = async_track_time_interval( - self.hass, self._async_check_for_stale, CHECK_HEARTBEAT_INTERVAL - ) - - @callback - def async_heartbeat( - self, unique_id: str, heartbeat_expire_time: datetime | None = None - ) -> None: - """Signal when a device has fresh home state.""" - if heartbeat_expire_time is not None: - self._heartbeat_time[unique_id] = heartbeat_expire_time - return - - if unique_id in self._heartbeat_time: - del self._heartbeat_time[unique_id] - - @callback - def _async_check_for_stale(self, *_: datetime) -> None: - """Check for any devices scheduled to be marked disconnected.""" - now = dt_util.utcnow() - - unique_ids_to_remove = [] - for unique_id, heartbeat_expire_time in self._heartbeat_time.items(): - if now > heartbeat_expire_time: - async_dispatcher_send( - self.hass, f"{self.signal_heartbeat_missed}_{unique_id}" - ) - unique_ids_to_remove.append(unique_id) - - for unique_id in unique_ids_to_remove: - del self._heartbeat_time[unique_id] - - @callback - def async_queue_poe_port_command( - self, device_id: str, port_idx: int, poe_mode: str - ) -> None: - """Queue commands to execute them together per device.""" - if self._cancel_poe_command: - self._cancel_poe_command() - self._cancel_poe_command = None - - device_queue = self.poe_command_queue.setdefault(device_id, {}) - device_queue[port_idx] = poe_mode - - async def async_execute_command(now: datetime) -> None: - """Execute previously queued commands.""" - queue = self.poe_command_queue.copy() - self.poe_command_queue.clear() - for device_id, device_commands in queue.items(): - device = self.api.devices[device_id] - commands = list(device_commands.items()) - await self.api.request( - DeviceSetPoePortModeRequest.create(device, targets=commands) - ) - - self._cancel_poe_command = async_call_later(self.hass, 5, async_execute_command) - @property def device_info(self) -> DeviceInfo: """UniFi Network device info.""" @@ -205,12 +157,6 @@ class UnifiHub: if not unload_ok: return False - if self._cancel_heartbeat_check: - self._cancel_heartbeat_check() - self._cancel_heartbeat_check = None - - if self._cancel_poe_command: - self._cancel_poe_command() - self._cancel_poe_command = None + self._entity_helper.reset() return True diff --git a/homeassistant/components/unifi/sensor.py b/homeassistant/components/unifi/sensor.py index cec87b36416..17b3cae93fd 100644 --- a/homeassistant/components/unifi/sensor.py +++ b/homeassistant/components/unifi/sensor.py @@ -460,7 +460,7 @@ class UnifiSensorEntity(UnifiEntity[HandlerT, ApiItemT], SensorEntity): if description.is_connected_fn is not None: # Send heartbeat if client is connected if description.is_connected_fn(self.hub, self._obj_id): - self.hub.async_heartbeat( + self.hub.update_heartbeat( self._attr_unique_id, dt_util.utcnow() + self.hub.config.option_detection_time, ) @@ -485,4 +485,4 @@ class UnifiSensorEntity(UnifiEntity[HandlerT, ApiItemT], SensorEntity): if self.entity_description.is_connected_fn is not None: # Remove heartbeat registration - self.hub.async_heartbeat(self._attr_unique_id) + self.hub.remove_heartbeat(self._attr_unique_id) diff --git a/homeassistant/components/unifi/switch.py b/homeassistant/components/unifi/switch.py index 6e073a655a5..45357dd67d2 100644 --- a/homeassistant/components/unifi/switch.py +++ b/homeassistant/components/unifi/switch.py @@ -147,7 +147,7 @@ async def async_poe_port_control_fn(hub: UnifiHub, obj_id: str, target: bool) -> port = hub.api.ports[obj_id] on_state = "auto" if port.raw["poe_caps"] != 8 else "passthrough" state = on_state if target else "off" - hub.async_queue_poe_port_command(mac, int(index), state) + hub.queue_poe_port_command(mac, int(index), state) async def async_port_forward_control_fn(