Working on IGD

This commit is contained in:
Steven Looman 2018-08-29 21:19:04 +02:00
parent e73f31d829
commit 1eac6408f5
11 changed files with 637 additions and 389 deletions

View File

@ -0,0 +1,29 @@
{
"config": {
"title": "IGD",
"step": {
"init": {
"title": "IGD"
},
"user": {
"title": "Configuration options for the IGD",
"data":{
"sensors": "Add traffic in/out sensors",
"port_forward": "Enable port forward for Home Assistant\nOnly enable this when your Home Assistant is password protected!",
"ssdp_url": "SSDP URL",
"udn": "UDN",
"name": "Name"
}
}
},
"error": {
},
"abort": {
"no_devices_discovered": "No IGDs discovered",
"already_configured": "IGD is already configured",
"no_sensors_or_port_forward": "Enable at least sensors or Port forward",
"no_igds": "No IGDs discovered",
"todo": "TODO"
}
}
}

View File

@ -0,0 +1,29 @@
{
"config": {
"title": "IGD",
"step": {
"init": {
"title": "IGD"
},
"user": {
"title": "Extra configuratie options voor IGD",
"data":{
"sensors": "Verkeer in/out sensors",
"port_forward": "Maak port-forward voor Home Assistant\nZet dit alleen aan wanneer uw Home Assistant een wachtwoord heeft!",
"ssdp_url": "SSDP URL",
"udn": "UDN",
"name": "Naam"
}
}
},
"error": {
},
"abort": {
"no_devices_discovered": "Geen IGDs gevonden",
"already_configured": "IGD is reeds geconfigureerd",
"no_sensors_or_port_forward": "Kies ten minste sensors of Port forward",
"no_igds": "Geen IGDs gevonden",
"todo": "TODO"
}
}
}

View File

