Move upnp derived sensors to library, be more robust about failing getting some data (#79955)

This commit is contained in:
Steven Looman 2022-10-26 21:34:44 +02:00 committed by GitHub
parent 00f72f8b2a
commit d50795af2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 274 additions and 344 deletions

View File

@ -1,28 +1,17 @@
"""Open ports in your router for Home Assistant and provide statistics.""" """UPnP/IGD integration."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
from typing import Any
from async_upnp_client.exceptions import UpnpCommunicationError, UpnpConnectionError from async_upnp_client.exceptions import UpnpConnectionError
from homeassistant.components import ssdp from homeassistant.components import ssdp
from homeassistant.components.binary_sensor import BinarySensorEntityDescription
from homeassistant.components.sensor import SensorEntityDescription
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform from homeassistant.const import Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, device_registry as dr from homeassistant.helpers import config_validation, device_registry
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from .const import ( from .const import (
CONFIG_ENTRY_HOST, CONFIG_ENTRY_HOST,
@ -36,14 +25,15 @@ from .const import (
IDENTIFIER_SERIAL_NUMBER, IDENTIFIER_SERIAL_NUMBER,
LOGGER, LOGGER,
) )
from .device import Device, async_create_device from .coordinator import UpnpDataUpdateCoordinator
from .device import async_create_device
NOTIFICATION_ID = "upnp_notification" NOTIFICATION_ID = "upnp_notification"
NOTIFICATION_TITLE = "UPnP/IGD Setup" NOTIFICATION_TITLE = "UPnP/IGD Setup"
PLATFORMS = [Platform.BINARY_SENSOR, Platform.SENSOR] PLATFORMS = [Platform.BINARY_SENSOR, Platform.SENSOR]
CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False) CONFIG_SCHEMA = config_validation.removed(DOMAIN, raise_if_present=False)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
@ -126,12 +116,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
if device.serial_number: if device.serial_number:
identifiers.add((IDENTIFIER_SERIAL_NUMBER, device.serial_number)) identifiers.add((IDENTIFIER_SERIAL_NUMBER, device.serial_number))
connections = {(dr.CONNECTION_UPNP, device.udn)} connections = {(device_registry.CONNECTION_UPNP, device.udn)}
if device_mac_address: if device_mac_address:
connections.add((dr.CONNECTION_NETWORK_MAC, device_mac_address)) connections.add((device_registry.CONNECTION_NETWORK_MAC, device_mac_address))
device_registry = dr.async_get(hass) dev_registry = device_registry.async_get(hass)
device_entry = device_registry.async_get_device( device_entry = dev_registry.async_get_device(
identifiers=identifiers, connections=connections identifiers=identifiers, connections=connections
) )
if device_entry: if device_entry:
@ -142,7 +132,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
) )
if not device_entry: if not device_entry:
# No device found, create new device entry. # No device found, create new device entry.
device_entry = device_registry.async_get_or_create( device_entry = dev_registry.async_get_or_create(
config_entry_id=entry.entry_id, config_entry_id=entry.entry_id,
connections=connections, connections=connections,
identifiers=identifiers, identifiers=identifiers,
@ -155,7 +145,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
) )
else: else:
# Update identifier. # Update identifier.
device_entry = device_registry.async_update_device( device_entry = dev_registry.async_update_device(
device_entry.id, device_entry.id,
new_identifiers=identifiers, new_identifiers=identifiers,
) )
@ -191,96 +181,3 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
del hass.data[DOMAIN][entry.entry_id] del hass.data[DOMAIN][entry.entry_id]
return unload_ok return unload_ok
@dataclass
class UpnpBinarySensorEntityDescription(BinarySensorEntityDescription):
"""A class that describes UPnP entities."""
format: str = "s"
unique_id: str | None = None
@dataclass
class UpnpSensorEntityDescription(SensorEntityDescription):
"""A class that describes a sensor UPnP entities."""
format: str = "s"
unique_id: str | None = None
class UpnpDataUpdateCoordinator(DataUpdateCoordinator):
"""Define an object to update data from UPNP device."""
def __init__(
self,
hass: HomeAssistant,
device: Device,
device_entry: dr.DeviceEntry,
update_interval: timedelta,
) -> None:
"""Initialize."""
self.device = device
self.device_entry = device_entry
super().__init__(
hass,
LOGGER,
name=device.name,
update_interval=update_interval,
)
async def _async_update_data(self) -> Mapping[str, Any]:
"""Update data."""
try:
update_values = await asyncio.gather(
self.device.async_get_traffic_data(),
self.device.async_get_status(),
)
except UpnpCommunicationError as exception:
LOGGER.debug(
"Caught exception when updating device: %s, exception: %s",
self.device,
exception,
)
raise UpdateFailed(
f"Unable to communicate with IGD at: {self.device.device_url}"
) from exception
return {
**update_values[0],
**update_values[1],
}
class UpnpEntity(CoordinatorEntity[UpnpDataUpdateCoordinator]):
"""Base class for UPnP/IGD entities."""
entity_description: UpnpSensorEntityDescription | UpnpBinarySensorEntityDescription
def __init__(
self,
coordinator: UpnpDataUpdateCoordinator,
entity_description: UpnpSensorEntityDescription
| UpnpBinarySensorEntityDescription,
) -> None:
"""Initialize the base entities."""
super().__init__(coordinator)
self._device = coordinator.device
self.entity_description = entity_description
self._attr_name = f"{coordinator.device.name} {entity_description.name}"
self._attr_unique_id = f"{coordinator.device.original_udn}_{entity_description.unique_id or entity_description.key}"
self._attr_device_info = DeviceInfo(
connections=coordinator.device_entry.connections,
name=coordinator.device_entry.name,
manufacturer=coordinator.device_entry.manufacturer,
model=coordinator.device_entry.model,
configuration_url=coordinator.device_entry.configuration_url,
)
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and (
self.coordinator.data.get(self.entity_description.key) is not None
)

View File

@ -1,19 +1,31 @@
"""Support for UPnP/IGD Binary Sensors.""" """Support for UPnP/IGD Binary Sensors."""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from homeassistant.components.binary_sensor import ( from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass, BinarySensorDeviceClass,
BinarySensorEntity, BinarySensorEntity,
BinarySensorEntityDescription,
) )
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import UpnpBinarySensorEntityDescription, UpnpDataUpdateCoordinator, UpnpEntity from . import UpnpDataUpdateCoordinator
from .const import DOMAIN, LOGGER, WAN_STATUS from .const import DOMAIN, LOGGER, WAN_STATUS
from .entity import UpnpEntity, UpnpEntityDescription
BINARYSENSOR_ENTITY_DESCRIPTIONS: tuple[UpnpBinarySensorEntityDescription, ...] = (
@dataclass
class UpnpBinarySensorEntityDescription(
UpnpEntityDescription, BinarySensorEntityDescription
):
"""A class that describes binary sensor UPnP entities."""
SENSOR_DESCRIPTIONS: tuple[UpnpBinarySensorEntityDescription, ...] = (
UpnpBinarySensorEntityDescription( UpnpBinarySensorEntityDescription(
key=WAN_STATUS, key=WAN_STATUS,
name="wan status", name="wan status",
@ -29,14 +41,14 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback,
) -> None: ) -> None:
"""Set up the UPnP/IGD sensors.""" """Set up the UPnP/IGD sensors."""
coordinator = hass.data[DOMAIN][config_entry.entry_id] coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
entities = [ entities = [
UpnpStatusBinarySensor( UpnpStatusBinarySensor(
coordinator=coordinator, coordinator=coordinator,
entity_description=entity_description, entity_description=entity_description,
) )
for entity_description in BINARYSENSOR_ENTITY_DESCRIPTIONS for entity_description in SENSOR_DESCRIPTIONS
if coordinator.data.get(entity_description.key) is not None if coordinator.data.get(entity_description.key) is not None
] ]
LOGGER.debug("Adding binary_sensor entities: %s", entities) LOGGER.debug("Adding binary_sensor entities: %s", entities)

View File

@ -78,7 +78,6 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
# Paths: # Paths:
# - ssdp(discovery_info) --> ssdp_confirm(None) --> ssdp_confirm({}) --> create_entry() # - ssdp(discovery_info) --> ssdp_confirm(None) --> ssdp_confirm({}) --> create_entry()
# - user(None): scan --> user({...}) --> create_entry() # - user(None): scan --> user({...}) --> create_entry()
# - import(None) --> create_entry()
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize the UPnP/IGD config flow.""" """Initialize the UPnP/IGD config flow."""

View File

@ -11,13 +11,16 @@ BYTES_RECEIVED = "bytes_received"
BYTES_SENT = "bytes_sent" BYTES_SENT = "bytes_sent"
PACKETS_RECEIVED = "packets_received" PACKETS_RECEIVED = "packets_received"
PACKETS_SENT = "packets_sent" PACKETS_SENT = "packets_sent"
KIBIBYTES_PER_SEC_RECEIVED = "kibibytes_per_sec_received"
KIBIBYTES_PER_SEC_SENT = "kibibytes_per_sec_sent"
PACKETS_PER_SEC_RECEIVED = "packets_per_sec_received"
PACKETS_PER_SEC_SENT = "packets_per_sec_sent"
TIMESTAMP = "timestamp" TIMESTAMP = "timestamp"
DATA_PACKETS = "packets" DATA_PACKETS = "packets"
DATA_RATE_PACKETS_PER_SECOND = f"{DATA_PACKETS}/{TIME_SECONDS}" DATA_RATE_PACKETS_PER_SECOND = f"{DATA_PACKETS}/{TIME_SECONDS}"
WAN_STATUS = "wan_status" WAN_STATUS = "wan_status"
ROUTER_IP = "ip" ROUTER_IP = "ip"
ROUTER_UPTIME = "uptime" ROUTER_UPTIME = "uptime"
KIBIBYTE = 1024
CONFIG_ENTRY_ST = "st" CONFIG_ENTRY_ST = "st"
CONFIG_ENTRY_UDN = "udn" CONFIG_ENTRY_UDN = "udn"
CONFIG_ENTRY_ORIGINAL_UDN = "original_udn" CONFIG_ENTRY_ORIGINAL_UDN = "original_udn"

View File

@ -0,0 +1,50 @@
"""UPnP/IGD coordinator."""
from collections.abc import Mapping
from datetime import timedelta
from typing import Any
from async_upnp_client.exceptions import UpnpCommunicationError
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER
from .device import Device
class UpnpDataUpdateCoordinator(DataUpdateCoordinator):
"""Define an object to update data from UPNP device."""
def __init__(
self,
hass: HomeAssistant,
device: Device,
device_entry: DeviceEntry,
update_interval: timedelta,
) -> None:
"""Initialize."""
self.device = device
self.device_entry = device_entry
super().__init__(
hass,
LOGGER,
name=device.name,
update_interval=update_interval,
)
async def _async_update_data(self) -> Mapping[str, Any]:
"""Update data."""
try:
return await self.device.async_get_data()
except UpnpCommunicationError as exception:
LOGGER.debug(
"Caught exception when updating device: %s, exception: %s",
self.device,
exception,
)
raise UpdateFailed(
f"Unable to communicate with IGD at: {self.device.device_url}"
) from exception

View File

@ -1,7 +1,6 @@
"""Home Assistant representation of an UPnP/IGD.""" """Home Assistant representation of an UPnP/IGD."""
from __future__ import annotations from __future__ import annotations
import asyncio
from collections.abc import Mapping from collections.abc import Mapping
from functools import partial from functools import partial
from ipaddress import ip_address from ipaddress import ip_address
@ -10,19 +9,21 @@ from urllib.parse import urlparse
from async_upnp_client.aiohttp import AiohttpSessionRequester from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.client_factory import UpnpFactory from async_upnp_client.client_factory import UpnpFactory
from async_upnp_client.exceptions import UpnpError from async_upnp_client.profiles.igd import IgdDevice
from async_upnp_client.profiles.igd import IgdDevice, StatusInfo
from getmac import get_mac_address from getmac import get_mac_address
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util.dt import utcnow
from .const import ( from .const import (
BYTES_RECEIVED, BYTES_RECEIVED,
BYTES_SENT, BYTES_SENT,
KIBIBYTES_PER_SEC_RECEIVED,
KIBIBYTES_PER_SEC_SENT,
LOGGER as _LOGGER, LOGGER as _LOGGER,
PACKETS_PER_SEC_RECEIVED,
PACKETS_PER_SEC_SENT,
PACKETS_RECEIVED, PACKETS_RECEIVED,
PACKETS_SENT, PACKETS_SENT,
ROUTER_IP, ROUTER_IP,
@ -51,7 +52,7 @@ async def async_create_device(hass: HomeAssistant, ssdp_location: str) -> Device
session = async_get_clientsession(hass, verify_ssl=False) session = async_get_clientsession(hass, verify_ssl=False)
requester = AiohttpSessionRequester(session, with_sleep=True, timeout=20) requester = AiohttpSessionRequester(session, with_sleep=True, timeout=20)
factory = UpnpFactory(requester, disable_state_variable_validation=True) factory = UpnpFactory(requester, non_strict=True)
upnp_device = await factory.async_create_device(ssdp_location) upnp_device = await factory.async_create_device(ssdp_location)
# Create profile wrapper. # Create profile wrapper.
@ -134,69 +135,35 @@ class Device:
"""Get string representation.""" """Get string representation."""
return f"IGD Device: {self.name}/{self.udn}::{self.device_type}" return f"IGD Device: {self.name}/{self.udn}::{self.device_type}"
async def async_get_traffic_data(self) -> Mapping[str, Any]: async def async_get_data(self) -> Mapping[str, Any]:
""" """Get all data from device."""
Get all traffic data in one go. _LOGGER.debug("Getting data for device: %s", self)
igd_state = await self._igd_device.async_get_traffic_and_status_data()
status_info = igd_state.status_info
if status_info is not None and not isinstance(status_info, Exception):
wan_status = status_info.connection_status
router_uptime = status_info.uptime
else:
wan_status = None
router_uptime = None
Traffic data consists of: def get_value(value: Any) -> Any:
- total bytes sent if value is None or isinstance(value, Exception):
- total bytes received return None
- total packets sent
- total packats received
Data is timestamped. return value
"""
_LOGGER.debug("Getting traffic statistics from device: %s", self)
values = await asyncio.gather(
self._igd_device.async_get_total_bytes_received(),
self._igd_device.async_get_total_bytes_sent(),
self._igd_device.async_get_total_packets_received(),
self._igd_device.async_get_total_packets_sent(),
)
return { return {
TIMESTAMP: utcnow(), TIMESTAMP: igd_state.timestamp,
BYTES_RECEIVED: values[0], BYTES_RECEIVED: get_value(igd_state.bytes_received),
BYTES_SENT: values[1], BYTES_SENT: get_value(igd_state.bytes_sent),
PACKETS_RECEIVED: values[2], PACKETS_RECEIVED: get_value(igd_state.packets_received),
PACKETS_SENT: values[3], PACKETS_SENT: get_value(igd_state.packets_sent),
} WAN_STATUS: wan_status,
ROUTER_UPTIME: router_uptime,
async def async_get_status(self) -> Mapping[str, Any]: ROUTER_IP: get_value(igd_state.external_ip_address),
"""Get connection status, uptime, and external IP.""" KIBIBYTES_PER_SEC_RECEIVED: igd_state.kibibytes_per_sec_received,
_LOGGER.debug("Getting status for device: %s", self) KIBIBYTES_PER_SEC_SENT: igd_state.kibibytes_per_sec_sent,
PACKETS_PER_SEC_RECEIVED: igd_state.packets_per_sec_received,
values = await asyncio.gather( PACKETS_PER_SEC_SENT: igd_state.packets_per_sec_sent,
self._igd_device.async_get_status_info(),
self._igd_device.async_get_external_ip_address(),
return_exceptions=True,
)
status_info: StatusInfo | None = None
router_ip: str | None = None
for idx, value in enumerate(values):
if isinstance(value, UpnpError):
# Not all routers support some of these items although based
# on defined standard they should.
_LOGGER.debug(
"Exception occurred while trying to get status %s for device %s: %s",
"status" if idx == 1 else "external IP address",
self,
str(value),
)
continue
if isinstance(value, Exception):
raise value
if isinstance(value, StatusInfo):
status_info = value
elif isinstance(value, str):
router_ip = value
return {
WAN_STATUS: status_info[0] if status_info is not None else None,
ROUTER_UPTIME: status_info[2] if status_info is not None else None,
ROUTER_IP: router_ip,
} }

View File

@ -0,0 +1,54 @@
"""Entity for UPnP/IGD."""
from __future__ import annotations
from dataclasses import dataclass
from homeassistant.helpers.entity import DeviceInfo, EntityDescription
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .coordinator import UpnpDataUpdateCoordinator
@dataclass
class UpnpEntityDescription(EntityDescription):
"""UPnP entity description."""
format: str = "s"
unique_id: str | None = None
value_key: str | None = None
def __post_init__(self):
"""Post initialize."""
self.value_key = self.value_key or self.key
class UpnpEntity(CoordinatorEntity[UpnpDataUpdateCoordinator]):
"""Base class for UPnP/IGD entities."""
entity_description: UpnpEntityDescription
def __init__(
self,
coordinator: UpnpDataUpdateCoordinator,
entity_description: UpnpEntityDescription,
) -> None:
"""Initialize the base entities."""
super().__init__(coordinator)
self._device = coordinator.device
self.entity_description = entity_description
self._attr_name = f"{coordinator.device.name} {entity_description.name}"
self._attr_unique_id = f"{coordinator.device.original_udn}_{entity_description.unique_id or entity_description.key}"
self._attr_device_info = DeviceInfo(
connections=coordinator.device_entry.connections,
name=coordinator.device_entry.name,
manufacturer=coordinator.device_entry.manufacturer,
model=coordinator.device_entry.model,
configuration_url=coordinator.device_entry.configuration_url,
)
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and (
self.coordinator.data.get(self.entity_description.key) is not None
)

View File

@ -15,5 +15,6 @@
} }
], ],
"iot_class": "local_polling", "iot_class": "local_polling",
"loggers": ["async_upnp_client"] "loggers": ["async_upnp_client"],
"integration_type": "device"
} }

