mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 13:17:32 +00:00
Rewrite parts of upnp component (#33108)
* Rewrite parts of upnp component * Linting * Add SCAN_INTERVAL * Get values simultaneously * Move to time related constants, as per #32065 * Linting * Move constant KIBIBYTE to homeassistant.const * Simplify code * Fix tests for #33344 * Changes after review * Update homeassistant/components/upnp/sensor.py * Changes after review * Formatting * Formatting * Use ST from discovery info to avoid swapping device_types if device advertises multiple versions * Linting * Requirements for upnp + dlna_dmr components * Linting * Regen requirements * Changes after review by @MartinHjelmare * Changes after review by @MartinHjelmare * Formatting * Linting * Changes after review by @MartinHjelmare * Changes after review by @MartinHjelmare Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
parent
de3f5e8d69
commit
dfc66b2018
@ -2,6 +2,6 @@
|
|||||||
"domain": "dlna_dmr",
|
"domain": "dlna_dmr",
|
||||||
"name": "DLNA Digital Media Renderer",
|
"name": "DLNA Digital Media Renderer",
|
||||||
"documentation": "https://www.home-assistant.io/integrations/dlna_dmr",
|
"documentation": "https://www.home-assistant.io/integrations/dlna_dmr",
|
||||||
"requirements": ["async-upnp-client==0.14.12"],
|
"requirements": ["async-upnp-client==0.14.13"],
|
||||||
"codeowners": []
|
"codeowners": []
|
||||||
}
|
}
|
||||||
|
@ -1,17 +1,15 @@
|
|||||||
"""Open ports in your router for Home Assistant and provide statistics."""
|
"""Open ports in your router for Home Assistant and provide statistics."""
|
||||||
from ipaddress import ip_address
|
from ipaddress import ip_address
|
||||||
from operator import itemgetter
|
from operator import itemgetter
|
||||||
|
from typing import Mapping
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant import config_entries
|
from homeassistant import config_entries
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||||
from homeassistant.helpers import (
|
from homeassistant.exceptions import ConfigEntryNotReady
|
||||||
config_validation as cv,
|
from homeassistant.helpers import config_validation as cv, device_registry as dr
|
||||||
device_registry as dr,
|
|
||||||
dispatcher,
|
|
||||||
)
|
|
||||||
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
|
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
|
||||||
from homeassistant.util import get_local_ip
|
from homeassistant.util import get_local_ip
|
||||||
|
|
||||||
@ -23,7 +21,6 @@ from .const import (
|
|||||||
CONF_PORTS,
|
CONF_PORTS,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
LOGGER as _LOGGER,
|
LOGGER as _LOGGER,
|
||||||
SIGNAL_REMOVE_SENSOR,
|
|
||||||
)
|
)
|
||||||
from .device import Device
|
from .device import Device
|
||||||
|
|
||||||
@ -37,7 +34,7 @@ CONFIG_SCHEMA = vol.Schema(
|
|||||||
vol.Optional(CONF_ENABLE_PORT_MAPPING, default=False): cv.boolean,
|
vol.Optional(CONF_ENABLE_PORT_MAPPING, default=False): cv.boolean,
|
||||||
vol.Optional(CONF_ENABLE_SENSORS, default=True): cv.boolean,
|
vol.Optional(CONF_ENABLE_SENSORS, default=True): cv.boolean,
|
||||||
vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
|
vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
|
||||||
vol.Optional(CONF_PORTS): vol.Schema(
|
vol.Optional(CONF_PORTS, default={}): vol.Schema(
|
||||||
{vol.Any(CONF_HASS, cv.port): vol.Any(CONF_HASS, cv.port)}
|
{vol.Any(CONF_HASS, cv.port): vol.Any(CONF_HASS, cv.port)}
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
@ -47,7 +44,7 @@ CONFIG_SCHEMA = vol.Schema(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _substitute_hass_ports(ports, hass_port=None):
|
def _substitute_hass_ports(ports: Mapping, hass_port: int = None) -> Mapping:
|
||||||
"""
|
"""
|
||||||
Substitute 'hass' for the hass_port.
|
Substitute 'hass' for the hass_port.
|
||||||
|
|
||||||
@ -86,8 +83,11 @@ def _substitute_hass_ports(ports, hass_port=None):
|
|||||||
return ports
|
return ports
|
||||||
|
|
||||||
|
|
||||||
async def async_discover_and_construct(hass, udn=None) -> Device:
|
async def async_discover_and_construct(
|
||||||
|
hass: HomeAssistantType, udn: str = None, st: str = None
|
||||||
|
) -> Device:
|
||||||
"""Discovery devices and construct a Device for one."""
|
"""Discovery devices and construct a Device for one."""
|
||||||
|
# pylint: disable=invalid-name
|
||||||
discovery_infos = await Device.async_discover(hass)
|
discovery_infos = await Device.async_discover(hass)
|
||||||
if not discovery_infos:
|
if not discovery_infos:
|
||||||
_LOGGER.info("No UPnP/IGD devices discovered")
|
_LOGGER.info("No UPnP/IGD devices discovered")
|
||||||
@ -95,7 +95,11 @@ async def async_discover_and_construct(hass, udn=None) -> Device:
|
|||||||
|
|
||||||
if udn:
|
if udn:
|
||||||
# get the discovery info with specified UDN
|
# get the discovery info with specified UDN
|
||||||
|
_LOGGER.debug("Discovery_infos: %s", discovery_infos)
|
||||||
filtered = [di for di in discovery_infos if di["udn"] == udn]
|
filtered = [di for di in discovery_infos if di["udn"] == udn]
|
||||||
|
if st:
|
||||||
|
_LOGGER.debug("Filtering on ST: %s", st)
|
||||||
|
filtered = [di for di in discovery_infos if di["st"] == st]
|
||||||
if not filtered:
|
if not filtered:
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
'Wanted UPnP/IGD device with UDN "%s" not found, ' "aborting", udn
|
'Wanted UPnP/IGD device with UDN "%s" not found, ' "aborting", udn
|
||||||
@ -125,8 +129,8 @@ async def async_setup(hass: HomeAssistantType, config: ConfigType):
|
|||||||
hass.data[DOMAIN] = {
|
hass.data[DOMAIN] = {
|
||||||
"config": conf,
|
"config": conf,
|
||||||
"devices": {},
|
"devices": {},
|
||||||
"local_ip": config.get(CONF_LOCAL_IP, local_ip),
|
"local_ip": conf.get(CONF_LOCAL_IP, local_ip),
|
||||||
"ports": conf.get("ports", {}),
|
"ports": conf.get(CONF_PORTS),
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf is not None:
|
if conf is not None:
|
||||||
@ -139,21 +143,24 @@ async def async_setup(hass: HomeAssistantType, config: ConfigType):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry):
|
async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry) -> bool:
|
||||||
"""Set up UPnP/IGD device from a config entry."""
|
"""Set up UPnP/IGD device from a config entry."""
|
||||||
domain_data = hass.data[DOMAIN]
|
domain_data = hass.data[DOMAIN]
|
||||||
conf = domain_data["config"]
|
conf = domain_data["config"]
|
||||||
|
|
||||||
# discover and construct
|
# discover and construct
|
||||||
device = await async_discover_and_construct(hass, config_entry.data.get("udn"))
|
udn = config_entry.data.get("udn")
|
||||||
|
st = config_entry.data.get("st") # pylint: disable=invalid-name
|
||||||
|
device = await async_discover_and_construct(hass, udn, st)
|
||||||
if not device:
|
if not device:
|
||||||
_LOGGER.info("Unable to create UPnP/IGD, aborting")
|
_LOGGER.info("Unable to create UPnP/IGD, aborting")
|
||||||
return False
|
raise ConfigEntryNotReady
|
||||||
|
|
||||||
# 'register'/save UDN
|
# 'register'/save UDN + ST
|
||||||
hass.data[DOMAIN]["devices"][device.udn] = device
|
hass.data[DOMAIN]["devices"][device.udn] = device
|
||||||
hass.config_entries.async_update_entry(
|
hass.config_entries.async_update_entry(
|
||||||
entry=config_entry, data={**config_entry.data, "udn": device.udn}
|
entry=config_entry,
|
||||||
|
data={**config_entry.data, "udn": device.udn, "st": device.device_type},
|
||||||
)
|
)
|
||||||
|
|
||||||
# create device registry entry
|
# create device registry entry
|
||||||
@ -179,8 +186,8 @@ async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry):
|
|||||||
# set up port mapping
|
# set up port mapping
|
||||||
if conf.get(CONF_ENABLE_PORT_MAPPING):
|
if conf.get(CONF_ENABLE_PORT_MAPPING):
|
||||||
_LOGGER.debug("Enabling port mapping")
|
_LOGGER.debug("Enabling port mapping")
|
||||||
local_ip = domain_data["local_ip"]
|
local_ip = domain_data[CONF_LOCAL_IP]
|
||||||
ports = conf.get("ports", {})
|
ports = conf.get(CONF_PORTS, {})
|
||||||
|
|
||||||
hass_port = None
|
hass_port = None
|
||||||
if hasattr(hass, "http"):
|
if hasattr(hass, "http"):
|
||||||
@ -200,7 +207,9 @@ async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def async_unload_entry(hass: HomeAssistantType, config_entry: ConfigEntry):
|
async def async_unload_entry(
|
||||||
|
hass: HomeAssistantType, config_entry: ConfigEntry
|
||||||
|
) -> bool:
|
||||||
"""Unload a UPnP/IGD device from a config entry."""
|
"""Unload a UPnP/IGD device from a config entry."""
|
||||||
udn = config_entry.data["udn"]
|
udn = config_entry.data["udn"]
|
||||||
device = hass.data[DOMAIN]["devices"][udn]
|
device = hass.data[DOMAIN]["devices"][udn]
|
||||||
@ -211,6 +220,4 @@ async def async_unload_entry(hass: HomeAssistantType, config_entry: ConfigEntry)
|
|||||||
|
|
||||||
# remove sensors
|
# remove sensors
|
||||||
_LOGGER.debug("Deleting sensors")
|
_LOGGER.debug("Deleting sensors")
|
||||||
dispatcher.async_dispatcher_send(hass, SIGNAL_REMOVE_SENSOR, device)
|
return await hass.config_entries.async_forward_entry_unload(config_entry, "sensor")
|
||||||
|
|
||||||
return True
|
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
"""Constants for the IGD component."""
|
"""Constants for the IGD component."""
|
||||||
|
from datetime import timedelta
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from homeassistant.const import TIME_SECONDS
|
||||||
|
|
||||||
CONF_ENABLE_PORT_MAPPING = "port_mapping"
|
CONF_ENABLE_PORT_MAPPING = "port_mapping"
|
||||||
CONF_ENABLE_SENSORS = "sensors"
|
CONF_ENABLE_SENSORS = "sensors"
|
||||||
CONF_HASS = "hass"
|
CONF_HASS = "hass"
|
||||||
@ -8,4 +11,12 @@ CONF_LOCAL_IP = "local_ip"
|
|||||||
CONF_PORTS = "ports"
|
CONF_PORTS = "ports"
|
||||||
DOMAIN = "upnp"
|
DOMAIN = "upnp"
|
||||||
LOGGER = logging.getLogger(__package__)
|
LOGGER = logging.getLogger(__package__)
|
||||||
SIGNAL_REMOVE_SENSOR = "upnp_remove_sensor"
|
BYTES_RECEIVED = "bytes_received"
|
||||||
|
BYTES_SENT = "bytes_sent"
|
||||||
|
PACKETS_RECEIVED = "packets_received"
|
||||||
|
PACKETS_SENT = "packets_sent"
|
||||||
|
TIMESTAMP = "timestamp"
|
||||||
|
DATA_PACKETS = "packets"
|
||||||
|
DATA_RATE_PACKETS_PER_SECOND = f"{DATA_PACKETS}/{TIME_SECONDS}"
|
||||||
|
KIBIBYTE = 1024
|
||||||
|
UPDATE_INTERVAL = timedelta(seconds=30)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
"""Home Assistant representation of an UPnP/IGD."""
|
"""Home Assistant representation of an UPnP/IGD."""
|
||||||
import asyncio
|
import asyncio
|
||||||
from ipaddress import IPv4Address
|
from ipaddress import IPv4Address
|
||||||
|
from typing import Mapping
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from async_upnp_client import UpnpError, UpnpFactory
|
from async_upnp_client import UpnpError, UpnpFactory
|
||||||
@ -9,8 +10,18 @@ from async_upnp_client.profiles.igd import IgdDevice
|
|||||||
|
|
||||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||||
from homeassistant.helpers.typing import HomeAssistantType
|
from homeassistant.helpers.typing import HomeAssistantType
|
||||||
|
import homeassistant.util.dt as dt_util
|
||||||
|
|
||||||
from .const import CONF_LOCAL_IP, DOMAIN, LOGGER as _LOGGER
|
from .const import (
|
||||||
|
BYTES_RECEIVED,
|
||||||
|
BYTES_SENT,
|
||||||
|
CONF_LOCAL_IP,
|
||||||
|
DOMAIN,
|
||||||
|
LOGGER as _LOGGER,
|
||||||
|
PACKETS_RECEIVED,
|
||||||
|
PACKETS_SENT,
|
||||||
|
TIMESTAMP,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Device:
|
class Device:
|
||||||
@ -18,7 +29,7 @@ class Device:
|
|||||||
|
|
||||||
def __init__(self, igd_device):
|
def __init__(self, igd_device):
|
||||||
"""Initialize UPnP/IGD device."""
|
"""Initialize UPnP/IGD device."""
|
||||||
self._igd_device = igd_device
|
self._igd_device: IgdDevice = igd_device
|
||||||
self._mapped_ports = []
|
self._mapped_ports = []
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -61,26 +72,37 @@ class Device:
|
|||||||
return cls(igd_device)
|
return cls(igd_device)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def udn(self):
|
def udn(self) -> str:
|
||||||
"""Get the UDN."""
|
"""Get the UDN."""
|
||||||
return self._igd_device.udn
|
return self._igd_device.udn
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self):
|
def name(self) -> str:
|
||||||
"""Get the name."""
|
"""Get the name."""
|
||||||
return self._igd_device.name
|
return self._igd_device.name
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def manufacturer(self):
|
def manufacturer(self) -> str:
|
||||||
"""Get the manufacturer."""
|
"""Get the manufacturer."""
|
||||||
return self._igd_device.manufacturer
|
return self._igd_device.manufacturer
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def model_name(self):
|
def model_name(self) -> str:
|
||||||
"""Get the model name."""
|
"""Get the model name."""
|
||||||
return self._igd_device.model_name
|
return self._igd_device.model_name
|
||||||
|
|
||||||
async def async_add_port_mappings(self, ports, local_ip):
|
@property
|
||||||
|
def device_type(self) -> str:
|
||||||
|
"""Get the device type."""
|
||||||
|
return self._igd_device.device_type
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
"""Get string representation."""
|
||||||
|
return f"IGD Device: {self.name}/{self.udn}"
|
||||||
|
|
||||||
|
async def async_add_port_mappings(
|
||||||
|
self, ports: Mapping[int, int], local_ip: str
|
||||||
|
) -> None:
|
||||||
"""Add port mappings."""
|
"""Add port mappings."""
|
||||||
if local_ip == "127.0.0.1":
|
if local_ip == "127.0.0.1":
|
||||||
_LOGGER.error("Could not create port mapping, our IP is 127.0.0.1")
|
_LOGGER.error("Could not create port mapping, our IP is 127.0.0.1")
|
||||||
@ -93,7 +115,9 @@ class Device:
|
|||||||
await self._async_add_port_mapping(external_port, local_ip, internal_port)
|
await self._async_add_port_mapping(external_port, local_ip, internal_port)
|
||||||
self._mapped_ports.append(external_port)
|
self._mapped_ports.append(external_port)
|
||||||
|
|
||||||
async def _async_add_port_mapping(self, external_port, local_ip, internal_port):
|
async def _async_add_port_mapping(
|
||||||
|
self, external_port: int, local_ip: str, internal_port: int
|
||||||
|
) -> None:
|
||||||
"""Add a port mapping."""
|
"""Add a port mapping."""
|
||||||
# create port mapping
|
# create port mapping
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
@ -123,12 +147,12 @@ class Device:
|
|||||||
internal_port,
|
internal_port,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_delete_port_mappings(self):
|
async def async_delete_port_mappings(self) -> None:
|
||||||
"""Remove a port mapping."""
|
"""Remove port mappings."""
|
||||||
for port in self._mapped_ports:
|
for port in self._mapped_ports:
|
||||||
await self._async_delete_port_mapping(port)
|
await self._async_delete_port_mapping(port)
|
||||||
|
|
||||||
async def _async_delete_port_mapping(self, external_port):
|
async def _async_delete_port_mapping(self, external_port: int) -> None:
|
||||||
"""Remove a port mapping."""
|
"""Remove a port mapping."""
|
||||||
_LOGGER.info("Deleting port mapping %s (TCP)", external_port)
|
_LOGGER.info("Deleting port mapping %s (TCP)", external_port)
|
||||||
try:
|
try:
|
||||||
@ -140,30 +164,31 @@ class Device:
|
|||||||
except (asyncio.TimeoutError, aiohttp.ClientError, UpnpError):
|
except (asyncio.TimeoutError, aiohttp.ClientError, UpnpError):
|
||||||
_LOGGER.error("Could not delete port mapping")
|
_LOGGER.error("Could not delete port mapping")
|
||||||
|
|
||||||
async def async_get_total_bytes_received(self):
|
async def async_get_traffic_data(self) -> Mapping[str, any]:
|
||||||
"""Get total bytes received."""
|
"""
|
||||||
try:
|
Get all traffic data in one go.
|
||||||
return await self._igd_device.async_get_total_bytes_received()
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
_LOGGER.warning("Timeout during get_total_bytes_received")
|
|
||||||
|
|
||||||
async def async_get_total_bytes_sent(self):
|
Traffic data consists of:
|
||||||
"""Get total bytes sent."""
|
- total bytes sent
|
||||||
try:
|
- total bytes received
|
||||||
return await self._igd_device.async_get_total_bytes_sent()
|
- total packets sent
|
||||||
except asyncio.TimeoutError:
|
- total packats received
|
||||||
_LOGGER.warning("Timeout during get_total_bytes_sent")
|
|
||||||
|
|
||||||
async def async_get_total_packets_received(self):
|
Data is timestamped.
|
||||||
"""Get total packets received."""
|
"""
|
||||||
try:
|
_LOGGER.debug("Getting traffic statistics from device: %s", self)
|
||||||
return await self._igd_device.async_get_total_packets_received()
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
_LOGGER.warning("Timeout during get_total_packets_received")
|
|
||||||
|
|
||||||
async def async_get_total_packets_sent(self):
|
values = await asyncio.gather(
|
||||||
"""Get total packets sent."""
|
self._igd_device.async_get_total_bytes_received(),
|
||||||
try:
|
self._igd_device.async_get_total_bytes_sent(),
|
||||||
return await self._igd_device.async_get_total_packets_sent()
|
self._igd_device.async_get_total_packets_received(),
|
||||||
except asyncio.TimeoutError:
|
self._igd_device.async_get_total_packets_sent(),
|
||||||
_LOGGER.warning("Timeout during get_total_packets_sent")
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
TIMESTAMP: dt_util.utcnow(),
|
||||||
|
BYTES_RECEIVED: values[0],
|
||||||
|
BYTES_SENT: values[1],
|
||||||
|
PACKETS_RECEIVED: values[2],
|
||||||
|
PACKETS_SENT: values[3],
|
||||||
|
}
|
||||||
|
@ -3,6 +3,15 @@
|
|||||||
"name": "UPnP",
|
"name": "UPnP",
|
||||||
"config_flow": true,
|
"config_flow": true,
|
||||||
"documentation": "https://www.home-assistant.io/integrations/upnp",
|
"documentation": "https://www.home-assistant.io/integrations/upnp",
|
||||||
"requirements": ["async-upnp-client==0.14.12"],
|
"requirements": ["async-upnp-client==0.14.13"],
|
||||||
"codeowners": ["@StevenLooman"]
|
"dependencies": [],
|
||||||
|
"codeowners": ["@StevenLooman"],
|
||||||
|
"ssdp": [
|
||||||
|
{
|
||||||
|
"st": "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"st": "urn:schemas-upnp-org:device:InternetGatewayDevice:2"
|
||||||
|
}
|
||||||
|
]
|
||||||
}
|
}
|
||||||
|
@ -1,275 +1,247 @@
|
|||||||
"""Support for UPnP/IGD Sensors."""
|
"""Support for UPnP/IGD Sensors."""
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
import logging
|
from typing import Mapping
|
||||||
|
|
||||||
from homeassistant.const import DATA_BYTES, DATA_KIBIBYTES, TIME_SECONDS
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.core import callback
|
from homeassistant.const import DATA_BYTES, DATA_RATE_KIBIBYTES_PER_SECOND
|
||||||
from homeassistant.helpers import device_registry as dr
|
from homeassistant.helpers import device_registry as dr
|
||||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
||||||
from homeassistant.helpers.entity import Entity
|
from homeassistant.helpers.entity import Entity
|
||||||
from homeassistant.helpers.typing import HomeAssistantType
|
from homeassistant.helpers.typing import HomeAssistantType
|
||||||
from homeassistant.util import Throttle
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||||
import homeassistant.util.dt as dt_util
|
|
||||||
|
|
||||||
from .const import DOMAIN as DOMAIN_UPNP, SIGNAL_REMOVE_SENSOR
|
from .const import (
|
||||||
|
BYTES_RECEIVED,
|
||||||
_LOGGER = logging.getLogger(__name__)
|
BYTES_SENT,
|
||||||
|
DATA_PACKETS,
|
||||||
BYTES_RECEIVED = "bytes_received"
|
DATA_RATE_PACKETS_PER_SECOND,
|
||||||
BYTES_SENT = "bytes_sent"
|
DOMAIN,
|
||||||
PACKETS_RECEIVED = "packets_received"
|
KIBIBYTE,
|
||||||
PACKETS_SENT = "packets_sent"
|
LOGGER as _LOGGER,
|
||||||
|
PACKETS_RECEIVED,
|
||||||
|
PACKETS_SENT,
|
||||||
|
TIMESTAMP,
|
||||||
|
UPDATE_INTERVAL,
|
||||||
|
)
|
||||||
|
from .device import Device
|
||||||
|
|
||||||
SENSOR_TYPES = {
|
SENSOR_TYPES = {
|
||||||
BYTES_RECEIVED: {"name": "bytes received", "unit": DATA_BYTES},
|
BYTES_RECEIVED: {
|
||||||
BYTES_SENT: {"name": "bytes sent", "unit": DATA_BYTES},
|
"device_value_key": BYTES_RECEIVED,
|
||||||
PACKETS_RECEIVED: {"name": "packets received", "unit": "packets"},
|
"name": f"{DATA_BYTES} received",
|
||||||
PACKETS_SENT: {"name": "packets sent", "unit": "packets"},
|
"unit": DATA_BYTES,
|
||||||
|
"unique_id": BYTES_RECEIVED,
|
||||||
|
"derived_name": f"{DATA_RATE_KIBIBYTES_PER_SECOND} received",
|
||||||
|
"derived_unit": DATA_RATE_KIBIBYTES_PER_SECOND,
|
||||||
|
"derived_unique_id": "KiB/sec_received",
|
||||||
|
},
|
||||||
|
BYTES_SENT: {
|
||||||
|
"device_value_key": BYTES_SENT,
|
||||||
|
"name": f"{DATA_BYTES} sent",
|
||||||
|
"unit": DATA_BYTES,
|
||||||
|
"unique_id": BYTES_SENT,
|
||||||
|
"derived_name": f"{DATA_RATE_KIBIBYTES_PER_SECOND} sent",
|
||||||
|
"derived_unit": DATA_RATE_KIBIBYTES_PER_SECOND,
|
||||||
|
"derived_unique_id": "KiB/sec_sent",
|
||||||
|
},
|
||||||
|
PACKETS_RECEIVED: {
|
||||||
|
"device_value_key": PACKETS_RECEIVED,
|
||||||
|
"name": f"{DATA_PACKETS} received",
|
||||||
|
"unit": DATA_PACKETS,
|
||||||
|
"unique_id": PACKETS_RECEIVED,
|
||||||
|
"derived_name": f"{DATA_RATE_PACKETS_PER_SECOND} received",
|
||||||
|
"derived_unit": DATA_RATE_PACKETS_PER_SECOND,
|
||||||
|
"derived_unique_id": "packets/sec_received",
|
||||||
|
},
|
||||||
|
PACKETS_SENT: {
|
||||||
|
"device_value_key": PACKETS_SENT,
|
||||||
|
"name": f"{DATA_PACKETS} sent",
|
||||||
|
"unit": DATA_PACKETS,
|
||||||
|
"unique_id": PACKETS_SENT,
|
||||||
|
"derived_name": f"{DATA_RATE_PACKETS_PER_SECOND} sent",
|
||||||
|
"derived_unit": DATA_RATE_PACKETS_PER_SECOND,
|
||||||
|
"derived_unique_id": "packets/sec_sent",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
IN = "received"
|
|
||||||
OUT = "sent"
|
|
||||||
KIBIBYTE = 1024
|
|
||||||
|
|
||||||
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=30)
|
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_platform(
|
async def async_setup_platform(
|
||||||
hass: HomeAssistantType, config, async_add_entities, discovery_info=None
|
hass: HomeAssistantType, config, async_add_entities, discovery_info=None
|
||||||
):
|
) -> None:
|
||||||
"""Old way of setting up UPnP/IGD sensors."""
|
"""Old way of setting up UPnP/IGD sensors."""
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"async_setup_platform: config: %s, discovery: %s", config, discovery_info
|
"async_setup_platform: config: %s, discovery: %s", config, discovery_info
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(hass, config_entry, async_add_entities):
|
async def async_setup_entry(
|
||||||
"""Set up the UPnP/IGD sensor."""
|
hass, config_entry: ConfigEntry, async_add_entities
|
||||||
|
) -> None:
|
||||||
@callback
|
"""Set up the UPnP/IGD sensors."""
|
||||||
def async_add_sensor(device):
|
|
||||||
"""Add sensors from UPnP/IGD device."""
|
|
||||||
# raw sensors + per-second sensors
|
|
||||||
sensors = [
|
|
||||||
RawUPnPIGDSensor(device, name, sensor_type)
|
|
||||||
for name, sensor_type in SENSOR_TYPES.items()
|
|
||||||
]
|
|
||||||
sensors += [
|
|
||||||
KBytePerSecondUPnPIGDSensor(device, IN),
|
|
||||||
KBytePerSecondUPnPIGDSensor(device, OUT),
|
|
||||||
PacketsPerSecondUPnPIGDSensor(device, IN),
|
|
||||||
PacketsPerSecondUPnPIGDSensor(device, OUT),
|
|
||||||
]
|
|
||||||
async_add_entities(sensors, True)
|
|
||||||
|
|
||||||
data = config_entry.data
|
data = config_entry.data
|
||||||
if "udn" in data:
|
if "udn" in data:
|
||||||
udn = data["udn"]
|
udn = data["udn"]
|
||||||
else:
|
else:
|
||||||
# any device will do
|
# any device will do
|
||||||
udn = list(hass.data[DOMAIN_UPNP]["devices"].keys())[0]
|
udn = list(hass.data[DOMAIN]["devices"].keys())[0]
|
||||||
|
|
||||||
device = hass.data[DOMAIN_UPNP]["devices"][udn]
|
device: Device = hass.data[DOMAIN]["devices"][udn]
|
||||||
async_add_sensor(device)
|
|
||||||
|
_LOGGER.debug("Adding sensors")
|
||||||
|
coordinator = DataUpdateCoordinator(
|
||||||
|
hass,
|
||||||
|
_LOGGER,
|
||||||
|
name=device.name,
|
||||||
|
update_method=device.async_get_traffic_data,
|
||||||
|
update_interval=timedelta(seconds=UPDATE_INTERVAL.seconds),
|
||||||
|
)
|
||||||
|
await coordinator.async_refresh()
|
||||||
|
|
||||||
|
sensors = [
|
||||||
|
RawUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_RECEIVED]),
|
||||||
|
RawUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_SENT]),
|
||||||
|
RawUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_RECEIVED]),
|
||||||
|
RawUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_SENT]),
|
||||||
|
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_RECEIVED]),
|
||||||
|
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_SENT]),
|
||||||
|
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_RECEIVED]),
|
||||||
|
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_SENT]),
|
||||||
|
]
|
||||||
|
async_add_entities(sensors, True)
|
||||||
|
|
||||||
|
|
||||||
class UpnpSensor(Entity):
|
class UpnpSensor(Entity):
|
||||||
"""Base class for UPnP/IGD sensors."""
|
"""Base class for UPnP/IGD sensors."""
|
||||||
|
|
||||||
def __init__(self, device):
|
def __init__(
|
||||||
|
self,
|
||||||
|
coordinator: DataUpdateCoordinator,
|
||||||
|
device: Device,
|
||||||
|
sensor_type: Mapping[str, str],
|
||||||
|
) -> None:
|
||||||
"""Initialize the base sensor."""
|
"""Initialize the base sensor."""
|
||||||
|
self._coordinator = coordinator
|
||||||
self._device = device
|
self._device = device
|
||||||
|
self._sensor_type = sensor_type
|
||||||
async def async_added_to_hass(self):
|
|
||||||
"""Subscribe to sensors events."""
|
|
||||||
self.async_on_remove(
|
|
||||||
async_dispatcher_connect(
|
|
||||||
self.hass, SIGNAL_REMOVE_SENSOR, self._upnp_remove_sensor
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def _upnp_remove_sensor(self, device):
|
|
||||||
"""Remove sensor."""
|
|
||||||
if self._device != device:
|
|
||||||
# not for us
|
|
||||||
return
|
|
||||||
|
|
||||||
self.hass.async_create_task(self.async_remove())
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def device_info(self):
|
def should_poll(self) -> bool:
|
||||||
|
"""Inform we should not be polled."""
|
||||||
|
return False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def icon(self) -> str:
|
||||||
|
"""Icon to use in the frontend, if any."""
|
||||||
|
return "mdi:server-network"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def available(self) -> bool:
|
||||||
|
"""Return if entity is available."""
|
||||||
|
device_value_key = self._sensor_type["device_value_key"]
|
||||||
|
return (
|
||||||
|
self._coordinator.last_update_success
|
||||||
|
and device_value_key in self._coordinator.data
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
"""Return the name of the sensor."""
|
||||||
|
return f"{self._device.name} {self._sensor_type['name']}"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unique_id(self) -> str:
|
||||||
|
"""Return an unique ID."""
|
||||||
|
return f"{self._device.udn}_{self._sensor_type['unique_id']}"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unit_of_measurement(self) -> str:
|
||||||
|
"""Return the unit of measurement of this entity, if any."""
|
||||||
|
return self._sensor_type["unit"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device_info(self) -> Mapping[str, any]:
|
||||||
"""Get device info."""
|
"""Get device info."""
|
||||||
return {
|
return {
|
||||||
"connections": {(dr.CONNECTION_UPNP, self._device.udn)},
|
"connections": {(dr.CONNECTION_UPNP, self._device.udn)},
|
||||||
"identifiers": {(DOMAIN_UPNP, self._device.udn)},
|
|
||||||
"name": self._device.name,
|
"name": self._device.name,
|
||||||
"manufacturer": self._device.manufacturer,
|
"manufacturer": self._device.manufacturer,
|
||||||
"model": self._device.model_name,
|
"model": self._device.model_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def async_update(self):
|
||||||
|
"""Request an update."""
|
||||||
|
await self._coordinator.async_request_refresh()
|
||||||
|
|
||||||
class RawUPnPIGDSensor(UpnpSensor):
|
async def async_added_to_hass(self) -> None:
|
||||||
|
"""Subscribe to sensors events."""
|
||||||
|
remove_from_coordinator = self._coordinator.async_add_listener(
|
||||||
|
self.async_write_ha_state
|
||||||
|
)
|
||||||
|
self.async_on_remove(remove_from_coordinator)
|
||||||
|
|
||||||
|
|
||||||
|
class RawUpnpSensor(UpnpSensor):
|
||||||
"""Representation of a UPnP/IGD sensor."""
|
"""Representation of a UPnP/IGD sensor."""
|
||||||
|
|
||||||
def __init__(self, device, sensor_type_name, sensor_type):
|
|
||||||
"""Initialize the UPnP/IGD sensor."""
|
|
||||||
super().__init__(device)
|
|
||||||
self._type_name = sensor_type_name
|
|
||||||
self._type = sensor_type
|
|
||||||
self._name = "{} {}".format(device.name, sensor_type["name"])
|
|
||||||
self._state = None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def name(self) -> str:
|
|
||||||
"""Return the name of the sensor."""
|
|
||||||
return self._name
|
|
||||||
|
|
||||||
@property
|
|
||||||
def unique_id(self) -> str:
|
|
||||||
"""Return an unique ID."""
|
|
||||||
return f"{self._device.udn}_{self._type_name}"
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self) -> str:
|
def state(self) -> str:
|
||||||
"""Return the state of the device."""
|
"""Return the state of the device."""
|
||||||
if self._state is None:
|
device_value_key = self._sensor_type["device_value_key"]
|
||||||
return None
|
value = self._coordinator.data[device_value_key]
|
||||||
|
return format(value, "d")
|
||||||
return format(self._state, "d")
|
|
||||||
|
|
||||||
@property
|
|
||||||
def icon(self) -> str:
|
|
||||||
"""Icon to use in the frontend, if any."""
|
|
||||||
return "mdi:server-network"
|
|
||||||
|
|
||||||
@property
|
|
||||||
def unit_of_measurement(self) -> str:
|
|
||||||
"""Return the unit of measurement of this entity, if any."""
|
|
||||||
return self._type["unit"]
|
|
||||||
|
|
||||||
@Throttle(MIN_TIME_BETWEEN_UPDATES)
|
|
||||||
async def async_update(self):
|
|
||||||
"""Get the latest information from the IGD."""
|
|
||||||
if self._type_name == BYTES_RECEIVED:
|
|
||||||
self._state = await self._device.async_get_total_bytes_received()
|
|
||||||
elif self._type_name == BYTES_SENT:
|
|
||||||
self._state = await self._device.async_get_total_bytes_sent()
|
|
||||||
elif self._type_name == PACKETS_RECEIVED:
|
|
||||||
self._state = await self._device.async_get_total_packets_received()
|
|
||||||
elif self._type_name == PACKETS_SENT:
|
|
||||||
self._state = await self._device.async_get_total_packets_sent()
|
|
||||||
|
|
||||||
|
|
||||||
class PerSecondUPnPIGDSensor(UpnpSensor):
|
class DerivedUpnpSensor(UpnpSensor):
|
||||||
"""Abstract representation of a X Sent/Received per second sensor."""
|
"""Representation of a UNIT Sent/Received per second sensor."""
|
||||||
|
|
||||||
def __init__(self, device, direction):
|
def __init__(self, coordinator, device, sensor_type) -> None:
|
||||||
"""Initialize sensor."""
|
"""Initialize sensor."""
|
||||||
super().__init__(device)
|
super().__init__(coordinator, device, sensor_type)
|
||||||
self._direction = direction
|
|
||||||
|
|
||||||
self._state = None
|
|
||||||
self._last_value = None
|
self._last_value = None
|
||||||
self._last_update_time = None
|
self._last_timestamp = None
|
||||||
|
|
||||||
@property
|
|
||||||
def unit(self) -> str:
|
|
||||||
"""Get unit we are measuring in."""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
async def _async_fetch_value(self):
|
|
||||||
"""Fetch a value from the IGD."""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def unique_id(self) -> str:
|
|
||||||
"""Return an unique ID."""
|
|
||||||
return f"{self._device.udn}_{self.unit}/sec_{self._direction}"
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self) -> str:
|
def name(self) -> str:
|
||||||
"""Return the name of the sensor."""
|
"""Return the name of the sensor."""
|
||||||
return f"{self._device.name} {self.unit}/sec {self._direction}"
|
return f"{self._device.name} {self._sensor_type['derived_name']}"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def icon(self) -> str:
|
def unique_id(self) -> str:
|
||||||
"""Icon to use in the frontend, if any."""
|
"""Return an unique ID."""
|
||||||
return "mdi:server-network"
|
return f"{self._device.udn}_{self._sensor_type['derived_unique_id']}"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def unit_of_measurement(self) -> str:
|
def unit_of_measurement(self) -> str:
|
||||||
"""Return the unit of measurement of this entity, if any."""
|
"""Return the unit of measurement of this entity, if any."""
|
||||||
return f"{self.unit}/{TIME_SECONDS}"
|
return self._sensor_type["derived_unit"]
|
||||||
|
|
||||||
def _is_overflowed(self, new_value) -> bool:
|
def _has_overflowed(self, current_value) -> bool:
|
||||||
"""Check if value has overflowed."""
|
"""Check if value has overflowed."""
|
||||||
return new_value < self._last_value
|
return current_value < self._last_value
|
||||||
|
|
||||||
async def async_update(self):
|
|
||||||
"""Get the latest information from the UPnP/IGD."""
|
|
||||||
new_value = await self._async_fetch_value()
|
|
||||||
|
|
||||||
if self._last_value is None:
|
|
||||||
self._last_value = new_value
|
|
||||||
self._last_update_time = dt_util.utcnow()
|
|
||||||
return
|
|
||||||
|
|
||||||
now = dt_util.utcnow()
|
|
||||||
if self._is_overflowed(new_value):
|
|
||||||
self._state = None # temporarily report nothing
|
|
||||||
else:
|
|
||||||
delta_time = (now - self._last_update_time).seconds
|
|
||||||
delta_value = new_value - self._last_value
|
|
||||||
self._state = delta_value / delta_time
|
|
||||||
|
|
||||||
self._last_value = new_value
|
|
||||||
self._last_update_time = now
|
|
||||||
|
|
||||||
|
|
||||||
class KBytePerSecondUPnPIGDSensor(PerSecondUPnPIGDSensor):
|
|
||||||
"""Representation of a KBytes Sent/Received per second sensor."""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def unit(self) -> str:
|
|
||||||
"""Get unit we are measuring in."""
|
|
||||||
return DATA_KIBIBYTES
|
|
||||||
|
|
||||||
async def _async_fetch_value(self) -> float:
|
|
||||||
"""Fetch value from device."""
|
|
||||||
if self._direction == IN:
|
|
||||||
return await self._device.async_get_total_bytes_received()
|
|
||||||
|
|
||||||
return await self._device.async_get_total_bytes_sent()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self) -> str:
|
def state(self) -> str:
|
||||||
"""Return the state of the device."""
|
"""Return the state of the device."""
|
||||||
if self._state is None:
|
# Can't calculate any derivative if we have only one value.
|
||||||
|
device_value_key = self._sensor_type["device_value_key"]
|
||||||
|
current_value = self._coordinator.data[device_value_key]
|
||||||
|
current_timestamp = self._coordinator.data[TIMESTAMP]
|
||||||
|
if self._last_value is None or self._has_overflowed(current_value):
|
||||||
|
self._last_value = current_value
|
||||||
|
self._last_timestamp = current_timestamp
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return format(float(self._state / KIBIBYTE), ".1f")
|
# Calculate derivative.
|
||||||
|
delta_value = current_value - self._last_value
|
||||||
|
if self._sensor_type["unit"] == DATA_BYTES:
|
||||||
class PacketsPerSecondUPnPIGDSensor(PerSecondUPnPIGDSensor):
|
delta_value /= KIBIBYTE
|
||||||
"""Representation of a Packets Sent/Received per second sensor."""
|
delta_time = current_timestamp - self._last_timestamp
|
||||||
|
if delta_time.seconds == 0:
|
||||||
@property
|
# Prevent division by 0.
|
||||||
def unit(self) -> str:
|
|
||||||
"""Get unit we are measuring in."""
|
|
||||||
return "packets"
|
|
||||||
|
|
||||||
async def _async_fetch_value(self) -> float:
|
|
||||||
"""Fetch value from device."""
|
|
||||||
if self._direction == IN:
|
|
||||||
return await self._device.async_get_total_packets_received()
|
|
||||||
|
|
||||||
return await self._device.async_get_total_packets_sent()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def state(self) -> str:
|
|
||||||
"""Return the state of the device."""
|
|
||||||
if self._state is None:
|
|
||||||
return None
|
return None
|
||||||
|
derived = delta_value / delta_time.seconds
|
||||||
|
|
||||||
return format(float(self._state), ".1f")
|
# Store current values for future use.
|
||||||
|
self._last_value = current_value
|
||||||
|
self._last_timestamp = current_timestamp
|
||||||
|
|
||||||
|
return format(derived, ".1f")
|
||||||
|
@ -76,6 +76,14 @@ SSDP = {
|
|||||||
"manufacturer": "Synology"
|
"manufacturer": "Synology"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
"upnp": [
|
||||||
|
{
|
||||||
|
"st": "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"st": "urn:schemas-upnp-org:device:InternetGatewayDevice:2"
|
||||||
|
}
|
||||||
|
],
|
||||||
"wemo": [
|
"wemo": [
|
||||||
{
|
{
|
||||||
"manufacturer": "Belkin International Inc."
|
"manufacturer": "Belkin International Inc."
|
||||||
|
@ -269,7 +269,7 @@ asterisk_mbox==0.5.0
|
|||||||
|
|
||||||
# homeassistant.components.dlna_dmr
|
# homeassistant.components.dlna_dmr
|
||||||
# homeassistant.components.upnp
|
# homeassistant.components.upnp
|
||||||
async-upnp-client==0.14.12
|
async-upnp-client==0.14.13
|
||||||
|
|
||||||
# homeassistant.components.aten_pe
|
# homeassistant.components.aten_pe
|
||||||
atenpdu==0.3.0
|
atenpdu==0.3.0
|
||||||
|
@ -122,7 +122,7 @@ arcam-fmj==0.4.3
|
|||||||
|
|
||||||
# homeassistant.components.dlna_dmr
|
# homeassistant.components.dlna_dmr
|
||||||
# homeassistant.components.upnp
|
# homeassistant.components.upnp
|
||||||
async-upnp-client==0.14.12
|
async-upnp-client==0.14.13
|
||||||
|
|
||||||
# homeassistant.components.stream
|
# homeassistant.components.stream
|
||||||
av==6.1.2
|
av==6.1.2
|
||||||
|
@ -1,14 +1,14 @@
|
|||||||
"""Test UPnP/IGD setup process."""
|
"""Test UPnP/IGD setup process."""
|
||||||
|
|
||||||
from ipaddress import ip_address
|
from ipaddress import IPv4Address
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from homeassistant.components import upnp
|
from homeassistant.components import upnp
|
||||||
from homeassistant.components.upnp.device import Device
|
from homeassistant.components.upnp.device import Device
|
||||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||||
from homeassistant.setup import async_setup_component
|
from homeassistant.setup import async_setup_component
|
||||||
|
|
||||||
from tests.common import MockConfigEntry, MockDependency, mock_coro
|
from tests.common import MockConfigEntry, mock_coro
|
||||||
|
|
||||||
|
|
||||||
class MockDevice(Device):
|
class MockDevice(Device):
|
||||||
@ -16,11 +16,8 @@ class MockDevice(Device):
|
|||||||
|
|
||||||
def __init__(self, udn):
|
def __init__(self, udn):
|
||||||
"""Initialize mock device."""
|
"""Initialize mock device."""
|
||||||
device = MagicMock()
|
igd_device = object()
|
||||||
device.manufacturer = "mock-manuf"
|
super().__init__(igd_device)
|
||||||
device.name = "mock-name"
|
|
||||||
device.model_name = "mock-model-name"
|
|
||||||
super().__init__(device)
|
|
||||||
self._udn = udn
|
self._udn = udn
|
||||||
self.added_port_mappings = []
|
self.added_port_mappings = []
|
||||||
self.removed_port_mappings = []
|
self.removed_port_mappings = []
|
||||||
@ -31,16 +28,38 @@ class MockDevice(Device):
|
|||||||
return cls("UDN")
|
return cls("UDN")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def udn(self):
|
def udn(self) -> str:
|
||||||
"""Get the UDN."""
|
"""Get the UDN."""
|
||||||
return self._udn
|
return self._udn
|
||||||
|
|
||||||
async def _async_add_port_mapping(self, external_port, local_ip, internal_port):
|
@property
|
||||||
|
def manufacturer(self) -> str:
|
||||||
|
"""Get manufacturer."""
|
||||||
|
return "mock-manufacturer"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
"""Get name."""
|
||||||
|
return "mock-name"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def model_name(self) -> str:
|
||||||
|
"""Get the model name."""
|
||||||
|
return "mock-model-name"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device_type(self) -> str:
|
||||||
|
"""Get the device type."""
|
||||||
|
return "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
|
||||||
|
|
||||||
|
async def _async_add_port_mapping(
|
||||||
|
self, external_port: int, local_ip: str, internal_port: int
|
||||||
|
) -> None:
|
||||||
"""Add a port mapping."""
|
"""Add a port mapping."""
|
||||||
entry = [external_port, local_ip, internal_port]
|
entry = [external_port, local_ip, internal_port]
|
||||||
self.added_port_mappings.append(entry)
|
self.added_port_mappings.append(entry)
|
||||||
|
|
||||||
async def _async_delete_port_mapping(self, external_port):
|
async def _async_delete_port_mapping(self, external_port: int) -> None:
|
||||||
"""Remove a port mapping."""
|
"""Remove a port mapping."""
|
||||||
entry = external_port
|
entry = external_port
|
||||||
self.removed_port_mappings.append(entry)
|
self.removed_port_mappings.append(entry)
|
||||||
@ -52,18 +71,11 @@ async def test_async_setup_entry_default(hass):
|
|||||||
entry = MockConfigEntry(domain=upnp.DOMAIN)
|
entry = MockConfigEntry(domain=upnp.DOMAIN)
|
||||||
|
|
||||||
config = {
|
config = {
|
||||||
"http": {},
|
|
||||||
"discovery": {},
|
|
||||||
# no upnp
|
# no upnp
|
||||||
}
|
}
|
||||||
with MockDependency("netdisco.discovery"), patch(
|
with patch.object(Device, "async_create_device") as create_device, patch.object(
|
||||||
"homeassistant.components.upnp.get_local_ip", return_value="192.168.1.10"
|
|
||||||
), patch.object(Device, "async_create_device") as create_device, patch.object(
|
|
||||||
Device, "async_create_device"
|
|
||||||
) as create_device, patch.object(
|
|
||||||
Device, "async_discover", return_value=mock_coro([])
|
Device, "async_discover", return_value=mock_coro([])
|
||||||
) as async_discover:
|
) as async_discover:
|
||||||
await async_setup_component(hass, "http", config)
|
|
||||||
await async_setup_component(hass, "upnp", config)
|
await async_setup_component(hass, "upnp", config)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
@ -97,12 +109,13 @@ async def test_async_setup_entry_port_mapping(hass):
|
|||||||
|
|
||||||
config = {
|
config = {
|
||||||
"http": {},
|
"http": {},
|
||||||
"discovery": {},
|
"upnp": {
|
||||||
"upnp": {"port_mapping": True, "ports": {"hass": "hass"}},
|
"local_ip": "192.168.1.10",
|
||||||
|
"port_mapping": True,
|
||||||
|
"ports": {"hass": "hass"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
with MockDependency("netdisco.discovery"), patch(
|
with patch.object(Device, "async_create_device") as create_device, patch.object(
|
||||||
"homeassistant.components.upnp.get_local_ip", return_value="192.168.1.10"
|
|
||||||
), patch.object(Device, "async_create_device") as create_device, patch.object(
|
|
||||||
Device, "async_discover", return_value=mock_coro([])
|
Device, "async_discover", return_value=mock_coro([])
|
||||||
) as async_discover:
|
) as async_discover:
|
||||||
await async_setup_component(hass, "http", config)
|
await async_setup_component(hass, "http", config)
|
||||||
@ -124,7 +137,7 @@ async def test_async_setup_entry_port_mapping(hass):
|
|||||||
|
|
||||||
# ensure add-port-mapping-methods called
|
# ensure add-port-mapping-methods called
|
||||||
assert mock_device.added_port_mappings == [
|
assert mock_device.added_port_mappings == [
|
||||||
[8123, ip_address("192.168.1.10"), 8123]
|
[8123, IPv4Address("192.168.1.10"), 8123]
|
||||||
]
|
]
|
||||||
|
|
||||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user