[Device Tracker] Xiaomi Mi Router token refresh (#5437)

Device token is refreshed if not anymore valid (for example after router reboot).
Token refresh will only be tried once per update.
This commit is contained in:
Touliloup 2017-01-19 15:05:37 +01:00 committed by Pascal Vizeli
parent 216ac14b3d
commit 909978b0d1
2 changed files with 118 additions and 36 deletions

View File

@ -31,12 +31,12 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
def get_scanner(hass, config):
"""Validate the configuration and return a Xiaomi Device Scanner."""
scanner = XioamiDeviceScanner(config[DOMAIN])
scanner = XiaomiDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None
class XioamiDeviceScanner(DeviceScanner):
class XiaomiDeviceScanner(DeviceScanner):
"""This class queries a Xiaomi Mi router.
Adapted from Luci scanner.
@ -44,15 +44,14 @@ class XioamiDeviceScanner(DeviceScanner):
def __init__(self, config):
"""Initialize the scanner."""
host = config[CONF_HOST]
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.last_results = {}
self.token = _get_token(host, username, password)
self.host = host
self.token = _get_token(self.host, self.username, self.password)
self.mac2name = None
self.success_init = self.token is not None
@ -66,9 +65,7 @@ class XioamiDeviceScanner(DeviceScanner):
"""Return the name of the given device or None if we don't know."""
with self.lock:
if self.mac2name is None:
url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist"
url = url.format(self.host, self.token)
result = _get_device_list(url)
result = self._retrieve_list_with_retry()
if result:
hosts = [x for x in result
if 'mac' in x and 'name' in x]
@ -76,7 +73,7 @@ class XioamiDeviceScanner(DeviceScanner):
(x['mac'].upper(), x['name']) for x in hosts]
self.mac2name = dict(mac2name_list)
else:
# Error, handled in the _req_json_rpc
# Error, handled in the _retrieve_list_with_retry
return
return self.mac2name.get(device.upper(), None)
@ -90,29 +87,72 @@ class XioamiDeviceScanner(DeviceScanner):
return False
with self.lock:
_LOGGER.info('Refreshing device list')
url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist"
url = url.format(self.host, self.token)
result = _get_device_list(url)
result = self._retrieve_list_with_retry()
if result:
self._store_result(result)
return True
return False
def _retrieve_list_with_retry(self):
"""Retrieve the device list with a retry if token is invalid.
Return the list if successful.
"""
_LOGGER.info('Refreshing device list')
result = _retrieve_list(self.host, self.token)
if result:
return result
else:
_LOGGER.info('Refreshing token and retrying device list refresh')
self.token = _get_token(self.host, self.username, self.password)
return _retrieve_list(self.host, self.token)
def _store_result(self, result):
"""Extract and store the device list in self.last_results."""
self.last_results = []
for device_entry in result:
# Check if the device is marked as connected
if int(device_entry['online']) == 1:
self.last_results.append(device_entry['mac'])
return True
return False
def _get_device_list(url, **kwargs):
def _retrieve_list(host, token, **kwargs):
""""Get device list for the given host."""
url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist"
url = url.format(host, token)
try:
res = requests.get(url, timeout=5, **kwargs)
except requests.exceptions.Timeout:
_LOGGER.exception('Connection to the router timed out')
_LOGGER.exception('Connection to the router timed out at URL [%s]',
url)
return
if res.status_code != 200:
_LOGGER.exception('Connection failed with http code [%s]',
res.status_code)
return
try:
result = res.json()
except ValueError:
# If json decoder could not parse the response
_LOGGER.exception('Failed to parse response from mi router')
return
try:
xiaomi_code = result['code']
except KeyError:
_LOGGER.exception('No field code in response from mi router. %s',
result)
return
if xiaomi_code == 0:
try:
return result['list']
except KeyError:
_LOGGER.exception('No list in response from mi router. %s', result)
return
else:
_LOGGER.info(
'Receive wrong Xiaomi code [%s], expected [0] in response [%s]',
xiaomi_code, result)
return
return _extract_result(res, 'list')
def _get_token(host, username, password):
@ -124,10 +164,6 @@ def _get_token(host, username, password):
except requests.exceptions.Timeout:
_LOGGER.exception('Connection to the router timed out')
return
return _extract_result(res, 'token')
def _extract_result(res, key_name):
if res.status_code == 200:
try:
result = res.json()
@ -136,10 +172,12 @@ def _extract_result(res, key_name):
_LOGGER.exception('Failed to parse response from mi router')
return
try:
return result[key_name]
return result['token']
except KeyError:
_LOGGER.exception('No %s in response from mi router. %s',
key_name, result)
error_message = "Xiaomi token cannot be refreshed, response from "\
+ "url: [%s] \nwith parameter: [%s] \nwas: [%s]"
_LOGGER.exception(error_message, url, data, result)
return
else:
_LOGGER.error('Invalid response from mi router: %s', res)
_LOGGER.error('Invalid response: [%s] at url: [%s] with data [%s]',
res, url, data)

