[device_traker/upc] New UPC connect box platform (#5100)

This commit is contained in:
Pascal Vizeli 2017-01-09 17:08:37 +01:00 committed by Johann Kellerman
parent 3ed7c1c6ad
commit bb02fc707c
4 changed files with 467 additions and 7 deletions

View File

@ -0,0 +1,164 @@
"""
Support for UPC ConnectBox router.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.upc_connect/
"""
import asyncio
import logging
import xml.etree.ElementTree as ET
import aiohttp
import async_timeout
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD
from homeassistant.helpers.aiohttp_client import async_create_clientsession
_LOGGER = logging.getLogger(__name__)
DEFAULT_IP = '192.168.0.1'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_IP): cv.string,
})
CMD_LOGIN = 15
CMD_DEVICES = 123
@asyncio.coroutine
def async_get_scanner(hass, config):
"""Return the UPC device scanner."""
scanner = UPCDeviceScanner(hass, config[DOMAIN])
success_init = yield from scanner.async_login()
return scanner if success_init else None
class UPCDeviceScanner(DeviceScanner):
"""This class queries a router running UPC ConnectBox firmware."""
def __init__(self, hass, config):
"""Initialize the scanner."""
self.hass = hass
self.host = config[CONF_HOST]
self.password = config[CONF_PASSWORD]
self.data = {}
self.token = None
self.headers = {
'X-Requested-With': 'XMLHttpRequest',
'Referer': "http://{}/index.html".format(self.host),
'User-Agent': ("Mozilla/5.0 (Windows NT 10.0; WOW64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/47.0.2526.106 Safari/537.36")
}
self.websession = async_create_clientsession(
hass, cookie_jar=aiohttp.CookieJar(unsafe=True, loop=hass.loop))
@asyncio.coroutine
def async_scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
if self.token is None:
reconnect = yield from self.async_login()
if not reconnect:
_LOGGER.error("Not connected to %s", self.host)
return []
raw = yield from self._async_ws_function(CMD_DEVICES)
xml_root = ET.fromstring(raw)
return [mac.text for mac in xml_root.iter('MACAddr')]
@asyncio.coroutine
def async_get_device_name(self, device):
"""The firmware doesn't save the name of the wireless device."""
return None
@asyncio.coroutine
def async_login(self):
"""Login into firmware and get first token."""
response = None
try:
# get first token
with async_timeout.timeout(10, loop=self.hass.loop):
response = yield from self.websession.get(
"http://{}/common_page/login.html".format(self.host)
)
self.token = self._async_get_token()
# login
data = yield from self._async_ws_function(CMD_LOGIN, {
'Username': 'NULL',
'Password': self.password,
})
# successfull?
if data.find("successful") != -1:
return True
return False
except (asyncio.TimeoutError, aiohttp.errors.ClientError):
_LOGGER.error("Can not load login page from %s", self.host)
return False
finally:
if response is not None:
yield from response.release()
@asyncio.coroutine
def _async_ws_function(self, function, additional_form=None):
"""Execute a command on UPC firmware webservice."""
form_data = {
'token': self.token,
'fun': function
}
if additional_form:
form_data.update(additional_form)
response = None
try:
with async_timeout.timeout(10, loop=self.hass.loop):
response = yield from self.websession.post(
"http://{}/xml/getter.xml".format(self.host),
data=form_data,
headers=self.headers
)
# error on UPC webservice
if response.status != 200:
_LOGGER.warning(
"Error %d on %s.", response.status, function)
self.token = None
return
# load data, store token for next request
raw = yield from response.text()
self.token = self._async_get_token()
return raw
except (asyncio.TimeoutError, aiohttp.errors.ClientError):
_LOGGER.error("Error on %s", function)
self.token = None
finally:
if response is not None:
yield from response.release()
def _async_get_token(self):
"""Extract token from cookies."""
cookie_manager = self.websession.cookie_jar.filter_cookies(
"http://{}".format(self.host))
return cookie_manager.get('sessionToken')