View File

@ -1,31 +1,46 @@
"""Support for UPnP/IGD Sensors.""" """Support for UPnP/IGD Sensors."""
from __future__ import annotations from __future__ import annotations
from homeassistant.components.sensor import SensorEntity from dataclasses import dataclass
from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import DATA_BYTES, DATA_RATE_KIBIBYTES_PER_SECOND, TIME_SECONDS from homeassistant.const import DATA_BYTES, DATA_RATE_KIBIBYTES_PER_SECOND, TIME_SECONDS
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import UpnpDataUpdateCoordinator, UpnpEntity, UpnpSensorEntityDescription
from .const import ( from .const import (
BYTES_RECEIVED, BYTES_RECEIVED,
BYTES_SENT, BYTES_SENT,
DATA_PACKETS, DATA_PACKETS,
DATA_RATE_PACKETS_PER_SECOND, DATA_RATE_PACKETS_PER_SECOND,
DOMAIN, DOMAIN,
KIBIBYTE, KIBIBYTES_PER_SEC_RECEIVED,
KIBIBYTES_PER_SEC_SENT,
LOGGER, LOGGER,
PACKETS_PER_SEC_RECEIVED,
PACKETS_PER_SEC_SENT,
PACKETS_RECEIVED, PACKETS_RECEIVED,
PACKETS_SENT, PACKETS_SENT,
ROUTER_IP, ROUTER_IP,
ROUTER_UPTIME, ROUTER_UPTIME,
TIMESTAMP,
WAN_STATUS, WAN_STATUS,
) )
from .coordinator import UpnpDataUpdateCoordinator
from .entity import UpnpEntity, UpnpEntityDescription
RAW_SENSORS: tuple[UpnpSensorEntityDescription, ...] = (
@dataclass
class UpnpSensorEntityDescription(UpnpEntityDescription, SensorEntityDescription):
"""A class that describes a sensor UPnP entities."""
SENSOR_DESCRIPTIONS: tuple[UpnpSensorEntityDescription, ...] = (
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=BYTES_RECEIVED, key=BYTES_RECEIVED,
name=f"{DATA_BYTES} received", name=f"{DATA_BYTES} received",
@ -33,6 +48,7 @@ RAW_SENSORS: tuple[UpnpSensorEntityDescription, ...] = (
native_unit_of_measurement=DATA_BYTES, native_unit_of_measurement=DATA_BYTES,
format="d", format="d",
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
state_class=SensorStateClass.TOTAL_INCREASING,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=BYTES_SENT, key=BYTES_SENT,
@ -41,6 +57,7 @@ RAW_SENSORS: tuple[UpnpSensorEntityDescription, ...] = (
native_unit_of_measurement=DATA_BYTES, native_unit_of_measurement=DATA_BYTES,
format="d", format="d",
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
state_class=SensorStateClass.TOTAL_INCREASING,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=PACKETS_RECEIVED, key=PACKETS_RECEIVED,
@ -49,6 +66,7 @@ RAW_SENSORS: tuple[UpnpSensorEntityDescription, ...] = (
native_unit_of_measurement=DATA_PACKETS, native_unit_of_measurement=DATA_PACKETS,
format="d", format="d",
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
state_class=SensorStateClass.TOTAL_INCREASING,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=PACKETS_SENT, key=PACKETS_SENT,
@ -57,11 +75,13 @@ RAW_SENSORS: tuple[UpnpSensorEntityDescription, ...] = (
native_unit_of_measurement=DATA_PACKETS, native_unit_of_measurement=DATA_PACKETS,
format="d", format="d",
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
state_class=SensorStateClass.TOTAL_INCREASING,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=ROUTER_IP, key=ROUTER_IP,
name="External IP", name="External IP",
icon="mdi:server-network", icon="mdi:server-network",
entity_category=EntityCategory.DIAGNOSTIC,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=ROUTER_UPTIME, key=ROUTER_UPTIME,
@ -79,42 +99,47 @@ RAW_SENSORS: tuple[UpnpSensorEntityDescription, ...] = (
entity_category=EntityCategory.DIAGNOSTIC, entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
), ),
)
DERIVED_SENSORS: tuple[UpnpSensorEntityDescription, ...] = (
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=BYTES_RECEIVED, key=BYTES_RECEIVED,
value_key=KIBIBYTES_PER_SEC_RECEIVED,
unique_id="KiB/sec_received", unique_id="KiB/sec_received",
name=f"{DATA_RATE_KIBIBYTES_PER_SECOND} received", name=f"{DATA_RATE_KIBIBYTES_PER_SECOND} received",
icon="mdi:server-network", icon="mdi:server-network",
native_unit_of_measurement=DATA_RATE_KIBIBYTES_PER_SECOND, native_unit_of_measurement=DATA_RATE_KIBIBYTES_PER_SECOND,
format=".1f", format=".1f",
state_class=SensorStateClass.MEASUREMENT,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=BYTES_SENT, key=BYTES_SENT,
value_key=KIBIBYTES_PER_SEC_SENT,
unique_id="KiB/sec_sent", unique_id="KiB/sec_sent",
name=f"{DATA_RATE_KIBIBYTES_PER_SECOND} sent", name=f"{DATA_RATE_KIBIBYTES_PER_SECOND} sent",
icon="mdi:server-network", icon="mdi:server-network",
native_unit_of_measurement=DATA_RATE_KIBIBYTES_PER_SECOND, native_unit_of_measurement=DATA_RATE_KIBIBYTES_PER_SECOND,
format=".1f", format=".1f",
state_class=SensorStateClass.MEASUREMENT,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=PACKETS_RECEIVED, key=PACKETS_RECEIVED,
value_key=PACKETS_PER_SEC_RECEIVED,
unique_id="packets/sec_received", unique_id="packets/sec_received",
name=f"{DATA_RATE_PACKETS_PER_SECOND} received", name=f"{DATA_RATE_PACKETS_PER_SECOND} received",
icon="mdi:server-network", icon="mdi:server-network",
native_unit_of_measurement=DATA_RATE_PACKETS_PER_SECOND, native_unit_of_measurement=DATA_RATE_PACKETS_PER_SECOND,
format=".1f", format=".1f",
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
), ),
UpnpSensorEntityDescription( UpnpSensorEntityDescription(
key=PACKETS_SENT, key=PACKETS_SENT,
value_key=PACKETS_PER_SEC_SENT,
unique_id="packets/sec_sent", unique_id="packets/sec_sent",
name=f"{DATA_RATE_PACKETS_PER_SECOND} sent", name=f"{DATA_RATE_PACKETS_PER_SECOND} sent",
icon="mdi:server-network", icon="mdi:server-network",
native_unit_of_measurement=DATA_RATE_PACKETS_PER_SECOND, native_unit_of_measurement=DATA_RATE_PACKETS_PER_SECOND,
format=".1f", format=".1f",
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
), ),
) )
@ -125,26 +150,16 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback,
) -> None: ) -> None:
"""Set up the UPnP/IGD sensors.""" """Set up the UPnP/IGD sensors."""
coordinator = hass.data[DOMAIN][config_entry.entry_id] coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
entities: list[UpnpSensor] = [ entities: list[UpnpSensor] = [
RawUpnpSensor( UpnpSensor(
coordinator=coordinator, coordinator=coordinator,
entity_description=entity_description, entity_description=entity_description,
) )
for entity_description in RAW_SENSORS for entity_description in SENSOR_DESCRIPTIONS
if coordinator.data.get(entity_description.key) is not None if coordinator.data.get(entity_description.key) is not None
] ]
entities.extend(
[
DerivedUpnpSensor(
coordinator=coordinator,
entity_description=entity_description,
)
for entity_description in DERIVED_SENSORS
if coordinator.data.get(entity_description.key) is not None
]
)
LOGGER.debug("Adding sensor entities: %s", entities) LOGGER.debug("Adding sensor entities: %s", entities)
async_add_entities(entities) async_add_entities(entities)
@ -155,64 +170,10 @@ class UpnpSensor(UpnpEntity, SensorEntity):
entity_description: UpnpSensorEntityDescription entity_description: UpnpSensorEntityDescription
class RawUpnpSensor(UpnpSensor):
"""Representation of a UPnP/IGD sensor."""
@property @property
def native_value(self) -> str | None: def native_value(self) -> str | None:
"""Return the state of the device.""" """Return the state of the device."""
value = self.coordinator.data[self.entity_description.key] value = self.coordinator.data[self.entity_description.value_key]
if value is None: if value is None:
return None return None
return format(value, self.entity_description.format) return format(value, self.entity_description.format)
class DerivedUpnpSensor(UpnpSensor):
"""Representation of a UNIT Sent/Received per second sensor."""
def __init__(
self,
coordinator: UpnpDataUpdateCoordinator,
entity_description: UpnpSensorEntityDescription,
) -> None:
"""Initialize sensor."""
super().__init__(coordinator=coordinator, entity_description=entity_description)
self._last_value = None
self._last_timestamp = None
def _has_overflowed(self, current_value) -> bool:
"""Check if value has overflowed."""
return current_value < self._last_value
@property
def native_value(self) -> str | None:
"""Return the state of the device."""
# Can't calculate any derivative if we have only one value.
current_value = self.coordinator.data[self.entity_description.key]
if current_value is None:
return None
current_timestamp = self.coordinator.data[TIMESTAMP]
if self._last_value is None or self._has_overflowed(current_value):
self._last_value = current_value
self._last_timestamp = current_timestamp
return None
# Calculate derivative.
delta_value = current_value - self._last_value
if (
self.entity_description.native_unit_of_measurement
== DATA_RATE_KIBIBYTES_PER_SECOND
):
delta_value /= KIBIBYTE
delta_time = current_timestamp - self._last_timestamp
if delta_time.total_seconds() == 0:
# Prevent division by 0.
return None
derived = delta_value / delta_time.total_seconds()
# Store current values for future use.
self._last_value = current_value
self._last_timestamp = current_timestamp
return format(derived, self.entity_description.format)