@ -4,30 +4,49 @@ Will open a port in your router for Home Assistant and provide statistics.
For more details about this component, please refer to the documentation at For more details about this component, please refer to the documentation at
https://home-assistant.io/components/upnp/ https://home-assistant.io/components/upnp/
""" """
# XXX TODO:
# + flow:
# + discovery
# + adding device
# + removing device
# - configured:
# - adding
# - sensors:
# + adding
# + handle overflow
# - removing
# - port forward:
# - adding
# - removing
# - shutdown
from ipaddress import IPv4Address
from ipaddress import ip_address from ipaddress import ip_address
import aiohttp import aiohttp
import asyncio import asyncio
import voluptuous as vol import voluptuous as vol
from homeassistant import config_entries, data_entry_flow from homeassistant import config_entries
from homeassistant.const import ( from homeassistant.config_entries import ConfigEntry
CONF_URL, from homeassistant.const import EVENT_HOMEASSISTANT_STOP
) from homeassistant.const import CONF_URL
from homeassistant.exceptions import PlatformNotReady from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv
from homeassistant.helpers import discovery
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import get_local_ip from homeassistant.util import get_local_ip
from .config_flow import configured_hosts from .config_flow import configured_udns
from .const import CONF_PORT_FORWARD, CONF_SENSORS
from .const import DOMAIN from .const import DOMAIN
from .const import LOGGER as _LOGGER from .const import LOGGER as _LOGGER
_LOGGER.warning('Loading IGD')
REQUIREMENTS = ['async-upnp-client==0.12.4']
REQUIREMENTS = ['async-upnp-client==0.12.3'] DEPENDENCIES = ['http']
DEPENDENCIES = ['http', 'api']
CONF_LOCAL_IP = 'local_ip' CONF_LOCAL_IP = 'local_ip'
CONF_ENABLE_PORT_MAPPING = 'port_mapping' CONF_ENABLE_PORT_MAPPING = 'port_mapping'
@ -38,8 +57,6 @@ CONF_HASS = 'hass'
NOTIFICATION_ID = 'igd_notification' NOTIFICATION_ID = 'igd_notification'
NOTIFICATION_TITLE = 'UPnP/IGD Setup' NOTIFICATION_TITLE = 'UPnP/IGD Setup'
IP_SERVICE = 'urn:schemas-upnp-org:service:WANIPConnection:1' # XXX TODO: remove this
UNITS = { UNITS = {
"Bytes": 1, "Bytes": 1,
"KBytes": 1024, "KBytes": 1024,
@ -59,129 +76,157 @@ CONFIG_SCHEMA = vol.Schema({
}, extra=vol.ALLOW_EXTRA) }, extra=vol.ALLOW_EXTRA)
async def async_setup(hass, config, *args, **kwargs): async def _async_create_igd_device(hass: HomeAssistantType, ssdp_description: str):
"""Register a port mapping for Home Assistant via UPnP.""" """."""
conf = config.get(DOMAIN) # build requester
if conf is None: from async_upnp_client.aiohttp import AiohttpSessionRequester
conf = {} session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, True)
hass.data[DOMAIN] = {} # create upnp device
configured = configured_hosts(hass) from async_upnp_client import UpnpFactory
_LOGGER.debug('Config: %s', config) factory = UpnpFactory(requester, disable_state_variable_validation=True)
_LOGGER.debug('configured: %s', configured) try:
upnp_device = await factory.async_create_device(ssdp_description)
except (asyncio.TimeoutError, aiohttp.ClientError):
raise PlatformNotReady()
# wrap with IgdDevice
from async_upnp_client.igd import IgdDevice
igd_device = IgdDevice(upnp_device, None)
return igd_device
def _store_device(hass: HomeAssistantType, udn, igd_device):
"""Store an igd_device by udn."""
hass.data[DOMAIN] = hass.data.get(DOMAIN, {})
hass.data[DOMAIN]['devices'] = hass.data[DOMAIN].get('devices', {})
hass.data[DOMAIN]['devices'][udn] = igd_device
def _get_device(hass: HomeAssistantType, udn):
"""Get an igd_device by udn."""
hass.data[DOMAIN] = hass.data.get(DOMAIN, {})
hass.data[DOMAIN]['devices'] = hass.data[DOMAIN].get('devices', {})
return hass.data[DOMAIN]['devices'][udn]
async def _async_create_port_forward(hass: HomeAssistantType, igd_device):
"""Create a port forward."""
_LOGGER.debug('Creating port forward: %s', igd_device)
# determine local ip, ensure sane IP
local_ip = get_local_ip()
if local_ip == '127.0.0.1':
_LOGGER.warning('Could not create port forward, our IP is 127.0.0.1')
return False
local_ip = IPv4Address(local_ip)
# create port mapping
port = hass.http.server_port
await igd_device.async_add_port_mapping(remote_host=None,
external_port=port,
protocol='TCP',
internal_port=port,
internal_client=local_ip,
enabled=True,
description="Home Assistant",
lease_duration=None)
igds = []
if not igds:
return True return True
async def _async_remove_port_forward(hass: HomeAssistantType, igd_device):
"""Remove a port forward."""
_LOGGER.debug('Removing port forward: %s', igd_device)
# remove port mapping
port = hass.http.server_port
await igd_device.async_remove_port_mapping(remote_host=None,
external_port=port,
protocol='TCP')
# config
async def async_setup(hass: HomeAssistantType, config):
"""Register a port mapping for Home Assistant via UPnP."""
_LOGGER.debug('async_setup: config: %s', config)
conf = config.get(DOMAIN, {})
hass.data[DOMAIN] = hass.data.get(DOMAIN, {})
configured = configured_udns(hass)
_LOGGER.debug('configured: %s', configured)
# if no ssdp given: take any discovered - by flow - IGD entry
# if none discovered, raise PlatformNotReady
# if ssdp given: use the SSDP
igds = [] # XXX TODO
for igd_conf in igds: for igd_conf in igds:
hass.async_add_job(hass.config_entries.flow.async_init( hass.async_add_job(hass.config_entries.flow.async_init(
DOMAIN, context={'source': config_entries.SOURCE_IMPORT}, DOMAIN, context={'source': config_entries.SOURCE_IMPORT},
data={ data={
'ssdp_url': igd_conf['ssdp_url'], 'ssdp_description': igd_conf['ssdp_description'],
} }
)) ))
return True return True
# if host is None:
# host = get_local_ip()
#
# if host == '127.0.0.1':
# _LOGGER.error(
# 'Unable to determine local IP. Add it to your configuration.')
# return False
#
# url = config.get(CONF_URL)
#
# # build requester
# from async_upnp_client.aiohttp import AiohttpSessionRequester
# session = async_get_clientsession(hass)
# requester = AiohttpSessionRequester(session, True)
#
# # create upnp device
# from async_upnp_client import UpnpFactory
# factory = UpnpFactory(requester, disable_state_variable_validation=True)
# try:
# upnp_device = await factory.async_create_device(url)
# except (asyncio.TimeoutError, aiohttp.ClientError):
# raise PlatformNotReady()
#
# # wrap with IgdDevice
# from async_upnp_client.igd import IgdDevice
# igd_device = IgdDevice(upnp_device, None)
# hass.data[DATA_IGD]['device'] = igd_device
#
# # sensors
# unit = config.get(CONF_UNITS)
# hass.async_create_task(discovery.async_load_platform(
# hass, 'sensor', DOMAIN, {'unit': unit}, config))
#
# # port mapping
# port_mapping = config.get(CONF_ENABLE_PORT_MAPPING)
# if not port_mapping:
# return True
#
# # determine ports
# internal_port = hass.http.server_port
# ports = config.get(CONF_PORTS)
# if ports is None:
# ports = {CONF_HASS: internal_port}
#
# registered = []
# async def register_port_mappings(event):
# """(Re-)register the port mapping."""
# from async_upnp_client import UpnpError
# for internal, external in ports.items():
# if internal == CONF_HASS:
# internal = internal_port
# try:
# await igd_device.async_add_port_mapping(remote_host=None,
# external_port=external,
# protocol='TCP',
# internal_port=internal,
# internal_client=ip_address(host),
# enabled=True,
# description='Home Assistant',
# lease_duration=None)
# registered.append(external)
# _LOGGER.debug("external %s -> %s @ %s", external, internal, host)
# except UpnpError as error:
# _LOGGER.error(error)
# hass.components.persistent_notification.create(
# '<b>ERROR: TCP port {} is already mapped in your router.'
# '</b><br />Please disable port_mapping in the <i>upnp</i> '
# 'configuration section.<br />'
# 'You will need to restart hass after fixing.'
# ''.format(external),
# title=NOTIFICATION_TITLE,
# notification_id=NOTIFICATION_ID)
#
# async def deregister_port_mappings(event):
# """De-register the port mapping."""
# tasks = [igd_device.async_delete_port_mapping(remote_host=None,
# external_port=external,
# protocol='TCP')
# for external in registered]
# if tasks:
# await asyncio.wait(tasks)
#
# await register_port_mappings(None)
# hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, deregister_port_mappings)
#
# return True
# config flow
async def async_setup_entry(hass, entry): async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry):
"""Set up a bridge from a config entry.""" """Set up a bridge from a config entry."""
_LOGGER.debug('async_setup_entry, title: %s, data: %s', entry.title, entry.data) _LOGGER.debug('async_setup_entry: title: %s, data: %s', config_entry.title, config_entry.data)
data = config_entry.data
ssdp_description = data['ssdp_description']
# build IGD device
try:
igd_device = await _async_create_igd_device(hass, ssdp_description)
except (asyncio.TimeoutError, aiohttp.ClientError):
raise PlatformNotReady()
_store_device(hass, igd_device.udn, igd_device)
# port forward
if data.get(CONF_PORT_FORWARD):
await _async_create_port_forward(hass, igd_device)
# port mapping?
# sensors # sensors
if data.get(CONF_SENSORS):
discovery_info = {
'unit': 'MBytes',
'udn': data['udn'],
}
hass_config = config_entry.data
hass.async_create_task(discovery.async_load_platform(
hass, 'sensor', DOMAIN, discovery_info, hass_config))
async def unload_entry(event):
"""Unload entry on quit."""
await async_unload_entry(hass, config_entry)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, unload_entry)
return True return True
async def async_unload_entry(hass, entry):
async def async_unload_entry(hass: HomeAssistantType, config_entry: ConfigEntry):
"""Unload a config entry.""" """Unload a config entry."""
_LOGGER.debug('async_unload_entry: title: %s, data: %s', config_entry.title, config_entry.data)
data = config_entry.data
udn = data['udn']
igd_device = _get_device(hass, udn)
# port forward
if data.get(CONF_PORT_FORWARD):
_LOGGER.debug('Removing port forward for: %s', igd_device)
_async_remove_port_forward(hass, igd_device)
# sensors
if data.get(CONF_SENSORS):
# XXX TODO: remove sensors
pass
return True