View File

@ -15,9 +15,12 @@ from tests.common import get_test_home_assistant
_LOGGER = logging.getLogger(__name__)
INVALID_USERNAME = 'bob'
TOKEN_TIMEOUT_USERNAME = 'tok'
URL_AUTHORIZE = 'http://192.168.0.1/cgi-bin/luci/api/xqsystem/login'
URL_LIST_END = 'api/misystem/devicelist'
FIRST_CALL = True
def mocked_requests(*args, **kwargs):
"""Mock requests.get invocations."""
@ -44,20 +47,38 @@ def mocked_requests(*args, **kwargs):
raise requests.HTTPError(self.status_code)
data = kwargs.get('data')
global FIRST_CALL
if data and data.get('username', None) == INVALID_USERNAME:
# deliver an invalid token
return MockResponse({
"code": "401",
"msg": "Invalid token"
}, 200)
elif data and data.get('username', None) == TOKEN_TIMEOUT_USERNAME:
# deliver an expired token
return MockResponse({
"url": "/cgi-bin/luci/;stok=ef5860/web/home",
"token": "timedOut",
"code": "0"
}, 200)
elif str(args[0]).startswith(URL_AUTHORIZE):
print("deliver authorized")
# deliver an authorized token
return MockResponse({
"url": "/cgi-bin/luci/;stok=ef5860/web/home",
"token": "ef5860",
"code": "0"
}, 200)
elif str(args[0]).endswith("timedOut/" + URL_LIST_END) \
and FIRST_CALL is True:
FIRST_CALL = False
# deliver an error when called with expired token
return MockResponse({
"code": "401",
"msg": "Invalid token"
}, 200)
elif str(args[0]).endswith(URL_LIST_END):
# deliver the device list
return MockResponse({
"mac": "1C:98:EC:0E:D5:A4",
"list": [
@ -144,7 +165,7 @@ class TestXiaomiDeviceScanner(unittest.TestCase):
self.hass.stop()
@mock.patch(
'homeassistant.components.device_tracker.xiaomi.XioamiDeviceScanner',
'homeassistant.components.device_tracker.xiaomi.XiaomiDeviceScanner',
return_value=mock.MagicMock())
def test_config(self, xiaomi_mock):
"""Testing minimal configuration."""
@ -165,7 +186,7 @@ class TestXiaomiDeviceScanner(unittest.TestCase):
self.assertEqual(call_arg['platform'], 'device_tracker')
@mock.patch(
'homeassistant.components.device_tracker.xiaomi.XioamiDeviceScanner',
'homeassistant.components.device_tracker.xiaomi.XiaomiDeviceScanner',
return_value=mock.MagicMock())
def test_config_full(self, xiaomi_mock):
"""Testing full configuration."""
@ -219,3 +240,26 @@ class TestXiaomiDeviceScanner(unittest.TestCase):
scanner.get_device_name("23:83:BF:F6:38:A0"))
self.assertEqual("Device2",
scanner.get_device_name("1D:98:EC:5E:D5:A6"))
@patch('requests.get', side_effect=mocked_requests)
@patch('requests.post', side_effect=mocked_requests)
def test_token_timed_out(self, mock_get, mock_post):
""""Testing refresh with a timed out token.
New token is requested and list is downloaded a second time.
"""
config = {
DOMAIN: xiaomi.PLATFORM_SCHEMA({
CONF_PLATFORM: xiaomi.DOMAIN,
CONF_HOST: '192.168.0.1',
CONF_USERNAME: TOKEN_TIMEOUT_USERNAME,
CONF_PASSWORD: 'passwordTest'
})
}
scanner = get_scanner(self.hass, config)
self.assertIsNotNone(scanner)
self.assertEqual(2, len(scanner.scan_devices()))
self.assertEqual("Device1",
scanner.get_device_name("23:83:BF:F6:38:A0"))
self.assertEqual("Device2",
scanner.get_device_name("1D:98:EC:5E:D5:A6"))