Working on IGD

This commit is contained in:
Steven Looman 2018-08-17 21:28:29 +02:00
parent 71d00062e5
commit 839b58c600
9 changed files with 339 additions and 156 deletions

View File

@ -49,6 +49,7 @@ CONFIG_ENTRY_HANDLERS = {
'google_cast': 'cast',
SERVICE_HUE: 'hue',
'sonos': 'sonos',
'igd': 'igd',
}
SERVICE_HANDLERS = {

View File

@ -0,0 +1,187 @@
"""
Will open a port in your router for Home Assistant and provide statistics.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/upnp/
"""
from ipaddress import ip_address
import aiohttp
import asyncio
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.const import (
CONF_URL,
)
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import get_local_ip
from .config_flow import configured_hosts
from .const import DOMAIN
from .const import LOGGER as _LOGGER
_LOGGER.warning('Loading IGD')
REQUIREMENTS = ['async-upnp-client==0.12.3']
DEPENDENCIES = ['http', 'api']
CONF_LOCAL_IP = 'local_ip'
CONF_ENABLE_PORT_MAPPING = 'port_mapping'
CONF_PORTS = 'ports'
CONF_UNITS = 'unit'
CONF_HASS = 'hass'
NOTIFICATION_ID = 'igd_notification'
NOTIFICATION_TITLE = 'UPnP/IGD Setup'
IP_SERVICE = 'urn:schemas-upnp-org:service:WANIPConnection:1' # XXX TODO: remove this
UNITS = {
"Bytes": 1,
"KBytes": 1024,
"MBytes": 1024**2,
"GBytes": 1024**3,
}
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Required(CONF_URL): cv.url,
vol.Optional(CONF_ENABLE_PORT_MAPPING, default=True): cv.boolean,
vol.Optional(CONF_UNITS, default="MBytes"): vol.In(UNITS),
vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
vol.Optional(CONF_PORTS):
vol.Schema({vol.Any(CONF_HASS, cv.positive_int): cv.positive_int})
}),
}, extra=vol.ALLOW_EXTRA)
async def async_setup(hass, config, *args, **kwargs):
"""Register a port mapping for Home Assistant via UPnP."""
conf = config.get(DOMAIN)
if conf is None:
conf = {}
hass.data[DOMAIN] = {}
configured = configured_hosts(hass)
_LOGGER.debug('Config: %s', config)
_LOGGER.debug('configured: %s', configured)
igds = []
if not igds:
return True
for igd_conf in igds:
hass.async_add_job(hass.config_entries.flow.async_init(
DOMAIN, context={'source': config_entries.SOURCE_IMPORT},
data={
'ssdp_url': igd_conf['ssdp_url'],
}
))
return True
# if host is None:
# host = get_local_ip()
#
# if host == '127.0.0.1':
# _LOGGER.error(
# 'Unable to determine local IP. Add it to your configuration.')
# return False
#
# url = config.get(CONF_URL)
#
# # build requester
# from async_upnp_client.aiohttp import AiohttpSessionRequester
# session = async_get_clientsession(hass)
# requester = AiohttpSessionRequester(session, True)
#
# # create upnp device
# from async_upnp_client import UpnpFactory
# factory = UpnpFactory(requester, disable_state_variable_validation=True)
# try:
# upnp_device = await factory.async_create_device(url)
# except (asyncio.TimeoutError, aiohttp.ClientError):
# raise PlatformNotReady()
#
# # wrap with IgdDevice
# from async_upnp_client.igd import IgdDevice
# igd_device = IgdDevice(upnp_device, None)
# hass.data[DATA_IGD]['device'] = igd_device
#
# # sensors
# unit = config.get(CONF_UNITS)
# hass.async_create_task(discovery.async_load_platform(
# hass, 'sensor', DOMAIN, {'unit': unit}, config))
#
# # port mapping
# port_mapping = config.get(CONF_ENABLE_PORT_MAPPING)
# if not port_mapping:
# return True
#
# # determine ports
# internal_port = hass.http.server_port
# ports = config.get(CONF_PORTS)
# if ports is None:
# ports = {CONF_HASS: internal_port}
#
# registered = []
# async def register_port_mappings(event):
# """(Re-)register the port mapping."""
# from async_upnp_client import UpnpError
# for internal, external in ports.items():
# if internal == CONF_HASS:
# internal = internal_port
# try:
# await igd_device.async_add_port_mapping(remote_host=None,
# external_port=external,
# protocol='TCP',
# internal_port=internal,
# internal_client=ip_address(host),
# enabled=True,
# description='Home Assistant',
# lease_duration=None)
# registered.append(external)
# _LOGGER.debug("external %s -> %s @ %s", external, internal, host)
# except UpnpError as error:
# _LOGGER.error(error)
# hass.components.persistent_notification.create(
# '<b>ERROR: TCP port {} is already mapped in your router.'
# '</b><br />Please disable port_mapping in the <i>upnp</i> '
# 'configuration section.<br />'
# 'You will need to restart hass after fixing.'
# ''.format(external),
# title=NOTIFICATION_TITLE,
# notification_id=NOTIFICATION_ID)
#
# async def deregister_port_mappings(event):
# """De-register the port mapping."""
# tasks = [igd_device.async_delete_port_mapping(remote_host=None,
# external_port=external,
# protocol='TCP')
# for external in registered]
# if tasks:
# await asyncio.wait(tasks)
#
# await register_port_mappings(None)
# hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, deregister_port_mappings)
#
# return True
async def async_setup_entry(hass, entry):
"""Set up a bridge from a config entry."""
_LOGGER.debug('async_setup_entry, title: %s, data: %s', entry.title, entry.data)
# port mapping?
# sensors
return True
async def async_unload_entry(hass, entry):
"""Unload a config entry."""