View File

@ -1,40 +1,19 @@
"""Config flow for IGD.""" """Config flow for IGD."""
from homeassistant import config_entries, data_entry_flow from homeassistant import config_entries, data_entry_flow
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv
import voluptuous as vol import voluptuous as vol
from .const import DOMAIN from .const import DOMAIN
from .const import LOGGER as _LOGGER from .const import LOGGER as _LOGGER
@callback @callback
def configured_hosts(hass): def configured_udns(hass):
"""Return a set of the configured hosts.""" """Get all configured UDNs."""
return set(entry.data['ssdp_url'] return [
for entry in hass.config_entries.async_entries(DOMAIN)) entry.data['udn']
for entry in hass.config_entries.async_entries(DOMAIN)
]
async def _get_igd_device(hass, ssdp_url):
"""."""
# build requester
from async_upnp_client.aiohttp import AiohttpSessionRequester
session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, True)
# create upnp device
from async_upnp_client import UpnpFactory
factory = UpnpFactory(requester, disable_state_variable_validation=True)
try:
upnp_device = await factory.async_create_device(ssdp_url)
except (asyncio.TimeoutError, aiohttp.ClientError):
raise PlatformNotReady()
# wrap with IgdDevice
from async_upnp_client.igd import IgdDevice
igd_device = IgdDevice(upnp_device, None)
return igd_device
@config_entries.HANDLERS.register(DOMAIN) @config_entries.HANDLERS.register(DOMAIN)
@ -43,61 +22,114 @@ class IgdFlowHandler(data_entry_flow.FlowHandler):
VERSION = 1 VERSION = 1
# def __init__(self): def __init__(self):
# """Initializer.""" """Initializer."""
# self.host = None pass
# flow: 1. detection/user adding @property
# 2. question: port forward? sensors? def _discovereds(self):
# 3. add it! """Get all discovered entries."""
if DOMAIN not in self.hass.data:
_LOGGER.debug('DOMAIN not in hass.data')
if 'discovered' not in self.hass.data.get(DOMAIN, {}):
_LOGGER.debug('discovered not in hass.data[DOMAIN]')
async def async_step_user(self, user_input=None): return self.hass.data.get(DOMAIN, {}).get('discovered', {})
_LOGGER.debug('async_step_user: %s', user_input)
return await self.async_abort(reason='todo') def _store_discovery_info(self, discovery_info):
"""Add discovery info."""
udn = discovery_info['udn']
if DOMAIN not in self.hass.data:
_LOGGER.debug('DOMAIN not in hass.data')
self.hass.data[DOMAIN] = self.hass.data.get(DOMAIN, {})
if 'discovered' not in self.hass.data[DOMAIN]:
_LOGGER.debug('Creating new discovered: %s', self.hass.data[DOMAIN])
self.hass.data[DOMAIN]['discovered'] = self.hass.data[DOMAIN].get('discovered', {})
self.hass.data[DOMAIN]['discovered'][udn] = discovery_info
async def async_step_discovery(self, discovery_info): async def async_step_discovery(self, discovery_info):
"""Handle a discovered IGD. """
Handle a discovered IGD.
This flow is triggered by the discovery component. It will check if the This flow is triggered by the discovery component. It will check if the
host is already configured and delegate to the import step if not. host is already configured and delegate to the import step if not.
""" """
_LOGGER.debug('async_step_discovery: %s', discovery_info) _LOGGER.debug('async_step_discovery %s: %s', id(self), discovery_info)
ssdp_url = discovery_info['ssdp_description'] # ensure not already discovered/configured
return await self.async_step_options({ udn = discovery_info['udn']
'ssdp_url': ssdp_url, if udn in configured_udns(self.hass):
}) _LOGGER.debug('Already configured: %s', discovery_info)
return self.async_abort(reason='already_configured')
async def async_step_options(self, user_options): # store discovered device
""".""" self._store_discovery_info(discovery_info)
_LOGGER.debug('async_step_options: %s', user_options)
if user_options and \ # abort --> not showing up in discovered things
'sensors' in user_options and \ # return self.async_abort(reason='user_input_required')
'port_forward' in user_options:
return await self.async_step_import(user_options) # user -> showing up in discovered things
return await self.async_step_user()
async def async_step_user(self, user_input=None):
"""Manual set up."""
_LOGGER.debug('async_step_user %s: %s', id(self), user_input)
# if user input given, handle it
user_input = user_input or {}
if 'igd_host' in user_input:
if not user_input['sensors'] and not user_input['port_forward']:
_LOGGER.debug('Aborting, no sensors and no portforward')
return self.async_abort(reason='no_sensors_or_port_forward')
configured_hosts = [
entry['host']
for entry in self._discovereds.values()
if entry['udn'] in configured_udns(self.hass)
]
if user_input['igd_host'] in configured_hosts:
return self.async_abort(reason='already_configured')
return await self._async_save(user_input)
# let user choose from all discovered IGDs
_LOGGER.debug('Discovered devices: %s', self._discovereds)
igd_hosts = [
entry['host']
for entry in self._discovereds.values()
if entry['udn'] not in configured_udns(self.hass)
]
if not igd_hosts:
return self.async_abort(reason='no_devices_discovered')
return self.async_show_form( return self.async_show_form(
step_id='options', step_id='user',
data_schema=vol.Schema({ data_schema=vol.Schema({
vol.Required('sensors'): cv.boolean, vol.Required('igd_host'): vol.In(igd_hosts),
vol.Required('port_forward'): cv.boolean, vol.Required('sensors'): bool,
# vol.Optional('ssdp_url', default=user_options['ssdp_url']): cv.url, vol.Required('port_forward'): bool,
}) })
) )
async def async_step_import(self, import_info): async def _async_save(self, import_info):
"""Import a IGD as new entry.""" """Store IGD as new entry."""
_LOGGER.debug('async_step_import: %s', import_info) _LOGGER.debug('async_step_import %s: %s', id(self), import_info)
# ensure we know the host
igd_host = import_info['igd_host']
discovery_infos = [info
for info in self._discovereds.values()
if info['host'] == igd_host]
if not discovery_infos:
return self.async_abort(reason='host_not_found')
discovery_info = discovery_infos[0]
ssdp_url = import_info['ssdp_url']
try:
igd_device = await _get_igd_device(self.hass, ssdp_url) # try it to see if it works
except:
pass
return self.async_create_entry( return self.async_create_entry(
title=igd_device.name, title=discovery_info['name'],
data={ data={
'ssdp_url': ssdp_url, 'ssdp_description': discovery_info['ssdp_description'],
'udn': igd_device.udn, 'udn': discovery_info['udn'],
} 'sensors': import_info['sensors'],
'port_forward': import_info['port_forward'],
},
) )

