Fix the "302" error in the UPC Connect component and remove the need to specify the router password (#8335)

* Remove the need to login on the UPC Connect component

* Remove unnecessary imports

* Update the unit tests for the UPC Connect component

* Fix the "302" error with the UPC Connect component

* Fix a flake8 error

* Update the unit tests for the UPC Connect component
This commit is contained in:
Flavien Charlon 2017-07-05 05:06:24 +01:00 committed by Paulus Schoutsen
parent 061a38cc3b
commit 8185587100
2 changed files with 35 additions and 116 deletions

View File

@ -12,11 +12,10 @@ import aiohttp
import async_timeout
import voluptuous as vol
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD
from homeassistant.const import CONF_HOST
from homeassistant.helpers.aiohttp_client import async_get_clientsession
@ -25,12 +24,9 @@ _LOGGER = logging.getLogger(__name__)
DEFAULT_IP = '192.168.0.1'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_IP): cv.string,
})
CMD_LOGIN = 15
CMD_LOGOUT = 16
CMD_DEVICES = 123
@ -38,7 +34,7 @@ CMD_DEVICES = 123
def async_get_scanner(hass, config):
"""Return the UPC device scanner."""
scanner = UPCDeviceScanner(hass, config[DOMAIN])
success_init = yield from scanner.async_login()
success_init = yield from scanner.async_initialize_token()
return scanner if success_init else None
@ -50,7 +46,6 @@ class UPCDeviceScanner(DeviceScanner):
"""Initialize the scanner."""
self.hass = hass
self.host = config[CONF_HOST]
self.password = config[CONF_PASSWORD]
self.data = {}
self.token = None
@ -65,21 +60,12 @@ class UPCDeviceScanner(DeviceScanner):
self.websession = async_get_clientsession(hass)
@asyncio.coroutine
def async_logout(event):
"""Logout from upc connect box."""
yield from self._async_ws_function(CMD_LOGOUT)
self.token = None
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, async_logout)
@asyncio.coroutine
def async_scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
if self.token is None:
reconnect = yield from self.async_login()
if not reconnect:
token_initialized = yield from self.async_initialize_token()
if not token_initialized:
_LOGGER.error("Not connected to %s", self.host)
return []
@ -95,55 +81,42 @@ class UPCDeviceScanner(DeviceScanner):
@asyncio.coroutine
def async_get_device_name(self, device):
"""Ge the firmware doesn't save the name of the wireless device."""
"""The firmware doesn't save the name of the wireless device."""
return None
@asyncio.coroutine
def async_login(self):
"""Login into firmware and get first token."""
def async_initialize_token(self):
"""Get first token."""
try:
# get first token
with async_timeout.timeout(10, loop=self.hass.loop):
response = yield from self.websession.get(
"http://{}/common_page/login.html".format(self.host)
"http://{}/common_page/login.html".format(self.host),
headers=self.headers
)
yield from response.text()
self.token = response.cookies['sessionToken'].value
# login
data = yield from self._async_ws_function(CMD_LOGIN, {
'Username': 'NULL',
'Password': self.password,
})
# Successful?
return data is not None
return True
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.error("Can not load login page from %s", self.host)
return False
@asyncio.coroutine
def _async_ws_function(self, function, additional_form=None):
def _async_ws_function(self, function):
"""Execute a command on UPC firmware webservice."""
form_data = {
'token': self.token,
'fun': function
}
if additional_form:
form_data.update(additional_form)
redirects = function != CMD_DEVICES
try:
with async_timeout.timeout(10, loop=self.hass.loop):
# The 'token' parameter has to be first, and 'fun' second
# or the UPC firmware will return an error
response = yield from self.websession.post(
"http://{}/xml/getter.xml".format(self.host),
data=form_data,
data="token={}&fun={}".format(self.token, function),
headers=self.headers,
allow_redirects=redirects
allow_redirects=False
)
# error?

View File