View File

@ -5627,7 +5627,7 @@
}, },
"upnp": { "upnp": {
"name": "UPnP/IGD", "name": "UPnP/IGD",
"integration_type": "hub", "integration_type": "device",
"config_flow": true, "config_flow": true,
"iot_class": "local_polling" "iot_class": "local_polling"
}, },

View File

@ -1,11 +1,12 @@
"""Configuration for SSDP tests.""" """Configuration for SSDP tests."""
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, PropertyMock, create_autospec, patch from unittest.mock import AsyncMock, MagicMock, PropertyMock, create_autospec, patch
from urllib.parse import urlparse from urllib.parse import urlparse
from async_upnp_client.client import UpnpDevice from async_upnp_client.client import UpnpDevice
from async_upnp_client.profiles.igd import IgdDevice, StatusInfo from async_upnp_client.profiles.igd import IgdDevice, IgdState, StatusInfo
import pytest import pytest
from homeassistant.components import ssdp from homeassistant.components import ssdp
@ -65,16 +66,23 @@ def mock_igd_device() -> IgdDevice:
mock_igd_device.udn = TEST_DISCOVERY.ssdp_udn mock_igd_device.udn = TEST_DISCOVERY.ssdp_udn
mock_igd_device.device = mock_upnp_device mock_igd_device.device = mock_upnp_device
mock_igd_device.async_get_total_bytes_received.return_value = 0 mock_igd_device.async_get_traffic_and_status_data.return_value = IgdState(
mock_igd_device.async_get_total_bytes_sent.return_value = 0 timestamp=datetime.now(),
mock_igd_device.async_get_total_packets_received.return_value = 0 bytes_received=0,
mock_igd_device.async_get_total_packets_sent.return_value = 0 bytes_sent=0,
mock_igd_device.async_get_status_info.return_value = StatusInfo( packets_received=0,
packets_sent=0,
status_info=StatusInfo(
"Connected", "Connected",
"", "",
10, 10,
),
external_ip_address="8.9.10.11",
kibibytes_per_sec_received=None,
kibibytes_per_sec_sent=None,
packets_per_sec_received=None,
packets_per_sec_sent=None,
) )
mock_igd_device.async_get_external_ip_address.return_value = "8.9.10.11"
with patch( with patch(
"homeassistant.components.upnp.device.UpnpFactory.async_create_device" "homeassistant.components.upnp.device.UpnpFactory.async_create_device"

View File

@ -1,8 +1,8 @@
"""Tests for UPnP/IGD binary_sensor.""" """Tests for UPnP/IGD binary_sensor."""
from datetime import timedelta from datetime import datetime, timedelta
from async_upnp_client.profiles.igd import StatusInfo from async_upnp_client.profiles.igd import IgdDevice, IgdState, StatusInfo
from homeassistant.components.upnp.const import DEFAULT_SCAN_INTERVAL from homeassistant.components.upnp.const import DEFAULT_SCAN_INTERVAL
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@ -20,11 +20,23 @@ async def test_upnp_binary_sensors(
assert wan_status_state.state == "on" assert wan_status_state.state == "on"
# Second poll. # Second poll.
mock_igd_device = mock_config_entry.igd_device mock_igd_device: IgdDevice = mock_config_entry.igd_device
mock_igd_device.async_get_status_info.return_value = StatusInfo( mock_igd_device.async_get_traffic_and_status_data.return_value = IgdState(
timestamp=datetime.now(),
bytes_received=0,
bytes_sent=0,
packets_received=0,
packets_sent=0,
status_info=StatusInfo(
"Disconnected", "Disconnected",
"", "",
40, 40,
),
external_ip_address="8.9.10.11",
kibibytes_per_sec_received=None,
kibibytes_per_sec_sent=None,
packets_per_sec_received=None,
packets_per_sec_sent=None,
) )
async_fire_time_changed( async_fire_time_changed(

View File

@ -1,10 +1,8 @@
"""Tests for UPnP/IGD sensor.""" """Tests for UPnP/IGD sensor."""
from datetime import timedelta from datetime import datetime, timedelta
from unittest.mock import patch
from async_upnp_client.profiles.igd import StatusInfo from async_upnp_client.profiles.igd import IgdDevice, IgdState, StatusInfo
import pytest
from homeassistant.components.upnp.const import DEFAULT_SCAN_INTERVAL from homeassistant.components.upnp.const import DEFAULT_SCAN_INTERVAL
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@ -14,7 +12,7 @@ from tests.common import MockConfigEntry, async_fire_time_changed
async def test_upnp_sensors(hass: HomeAssistant, mock_config_entry: MockConfigEntry): async def test_upnp_sensors(hass: HomeAssistant, mock_config_entry: MockConfigEntry):
"""Test normal sensors.""" """Test sensors."""
# First poll. # First poll.
assert hass.states.get("sensor.mock_name_b_received").state == "0" assert hass.states.get("sensor.mock_name_b_received").state == "0"
assert hass.states.get("sensor.mock_name_b_sent").state == "0" assert hass.states.get("sensor.mock_name_b_sent").state == "0"
@ -22,19 +20,30 @@ async def test_upnp_sensors(hass: HomeAssistant, mock_config_entry: MockConfigEn
assert hass.states.get("sensor.mock_name_packets_sent").state == "0" assert hass.states.get("sensor.mock_name_packets_sent").state == "0"
assert hass.states.get("sensor.mock_name_external_ip").state == "8.9.10.11" assert hass.states.get("sensor.mock_name_external_ip").state == "8.9.10.11"
assert hass.states.get("sensor.mock_name_wan_status").state == "Connected" assert hass.states.get("sensor.mock_name_wan_status").state == "Connected"
assert hass.states.get("sensor.mock_name_kib_s_received").state == "unknown"
assert hass.states.get("sensor.mock_name_kib_s_sent").state == "unknown"
assert hass.states.get("sensor.mock_name_packets_s_received").state == "unknown"
assert hass.states.get("sensor.mock_name_packets_s_sent").state == "unknown"
# Second poll. # Second poll.
mock_igd_device = mock_config_entry.igd_device mock_igd_device: IgdDevice = mock_config_entry.igd_device
mock_igd_device.async_get_total_bytes_received.return_value = 10240 mock_igd_device.async_get_traffic_and_status_data.return_value = IgdState(
mock_igd_device.async_get_total_bytes_sent.return_value = 20480 timestamp=datetime.now(),
mock_igd_device.async_get_total_packets_received.return_value = 30 bytes_received=10240,
mock_igd_device.async_get_total_packets_sent.return_value = 40 bytes_sent=20480,
mock_igd_device.async_get_status_info.return_value = StatusInfo( packets_received=30,
packets_sent=40,
status_info=StatusInfo(
"Disconnected", "Disconnected",
"", "",
40, 40,
),
external_ip_address="",
kibibytes_per_sec_received=10.0,
kibibytes_per_sec_sent=20.0,
packets_per_sec_received=30.0,
packets_per_sec_sent=40.0,
) )
mock_igd_device.async_get_external_ip_address.return_value = ""
now = dt_util.utcnow() now = dt_util.utcnow()
async_fire_time_changed(hass, now + timedelta(seconds=DEFAULT_SCAN_INTERVAL)) async_fire_time_changed(hass, now + timedelta(seconds=DEFAULT_SCAN_INTERVAL))
@ -46,50 +55,7 @@ async def test_upnp_sensors(hass: HomeAssistant, mock_config_entry: MockConfigEn
assert hass.states.get("sensor.mock_name_packets_sent").state == "40" assert hass.states.get("sensor.mock_name_packets_sent").state == "40"
assert hass.states.get("sensor.mock_name_external_ip").state == "" assert hass.states.get("sensor.mock_name_external_ip").state == ""
assert hass.states.get("sensor.mock_name_wan_status").state == "Disconnected" assert hass.states.get("sensor.mock_name_wan_status").state == "Disconnected"
assert hass.states.get("sensor.mock_name_kib_s_received").state == "10.0"
assert hass.states.get("sensor.mock_name_kib_s_sent").state == "20.0"
async def test_derived_upnp_sensors( assert hass.states.get("sensor.mock_name_packets_s_received").state == "30.0"
hass: HomeAssistant, mock_config_entry: MockConfigEntry assert hass.states.get("sensor.mock_name_packets_s_sent").state == "40.0"
):
"""Test derived sensors."""
# First poll.
assert hass.states.get("sensor.mock_name_kib_s_received").state == "unknown"
assert hass.states.get("sensor.mock_name_kib_s_sent").state == "unknown"
assert hass.states.get("sensor.mock_name_packets_s_received").state == "unknown"
assert hass.states.get("sensor.mock_name_packets_s_sent").state == "unknown"
# Second poll.
mock_igd_device = mock_config_entry.igd_device
mock_igd_device.async_get_total_bytes_received.return_value = int(
10240 * DEFAULT_SCAN_INTERVAL
)
mock_igd_device.async_get_total_bytes_sent.return_value = int(
20480 * DEFAULT_SCAN_INTERVAL
)
mock_igd_device.async_get_total_packets_received.return_value = int(
30 * DEFAULT_SCAN_INTERVAL
)
mock_igd_device.async_get_total_packets_sent.return_value = int(
40 * DEFAULT_SCAN_INTERVAL
)
now = dt_util.utcnow()
with patch(
"homeassistant.components.upnp.device.utcnow",
return_value=now + timedelta(seconds=DEFAULT_SCAN_INTERVAL),
):
async_fire_time_changed(hass, now + timedelta(seconds=DEFAULT_SCAN_INTERVAL))
await hass.async_block_till_done()
assert float(
hass.states.get("sensor.mock_name_kib_s_received").state
) == pytest.approx(10.0, rel=0.1)
assert float(
hass.states.get("sensor.mock_name_kib_s_sent").state
) == pytest.approx(20.0, rel=0.1)
assert float(
hass.states.get("sensor.mock_name_packets_s_received").state
) == pytest.approx(30.0, rel=0.1)
assert float(
hass.states.get("sensor.mock_name_packets_s_sent").state
) == pytest.approx(40.0, rel=0.1)