diff --git a/homeassistant/components/device_tracker/xiaomi.py b/homeassistant/components/device_tracker/xiaomi.py index ff53d1fe99f..7c5c415f054 100644 --- a/homeassistant/components/device_tracker/xiaomi.py +++ b/homeassistant/components/device_tracker/xiaomi.py @@ -31,12 +31,12 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ def get_scanner(hass, config): """Validate the configuration and return a Xiaomi Device Scanner.""" - scanner = XioamiDeviceScanner(config[DOMAIN]) + scanner = XiaomiDeviceScanner(config[DOMAIN]) return scanner if scanner.success_init else None -class XioamiDeviceScanner(DeviceScanner): +class XiaomiDeviceScanner(DeviceScanner): """This class queries a Xiaomi Mi router. Adapted from Luci scanner. @@ -44,15 +44,14 @@ class XioamiDeviceScanner(DeviceScanner): def __init__(self, config): """Initialize the scanner.""" - host = config[CONF_HOST] - username, password = config[CONF_USERNAME], config[CONF_PASSWORD] + self.host = config[CONF_HOST] + self.username = config[CONF_USERNAME] + self.password = config[CONF_PASSWORD] self.lock = threading.Lock() self.last_results = {} - self.token = _get_token(host, username, password) - - self.host = host + self.token = _get_token(self.host, self.username, self.password) self.mac2name = None self.success_init = self.token is not None @@ -66,9 +65,7 @@ class XioamiDeviceScanner(DeviceScanner): """Return the name of the given device or None if we don't know.""" with self.lock: if self.mac2name is None: - url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist" - url = url.format(self.host, self.token) - result = _get_device_list(url) + result = self._retrieve_list_with_retry() if result: hosts = [x for x in result if 'mac' in x and 'name' in x] @@ -76,7 +73,7 @@ class XioamiDeviceScanner(DeviceScanner): (x['mac'].upper(), x['name']) for x in hosts] self.mac2name = dict(mac2name_list) else: - # Error, handled in the _req_json_rpc + # Error, handled in the _retrieve_list_with_retry return return self.mac2name.get(device.upper(), None) @@ -90,29 +87,72 @@ class XioamiDeviceScanner(DeviceScanner): return False with self.lock: - _LOGGER.info('Refreshing device list') - url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist" - url = url.format(self.host, self.token) - result = _get_device_list(url) + result = self._retrieve_list_with_retry() if result: - self.last_results = [] - for device_entry in result: - # Check if the device is marked as connected - if int(device_entry['online']) == 1: - self.last_results.append(device_entry['mac']) - + self._store_result(result) return True - return False + def _retrieve_list_with_retry(self): + """Retrieve the device list with a retry if token is invalid. -def _get_device_list(url, **kwargs): + Return the list if successful. + """ + _LOGGER.info('Refreshing device list') + result = _retrieve_list(self.host, self.token) + if result: + return result + else: + _LOGGER.info('Refreshing token and retrying device list refresh') + self.token = _get_token(self.host, self.username, self.password) + return _retrieve_list(self.host, self.token) + + def _store_result(self, result): + """Extract and store the device list in self.last_results.""" + self.last_results = [] + for device_entry in result: + # Check if the device is marked as connected + if int(device_entry['online']) == 1: + self.last_results.append(device_entry['mac']) + + +def _retrieve_list(host, token, **kwargs): + """"Get device list for the given host.""" + url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist" + url = url.format(host, token) try: res = requests.get(url, timeout=5, **kwargs) except requests.exceptions.Timeout: - _LOGGER.exception('Connection to the router timed out') + _LOGGER.exception('Connection to the router timed out at URL [%s]', + url) + return + if res.status_code != 200: + _LOGGER.exception('Connection failed with http code [%s]', + res.status_code) + return + try: + result = res.json() + except ValueError: + # If json decoder could not parse the response + _LOGGER.exception('Failed to parse response from mi router') + return + try: + xiaomi_code = result['code'] + except KeyError: + _LOGGER.exception('No field code in response from mi router. %s', + result) + return + if xiaomi_code == 0: + try: + return result['list'] + except KeyError: + _LOGGER.exception('No list in response from mi router. %s', result) + return + else: + _LOGGER.info( + 'Receive wrong Xiaomi code [%s], expected [0] in response [%s]', + xiaomi_code, result) return - return _extract_result(res, 'list') def _get_token(host, username, password): @@ -124,10 +164,6 @@ def _get_token(host, username, password): except requests.exceptions.Timeout: _LOGGER.exception('Connection to the router timed out') return - return _extract_result(res, 'token') - - -def _extract_result(res, key_name): if res.status_code == 200: try: result = res.json() @@ -136,10 +172,12 @@ def _extract_result(res, key_name): _LOGGER.exception('Failed to parse response from mi router') return try: - return result[key_name] + return result['token'] except KeyError: - _LOGGER.exception('No %s in response from mi router. %s', - key_name, result) + error_message = "Xiaomi token cannot be refreshed, response from "\ + + "url: [%s] \nwith parameter: [%s] \nwas: [%s]" + _LOGGER.exception(error_message, url, data, result) return else: - _LOGGER.error('Invalid response from mi router: %s', res) + _LOGGER.error('Invalid response: [%s] at url: [%s] with data [%s]', + res, url, data) diff --git a/tests/components/device_tracker/test_xiaomi.py b/tests/components/device_tracker/test_xiaomi.py index 482ed7c0c0d..94a4566a17b 100644 --- a/tests/components/device_tracker/test_xiaomi.py +++ b/tests/components/device_tracker/test_xiaomi.py @@ -15,9 +15,12 @@ from tests.common import get_test_home_assistant _LOGGER = logging.getLogger(__name__) INVALID_USERNAME = 'bob' +TOKEN_TIMEOUT_USERNAME = 'tok' URL_AUTHORIZE = 'http://192.168.0.1/cgi-bin/luci/api/xqsystem/login' URL_LIST_END = 'api/misystem/devicelist' +FIRST_CALL = True + def mocked_requests(*args, **kwargs): """Mock requests.get invocations.""" @@ -44,20 +47,38 @@ def mocked_requests(*args, **kwargs): raise requests.HTTPError(self.status_code) data = kwargs.get('data') + global FIRST_CALL if data and data.get('username', None) == INVALID_USERNAME: + # deliver an invalid token return MockResponse({ "code": "401", "msg": "Invalid token" }, 200) + elif data and data.get('username', None) == TOKEN_TIMEOUT_USERNAME: + # deliver an expired token + return MockResponse({ + "url": "/cgi-bin/luci/;stok=ef5860/web/home", + "token": "timedOut", + "code": "0" + }, 200) elif str(args[0]).startswith(URL_AUTHORIZE): - print("deliver authorized") + # deliver an authorized token return MockResponse({ "url": "/cgi-bin/luci/;stok=ef5860/web/home", "token": "ef5860", "code": "0" }, 200) + elif str(args[0]).endswith("timedOut/" + URL_LIST_END) \ + and FIRST_CALL is True: + FIRST_CALL = False + # deliver an error when called with expired token + return MockResponse({ + "code": "401", + "msg": "Invalid token" + }, 200) elif str(args[0]).endswith(URL_LIST_END): + # deliver the device list return MockResponse({ "mac": "1C:98:EC:0E:D5:A4", "list": [ @@ -144,7 +165,7 @@ class TestXiaomiDeviceScanner(unittest.TestCase): self.hass.stop() @mock.patch( - 'homeassistant.components.device_tracker.xiaomi.XioamiDeviceScanner', + 'homeassistant.components.device_tracker.xiaomi.XiaomiDeviceScanner', return_value=mock.MagicMock()) def test_config(self, xiaomi_mock): """Testing minimal configuration.""" @@ -165,7 +186,7 @@ class TestXiaomiDeviceScanner(unittest.TestCase): self.assertEqual(call_arg['platform'], 'device_tracker') @mock.patch( - 'homeassistant.components.device_tracker.xiaomi.XioamiDeviceScanner', + 'homeassistant.components.device_tracker.xiaomi.XiaomiDeviceScanner', return_value=mock.MagicMock()) def test_config_full(self, xiaomi_mock): """Testing full configuration.""" @@ -219,3 +240,26 @@ class TestXiaomiDeviceScanner(unittest.TestCase): scanner.get_device_name("23:83:BF:F6:38:A0")) self.assertEqual("Device2", scanner.get_device_name("1D:98:EC:5E:D5:A6")) + + @patch('requests.get', side_effect=mocked_requests) + @patch('requests.post', side_effect=mocked_requests) + def test_token_timed_out(self, mock_get, mock_post): + """"Testing refresh with a timed out token. + + New token is requested and list is downloaded a second time. + """ + config = { + DOMAIN: xiaomi.PLATFORM_SCHEMA({ + CONF_PLATFORM: xiaomi.DOMAIN, + CONF_HOST: '192.168.0.1', + CONF_USERNAME: TOKEN_TIMEOUT_USERNAME, + CONF_PASSWORD: 'passwordTest' + }) + } + scanner = get_scanner(self.hass, config) + self.assertIsNotNone(scanner) + self.assertEqual(2, len(scanner.scan_devices())) + self.assertEqual("Device1", + scanner.get_device_name("23:83:BF:F6:38:A0")) + self.assertEqual("Device2", + scanner.get_device_name("1D:98:EC:5E:D5:A6"))