From 9af3d2b914cdd2a80806d71bf5bd8d968b6e7bbc Mon Sep 17 00:00:00 2001 From: Markus Stenberg Date: Thu, 24 Apr 2014 16:58:15 +0300 Subject: [PATCH] pep8 + pylint run fixes. Also refactored the json-rpc to happen only in one place (external library would be nice but oh well). --- homeassistant/components/device_tracker.py | 138 ++++++++------------- 1 file changed, 52 insertions(+), 86 deletions(-) diff --git a/homeassistant/components/device_tracker.py b/homeassistant/components/device_tracker.py index bf805962a02..3b26f09fefc 100644 --- a/homeassistant/components/device_tracker.py +++ b/homeassistant/components/device_tracker.py @@ -456,6 +456,7 @@ class NetgearDeviceScanner(object): else: return + class LuciDeviceScanner(object): """ This class queries a wireless router running OpenWrt firmware for connected devices. Adapted from Tomato scanner. @@ -484,28 +485,39 @@ class LuciDeviceScanner(object): self.mac2name = None self.success_init = self.token - def get_token(self, host, username, password): - data = json.dumps({'method': 'login', - 'params': [username, password]}) + def _req_json_rpc(self, url, method, *args, **kwargs): + """ Perform one JSON RPC operation. """ + data = json.dumps({'method': method, 'params': args}) try: - r = requests.post('http://{}/cgi-bin/luci/rpc/auth'.format(host), data=data, timeout=3) - if r.status_code == 200: - token = r.json()['result'] - self.logger.info('Authenticated') - return token - elif r.status_code == 401: - # Authentication error - self.logger.exception( - "Failed to authenticate, " - "please check your username and password") - return - else: - self.logger.error("Invalid response: %s" % r) + res = requests.post(url, data=data, **kwargs) except requests.exceptions.Timeout: self.logger.exception("Connection to the router timed out") - except ValueError: - # If json decoder could not parse the response - self.logger.exception("Failed to parse response from router") + return + if res.status_code == 200: + try: + result = res.json() + except ValueError: + # If json decoder could not parse the response + self.logger.exception("Failed to parse response from luci") + return + try: + return result['result'] + except KeyError: + self.logger.exception("No result in response from luci") + return + elif res.status_code == 401: + # Authentication error + self.logger.exception( + "Failed to authenticate, " + "please check your username and password") + return + else: + self.logger.error("Invalid response from luci: {}".format(res)) + + def get_token(self, host, username, password): + """ Get authentication token for the given host+username+password """ + url = 'http://{}/cgi-bin/luci/rpc/auth'.format(host) + return self._req_json_rpc(url, 'login', username, password) def scan_devices(self): """ Scans for new devices and return a @@ -520,37 +532,17 @@ class LuciDeviceScanner(object): with self.lock: if self.mac2name is None: - try: - data = json.dumps({'method': 'get_all', - 'params': ['dhcp']}) - - r = requests.post('http://{}/cgi-bin/luci/rpc/uci'.format(self.host), params={'auth': self.token}, data=data, timeout=3) - - # Calling and parsing the Luci api here. We only need the - # wldev and dhcpd_lease values. For API description see: - # http://paulusschoutsen.nl/ - # blog/2013/10/tomato-api-documentation/ - if r.status_code == 200: - self.mac2name = dict(map(lambda x:(x['mac'], x['name']), - filter(lambda x:x['.type'] == 'host' and 'mac' in x and 'name' in x, - r.json()['result'].values()))) - # Passthrough - else: - self.logger.error("Invalid response: %s" % r) - return - - except requests.exceptions.Timeout: - # We get this if we could not connect to the router or - # an invalid http_id was supplied - self.logger.exception( - "Connection to the router timed out") - return - - except ValueError: - # If json decoder could not parse the response - self.logger.exception( - "Failed to parse response from router") - + url = 'http://{}/cgi-bin/luci/rpc/uci'.format(self.host) + result = self._req_json_rpc(url, 'get_all', 'dhcp', + params={'auth': self.token}) + if result: + hosts = [x for x in result.values() + if x['.type'] == 'host' and + 'mac' in x and 'name' in x] + mac2name_list = [(x['mac'], x['name']) for x in hosts] + self.mac2name = dict(mac2name_list) + else: + # Error, handled in the _req_json_rpc return return self.mac2name.get(device, None) @@ -560,46 +552,20 @@ class LuciDeviceScanner(object): if not self.success_init: return False with self.lock: - - # if date_updated is None or the date is too old we scan for new data + # if date_updated is None or the date is too old we scan + # for new data if (not self.date_updated or datetime.now() - self.date_updated > MIN_TIME_BETWEEN_SCANS): self.logger.info("Checking ARP") - try: - data = json.dumps({'method': 'net.arptable', - 'params': []}) - - r = requests.post('http://{}/cgi-bin/luci/rpc/sys'.format(self.host), params={'auth': self.token}, data=data, timeout=3) - - # Calling and parsing the Luci api here. We only need the - # wldev and dhcpd_lease values. For API description see: - # http://paulusschoutsen.nl/ - # blog/2013/10/tomato-api-documentation/ - if r.status_code == 200: - self.last_results = list(map(lambda x:x['HW address'], r.json()['result'])) - self.date_updated = datetime.now() - return True - - else: - self.logger.error("Invalid response: %s" % r) - return False - - except requests.exceptions.Timeout: - # We get this if we could not connect to the router or - # an invalid http_id was supplied - self.logger.exception( - "Connection to the router timed out") - - return False - - except ValueError: - # If json decoder could not parse the response - self.logger.exception( - "Failed to parse response from router") - - return False + url = 'http://{}/cgi-bin/luci/rpc/sys'.format(self.host) + result = self._req_json_rpc(url, 'net.arptable', + params={'auth': self.token}) + if result: + self.last_results = [x['HW address'] for x in result] + self.date_updated = datetime.now() + return True + return False return True -