diff --git a/.coveragerc b/.coveragerc index fdd36f55925..7941bd5708f 100644 --- a/.coveragerc +++ b/.coveragerc @@ -695,6 +695,7 @@ omit = homeassistant/components/sensor/haveibeenpwned.py homeassistant/components/sensor/hp_ilo.py homeassistant/components/sensor/htu21d.py + homeassistant/components/sensor/upnp.py homeassistant/components/sensor/imap_email_content.py homeassistant/components/sensor/imap.py homeassistant/components/sensor/influxdb.py @@ -781,7 +782,6 @@ omit = homeassistant/components/sensor/travisci.py homeassistant/components/sensor/twitch.py homeassistant/components/sensor/uber.py - homeassistant/components/sensor/upnp.py homeassistant/components/sensor/ups.py homeassistant/components/sensor/uscis.py homeassistant/components/sensor/vasttrafik.py diff --git a/CODEOWNERS b/CODEOWNERS index 4c75e764b20..6736b3abd51 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -82,7 +82,6 @@ homeassistant/components/sensor/sma.py @kellerza homeassistant/components/sensor/sql.py @dgomes homeassistant/components/sensor/sytadin.py @gautric homeassistant/components/sensor/tibber.py @danielhiversen -homeassistant/components/sensor/upnp.py @dgomes homeassistant/components/sensor/waqi.py @andrey-git homeassistant/components/switch/tplink.py @rytilahti homeassistant/components/vacuum/roomba.py @pschmitt diff --git a/homeassistant/components/discovery.py b/homeassistant/components/discovery.py index 9377db52609..0640eb262cd 100644 --- a/homeassistant/components/discovery.py +++ b/homeassistant/components/discovery.py @@ -50,6 +50,7 @@ CONFIG_ENTRY_HANDLERS = { SERVICE_HUE: 'hue', SERVICE_IKEA_TRADFRI: 'tradfri', 'sonos': 'sonos', + 'igd': 'upnp', } SERVICE_HANDLERS = { diff --git a/homeassistant/components/sensor/upnp.py b/homeassistant/components/sensor/upnp.py index d021312d15c..c05e2ce0ade 100644 --- a/homeassistant/components/sensor/upnp.py +++ b/homeassistant/components/sensor/upnp.py @@ -1,87 +1,268 @@ """ -Support for UPnP Sensors (IGD). +Support for UPnP/IGD Sensors. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/sensor.upnp/ """ +from datetime import datetime import logging -from homeassistant.components.upnp import DATA_UPNP, UNITS, CIC_SERVICE +from homeassistant.core import callback +from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.entity import Entity +from homeassistant.components.upnp.const import DOMAIN as DATA_UPNP +from homeassistant.components.upnp.const import SIGNAL_REMOVE_SENSOR + _LOGGER = logging.getLogger(__name__) DEPENDENCIES = ['upnp'] -BYTES_RECEIVED = 1 -BYTES_SENT = 2 -PACKETS_RECEIVED = 3 -PACKETS_SENT = 4 +BYTES_RECEIVED = 'bytes_received' +BYTES_SENT = 'bytes_sent' +PACKETS_RECEIVED = 'packets_received' +PACKETS_SENT = 'packets_sent' -# sensor_type: [friendly_name, convert_unit, icon] SENSOR_TYPES = { - BYTES_RECEIVED: ['received bytes', True, 'mdi:server-network'], - BYTES_SENT: ['sent bytes', True, 'mdi:server-network'], - PACKETS_RECEIVED: ['packets received', False, 'mdi:server-network'], - PACKETS_SENT: ['packets sent', False, 'mdi:server-network'], + BYTES_RECEIVED: { + 'name': 'bytes received', + 'unit': 'bytes', + }, + BYTES_SENT: { + 'name': 'bytes sent', + 'unit': 'bytes', + }, + PACKETS_RECEIVED: { + 'name': 'packets received', + 'unit': 'packets', + }, + PACKETS_SENT: { + 'name': 'packets sent', + 'unit': 'packets', + }, } +IN = 'received' +OUT = 'sent' +KBYTE = 1024 + async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): - """Set up the IGD sensors.""" - if discovery_info is None: - return - - device = hass.data[DATA_UPNP] - service = device.find_first_service(CIC_SERVICE) - unit = discovery_info['unit'] - async_add_entities([ - IGDSensor(service, t, unit if SENSOR_TYPES[t][1] else '#') - for t in SENSOR_TYPES], True) + """Old way of setting up UPnP/IGD sensors.""" + _LOGGER.debug('async_setup_platform: config: %s, discovery: %s', + config, discovery_info) -class IGDSensor(Entity): - """Representation of a UPnP IGD sensor.""" +async def async_setup_entry(hass, config_entry, async_add_entities): + """Set up the UPnP/IGD sensor.""" + @callback + def async_add_sensor(device): + """Add sensors from UPnP/IGD device.""" + # raw sensors + per-second sensors + sensors = [ + RawUPnPIGDSensor(device, name, sensor_type) + for name, sensor_type in SENSOR_TYPES.items() + ] + sensors += [ + KBytePerSecondUPnPIGDSensor(device, IN), + KBytePerSecondUPnPIGDSensor(device, OUT), + PacketsPerSecondUPnPIGDSensor(device, IN), + PacketsPerSecondUPnPIGDSensor(device, OUT), + ] + async_add_entities(sensors, True) - def __init__(self, service, sensor_type, unit=None): - """Initialize the IGD sensor.""" - self._service = service - self.type = sensor_type - self.unit = unit - self.unit_factor = UNITS[unit] if unit in UNITS else 1 - self._name = 'IGD {}'.format(SENSOR_TYPES[sensor_type][0]) + data = config_entry.data + udn = data['udn'] + device = hass.data[DATA_UPNP]['devices'][udn] + async_add_sensor(device) + + +class UpnpSensor(Entity): + """Base class for UPnP/IGD sensors.""" + + def __init__(self, device): + """Initialize the base sensor.""" + self._device = device + + async def async_added_to_hass(self): + """Subscribe to sensors events.""" + async_dispatcher_connect(self.hass, + SIGNAL_REMOVE_SENSOR, + self._upnp_remove_sensor) + + @callback + def _upnp_remove_sensor(self, device): + """Remove sensor.""" + if self._device != device: + # not for us + return + + self.hass.async_create_task(self.async_remove()) + + +class RawUPnPIGDSensor(UpnpSensor): + """Representation of a UPnP/IGD sensor.""" + + def __init__(self, device, sensor_type_name, sensor_type): + """Initialize the UPnP/IGD sensor.""" + super().__init__(device) + self._type_name = sensor_type_name + self._type = sensor_type + self._name = '{} {}'.format(device.name, sensor_type['name']) self._state = None @property - def name(self): + def name(self) -> str: """Return the name of the sensor.""" return self._name @property - def state(self): + def unique_id(self) -> str: + """Return an unique ID.""" + return '{}_{}'.format(self._device.udn, self._type_name) + + @property + def state(self) -> str: """Return the state of the device.""" - if self._state: - return format(float(self._state) / self.unit_factor, '.1f') - return self._state + return format(self._state, 'd') @property - def icon(self): + def icon(self) -> str: """Icon to use in the frontend, if any.""" - return SENSOR_TYPES[self.type][2] + return 'mdi:server-network' @property - def unit_of_measurement(self): + def unit_of_measurement(self) -> str: """Return the unit of measurement of this entity, if any.""" - return self.unit + return self._type['unit'] async def async_update(self): """Get the latest information from the IGD.""" - if self.type == BYTES_RECEIVED: - self._state = await self._service.get_total_bytes_received() - elif self.type == BYTES_SENT: - self._state = await self._service.get_total_bytes_sent() - elif self.type == PACKETS_RECEIVED: - self._state = await self._service.get_total_packets_received() - elif self.type == PACKETS_SENT: - self._state = await self._service.get_total_packets_sent() + if self._type_name == BYTES_RECEIVED: + self._state = await self._device.async_get_total_bytes_received() + elif self._type_name == BYTES_SENT: + self._state = await self._device.async_get_total_bytes_sent() + elif self._type_name == PACKETS_RECEIVED: + self._state = await self._device.async_get_total_packets_received() + elif self._type_name == PACKETS_SENT: + self._state = await self._device.async_get_total_packets_sent() + + +class PerSecondUPnPIGDSensor(UpnpSensor): + """Abstract representation of a X Sent/Received per second sensor.""" + + def __init__(self, device, direction): + """Initializer.""" + super().__init__(device) + self._direction = direction + + self._state = None + self._last_value = None + self._last_update_time = None + + @property + def unit(self) -> str: + """Get unit we are measuring in.""" + raise NotImplementedError() + + @property + def _async_fetch_value(self): + """Fetch a value from the IGD.""" + raise NotImplementedError() + + @property + def unique_id(self) -> str: + """Return an unique ID.""" + return '{}_{}/sec_{}'.format(self._device.udn, + self.unit, + self._direction) + + @property + def name(self) -> str: + """Return the name of the sensor.""" + return '{} {}/sec {}'.format(self._device.name, + self.unit, + self._direction) + + @property + def icon(self) -> str: + """Icon to use in the frontend, if any.""" + return 'mdi:server-network' + + @property + def unit_of_measurement(self) -> str: + """Return the unit of measurement of this entity, if any.""" + return '{}/sec'.format(self.unit) + + def _is_overflowed(self, new_value) -> bool: + """Check if value has overflowed.""" + return new_value < self._last_value + + async def async_update(self): + """Get the latest information from the UPnP/IGD.""" + new_value = await self._async_fetch_value() + + if self._last_value is None: + self._last_value = new_value + self._last_update_time = datetime.now() + return + + now = datetime.now() + if self._is_overflowed(new_value): + self._state = None # temporarily report nothing + else: + delta_time = (now - self._last_update_time).seconds + delta_value = new_value - self._last_value + self._state = (delta_value / delta_time) + + self._last_value = new_value + self._last_update_time = now + + +class KBytePerSecondUPnPIGDSensor(PerSecondUPnPIGDSensor): + """Representation of a KBytes Sent/Received per second sensor.""" + + @property + def unit(self) -> str: + """Get unit we are measuring in.""" + return 'kbyte' + + async def _async_fetch_value(self) -> float: + """Fetch value from device.""" + if self._direction == IN: + return await self._device.async_get_total_bytes_received() + + return await self._device.async_get_total_bytes_sent() + + @property + def state(self) -> str: + """Return the state of the device.""" + if self._state is None: + return None + + return format(float(self._state / KBYTE), '.1f') + + +class PacketsPerSecondUPnPIGDSensor(PerSecondUPnPIGDSensor): + """Representation of a Packets Sent/Received per second sensor.""" + + @property + def unit(self) -> str: + """Get unit we are measuring in.""" + return 'packets' + + async def _async_fetch_value(self) -> float: + """Fetch value from device.""" + if self._direction == IN: + return await self._device.async_get_total_packets_received() + + return await self._device.async_get_total_packets_sent() + + @property + def state(self) -> str: + """Return the state of the device.""" + if self._state is None: + return None + + return format(float(self._state), '.1f') diff --git a/homeassistant/components/upnp.py b/homeassistant/components/upnp.py deleted file mode 100644 index 2bf0572d498..00000000000 --- a/homeassistant/components/upnp.py +++ /dev/null @@ -1,144 +0,0 @@ -""" -Will open a port in your router for Home Assistant and provide statistics. - -For more details about this component, please refer to the documentation at -https://home-assistant.io/components/upnp/ -""" -from ipaddress import ip_address -import logging -import asyncio - -import voluptuous as vol - -from homeassistant.const import (EVENT_HOMEASSISTANT_STOP) -from homeassistant.helpers import config_validation as cv -from homeassistant.helpers import discovery -from homeassistant.util import get_local_ip - -REQUIREMENTS = ['pyupnp-async==0.1.1.1'] -DEPENDENCIES = ['http'] - -_LOGGER = logging.getLogger(__name__) - -DEPENDENCIES = ['api'] -DOMAIN = 'upnp' - -DATA_UPNP = 'upnp_device' - -CONF_LOCAL_IP = 'local_ip' -CONF_ENABLE_PORT_MAPPING = 'port_mapping' -CONF_PORTS = 'ports' -CONF_UNITS = 'unit' -CONF_HASS = 'hass' - -NOTIFICATION_ID = 'upnp_notification' -NOTIFICATION_TITLE = 'UPnP Setup' - -IGD_DEVICE = 'urn:schemas-upnp-org:device:InternetGatewayDevice:1' -PPP_SERVICE = 'urn:schemas-upnp-org:service:WANPPPConnection:1' -IP_SERVICE = 'urn:schemas-upnp-org:service:WANIPConnection:1' -IP_SERVICE2 = 'urn:schemas-upnp-org:service:WANIPConnection:2' -CIC_SERVICE = 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1' - -UNITS = { - "Bytes": 1, - "KBytes": 1024, - "MBytes": 1024**2, - "GBytes": 1024**3, -} - -CONFIG_SCHEMA = vol.Schema({ - DOMAIN: vol.Schema({ - vol.Optional(CONF_ENABLE_PORT_MAPPING, default=False): cv.boolean, - vol.Optional(CONF_UNITS, default="MBytes"): vol.In(UNITS), - vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string), - vol.Optional(CONF_PORTS): - vol.Schema({vol.Any(CONF_HASS, cv.positive_int): cv.positive_int}) - }), -}, extra=vol.ALLOW_EXTRA) - - -async def async_setup(hass, config): - """Register a port mapping for Home Assistant via UPnP.""" - config = config[DOMAIN] - host = config.get(CONF_LOCAL_IP) - - if host is None: - host = get_local_ip() - - if host == '127.0.0.1': - _LOGGER.error( - 'Unable to determine local IP. Add it to your configuration.') - return False - - import pyupnp_async - from pyupnp_async.error import UpnpSoapError - - service = None - resp = await pyupnp_async.msearch_first(search_target=IGD_DEVICE) - if not resp: - return False - - try: - device = await resp.get_device() - hass.data[DATA_UPNP] = device - for _service in device.services: - if _service['serviceType'] == PPP_SERVICE: - service = device.find_first_service(PPP_SERVICE) - if _service['serviceType'] == IP_SERVICE: - service = device.find_first_service(IP_SERVICE) - if _service['serviceType'] == IP_SERVICE2: - service = device.find_first_service(IP_SERVICE2) - if _service['serviceType'] == CIC_SERVICE: - unit = config[CONF_UNITS] - hass.async_create_task(discovery.async_load_platform( - hass, 'sensor', DOMAIN, {'unit': unit}, config)) - except UpnpSoapError as error: - _LOGGER.error(error) - return False - - if not service: - _LOGGER.warning("Could not find any UPnP IGD") - return False - - port_mapping = config[CONF_ENABLE_PORT_MAPPING] - if not port_mapping: - return True - - internal_port = hass.http.server_port - - ports = config.get(CONF_PORTS) - if ports is None: - ports = {CONF_HASS: internal_port} - - registered = [] - for internal, external in ports.items(): - if internal == CONF_HASS: - internal = internal_port - try: - await service.add_port_mapping(internal, external, host, 'TCP', - desc='Home Assistant') - registered.append(external) - _LOGGER.debug("Mapping external TCP port %s -> %s @ %s", - external, internal, host) - except UpnpSoapError as error: - _LOGGER.error(error) - hass.components.persistent_notification.create( - 'ERROR: tcp port {} is already mapped in your router.' - '
Please disable port_mapping in the upnp ' - 'configuration section.
' - 'You will need to restart hass after fixing.' - ''.format(external), - title=NOTIFICATION_TITLE, - notification_id=NOTIFICATION_ID) - - async def deregister_port(event): - """De-register the UPnP port mapping.""" - tasks = [service.delete_port_mapping(external, 'TCP') - for external in registered] - if tasks: - await asyncio.wait(tasks) - - hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, deregister_port) - - return True diff --git a/homeassistant/components/upnp/.translations/en.json b/homeassistant/components/upnp/.translations/en.json new file mode 100644 index 00000000000..682150b7ddd --- /dev/null +++ b/homeassistant/components/upnp/.translations/en.json @@ -0,0 +1,25 @@ +{ + "config": { + "title": "UPnP/IGD", + "step": { + "init": { + "title": "UPnP/IGD" + }, + "user": { + "title": "Configuration options for the UPnP/IGD", + "data":{ + "igd": "UPnP/IGD", + "enable_sensors": "Add traffic sensors", + "enable_port_mapping": "Enable port mapping for Home Assistant" + } + } + }, + "error": { + }, + "abort": { + "no_devices_discovered": "No UPnP/IGDs discovered", + "already_configured": "UPnP/IGD is already configured", + "no_sensors_or_port_mapping": "Enable at least sensors or port mapping" + } + } +} diff --git a/homeassistant/components/upnp/.translations/nl.json b/homeassistant/components/upnp/.translations/nl.json new file mode 100644 index 00000000000..55a94a8ea6f --- /dev/null +++ b/homeassistant/components/upnp/.translations/nl.json @@ -0,0 +1,25 @@ +{ + "config": { + "title": "UPnP/IGD", + "step": { + "init": { + "title": "UPnP/IGD" + }, + "user": { + "title": "Extra configuratie options voor UPnP/IGD", + "data":{ + "igd": "UPnP/IGD", + "enable_sensors": "Verkeer sensors toevoegen", + "enable_port_mapping": "Maak port mapping voor Home Assistant" + } + } + }, + "error": { + }, + "abort": { + "no_devices_discovered": "Geen UPnP/IGDs gevonden", + "already_configured": "UPnP/IGD is reeds geconfigureerd", + "no_sensors_or_port_mapping": "Kies ten minste sensors of port mapping" + } + } +} diff --git a/homeassistant/components/upnp/__init__.py b/homeassistant/components/upnp/__init__.py new file mode 100644 index 00000000000..f70fbcc4d20 --- /dev/null +++ b/homeassistant/components/upnp/__init__.py @@ -0,0 +1,169 @@ +""" +Will open a port in your router for Home Assistant and provide statistics. + +For more details about this component, please refer to the documentation at +https://home-assistant.io/components/upnp/ +""" +import asyncio +from ipaddress import ip_address + +import aiohttp +import voluptuous as vol + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.helpers import config_validation as cv +from homeassistant.helpers import dispatcher +from homeassistant.helpers.typing import ConfigType +from homeassistant.helpers.typing import HomeAssistantType +from homeassistant.components.discovery import DOMAIN as DISCOVERY_DOMAIN + +from .const import ( + CONF_ENABLE_PORT_MAPPING, CONF_ENABLE_SENSORS, + CONF_HASS, CONF_LOCAL_IP, CONF_PORTS, + CONF_UDN, CONF_SSDP_DESCRIPTION, + SIGNAL_REMOVE_SENSOR, +) +from .const import DOMAIN +from .const import LOGGER as _LOGGER +from .config_flow import ensure_domain_data +from .device import Device + + +REQUIREMENTS = ['async-upnp-client==0.12.4'] +DEPENDENCIES = ['http'] + +NOTIFICATION_ID = 'upnp_notification' +NOTIFICATION_TITLE = 'UPnP/IGD Setup' + +CONFIG_SCHEMA = vol.Schema({ + DOMAIN: vol.Schema({ + vol.Optional(CONF_ENABLE_PORT_MAPPING, default=False): cv.boolean, + vol.Optional(CONF_ENABLE_SENSORS, default=True): cv.boolean, + vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string), + vol.Optional(CONF_PORTS): + vol.Schema({ + vol.Any(CONF_HASS, cv.positive_int): + vol.Any(CONF_HASS, cv.positive_int) + }) + }), +}, extra=vol.ALLOW_EXTRA) + + +def _substitute_hass_ports(ports, hass_port): + """Substitute 'hass' for the hass_port.""" + ports = ports.copy() + + # substitute 'hass' for hass_port, both keys and values + if CONF_HASS in ports: + ports[hass_port] = ports[CONF_HASS] + del ports[CONF_HASS] + + for port in ports: + if ports[port] == CONF_HASS: + ports[port] = hass_port + + return ports + + +# config +async def async_setup(hass: HomeAssistantType, config: ConfigType): + """Register a port mapping for Home Assistant via UPnP.""" + ensure_domain_data(hass) + + # ensure sane config + if DOMAIN not in config: + return True + + if DISCOVERY_DOMAIN not in config: + _LOGGER.warning('UPNP needs discovery, please enable it') + return False + + # overridden local ip + upnp_config = config[DOMAIN] + if CONF_LOCAL_IP in upnp_config: + hass.data[DOMAIN]['local_ip'] = upnp_config[CONF_LOCAL_IP] + + # determine ports + ports = {CONF_HASS: CONF_HASS} # default, port_mapping disabled by default + if CONF_PORTS in upnp_config: + # copy from config + ports = upnp_config[CONF_PORTS] + + hass.data[DOMAIN]['auto_config'] = { + 'active': True, + 'enable_sensors': upnp_config[CONF_ENABLE_SENSORS], + 'enable_port_mapping': upnp_config[CONF_ENABLE_PORT_MAPPING], + 'ports': ports, + } + + return True + + +# config flow +async def async_setup_entry(hass: HomeAssistantType, + config_entry: ConfigEntry): + """Set up UPnP/IGD-device from a config entry.""" + ensure_domain_data(hass) + data = config_entry.data + + # build UPnP/IGD device + ssdp_description = data[CONF_SSDP_DESCRIPTION] + try: + device = await Device.async_create_device(hass, ssdp_description) + except (asyncio.TimeoutError, aiohttp.ClientError): + _LOGGER.error('Unable to create upnp-device') + return False + + hass.data[DOMAIN]['devices'][device.udn] = device + + # port mapping + if data.get(CONF_ENABLE_PORT_MAPPING): + local_ip = hass.data[DOMAIN].get('local_ip') + ports = hass.data[DOMAIN]['auto_config']['ports'] + _LOGGER.debug('Enabling port mappings: %s', ports) + + hass_port = hass.http.server_port + ports = _substitute_hass_ports(ports, hass_port) + await device.async_add_port_mappings(ports, local_ip=local_ip) + + # sensors + if data.get(CONF_ENABLE_SENSORS): + _LOGGER.debug('Enabling sensors') + + # register sensor setup handlers + hass.async_create_task(hass.config_entries.async_forward_entry_setup( + config_entry, 'sensor')) + + async def unload_entry(event): + """Unload entry on quit.""" + await async_unload_entry(hass, config_entry) + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, unload_entry) + + return True + + +async def async_unload_entry(hass: HomeAssistantType, + config_entry: ConfigEntry): + """Unload a config entry.""" + data = config_entry.data + udn = data[CONF_UDN] + + if udn not in hass.data[DOMAIN]['devices']: + return True + device = hass.data[DOMAIN]['devices'][udn] + + # port mapping + if data.get(CONF_ENABLE_PORT_MAPPING): + _LOGGER.debug('Deleting port mappings') + await device.async_delete_port_mappings() + + # sensors + if data.get(CONF_ENABLE_SENSORS): + _LOGGER.debug('Deleting sensors') + dispatcher.async_dispatcher_send(hass, SIGNAL_REMOVE_SENSOR, device) + + # clear stored device + del hass.data[DOMAIN]['devices'][udn] + + return True diff --git a/homeassistant/components/upnp/config_flow.py b/homeassistant/components/upnp/config_flow.py new file mode 100644 index 00000000000..885f2f64211 --- /dev/null +++ b/homeassistant/components/upnp/config_flow.py @@ -0,0 +1,160 @@ +"""Config flow for UPNP.""" +from collections import OrderedDict + +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant import data_entry_flow + +from .const import ( + CONF_ENABLE_PORT_MAPPING, CONF_ENABLE_SENSORS, + CONF_SSDP_DESCRIPTION, CONF_UDN +) +from .const import DOMAIN + + +def ensure_domain_data(hass): + """Ensure hass.data is filled properly.""" + hass.data[DOMAIN] = hass.data.get(DOMAIN, {}) + hass.data[DOMAIN]['devices'] = hass.data[DOMAIN].get('devices', {}) + hass.data[DOMAIN]['discovered'] = hass.data[DOMAIN].get('discovered', {}) + hass.data[DOMAIN]['auto_config'] = hass.data[DOMAIN].get('auto_config', { + 'active': False, + 'enable_sensors': False, + 'enable_port_mapping': False, + 'ports': {'hass': 'hass'}, + }) + + +@config_entries.HANDLERS.register(DOMAIN) +class UpnpFlowHandler(data_entry_flow.FlowHandler): + """Handle a UPnP/IGD config flow.""" + + VERSION = 1 + CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL + + @property + def _configured_upnp_igds(self): + """Get all configured IGDs.""" + return { + entry.data[CONF_UDN]: { + 'udn': entry.data[CONF_UDN], + } + for entry in self.hass.config_entries.async_entries(DOMAIN) + } + + @property + def _discovered_upnp_igds(self): + """Get all discovered entries.""" + return self.hass.data[DOMAIN]['discovered'] + + def _store_discovery_info(self, discovery_info): + """Add discovery info.""" + udn = discovery_info['udn'] + self.hass.data[DOMAIN]['discovered'][udn] = discovery_info + + def _auto_config_settings(self): + """Check if auto_config has been enabled.""" + return self.hass.data[DOMAIN]['auto_config'] + + async def async_step_discovery(self, discovery_info): + """ + Handle a discovered UPnP/IGD. + + This flow is triggered by the discovery component. It will check if the + host is already configured and delegate to the import step if not. + """ + ensure_domain_data(self.hass) + + # store discovered device + discovery_info['friendly_name'] = \ + '{} ({})'.format(discovery_info['host'], discovery_info['name']) + self._store_discovery_info(discovery_info) + + # ensure not already discovered/configured + udn = discovery_info['udn'] + if udn in self._configured_upnp_igds: + return self.async_abort(reason='already_configured') + + # auto config? + auto_config = self._auto_config_settings() + if auto_config['active']: + import_info = { + 'name': discovery_info['friendly_name'], + 'enable_sensors': auto_config['enable_sensors'], + 'enable_port_mapping': auto_config['enable_port_mapping'], + } + + return await self._async_save_entry(import_info) + + return await self.async_step_user() + + async def async_step_user(self, user_input=None): + """Manual set up.""" + ensure_domain_data(self.hass) + + # if user input given, handle it + user_input = user_input or {} + if 'name' in user_input: + if not user_input['enable_sensors'] and \ + not user_input['enable_port_mapping']: + return self.async_abort(reason='no_sensors_or_port_mapping') + + # ensure not already configured + configured_names = [ + entry['friendly_name'] + for udn, entry in self._discovered_upnp_igds.items() + if udn in self._configured_upnp_igds + ] + if user_input['name'] in configured_names: + return self.async_abort(reason='already_configured') + + return await self._async_save_entry(user_input) + + # let user choose from all discovered, non-configured, UPnP/IGDs + names = [ + entry['friendly_name'] + for udn, entry in self._discovered_upnp_igds.items() + if udn not in self._configured_upnp_igds + ] + if not names: + return self.async_abort(reason='no_devices_discovered') + + return self.async_show_form( + step_id='user', + data_schema=vol.Schema( + OrderedDict([ + (vol.Required('name'), vol.In(names)), + (vol.Optional('enable_sensors'), bool), + (vol.Optional('enable_port_mapping'), bool), + ]) + )) + + async def async_step_import(self, import_info): + """Import a new UPnP/IGD as a config entry.""" + ensure_domain_data(self.hass) + + return await self._async_save_entry(import_info) + + async def _async_save_entry(self, import_info): + """Store UPNP/IGD as new entry.""" + ensure_domain_data(self.hass) + + # ensure we know the host + name = import_info['name'] + discovery_infos = [info + for info in self._discovered_upnp_igds.values() + if info['friendly_name'] == name] + if not discovery_infos: + return self.async_abort(reason='host_not_found') + discovery_info = discovery_infos[0] + + return self.async_create_entry( + title=discovery_info['name'], + data={ + CONF_SSDP_DESCRIPTION: discovery_info['ssdp_description'], + CONF_UDN: discovery_info['udn'], + CONF_ENABLE_SENSORS: import_info['enable_sensors'], + CONF_ENABLE_PORT_MAPPING: import_info['enable_port_mapping'], + }, + ) diff --git a/homeassistant/components/upnp/const.py b/homeassistant/components/upnp/const.py new file mode 100644 index 00000000000..7a906ae02be --- /dev/null +++ b/homeassistant/components/upnp/const.py @@ -0,0 +1,14 @@ +"""Constants for the IGD component.""" +import logging + + +CONF_ENABLE_PORT_MAPPING = 'port_mapping' +CONF_ENABLE_SENSORS = 'sensors' +CONF_HASS = 'hass' +CONF_LOCAL_IP = 'local_ip' +CONF_PORTS = 'ports' +CONF_SSDP_DESCRIPTION = 'ssdp_description' +CONF_UDN = 'udn' +DOMAIN = 'upnp' +LOGGER = logging.getLogger('homeassistant.components.upnp') +SIGNAL_REMOVE_SENSOR = 'upnp_remove_sensor' diff --git a/homeassistant/components/upnp/device.py b/homeassistant/components/upnp/device.py new file mode 100644 index 00000000000..4a444aa3087 --- /dev/null +++ b/homeassistant/components/upnp/device.py @@ -0,0 +1,131 @@ +"""Hass representation of an UPnP/IGD.""" +import asyncio +from ipaddress import IPv4Address + +import aiohttp + +from homeassistant.helpers.aiohttp_client import async_get_clientsession +from homeassistant.helpers.typing import HomeAssistantType +from homeassistant.util import get_local_ip + +from .const import LOGGER as _LOGGER + + +class Device: + """Hass representation of an UPnP/IGD.""" + + def __init__(self, igd_device): + """Initializer.""" + self._igd_device = igd_device + self._mapped_ports = [] + + @classmethod + async def async_create_device(cls, + hass: HomeAssistantType, + ssdp_description: str): + """Create UPnP/IGD device.""" + # build async_upnp_client requester + from async_upnp_client.aiohttp import AiohttpSessionRequester + session = async_get_clientsession(hass) + requester = AiohttpSessionRequester(session, True) + + # create async_upnp_client device + from async_upnp_client import UpnpFactory + factory = UpnpFactory(requester, + disable_state_variable_validation=True) + upnp_device = await factory.async_create_device(ssdp_description) + + # wrap with async_upnp_client IgdDevice + from async_upnp_client.igd import IgdDevice + igd_device = IgdDevice(upnp_device, None) + + return cls(igd_device) + + @property + def udn(self): + """Get the UDN.""" + return self._igd_device.udn + + @property + def name(self): + """Get the name.""" + return self._igd_device.name + + async def async_add_port_mappings(self, ports, local_ip=None): + """Add port mappings.""" + # determine local ip, ensure sane IP + if local_ip is None: + local_ip = get_local_ip() + + if local_ip == '127.0.0.1': + _LOGGER.error( + 'Could not create port mapping, our IP is 127.0.0.1') + local_ip = IPv4Address(local_ip) + + # create port mappings + for external_port, internal_port in ports.items(): + await self._async_add_port_mapping(external_port, + local_ip, + internal_port) + self._mapped_ports.append(external_port) + + async def _async_add_port_mapping(self, + external_port, + local_ip, + internal_port): + """Add a port mapping.""" + # create port mapping + from async_upnp_client import UpnpError + _LOGGER.info('Creating port mapping %s:%s:%s (TCP)', + external_port, local_ip, internal_port) + try: + await self._igd_device.async_add_port_mapping( + remote_host=None, + external_port=external_port, + protocol='TCP', + internal_port=internal_port, + internal_client=local_ip, + enabled=True, + description="Home Assistant", + lease_duration=None) + + self._mapped_ports.append(external_port) + except (asyncio.TimeoutError, aiohttp.ClientError, UpnpError): + _LOGGER.error('Could not add port mapping: %s:%s:%s', + external_port, local_ip, internal_port) + + async def async_delete_port_mappings(self): + """Remove a port mapping.""" + for port in self._mapped_ports: + await self._async_delete_port_mapping(port) + + async def _async_delete_port_mapping(self, external_port): + """Remove a port mapping.""" + from async_upnp_client import UpnpError + _LOGGER.info('Deleting port mapping %s (TCP)', external_port) + try: + await self._igd_device.async_delete_port_mapping( + remote_host=None, + external_port=external_port, + protocol='TCP') + + self._mapped_ports.remove(external_port) + except (asyncio.TimeoutError, aiohttp.ClientError, UpnpError): + _LOGGER.error('Could not delete port mapping') + + async def async_get_total_bytes_received(self): + """Get total bytes received.""" + return await self._igd_device.async_get_total_bytes_received() + + async def async_get_total_bytes_sent(self): + """Get total bytes sent.""" + return await self._igd_device.async_get_total_bytes_sent() + + async def async_get_total_packets_received(self): + """Get total packets received.""" + # pylint: disable=invalid-name + return await self._igd_device.async_get_total_packets_received() + + async def async_get_total_packets_sent(self): + """Get total packets sent.""" + return await self._igd_device.async_get_total_packets_sent() diff --git a/homeassistant/components/upnp/strings.json b/homeassistant/components/upnp/strings.json new file mode 100644 index 00000000000..682150b7ddd --- /dev/null +++ b/homeassistant/components/upnp/strings.json @@ -0,0 +1,25 @@ +{ + "config": { + "title": "UPnP/IGD", + "step": { + "init": { + "title": "UPnP/IGD" + }, + "user": { + "title": "Configuration options for the UPnP/IGD", + "data":{ + "igd": "UPnP/IGD", + "enable_sensors": "Add traffic sensors", + "enable_port_mapping": "Enable port mapping for Home Assistant" + } + } + }, + "error": { + }, + "abort": { + "no_devices_discovered": "No UPnP/IGDs discovered", + "already_configured": "UPnP/IGD is already configured", + "no_sensors_or_port_mapping": "Enable at least sensors or port mapping" + } + } +} diff --git a/homeassistant/config_entries.py b/homeassistant/config_entries.py index fcc8a1f92ac..f3649aca453 100644 --- a/homeassistant/config_entries.py +++ b/homeassistant/config_entries.py @@ -149,6 +149,7 @@ FLOWS = [ 'sonos', 'tradfri', 'zone', + 'upnp', ] diff --git a/requirements_all.txt b/requirements_all.txt index bf76a78541d..0e610dcc521 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -145,6 +145,7 @@ aqualogic==1.0 # homeassistant.components.asterisk_mbox asterisk_mbox==0.5.0 +# homeassistant.components.upnp # homeassistant.components.media_player.dlna_dmr async-upnp-client==0.12.4 @@ -1224,9 +1225,6 @@ pytrafikverket==0.1.5.8 # homeassistant.components.device_tracker.unifi pyunifi==2.13 -# homeassistant.components.upnp -pyupnp-async==0.1.1.1 - # homeassistant.components.binary_sensor.uptimerobot pyuptimerobot==0.0.5 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index e01cddc7243..b5f83b5a55e 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -190,9 +190,6 @@ pytradfri[async]==5.6.0 # homeassistant.components.device_tracker.unifi pyunifi==2.13 -# homeassistant.components.upnp -pyupnp-async==0.1.1.1 - # homeassistant.components.notify.html5 pywebpush==1.6.0 diff --git a/tests/components/test_upnp.py b/tests/components/test_upnp.py deleted file mode 100644 index 6089e6859f2..00000000000 --- a/tests/components/test_upnp.py +++ /dev/null @@ -1,183 +0,0 @@ -"""Test the UPNP component.""" -from collections import OrderedDict -from unittest.mock import patch, MagicMock - -import pytest - -from homeassistant.const import EVENT_HOMEASSISTANT_STOP -from homeassistant.setup import async_setup_component -from homeassistant.components.upnp import IP_SERVICE, DATA_UPNP - - -class MockService(MagicMock): - """Mock upnp IP service.""" - - async def add_port_mapping(self, *args, **kwargs): - """Original function.""" - self.mock_add_port_mapping(*args, **kwargs) - - async def delete_port_mapping(self, *args, **kwargs): - """Original function.""" - self.mock_delete_port_mapping(*args, **kwargs) - - -class MockDevice(MagicMock): - """Mock upnp device.""" - - def find_first_service(self, *args, **kwargs): - """Original function.""" - self._service = MockService() - return self._service - - def peep_first_service(self): - """Access Mock first service.""" - return self._service - - -class MockResp(MagicMock): - """Mock upnp msearch response.""" - - async def get_device(self, *args, **kwargs): - """Original function.""" - device = MockDevice() - service = {'serviceType': IP_SERVICE} - device.services = [service] - return device - - -@pytest.fixture -def mock_msearch_first(*args, **kwargs): - """Wrap async mock msearch_first.""" - async def async_mock_msearch_first(*args, **kwargs): - """Mock msearch_first.""" - return MockResp(*args, **kwargs) - - with patch('pyupnp_async.msearch_first', new=async_mock_msearch_first): - yield - - -@pytest.fixture -def mock_async_exception(*args, **kwargs): - """Wrap async mock exception.""" - async def async_mock_exception(*args, **kwargs): - return Exception - - with patch('pyupnp_async.msearch_first', new=async_mock_exception): - yield - - -@pytest.fixture -def mock_local_ip(): - """Mock get_local_ip.""" - with patch('homeassistant.components.upnp.get_local_ip', - return_value='192.168.0.10'): - yield - - -async def test_setup_fail_if_no_ip(hass): - """Test setup fails if we can't find a local IP.""" - with patch('homeassistant.components.upnp.get_local_ip', - return_value='127.0.0.1'): - result = await async_setup_component(hass, 'upnp', { - 'upnp': {} - }) - - assert not result - - -async def test_setup_fail_if_cannot_select_igd(hass, - mock_local_ip, - mock_async_exception): - """Test setup fails if we can't find an UPnP IGD.""" - result = await async_setup_component(hass, 'upnp', { - 'upnp': {} - }) - - assert not result - - -async def test_setup_succeeds_if_specify_ip(hass, mock_msearch_first): - """Test setup succeeds if we specify IP and can't find a local IP.""" - with patch('homeassistant.components.upnp.get_local_ip', - return_value='127.0.0.1'): - result = await async_setup_component(hass, 'upnp', { - 'upnp': { - 'local_ip': '192.168.0.10', - 'port_mapping': 'True' - } - }) - - assert result - mock_service = hass.data[DATA_UPNP].peep_first_service() - assert len(mock_service.mock_add_port_mapping.mock_calls) == 1 - mock_service.mock_add_port_mapping.assert_called_once_with( - 8123, 8123, '192.168.0.10', 'TCP', desc='Home Assistant') - - -async def test_no_config_maps_hass_local_to_remote_port(hass, - mock_local_ip, - mock_msearch_first): - """Test by default we map local to remote port.""" - result = await async_setup_component(hass, 'upnp', { - 'upnp': { - 'port_mapping': 'True' - } - }) - - assert result - mock_service = hass.data[DATA_UPNP].peep_first_service() - assert len(mock_service.mock_add_port_mapping.mock_calls) == 1 - mock_service.mock_add_port_mapping.assert_called_once_with( - 8123, 8123, '192.168.0.10', 'TCP', desc='Home Assistant') - - -async def test_map_hass_to_remote_port(hass, - mock_local_ip, - mock_msearch_first): - """Test mapping hass to remote port.""" - result = await async_setup_component(hass, 'upnp', { - 'upnp': { - 'port_mapping': 'True', - 'ports': { - 'hass': 1000 - } - } - }) - - assert result - mock_service = hass.data[DATA_UPNP].peep_first_service() - assert len(mock_service.mock_add_port_mapping.mock_calls) == 1 - mock_service.mock_add_port_mapping.assert_called_once_with( - 8123, 1000, '192.168.0.10', 'TCP', desc='Home Assistant') - - -async def test_map_internal_to_remote_ports(hass, - mock_local_ip, - mock_msearch_first): - """Test mapping local to remote ports.""" - ports = OrderedDict() - ports['hass'] = 1000 - ports[1883] = 3883 - - result = await async_setup_component(hass, 'upnp', { - 'upnp': { - 'port_mapping': 'True', - 'ports': ports - } - }) - - assert result - mock_service = hass.data[DATA_UPNP].peep_first_service() - assert len(mock_service.mock_add_port_mapping.mock_calls) == 2 - - mock_service.mock_add_port_mapping.assert_any_call( - 8123, 1000, '192.168.0.10', 'TCP', desc='Home Assistant') - mock_service.mock_add_port_mapping.assert_any_call( - 1883, 3883, '192.168.0.10', 'TCP', desc='Home Assistant') - - hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) - await hass.async_block_till_done() - assert len(mock_service.mock_delete_port_mapping.mock_calls) == 2 - - mock_service.mock_delete_port_mapping.assert_any_call(1000, 'TCP') - mock_service.mock_delete_port_mapping.assert_any_call(3883, 'TCP') diff --git a/tests/components/upnp/__init__.py b/tests/components/upnp/__init__.py new file mode 100644 index 00000000000..4fcc4167e5b --- /dev/null +++ b/tests/components/upnp/__init__.py @@ -0,0 +1 @@ +"""Tests for the IGD component.""" diff --git a/tests/components/upnp/test_config_flow.py b/tests/components/upnp/test_config_flow.py new file mode 100644 index 00000000000..3ff1316975f --- /dev/null +++ b/tests/components/upnp/test_config_flow.py @@ -0,0 +1,240 @@ +"""Tests for UPnP/IGD config flow.""" + +from homeassistant.components import upnp +from homeassistant.components.upnp import config_flow as upnp_config_flow + +from tests.common import MockConfigEntry + + +async def test_flow_none_discovered(hass): + """Test no device discovered flow.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + hass.data[upnp.DOMAIN] = { + 'discovered': {} + } + + result = await flow.async_step_user() + assert result['type'] == 'abort' + assert result['reason'] == 'no_devices_discovered' + + +async def test_flow_already_configured(hass): + """Test device already configured flow.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + + # discovered device + udn = 'uuid:device_1' + hass.data[upnp.DOMAIN] = { + 'discovered': { + udn: { + 'friendly_name': '192.168.1.1 (Test device)', + 'host': '192.168.1.1', + 'udn': udn, + }, + }, + } + + # configured entry + MockConfigEntry(domain=upnp.DOMAIN, data={ + 'udn': udn, + 'host': '192.168.1.1', + }).add_to_hass(hass) + + result = await flow.async_step_user({ + 'name': '192.168.1.1 (Test device)', + 'enable_sensors': True, + 'enable_port_mapping': False, + }) + assert result['type'] == 'abort' + assert result['reason'] == 'already_configured' + + +async def test_flow_no_sensors_no_port_mapping(hass): + """Test single device, no sensors, no port_mapping.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + + # discovered device + udn = 'uuid:device_1' + hass.data[upnp.DOMAIN] = { + 'discovered': { + udn: { + 'friendly_name': '192.168.1.1 (Test device)', + 'host': '192.168.1.1', + 'udn': udn, + }, + }, + } + + # configured entry + MockConfigEntry(domain=upnp.DOMAIN, data={ + 'udn': udn, + 'host': '192.168.1.1', + }).add_to_hass(hass) + + result = await flow.async_step_user({ + 'name': '192.168.1.1 (Test device)', + 'enable_sensors': False, + 'enable_port_mapping': False, + }) + assert result['type'] == 'abort' + assert result['reason'] == 'no_sensors_or_port_mapping' + + +async def test_flow_discovered_form(hass): + """Test single device discovered, show form flow.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + + # discovered device + udn = 'uuid:device_1' + hass.data[upnp.DOMAIN] = { + 'discovered': { + udn: { + 'friendly_name': '192.168.1.1 (Test device)', + 'host': '192.168.1.1', + 'udn': udn, + }, + }, + } + + result = await flow.async_step_user() + assert result['type'] == 'form' + assert result['step_id'] == 'user' + + +async def test_flow_two_discovered_form(hass): + """Test two devices discovered, show form flow with two devices.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + + # discovered device + udn_1 = 'uuid:device_1' + udn_2 = 'uuid:device_2' + hass.data[upnp.DOMAIN] = { + 'discovered': { + udn_1: { + 'friendly_name': '192.168.1.1 (Test device)', + 'host': '192.168.1.1', + 'udn': udn_1, + }, + udn_2: { + 'friendly_name': '192.168.2.1 (Test device)', + 'host': '192.168.2.1', + 'udn': udn_2, + }, + }, + } + + result = await flow.async_step_user() + assert result['type'] == 'form' + assert result['step_id'] == 'user' + assert result['data_schema']({ + 'name': '192.168.1.1 (Test device)', + 'enable_sensors': True, + 'enable_port_mapping': False, + }) + assert result['data_schema']({ + 'name': '192.168.2.1 (Test device)', + 'enable_sensors': True, + 'enable_port_mapping': False, + }) + + +async def test_config_entry_created(hass): + """Test config entry is created.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + + # discovered device + hass.data[upnp.DOMAIN] = { + 'discovered': { + 'uuid:device_1': { + 'friendly_name': '192.168.1.1 (Test device)', + 'name': 'Test device 1', + 'host': '192.168.1.1', + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'udn': 'uuid:device_1', + }, + }, + } + + result = await flow.async_step_user({ + 'name': '192.168.1.1 (Test device)', + 'enable_sensors': True, + 'enable_port_mapping': False, + }) + assert result['type'] == 'create_entry' + assert result['data'] == { + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'udn': 'uuid:device_1', + 'port_mapping': False, + 'sensors': True, + } + assert result['title'] == 'Test device 1' + + +async def test_flow_discovery_auto_config_sensors(hass): + """Test creation of device with auto_config.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + + # auto_config active + hass.data[upnp.DOMAIN] = { + 'auto_config': { + 'active': True, + 'enable_port_mapping': False, + 'enable_sensors': True, + }, + } + + # discovered device + result = await flow.async_step_discovery({ + 'name': 'Test device 1', + 'host': '192.168.1.1', + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'udn': 'uuid:device_1', + }) + + assert result['type'] == 'create_entry' + assert result['data'] == { + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'udn': 'uuid:device_1', + 'sensors': True, + 'port_mapping': False, + } + assert result['title'] == 'Test device 1' + + +async def test_flow_discovery_auto_config_sensors_port_mapping(hass): + """Test creation of device with auto_config, with port mapping.""" + flow = upnp_config_flow.UpnpFlowHandler() + flow.hass = hass + + # auto_config active, with port_mapping + hass.data[upnp.DOMAIN] = { + 'auto_config': { + 'active': True, + 'enable_port_mapping': True, + 'enable_sensors': True, + }, + } + + # discovered device + result = await flow.async_step_discovery({ + 'name': 'Test device 1', + 'host': '192.168.1.1', + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'udn': 'uuid:device_1', + }) + + assert result['type'] == 'create_entry' + assert result['data'] == { + 'udn': 'uuid:device_1', + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'sensors': True, + 'port_mapping': True, + } + assert result['title'] == 'Test device 1' diff --git a/tests/components/upnp/test_init.py b/tests/components/upnp/test_init.py new file mode 100644 index 00000000000..ce4656032a6 --- /dev/null +++ b/tests/components/upnp/test_init.py @@ -0,0 +1,188 @@ +"""Test UPnP/IGD setup process.""" + +from ipaddress import ip_address +from unittest.mock import patch, MagicMock + +from homeassistant.setup import async_setup_component +from homeassistant.components import upnp +from homeassistant.components.upnp.device import Device +from homeassistant.const import EVENT_HOMEASSISTANT_STOP + +from tests.common import MockConfigEntry +from tests.common import mock_coro + + +class MockDevice(Device): + """Mock device for Device.""" + + def __init__(self, udn): + """Initializer.""" + super().__init__(None) + self._udn = udn + self.added_port_mappings = [] + self.removed_port_mappings = [] + + @classmethod + async def async_create_device(cls, hass, ssdp_description): + """Return self.""" + return cls() + + @property + def udn(self): + """Get the UDN.""" + return self._udn + + async def _async_add_port_mapping(self, + external_port, + local_ip, + internal_port): + """Add a port mapping.""" + entry = [external_port, local_ip, internal_port] + self.added_port_mappings.append(entry) + + async def _async_delete_port_mapping(self, external_port): + """Remove a port mapping.""" + entry = external_port + self.removed_port_mappings.append(entry) + + +async def test_async_setup_no_auto_config(hass): + """Test async_setup.""" + # setup component, enable auto_config + await async_setup_component(hass, 'upnp') + + assert hass.data[upnp.DOMAIN]['auto_config'] == { + 'active': False, + 'enable_sensors': False, + 'enable_port_mapping': False, + 'ports': {'hass': 'hass'}, + } + + +async def test_async_setup_auto_config(hass): + """Test async_setup.""" + # setup component, enable auto_config + await async_setup_component(hass, 'upnp', {'upnp': {}, 'discovery': {}}) + + assert hass.data[upnp.DOMAIN]['auto_config'] == { + 'active': True, + 'enable_sensors': True, + 'enable_port_mapping': False, + 'ports': {'hass': 'hass'}, + } + + +async def test_async_setup_auto_config_port_mapping(hass): + """Test async_setup.""" + # setup component, enable auto_config + await async_setup_component(hass, 'upnp', { + 'upnp': { + 'port_mapping': True, + 'ports': {'hass': 'hass'}, + }, + 'discovery': {}}) + + assert hass.data[upnp.DOMAIN]['auto_config'] == { + 'active': True, + 'enable_sensors': True, + 'enable_port_mapping': True, + 'ports': {'hass': 'hass'}, + } + + +async def test_async_setup_auto_config_no_sensors(hass): + """Test async_setup.""" + # setup component, enable auto_config + await async_setup_component(hass, 'upnp', { + 'upnp': {'sensors': False}, + 'discovery': {}}) + + assert hass.data[upnp.DOMAIN]['auto_config'] == { + 'active': True, + 'enable_sensors': False, + 'enable_port_mapping': False, + 'ports': {'hass': 'hass'}, + } + + +async def test_async_setup_entry_default(hass): + """Test async_setup_entry.""" + udn = 'uuid:device_1' + entry = MockConfigEntry(domain=upnp.DOMAIN, data={ + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'udn': udn, + 'sensors': True, + 'port_mapping': False, + }) + + # ensure hass.http is available + await async_setup_component(hass, 'upnp') + + # mock homeassistant.components.upnp.device.Device + mock_device = MagicMock() + mock_device.udn = udn + mock_device.async_add_port_mappings.return_value = mock_coro() + mock_device.async_delete_port_mappings.return_value = mock_coro() + with patch.object(Device, 'async_create_device') as mock_create_device: + mock_create_device.return_value = mock_coro( + return_value=mock_device) + with patch('homeassistant.components.upnp.device.get_local_ip', + return_value='192.168.1.10'): + assert await upnp.async_setup_entry(hass, entry) is True + + # ensure device is stored/used + assert hass.data[upnp.DOMAIN]['devices'][udn] == mock_device + + hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + await hass.async_block_till_done() + + # ensure cleaned up + assert udn not in hass.data[upnp.DOMAIN]['devices'] + + # ensure no port-mapping-methods called + assert len(mock_device.async_add_port_mappings.mock_calls) == 0 + assert len(mock_device.async_delete_port_mappings.mock_calls) == 0 + + +async def test_async_setup_entry_port_mapping(hass): + """Test async_setup_entry.""" + udn = 'uuid:device_1' + entry = MockConfigEntry(domain=upnp.DOMAIN, data={ + 'ssdp_description': 'http://192.168.1.1/desc.xml', + 'udn': udn, + 'sensors': False, + 'port_mapping': True, + }) + + # ensure hass.http is available + await async_setup_component(hass, 'upnp', { + 'upnp': { + 'port_mapping': True, + 'ports': {'hass': 'hass'}, + }, + 'discovery': {}, + }) + + mock_device = MockDevice(udn) + with patch.object(Device, 'async_create_device') as mock_create_device: + mock_create_device.return_value = mock_coro(return_value=mock_device) + with patch('homeassistant.components.upnp.device.get_local_ip', + return_value='192.168.1.10'): + assert await upnp.async_setup_entry(hass, entry) is True + + # ensure device is stored/used + assert hass.data[upnp.DOMAIN]['devices'][udn] == mock_device + + # ensure add-port-mapping-methods called + assert mock_device.added_port_mappings == [ + [8123, ip_address('192.168.1.10'), 8123] + ] + + hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + await hass.async_block_till_done() + + # ensure cleaned up + assert udn not in hass.data[upnp.DOMAIN]['devices'] + + # ensure delete-port-mapping-methods called + assert mock_device.removed_port_mappings == [8123]