View File

@ -0,0 +1,103 @@
"""Config flow for IGD."""
from homeassistant import config_entries, data_entry_flow
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv
import voluptuous as vol
from .const import DOMAIN
from .const import LOGGER as _LOGGER
@callback
def configured_hosts(hass):
"""Return a set of the configured hosts."""
return set(entry.data['ssdp_url']
for entry in hass.config_entries.async_entries(DOMAIN))
async def _get_igd_device(hass, ssdp_url):
"""."""
# build requester
from async_upnp_client.aiohttp import AiohttpSessionRequester
session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, True)
# create upnp device
from async_upnp_client import UpnpFactory
factory = UpnpFactory(requester, disable_state_variable_validation=True)
try:
upnp_device = await factory.async_create_device(ssdp_url)
except (asyncio.TimeoutError, aiohttp.ClientError):
raise PlatformNotReady()
# wrap with IgdDevice
from async_upnp_client.igd import IgdDevice
igd_device = IgdDevice(upnp_device, None)
return igd_device
@config_entries.HANDLERS.register(DOMAIN)
class IgdFlowHandler(data_entry_flow.FlowHandler):
"""Handle a Hue config flow."""
VERSION = 1
# def __init__(self):
# """Initializer."""
# self.host = None
# flow: 1. detection/user adding
# 2. question: port forward? sensors?
# 3. add it!
async def async_step_user(self, user_input=None):
_LOGGER.debug('async_step_user: %s', user_input)
return await self.async_abort(reason='todo')
async def async_step_discovery(self, discovery_info):
"""Handle a discovered IGD.
This flow is triggered by the discovery component. It will check if the
host is already configured and delegate to the import step if not.
"""
_LOGGER.debug('async_step_discovery: %s', discovery_info)
ssdp_url = discovery_info['ssdp_description']
return await self.async_step_options({
'ssdp_url': ssdp_url,
})
async def async_step_options(self, user_options):
"""."""
_LOGGER.debug('async_step_options: %s', user_options)
if user_options and \
'sensors' in user_options and \
'port_forward' in user_options:
return await self.async_step_import(user_options)
return self.async_show_form(
step_id='options',
data_schema=vol.Schema({
vol.Required('sensors'): cv.boolean,
vol.Required('port_forward'): cv.boolean,
# vol.Optional('ssdp_url', default=user_options['ssdp_url']): cv.url,
})
)
async def async_step_import(self, import_info):
"""Import a IGD as new entry."""
_LOGGER.debug('async_step_import: %s', import_info)
ssdp_url = import_info['ssdp_url']
try:
igd_device = await _get_igd_device(self.hass, ssdp_url) # try it to see if it works
except:
pass
return self.async_create_entry(
title=igd_device.name,
data={
'ssdp_url': ssdp_url,
'udn': igd_device.udn,
}
)

View File

@ -0,0 +1,5 @@
"""Constants for the IGD component."""
import logging
DOMAIN = 'igd'
LOGGER = logging.getLogger('homeassistant.components.igd')

View File