View File

@ -0,0 +1,239 @@
"""The tests for the UPC ConnextBox device tracker platform."""
import asyncio
import os
from unittest.mock import patch
import logging
from homeassistant.bootstrap import setup_component
from homeassistant.components import device_tracker
from homeassistant.const import (
CONF_PLATFORM, CONF_HOST, CONF_PASSWORD)
from homeassistant.components.device_tracker import DOMAIN
import homeassistant.components.device_tracker.upc_connect as platform
from homeassistant.util.async import run_coroutine_threadsafe
from tests.common import (
get_test_home_assistant, assert_setup_component, load_fixture)
_LOGGER = logging.getLogger(__name__)
@asyncio.coroutine
def async_scan_devices_mock(scanner):
"""Mock async_scan_devices."""
return []
class TestUPCConnect(object):
"""Tests for the Ddwrt device tracker platform."""
def setup_method(self):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.hass.config.components = ['zone']
self.host = "127.0.0.1"
def teardown_method(self):
"""Stop everything that was started."""
try:
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
except FileNotFoundError:
pass
self.hass.stop()
@patch('homeassistant.components.device_tracker.upc_connect.'
'UPCDeviceScanner.async_scan_devices',
return_value=async_scan_devices_mock)
def test_setup_platform(self, scan_mock, aioclient_mock):
"""Setup a platform."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
content=b'successful'
)
with assert_setup_component(1):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
}})
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
@patch('homeassistant.components.device_tracker._LOGGER.error')
def test_setup_platform_error_webservice(self, mock_error, aioclient_mock):
"""Setup a platform with api error."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
content=b'successful',
status=404
)
with assert_setup_component(1):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
}})
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert 'Error setting up platform' in \
str(mock_error.call_args_list[-1])
@patch('homeassistant.components.device_tracker._LOGGER.error')
def test_setup_platform_timeout_webservice(self, mock_error,
aioclient_mock):
"""Setup a platform with api timeout."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
content=b'successful',
exc=asyncio.TimeoutError()
)
with assert_setup_component(1):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
}})
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert 'Error setting up platform' in \
str(mock_error.call_args_list[-1])
@patch('homeassistant.components.device_tracker._LOGGER.error')
def test_setup_platform_timeout_loginpage(self, mock_error,
aioclient_mock):
"""Setup a platform with timeout on loginpage."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
exc=asyncio.TimeoutError()
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
content=b'successful',
)
with assert_setup_component(1):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
}})
assert len(aioclient_mock.mock_calls) == 1
assert 'Error setting up platform' in \
str(mock_error.call_args_list[-1])
def test_scan_devices(self, aioclient_mock):
"""Setup a upc platform and scan device."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
content=b'successful',
cookies={'sessionToken': '654321'}
)
scanner = run_coroutine_threadsafe(platform.async_get_scanner(
self.hass, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
}}
), self.hass.loop).result()
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
aioclient_mock.clear_requests()
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
text=load_fixture('upc_connect.xml'),
cookies={'sessionToken': '1235678'}
)
mac_list = run_coroutine_threadsafe(
scanner.async_scan_devices(), self.hass.loop).result()
assert len(aioclient_mock.mock_calls) == 1
assert aioclient_mock.mock_calls[0][2]['fun'] == 123
assert scanner.token == '1235678'
assert mac_list == ['30:D3:2D:0:69:21', '5C:AA:FD:25:32:02',
'70:EE:50:27:A1:38']
def test_scan_devices_without_session(self, aioclient_mock):
"""Setup a upc platform and scan device with no token."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
content=b'successful',
cookies={'sessionToken': '654321'}
)
scanner = run_coroutine_threadsafe(platform.async_get_scanner(
self.hass, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
}}
), self.hass.loop).result()
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
aioclient_mock.clear_requests()
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
text=load_fixture('upc_connect.xml'),
cookies={'sessionToken': '1235678'}
)
scanner.token = None
mac_list = run_coroutine_threadsafe(
scanner.async_scan_devices(), self.hass.loop).result()
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert mac_list == []