@ -7,7 +7,7 @@ import logging
from homeassistant.setup import setup_component
from homeassistant.components import device_tracker
from homeassistant.const import (
CONF_PLATFORM, CONF_HOST, CONF_PASSWORD)
CONF_PLATFORM, CONF_HOST)
from homeassistant.components.device_tracker import DOMAIN
import homeassistant.components.device_tracker.upc_connect as platform
from homeassistant.util.async import run_coroutine_threadsafe
@ -62,43 +62,10 @@ class TestUPCConnect(object):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
CONF_HOST: self.host
}})
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
@patch('homeassistant.components.device_tracker._LOGGER.error')
def test_setup_platform_error_webservice(self, mock_error, aioclient_mock):
"""Setup a platform with api error."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
content=b'successful',
status=404
)
with assert_setup_component(1, DOMAIN):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
}})
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert 'Error setting up platform' in \
str(mock_error.call_args_list[-1])
assert len(aioclient_mock.mock_calls) == 1
@patch('homeassistant.components.device_tracker._LOGGER.error')
def test_setup_platform_timeout_webservice(self, mock_error,
@ -106,10 +73,7 @@ class TestUPCConnect(object):
"""Setup a platform with api timeout."""
aioclient_mock.get(
"http://{}/common_page/login.html".format(self.host),
cookies={'sessionToken': '654321'}
)
aioclient_mock.post(
"http://{}/xml/getter.xml".format(self.host),
cookies={'sessionToken': '654321'},
content=b'successful',
exc=asyncio.TimeoutError()
)
@ -118,14 +82,10 @@ class TestUPCConnect(object):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
CONF_HOST: self.host
}})
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert len(aioclient_mock.mock_calls) == 1
assert 'Error setting up platform' in \
str(mock_error.call_args_list[-1])
@ -147,8 +107,7 @@ class TestUPCConnect(object):
assert setup_component(
self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
CONF_HOST: self.host
}})
assert len(aioclient_mock.mock_calls) == 1
@ -171,14 +130,11 @@ class TestUPCConnect(object):
scanner = run_coroutine_threadsafe(platform.async_get_scanner(
self.hass, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
CONF_HOST: self.host
}}
), self.hass.loop).result()
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert len(aioclient_mock.mock_calls) == 1
aioclient_mock.clear_requests()
aioclient_mock.post(
@ -191,8 +147,7 @@ class TestUPCConnect(object):
scanner.async_scan_devices(), self.hass.loop).result()
assert len(aioclient_mock.mock_calls) == 1
assert aioclient_mock.mock_calls[0][2]['fun'] == 123
assert scanner.token == '1235678'
assert aioclient_mock.mock_calls[0][2] == 'token=654321&fun=123'
assert mac_list == ['30:D3:2D:0:69:21', '5C:AA:FD:25:32:02',
'70:EE:50:27:A1:38']
@ -211,14 +166,11 @@ class TestUPCConnect(object):
scanner = run_coroutine_threadsafe(platform.async_get_scanner(
self.hass, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
CONF_HOST: self.host
}}
), self.hass.loop).result()
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert len(aioclient_mock.mock_calls) == 1
aioclient_mock.clear_requests()
aioclient_mock.get(
@ -235,8 +187,8 @@ class TestUPCConnect(object):
mac_list = run_coroutine_threadsafe(
scanner.async_scan_devices(), self.hass.loop).result()
assert len(aioclient_mock.mock_calls) == 3
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2] == 'token=654321&fun=123'
assert mac_list == ['30:D3:2D:0:69:21', '5C:AA:FD:25:32:02',
'70:EE:50:27:A1:38']
@ -255,14 +207,11 @@ class TestUPCConnect(object):
scanner = run_coroutine_threadsafe(platform.async_get_scanner(
self.hass, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
CONF_HOST: self.host
}}
), self.hass.loop).result()
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert len(aioclient_mock.mock_calls) == 1
aioclient_mock.clear_requests()
aioclient_mock.get(
@ -280,7 +229,7 @@ class TestUPCConnect(object):
scanner.async_scan_devices(), self.hass.loop).result()
assert len(aioclient_mock.mock_calls) == 2
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2] == 'token=654321&fun=123'
assert mac_list == []
def test_scan_devices_parse_error(self, aioclient_mock):
@ -298,14 +247,11 @@ class TestUPCConnect(object):
scanner = run_coroutine_threadsafe(platform.async_get_scanner(
self.hass, {DOMAIN: {
CONF_PLATFORM: 'upc_connect',
CONF_HOST: self.host,
CONF_PASSWORD: '123456'
CONF_HOST: self.host
}}
), self.hass.loop).result()
assert aioclient_mock.mock_calls[1][2]['Password'] == '123456'
assert aioclient_mock.mock_calls[1][2]['fun'] == 15
assert aioclient_mock.mock_calls[1][2]['token'] == '654321'
assert len(aioclient_mock.mock_calls) == 1
aioclient_mock.clear_requests()
aioclient_mock.post(
@ -318,6 +264,6 @@ class TestUPCConnect(object):
scanner.async_scan_devices(), self.hass.loop).result()
assert len(aioclient_mock.mock_calls) == 1
assert aioclient_mock.mock_calls[0][2]['fun'] == 123
assert aioclient_mock.mock_calls[0][2] == 'token=654321&fun=123'
assert scanner.token is None
assert mac_list == []