@ -0,0 +1,26 @@
{
"config": {
"title": "IGD",
"step": {
"options": {
"title": "Extra configuration options for the IGD",
"data":{
"sensors": "Add traffic in/out sensors",
"port_forward": "Enable port forward for Home Assistant\nOnly enable this when your Home Assistant is password protected!",
"ssdp_ur": "SSDP URL",
}
},
"import": {
"title": "Link with IGD",
"description": "Setup the IGD"
}
},
"error": {
},
"abort": {
"already_configured": "IGD is already configured",
"no_igds": "No IGDs discovered",
"todo": "TODO"
}
}
}

View File

@ -6,12 +6,12 @@ https://home-assistant.io/components/sensor.upnp/
"""
import logging
from homeassistant.components.upnp import DATA_UPNP, UNITS, CIC_SERVICE
from homeassistant.components.igd import DATA_IGD, UNITS
from homeassistant.helpers.entity import Entity
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['upnp']
DEPENDENCIES = ['igd']
BYTES_RECEIVED = 1
BYTES_SENT = 2
@ -33,20 +33,19 @@ async def async_setup_platform(hass, config, async_add_devices,
if discovery_info is None:
return
device = hass.data[DATA_UPNP]
service = device.find_first_service(CIC_SERVICE)
device = hass.data[DATA_IGD]['device']
unit = discovery_info['unit']
async_add_devices([
IGDSensor(service, t, unit if SENSOR_TYPES[t][1] else '#')
IGDSensor(device, t, unit if SENSOR_TYPES[t][1] else '#')
for t in SENSOR_TYPES], True)
class IGDSensor(Entity):
"""Representation of a UPnP IGD sensor."""
def __init__(self, service, sensor_type, unit=None):
def __init__(self, device, sensor_type, unit=None):
"""Initialize the IGD sensor."""
self._service = service
self._device = device
self.type = sensor_type
self.unit = unit
self.unit_factor = UNITS[unit] if unit in UNITS else 1
@ -78,10 +77,10 @@ class IGDSensor(Entity):
async def async_update(self):
"""Get the latest information from the IGD."""
if self.type == BYTES_RECEIVED:
self._state = await self._service.get_total_bytes_received()
self._state = await self._device.async_get_total_bytes_received()
elif self.type == BYTES_SENT:
self._state = await self._service.get_total_bytes_sent()
self._state = await self._device.async_get_total_bytes_sent()
elif self.type == PACKETS_RECEIVED:
self._state = await self._service.get_total_packets_received()
self._state = await self._device.async_get_total_packets_received()
elif self.type == PACKETS_SENT:
self._state = await self._service.get_total_packets_sent()
self._state = await self._device.async_get_total_packets_sent()

View File

@ -1,140 +0,0 @@
"""
Will open a port in your router for Home Assistant and provide statistics.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/upnp/
"""
from ipaddress import ip_address
import logging
import asyncio
import voluptuous as vol
from homeassistant.const import (EVENT_HOMEASSISTANT_STOP)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers import discovery
from homeassistant.util import get_local_ip
REQUIREMENTS = ['pyupnp-async==0.1.0.2']
DEPENDENCIES = ['http']
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['api']
DOMAIN = 'upnp'
DATA_UPNP = 'upnp_device'
CONF_LOCAL_IP = 'local_ip'
CONF_ENABLE_PORT_MAPPING = 'port_mapping'
CONF_PORTS = 'ports'
CONF_UNITS = 'unit'
CONF_HASS = 'hass'
NOTIFICATION_ID = 'upnp_notification'
NOTIFICATION_TITLE = 'UPnP Setup'
IGD_DEVICE = 'urn:schemas-upnp-org:device:InternetGatewayDevice:1'
PPP_SERVICE = 'urn:schemas-upnp-org:service:WANPPPConnection:1'
IP_SERVICE = 'urn:schemas-upnp-org:service:WANIPConnection:1'
CIC_SERVICE = 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1'
UNITS = {
"Bytes": 1,
"KBytes": 1024,
"MBytes": 1024**2,
"GBytes": 1024**3,
}
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_ENABLE_PORT_MAPPING, default=True): cv.boolean,
vol.Optional(CONF_UNITS, default="MBytes"): vol.In(UNITS),
vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
vol.Optional(CONF_PORTS):
vol.Schema({vol.Any(CONF_HASS, cv.positive_int): cv.positive_int})
}),
}, extra=vol.ALLOW_EXTRA)
async def async_setup(hass, config):
"""Register a port mapping for Home Assistant via UPnP."""
config = config[DOMAIN]
host = config.get(CONF_LOCAL_IP)
if host is None:
host = get_local_ip()
if host == '127.0.0.1':
_LOGGER.error(
'Unable to determine local IP. Add it to your configuration.')
return False
import pyupnp_async
from pyupnp_async.error import UpnpSoapError
service = None
resp = await pyupnp_async.msearch_first(search_target=IGD_DEVICE)
if not resp:
return False
try:
device = await resp.get_device()
hass.data[DATA_UPNP] = device
for _service in device.services:
if _service['serviceType'] == PPP_SERVICE:
service = device.find_first_service(PPP_SERVICE)
if _service['serviceType'] == IP_SERVICE:
service = device.find_first_service(IP_SERVICE)
if _service['serviceType'] == CIC_SERVICE:
unit = config.get(CONF_UNITS)
hass.async_create_task(discovery.async_load_platform(
hass, 'sensor', DOMAIN, {'unit': unit}, config))
except UpnpSoapError as error:
_LOGGER.error(error)
return False
if not service:
_LOGGER.warning("Could not find any UPnP IGD")
return False
port_mapping = config.get(CONF_ENABLE_PORT_MAPPING)
if not port_mapping:
return True
internal_port = hass.http.server_port
ports = config.get(CONF_PORTS)
if ports is None:
ports = {CONF_HASS: internal_port}
registered = []
for internal, external in ports.items():
if internal == CONF_HASS:
internal = internal_port
try:
await service.add_port_mapping(internal, external, host, 'TCP',
desc='Home Assistant')
registered.append(external)
_LOGGER.debug("external %s -> %s @ %s", external, internal, host)
except UpnpSoapError as error:
_LOGGER.error(error)
hass.components.persistent_notification.create(
'<b>ERROR: tcp port {} is already mapped in your router.'
'</b><br />Please disable port_mapping in the <i>upnp</i> '
'configuration section.<br />'
'You will need to restart hass after fixing.'
''.format(external),
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID)
async def deregister_port(event):
"""De-register the UPnP port mapping."""
tasks = [service.delete_port_mapping(external, 'TCP')
for external in registered]
if tasks:
await asyncio.wait(tasks)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, deregister_port)
return True

View File

@ -142,6 +142,7 @@ FLOWS = [
'nest',
'sonos',
'zone',
'igd',
]
@ -414,6 +415,7 @@ class ConfigEntries:
Handler key is the domain of the component that we want to setup.
"""
component = getattr(self.hass.components, handler_key)
_LOGGER.debug('Handler key: %s', handler_key)
handler = HANDLERS.get(handler_key)
if handler is None:

View File

@ -6,7 +6,7 @@ import pytest
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.setup import async_setup_component
from homeassistant.components.upnp import IP_SERVICE, DATA_UPNP
from homeassistant.components.igd import IP_SERVICE, DATA_IGD
class MockService(MagicMock):
@ -107,7 +107,7 @@ async def test_setup_succeeds_if_specify_ip(hass, mock_msearch_first):
})
assert result
mock_service = hass.data[DATA_UPNP].peep_first_service()
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 1
mock_service.mock_add_port_mapping.assert_called_once_with(
8123, 8123, '192.168.0.10', 'TCP', desc='Home Assistant')
@ -122,7 +122,7 @@ async def test_no_config_maps_hass_local_to_remote_port(hass,
})
assert result
mock_service = hass.data[DATA_UPNP].peep_first_service()
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 1
mock_service.mock_add_port_mapping.assert_called_once_with(
8123, 8123, '192.168.0.10', 'TCP', desc='Home Assistant')
@ -141,7 +141,7 @@ async def test_map_hass_to_remote_port(hass,
})
assert result
mock_service = hass.data[DATA_UPNP].peep_first_service()
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 1
mock_service.mock_add_port_mapping.assert_called_once_with(
8123, 1000, '192.168.0.10', 'TCP', desc='Home Assistant')
@ -162,7 +162,7 @@ async def test_map_internal_to_remote_ports(hass,
})
assert result
mock_service = hass.data[DATA_UPNP].peep_first_service()
mock_service = hass.data[DATA_IGD].peep_first_service()
assert len(mock_service.mock_add_port_mapping.mock_calls) == 2
mock_service.mock_add_port_mapping.assert_any_call(