42
tests/fixtures/upc_connect.xml vendored Normal file
View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<LanUserTable>
<Ethernet>
<clientinfo>
<interface>Ethernet 1</interface>
<IPv4Addr>192.168.0.139/24</IPv4Addr>
<index>0</index>
<interfaceid>2</interfaceid>
<hostname>Unknown</hostname>
<MACAddr>30:D3:2D:0:69:21</MACAddr>
<method>2</method>
<leaseTime>00:00:00:00</leaseTime>
<speed>1000</speed>
</clientinfo>
<clientinfo>
<interface>Ethernet 2</interface>
<IPv4Addr>192.168.0.134/24</IPv4Addr>
<index>1</index>
<interfaceid>2</interfaceid>
<hostname>Unknown</hostname>
<MACAddr>5C:AA:FD:25:32:02</MACAddr>
<method>2</method>
<leaseTime>00:00:00:00</leaseTime>
<speed>10</speed>
</clientinfo>
</Ethernet>
<WIFI>
<clientinfo>
<interface>HASS</interface>
<IPv4Addr>192.168.0.194/24</IPv4Addr>
<index>3</index>
<interfaceid>3</interfaceid>
<hostname>Unknown</hostname>
<MACAddr>70:EE:50:27:A1:38</MACAddr>
<method>2</method>
<leaseTime>00:00:00:00</leaseTime>
<speed>39</speed>
</clientinfo>
</WIFI>
<totalClient>3</totalClient>
<Customer>upc</Customer>
</LanUserTable>

View File

@ -14,6 +14,7 @@ class AiohttpClientMocker:
def __init__(self):
"""Initialize the request mocker."""
self._mocks = []
self._cookies = {}
self.mock_calls = []
def request(self, method, url, *,
@ -25,7 +26,8 @@ class AiohttpClientMocker:
json=None,
params=None,
headers=None,
exc=None):
exc=None,
cookies=None):
"""Mock a request."""
if json:
text = _json.dumps(json)
@ -35,11 +37,11 @@ class AiohttpClientMocker:
content = b''
if params:
url = str(yarl.URL(url).with_query(params))
self.exc = exc
if cookies:
self._cookies.update(cookies)
self._mocks.append(AiohttpClientMockResponse(
method, url, status, content))
method, url, status, content, exc))
def get(self, *args, **kwargs):
"""Register a mock get request."""
@ -66,6 +68,16 @@ class AiohttpClientMocker:
"""Number of requests made."""
return len(self.mock_calls)
def filter_cookies(self, host):
"""Return hosts cookies."""
return self._cookies
def clear_requests(self):
"""Reset mock calls."""
self._mocks.clear()
self._cookies.clear()
self.mock_calls.clear()
@asyncio.coroutine
def match_request(self, method, url, *, data=None, auth=None, params=None,
headers=None): # pylint: disable=unused-variable
@ -74,8 +86,8 @@ class AiohttpClientMocker:
if response.match_request(method, url, params):
self.mock_calls.append((method, url, data))
if self.exc:
raise self.exc
if response.exc:
raise response.exc
return response
assert False, "No mock registered for {} {} {}".format(method.upper(),
@ -85,7 +97,7 @@ class AiohttpClientMocker:
class AiohttpClientMockResponse:
"""Mock Aiohttp client response."""
def __init__(self, method, url, status, response):
def __init__(self, method, url, status, response, exc=None):
"""Initialize a fake response."""
self.method = method
self._url = url
@ -93,6 +105,7 @@ class AiohttpClientMockResponse:
else urlparse(url.lower()))
self.status = status
self.response = response
self.exc = exc
def match_request(self, method, url, params=None):
"""Test if response answers request."""
@ -155,4 +168,6 @@ def mock_aiohttp_client():
setattr(instance, method,
functools.partial(mocker.match_request, method))
instance.cookie_jar.filter_cookies = mocker.filter_cookies
yield mocker