From 7656ca8313a7b423ad49fdbc677d66e6edf90582 Mon Sep 17 00:00:00 2001 From: Guido Schmitz Date: Thu, 30 Jun 2022 21:12:12 +0200 Subject: [PATCH] Add presence detection to devolo_home_network (#72030) --- .../components/devolo_home_network/const.py | 13 +- .../devolo_home_network/device_tracker.py | 159 ++++++++++++++++++ .../devolo_home_network/__init__.py | 4 +- tests/components/devolo_home_network/const.py | 4 + .../test_device_tracker.py | 107 ++++++++++++ 5 files changed, 285 insertions(+), 2 deletions(-) create mode 100644 homeassistant/components/devolo_home_network/device_tracker.py create mode 100644 tests/components/devolo_home_network/test_device_tracker.py diff --git a/homeassistant/components/devolo_home_network/const.py b/homeassistant/components/devolo_home_network/const.py index 2dfdd3c1d9a..1a49beb5d02 100644 --- a/homeassistant/components/devolo_home_network/const.py +++ b/homeassistant/components/devolo_home_network/const.py @@ -5,8 +5,9 @@ from datetime import timedelta from homeassistant.const import Platform DOMAIN = "devolo_home_network" -PLATFORMS = [Platform.BINARY_SENSOR, Platform.SENSOR] +PLATFORMS = [Platform.BINARY_SENSOR, Platform.DEVICE_TRACKER, Platform.SENSOR] +MAC_ADDRESS = "mac_address" PRODUCT = "product" SERIAL_NUMBER = "serial_number" TITLE = "title" @@ -15,6 +16,16 @@ LONG_UPDATE_INTERVAL = timedelta(minutes=5) SHORT_UPDATE_INTERVAL = timedelta(seconds=15) CONNECTED_PLC_DEVICES = "connected_plc_devices" +CONNECTED_STATIONS = "connected_stations" CONNECTED_TO_ROUTER = "connected_to_router" CONNECTED_WIFI_CLIENTS = "connected_wifi_clients" NEIGHBORING_WIFI_NETWORKS = "neighboring_wifi_networks" + +WIFI_APTYPE = { + "WIFI_VAP_MAIN_AP": "Main", + "WIFI_VAP_GUEST_AP": "Guest", +} +WIFI_BANDS = { + "WIFI_BAND_2G": 2.4, + "WIFI_BAND_5G": 5, +} diff --git a/homeassistant/components/devolo_home_network/device_tracker.py b/homeassistant/components/devolo_home_network/device_tracker.py new file mode 100644 index 00000000000..9dffeef7db9 --- /dev/null +++ b/homeassistant/components/devolo_home_network/device_tracker.py @@ -0,0 +1,159 @@ +"""Platform for device tracker integration.""" +from __future__ import annotations + +from typing import Any + +from devolo_plc_api.device import Device + +from homeassistant.components.device_tracker import ( + DOMAIN as DEVICE_TRACKER_DOMAIN, + SOURCE_TYPE_ROUTER, +) +from homeassistant.components.device_tracker.config_entry import ScannerEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import FREQUENCY_GIGAHERTZ, STATE_UNKNOWN +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers import entity_registry +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import ( + CoordinatorEntity, + DataUpdateCoordinator, +) + +from .const import ( + CONNECTED_STATIONS, + CONNECTED_WIFI_CLIENTS, + DOMAIN, + MAC_ADDRESS, + WIFI_APTYPE, + WIFI_BANDS, +) + + +async def async_setup_entry( + hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback +) -> None: + """Get all devices and sensors and setup them via config entry.""" + device: Device = hass.data[DOMAIN][entry.entry_id]["device"] + coordinators: dict[str, DataUpdateCoordinator] = hass.data[DOMAIN][entry.entry_id][ + "coordinators" + ] + registry = entity_registry.async_get(hass) + tracked = set() + + @callback + def new_device_callback() -> None: + """Add new devices if needed.""" + new_entities = [] + for station in coordinators[CONNECTED_WIFI_CLIENTS].data[CONNECTED_STATIONS]: + if station[MAC_ADDRESS] in tracked: + continue + + new_entities.append( + DevoloScannerEntity( + coordinators[CONNECTED_WIFI_CLIENTS], device, station[MAC_ADDRESS] + ) + ) + tracked.add(station[MAC_ADDRESS]) + if new_entities: + async_add_entities(new_entities) + + @callback + def restore_entities() -> None: + """Restore clients that are not a part of active clients list.""" + missing = [] + for entity in entity_registry.async_entries_for_config_entry( + registry, entry.entry_id + ): + if ( + entity.platform == DOMAIN + and entity.domain == DEVICE_TRACKER_DOMAIN + and ( + mac_address := entity.unique_id.replace( + f"{device.serial_number}_", "" + ) + ) + not in tracked + ): + missing.append( + DevoloScannerEntity( + coordinators[CONNECTED_WIFI_CLIENTS], device, mac_address + ) + ) + tracked.add(mac_address) + + if missing: + async_add_entities(missing) + + if device.device and "wifi1" in device.device.features: + restore_entities() + entry.async_on_unload( + coordinators[CONNECTED_WIFI_CLIENTS].async_add_listener(new_device_callback) + ) + + +class DevoloScannerEntity(CoordinatorEntity, ScannerEntity): + """Representation of a devolo device tracker.""" + + def __init__( + self, coordinator: DataUpdateCoordinator, device: Device, mac: str + ) -> None: + """Initialize entity.""" + super().__init__(coordinator) + self._device = device + self._mac = mac + + @property + def extra_state_attributes(self) -> dict[str, str]: + """Return the attributes.""" + attrs: dict[str, str] = {} + if not self.coordinator.data[CONNECTED_STATIONS]: + return {} + + station: dict[str, Any] = next( + ( + station + for station in self.coordinator.data[CONNECTED_STATIONS] + if station[MAC_ADDRESS] == self.mac_address + ), + {}, + ) + if station: + attrs["wifi"] = WIFI_APTYPE.get(station["vap_type"], STATE_UNKNOWN) + attrs["band"] = ( + f"{WIFI_BANDS.get(station['band'])} {FREQUENCY_GIGAHERTZ}" + if WIFI_BANDS.get(station["band"]) + else STATE_UNKNOWN + ) + return attrs + + @property + def icon(self) -> str: + """Return device icon.""" + if self.is_connected: + return "mdi:lan-connect" + return "mdi:lan-disconnect" + + @property + def is_connected(self) -> bool: + """Return true if the device is connected to the network.""" + return any( + station + for station in self.coordinator.data[CONNECTED_STATIONS] + if station[MAC_ADDRESS] == self.mac_address + ) + + @property + def mac_address(self) -> str: + """Return mac_address.""" + return self._mac + + @property + def source_type(self) -> str: + """Return tracker source type.""" + return SOURCE_TYPE_ROUTER + + @property + def unique_id(self) -> str: + """Return unique ID of the entity.""" + return f"{self._device.serial_number}_{self._mac}" diff --git a/tests/components/devolo_home_network/__init__.py b/tests/components/devolo_home_network/__init__.py index 1c10d7a59ef..bb861081517 100644 --- a/tests/components/devolo_home_network/__init__.py +++ b/tests/components/devolo_home_network/__init__.py @@ -28,6 +28,8 @@ def configure_integration(hass: HomeAssistant) -> MockConfigEntry: async def async_connect(self, session_instance: Any = None): """Give a mocked device the needed properties.""" - self.mac = DISCOVERY_INFO.properties["PlcMacAddress"] self.plcnet = PlcNetApi(IP, None, dataclasses.asdict(DISCOVERY_INFO)) self.device = DeviceApi(IP, None, dataclasses.asdict(DISCOVERY_INFO)) + self.mac = DISCOVERY_INFO.properties["PlcMacAddress"] + self.product = DISCOVERY_INFO.properties["Product"] + self.serial_number = DISCOVERY_INFO.properties["SN"] diff --git a/tests/components/devolo_home_network/const.py b/tests/components/devolo_home_network/const.py index 516a19f3421..aec27ce6a8b 100644 --- a/tests/components/devolo_home_network/const.py +++ b/tests/components/devolo_home_network/const.py @@ -16,6 +16,10 @@ CONNECTED_STATIONS = { ], } +NO_CONNECTED_STATIONS = { + "connected_stations": [], +} + DISCOVERY_INFO = zeroconf.ZeroconfServiceInfo( host=IP, addresses=[IP], diff --git a/tests/components/devolo_home_network/test_device_tracker.py b/tests/components/devolo_home_network/test_device_tracker.py new file mode 100644 index 00000000000..233a480b5e3 --- /dev/null +++ b/tests/components/devolo_home_network/test_device_tracker.py @@ -0,0 +1,107 @@ +"""Tests for the devolo Home Network device tracker.""" +from unittest.mock import AsyncMock, patch + +from devolo_plc_api.exceptions.device import DeviceUnavailable +import pytest + +from homeassistant.components.device_tracker import DOMAIN as PLATFORM +from homeassistant.components.devolo_home_network.const import ( + DOMAIN, + LONG_UPDATE_INTERVAL, + WIFI_APTYPE, + WIFI_BANDS, +) +from homeassistant.const import ( + FREQUENCY_GIGAHERTZ, + STATE_HOME, + STATE_NOT_HOME, + STATE_UNAVAILABLE, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers import entity_registry +from homeassistant.util import dt + +from . import configure_integration +from .const import CONNECTED_STATIONS, DISCOVERY_INFO, NO_CONNECTED_STATIONS + +from tests.common import async_fire_time_changed + +STATION = CONNECTED_STATIONS["connected_stations"][0] +SERIAL = DISCOVERY_INFO.properties["SN"] + + +@pytest.mark.usefixtures("mock_device") +async def test_device_tracker(hass: HomeAssistant): + """Test device tracker states.""" + state_key = f"{PLATFORM}.{DOMAIN}_{SERIAL}_{STATION['mac_address'].lower().replace(':', '_')}" + entry = configure_integration(hass) + er = entity_registry.async_get(hass) + await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt.utcnow() + LONG_UPDATE_INTERVAL) + await hass.async_block_till_done() + + # Enable entity + er.async_update_entity(state_key, disabled_by=None) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt.utcnow() + LONG_UPDATE_INTERVAL) + await hass.async_block_till_done() + + state = hass.states.get(state_key) + assert state is not None + assert state.state == STATE_HOME + assert state.attributes["wifi"] == WIFI_APTYPE[STATION["vap_type"]] + assert ( + state.attributes["band"] + == f"{WIFI_BANDS[STATION['band']]} {FREQUENCY_GIGAHERTZ}" + ) + + # Emulate state change + with patch( + "devolo_plc_api.device_api.deviceapi.DeviceApi.async_get_wifi_connected_station", + new=AsyncMock(return_value=NO_CONNECTED_STATIONS), + ): + async_fire_time_changed(hass, dt.utcnow() + LONG_UPDATE_INTERVAL) + await hass.async_block_till_done() + + state = hass.states.get(state_key) + assert state is not None + assert state.state == STATE_NOT_HOME + + # Emulate device failure + with patch( + "devolo_plc_api.device_api.deviceapi.DeviceApi.async_get_wifi_connected_station", + side_effect=DeviceUnavailable, + ): + async_fire_time_changed(hass, dt.utcnow() + LONG_UPDATE_INTERVAL) + await hass.async_block_till_done() + + state = hass.states.get(state_key) + assert state is not None + assert state.state == STATE_UNAVAILABLE + + await hass.config_entries.async_unload(entry.entry_id) + + +@pytest.mark.usefixtures("mock_device") +async def test_restoring_clients(hass: HomeAssistant): + """Test restoring existing device_tracker entities.""" + state_key = f"{PLATFORM}.{DOMAIN}_{SERIAL}_{STATION['mac_address'].lower().replace(':', '_')}" + entry = configure_integration(hass) + er = entity_registry.async_get(hass) + er.async_get_or_create( + PLATFORM, + DOMAIN, + f"{SERIAL}_{STATION['mac_address']}", + config_entry=entry, + ) + + with patch( + "devolo_plc_api.device_api.deviceapi.DeviceApi.async_get_wifi_connected_station", + new=AsyncMock(return_value=NO_CONNECTED_STATIONS), + ): + await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done() + state = hass.states.get(state_key) + assert state is not None + assert state.state == STATE_NOT_HOME