View File

@ -3,3 +3,5 @@ import logging
DOMAIN = 'igd' DOMAIN = 'igd'
LOGGER = logging.getLogger('homeassistant.components.igd') LOGGER = logging.getLogger('homeassistant.components.igd')
CONF_PORT_FORWARD = 'port_forward'
CONF_SENSORS = 'sensors'

View File

@ -2,23 +2,26 @@
"config": { "config": {
"title": "IGD", "title": "IGD",
"step": { "step": {
"options": { "init": {
"title": "Extra configuration options for the IGD", "title": "IGD"
},
"user": {
"title": "Configuration options for the IGD",
"data":{ "data":{
"sensors": "Add traffic in/out sensors", "sensors": "Add traffic in/out sensors",
"port_forward": "Enable port forward for Home Assistant\nOnly enable this when your Home Assistant is password protected!", "port_forward": "Enable port forward for Home Assistant\nOnly enable this when your Home Assistant is password protected!",
"ssdp_ur": "SSDP URL", "ssdp_url": "SSDP URL",
"udn": "UDN",
"name": "Name"
} }
},
"import": {
"title": "Link with IGD",
"description": "Setup the IGD"
} }
}, },
"error": { "error": {
}, },
"abort": { "abort": {
"no_devices_discovered": "No IGDs discovered",
"already_configured": "IGD is already configured", "already_configured": "IGD is already configured",
"no_sensors_or_port_forward": "Enable at least sensors or Port forward",
"no_igds": "No IGDs discovered", "no_igds": "No IGDs discovered",
"todo": "TODO" "todo": "TODO"
} }

View File

@ -6,26 +6,29 @@ https://home-assistant.io/components/sensor.upnp/
""" """
import logging import logging
from homeassistant.components.igd import DATA_IGD, UNITS from homeassistant.components import history
from homeassistant.components.igd import DOMAIN, UNITS
from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity import Entity
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['igd'] DEPENDENCIES = ['igd', 'history']
BYTES_RECEIVED = 1 BYTES_RECEIVED = 'bytes_received'
BYTES_SENT = 2 BYTES_SENT = 'bytes_sent'
PACKETS_RECEIVED = 3 PACKETS_RECEIVED = 'packets_received'
PACKETS_SENT = 4 PACKETS_SENT = 'packets_sent'
# sensor_type: [friendly_name, convert_unit, icon] # sensor_type: [friendly_name, convert_unit, icon]
SENSOR_TYPES = { SENSOR_TYPES = {
BYTES_RECEIVED: ['received bytes', True, 'mdi:server-network'], BYTES_RECEIVED: ['bytes received', True, 'mdi:server-network', float],
BYTES_SENT: ['sent bytes', True, 'mdi:server-network'], BYTES_SENT: ['bytes sent', True, 'mdi:server-network', float],
PACKETS_RECEIVED: ['packets received', False, 'mdi:server-network'], PACKETS_RECEIVED: ['packets received', False, 'mdi:server-network', int],
PACKETS_SENT: ['packets sent', False, 'mdi:server-network'], PACKETS_SENT: ['packets sent', False, 'mdi:server-network', int],
} }
OVERFLOW_AT = 2**32
async def async_setup_platform(hass, config, async_add_devices, async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None): discovery_info=None):
@ -33,11 +36,12 @@ async def async_setup_platform(hass, config, async_add_devices,
if discovery_info is None: if discovery_info is None:
return return
device = hass.data[DATA_IGD]['device'] udn = discovery_info['udn']
device = hass.data[DOMAIN]['devices'][udn]
unit = discovery_info['unit'] unit = discovery_info['unit']
async_add_devices([ async_add_devices([
IGDSensor(device, t, unit if SENSOR_TYPES[t][1] else '#') IGDSensor(device, t, unit if SENSOR_TYPES[t][1] else '#')
for t in SENSOR_TYPES], True) for t in SENSOR_TYPES])
class IGDSensor(Entity): class IGDSensor(Entity):
@ -51,6 +55,7 @@ class IGDSensor(Entity):
self.unit_factor = UNITS[unit] if unit in UNITS else 1 self.unit_factor = UNITS[unit] if unit in UNITS else 1
self._name = 'IGD {}'.format(SENSOR_TYPES[sensor_type][0]) self._name = 'IGD {}'.format(SENSOR_TYPES[sensor_type][0])
self._state = None self._state = None
self._last_value = None
@property @property
def name(self): def name(self):
@ -60,9 +65,14 @@ class IGDSensor(Entity):
@property @property
def state(self): def state(self):
"""Return the state of the device.""" """Return the state of the device."""
if self._state: if self._state is None:
return format(float(self._state) / self.unit_factor, '.1f') return None
return self._state
coercer = SENSOR_TYPES[self.type][3]
if coercer == int:
return format(self._state)
return format(self._state / self.unit_factor, '.1f')
@property @property
def icon(self): def icon(self):
@ -76,11 +86,85 @@ class IGDSensor(Entity):
async def async_update(self): async def async_update(self):
"""Get the latest information from the IGD.""" """Get the latest information from the IGD."""
new_value = 0
if self.type == BYTES_RECEIVED: if self.type == BYTES_RECEIVED:
self._state = await self._device.async_get_total_bytes_received() new_value = await self._device.async_get_total_bytes_received()
elif self.type == BYTES_SENT: elif self.type == BYTES_SENT:
self._state = await self._device.async_get_total_bytes_sent() new_value = await self._device.async_get_total_bytes_sent()
elif self.type == PACKETS_RECEIVED: elif self.type == PACKETS_RECEIVED:
self._state = await self._device.async_get_total_packets_received() new_value = await self._device.async_get_total_packets_received()
elif self.type == PACKETS_SENT: elif self.type == PACKETS_SENT:
self._state = await self._device.async_get_total_packets_sent() new_value = await self._device.async_get_total_packets_sent()
self._handle_new_value(new_value)
# _LOGGER.debug('Removing self: %s', self)
# await self.async_remove() # XXX TODO: does not remove from the UI
@property
def _last_state(self):
"""Get the last state reported to hass."""
states = history.get_last_state_changes(self.hass, 2, self.entity_id)
entity_states = [
state for state in states[self.entity_id]
if state.state != 'unknown']
_LOGGER.debug('%s: entity_states: %s', self.entity_id, entity_states)
if not entity_states:
return None
return entity_states[0]
@property
def _last_value_from_state(self):
"""Get the last value reported to hass."""
last_state = self._last_state
if not last_state:
_LOGGER.debug('%s: No last state', self.entity_id)
return None
coercer = SENSOR_TYPES[self.type][3]
try:
state = coercer(float(last_state.state)) * self.unit_factor
except ValueError:
_LOGGER.debug('%s: value error, coercer: %s, state: %s', self.entity_id, coercer, last_state.state)
raise
state = coercer(0.0)
return state
def _handle_new_value(self, new_value):
_LOGGER.debug('%s: handle_new_value: state: %s, new_value: %s, last_value: %s',
self.entity_id, self._state, new_value, self._last_value)
# upnp-client --debug --pprint --device http://192.168.178.1/RootDevice.xml call-action WANCIFC/GetTotalBytesReceived
if self.entity_id is None:
# don't know our entity ID yet, do nothing but store value
self._last_value = new_value
return
if self._last_value is None:
self._last_value = new_value
if self._state is None:
# try to get the state from history
self._state = self._last_value_from_state or 0
_LOGGER.debug('%s: state: %s, last_value: %s',
self.entity_id, self._state, self._last_value)
# calculate new state
if self._last_value <= new_value:
diff = new_value - self._last_value
else:
# handle overflow
diff = OVERFLOW_AT - self._last_value
if new_value >= 0:
diff += new_value
else:
# some devices don't overflow and start at 0, but somewhere to -2**32
diff += new_value - -OVERFLOW_AT
self._state += diff
self._last_value = new_value
_LOGGER.debug('%s: diff: %s, state: %s, last_value: %s',
self.entity_id, diff, self._state, self._last_value)

View File

@ -0,0 +1 @@
"""Tests for the IGD component."""

View File

@ -0,0 +1,165 @@
"""Tests for IGD config flow."""
from homeassistant.components import igd
from tests.common import MockConfigEntry
async def test_flow_none_discovered(hass):
"""Test no device discovered flow."""
flow = igd.config_flow.IgdFlowHandler()
flow.hass = hass
result = await flow.async_step_user()
assert result['type'] == 'abort'
assert result['reason'] == 'no_devices_discovered'
async def test_flow_already_configured(hass):
"""Test device already configured flow."""
flow = igd.config_flow.IgdFlowHandler()
flow.hass = hass
# discovered device
udn = 'uuid:device_1'
hass.data[igd.DOMAIN] = {
'discovered': {
udn: {
'host': '192.168.1.1',
'udn': udn,
},
},
}
# configured entry
MockConfigEntry(domain=igd.DOMAIN, data={
'udn': udn,
'host': '192.168.1.1',
}).add_to_hass(hass)
result = await flow.async_step_user({
'igd_host': '192.168.1.1',
'sensors': True,
'port_forward': False,
})
assert result['type'] == 'abort'
assert result['reason'] == 'already_configured'
async def test_flow_no_sensors_no_port_forward(hass):
"""Test single device, no sensors, no port_forward."""
flow = igd.config_flow.IgdFlowHandler()
flow.hass = hass
# discovered device
udn = 'uuid:device_1'
hass.data[igd.DOMAIN] = {
'discovered': {
udn: {
'host': '192.168.1.1',
'udn': udn,
},
},
}
# configured entry
MockConfigEntry(domain=igd.DOMAIN, data={
'udn': udn,
'host': '192.168.1.1',
}).add_to_hass(hass)
result = await flow.async_step_user({
'igd_host': '192.168.1.1',
'sensors': False,
'port_forward': False,
})
assert result['type'] == 'abort'
assert result['reason'] == 'no_sensors_or_port_forward'
async def test_flow_discovered_form(hass):
"""Test single device discovered, show form flow."""
flow = igd.config_flow.IgdFlowHandler()
flow.hass = hass
# discovered device
udn = 'uuid:device_1'
hass.data[igd.DOMAIN] = {
'discovered': {
udn: {
'host': '192.168.1.1',
'udn': udn,
},
},
}
result = await flow.async_step_user()
assert result['type'] == 'form'
assert result['step_id'] == 'user'
async def test_flow_two_discovered_form(hass):
"""Test single device discovered, show form flow."""
flow = igd.config_flow.IgdFlowHandler()
flow.hass = hass
# discovered device
udn_1 = 'uuid:device_1'
udn_2 = 'uuid:device_2'
hass.data[igd.DOMAIN] = {
'discovered': {
udn_1: {
'host': '192.168.1.1',
'udn': udn_1,
},
udn_2: {
'host': '192.168.2.1',
'udn': udn_2,
},
},
}
result = await flow.async_step_user()
assert result['type'] == 'form'
assert result['step_id'] == 'user'
assert result['data_schema']({
'igd_host': '192.168.1.1',
'sensors': True,
'port_forward': False,
})
assert result['data_schema']({
'igd_host': '192.168.2.1',
'sensors': True,
'port_forward': False,
})
async def test_config_entry_created(hass):
flow = igd.config_flow.IgdFlowHandler()
flow.hass = hass
# discovered device
udn = 'uuid:device_1'
hass.data[igd.DOMAIN] = {
'discovered': {
udn: {
'name': 'Test device 1',
'host': '192.168.1.1',
'ssdp_description': 'http://192.168.1.1/desc.xml',
'udn': udn,
},
},
}
result = await flow.async_step_user({
'igd_host': '192.168.1.1',
'sensors': True,
'port_forward': False,
})
assert result['data'] == {
'port_forward': False,
'sensors': True,
'ssdp_description': 'http://192.168.1.1/desc.xml',
'udn': 'uuid:device_1',
}
assert result['title'] == 'Test device 1'

View File

@ -0,0 +1,41 @@
"""Test IGD setup process."""
from unittest.mock import patch, MagicMock
from homeassistant.setup import async_setup_component
from homeassistant.components import igd
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from tests.common import MockConfigEntry
from tests.common import mock_coro
async def test_async_setup_entry_port_forward_created(hass):
"""Test async_setup_entry."""
udn = 'uuid:device_1'
entry = MockConfigEntry(domain=igd.DOMAIN, data={
'ssdp_description': 'http://192.168.1.1/desc.xml',
'udn': udn,
'sensors': False,
'port_forward': True,
})
# ensure hass.http is available
await async_setup_component(hass, 'igd')
mock_igd_device = MagicMock()
mock_igd_device.udn = udn
mock_igd_device.async_add_port_mapping.return_value = mock_coro()
mock_igd_device.async_remove_port_mapping.return_value = mock_coro()
with patch.object(igd, '_async_create_igd_device') as mock_create_device:
mock_create_device.return_value = mock_coro(return_value=mock_igd_device)
with patch('homeassistant.components.igd.get_local_ip', return_value='192.168.1.10'):
assert await igd.async_setup_entry(hass, entry) is True
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
assert hass.data[igd.DOMAIN]['devices'][udn] == mock_igd_device
assert len(mock_igd_device.async_add_port_mapping.mock_calls) > 0
assert len(mock_igd_device.async_delete_port_mapping.mock_calls) > 0

View File

@ -1,183 +0,0 @@
"""Test the UPNP component."""
from collections import OrderedDict
from unittest.mock import patch, MagicMock
import pytest
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.setup import async_setup_component
from homeassistant.components.igd import IP_SERVICE, DATA_IGD
class MockService(MagicMock):
"""Mock upnp IP service."""
async def add_port_mapping(self, *args, **kwargs):
"""Original function."""
self.mock_add_port_mapping(*args, **kwargs)
async def delete_port_mapping(self, *args, **kwargs):
"""Original function."""
self.mock_delete_port_mapping(*args, **kwargs)
class MockDevice(MagicMock):
"""Mock upnp device."""
def find_first_service(self, *args, **kwargs):
"""Original function."""
self._service = MockService()
return self._service
def peep_first_service(self):
"""Access Mock first service."""
return self._service
class MockResp(MagicMock):
"""Mock upnp msearch response."""
async def get_device(self, *args, **kwargs):
"""Original function."""
device = MockDevice()
service = {'serviceType': IP_SERVICE}
device.services = [service]
return device
@pytest.fixture
def mock_msearch_first(*args, **kwargs):
"""Wrap async mock msearch_first."""
async def async_mock_msearch_first(*args, **kwargs):
"""Mock msearch_first."""
return MockResp(*args, **kwargs)
with patch('pyupnp_async.msearch_first', new=async_mock_msearch_first):
yield
@pytest.fixture
def mock_async_exception(*args, **kwargs):
"""Wrap async mock exception."""
async def async_mock_exception(*args, **kwargs):
return Exception
with patch('pyupnp_async.msearch_first', new=async_mock_exception):
yield
@pytest.fixture
def mock_local_ip():
"""Mock get_local_ip."""
with patch('homeassistant.components.upnp.get_local_ip',
return_value='192.168.0.10'):
yield
async def test_setup_fail_if_no_ip(hass):
"""Test setup fails if we can't find a local IP."""
with patch('homeassistant.components.upnp.get_local_ip',
return_value='127.0.0.1'):
result = await async_setup_component(hass, 'upnp', {
'upnp': {}
})
assert not result
async def test_setup_fail_if_cannot_select_igd(hass,
mock_local_ip,
mock_async_exception):
"""Test setup fails if we can't find an UPnP IGD."""
result = await async_setup_component(hass, 'upnp', {
'upnp': {}
})
assert not result
async def test_setup_succeeds_if_specify_ip(hass, mock_msearch_first):
"""Test setup succeeds if we specify IP and can't find a local IP."""
with patch('homeassistant.components.upnp.get_local_ip',
return_value='127.0.0.1'):
result = await async_setup_component(hass, 'upnp', {
'upnp': {
'local_ip': '192.168.0.10',
'port_mapping': 'True'
}
})
assert result
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 1
mock_service.mock_add_port_mapping.assert_called_once_with(
8123, 8123, '192.168.0.10', 'TCP', desc='Home Assistant')
async def test_no_config_maps_hass_local_to_remote_port(hass,
mock_local_ip,
mock_msearch_first):
"""Test by default we map local to remote port."""
result = await async_setup_component(hass, 'upnp', {
'upnp': {
'port_mapping': 'True'
}
})
assert result
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 1
mock_service.mock_add_port_mapping.assert_called_once_with(
8123, 8123, '192.168.0.10', 'TCP', desc='Home Assistant')
async def test_map_hass_to_remote_port(hass,
mock_local_ip,
mock_msearch_first):
"""Test mapping hass to remote port."""
result = await async_setup_component(hass, 'upnp', {
'upnp': {
'port_mapping': 'True',
'ports': {
'hass': 1000
}
}
})
assert result
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 1
mock_service.mock_add_port_mapping.assert_called_once_with(
8123, 1000, '192.168.0.10', 'TCP', desc='Home Assistant')
async def test_map_internal_to_remote_ports(hass,
mock_local_ip,
mock_msearch_first):
"""Test mapping local to remote ports."""
ports = OrderedDict()
ports['hass'] = 1000
ports[1883] = 3883
result = await async_setup_component(hass, 'upnp', {
'upnp': {
'port_mapping': 'True',
'ports': ports
}
})
assert result
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 2
mock_service.mock_add_port_mapping.assert_any_call(
8123, 1000, '192.168.0.10', 'TCP', desc='Home Assistant')
mock_service.mock_add_port_mapping.assert_any_call(
1883, 3883, '192.168.0.10', 'TCP', desc='Home Assistant')
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
assert len(mock_service.mock_delete_port_mapping.mock_calls) == 2
mock_service.mock_delete_port_mapping.assert_any_call(1000, 'TCP')
mock_service.mock_delete_port_mapping.assert_any_call(3883, 'TCP')