From 54949cff5ad3cc7aa43897055df7446ce3b69e9c Mon Sep 17 00:00:00 2001 From: Finbarr Brady Date: Wed, 20 Feb 2019 15:55:00 +0000 Subject: [PATCH] Support OpenWRT 18.06 in luci device tracker (#21236) * Got it right this time i hope * updates on comments --- .../components/device_tracker/luci.py | 150 ++++-------------- requirements_all.txt | 3 + 2 files changed, 30 insertions(+), 123 deletions(-) diff --git a/homeassistant/components/device_tracker/luci.py b/homeassistant/components/device_tracker/luci.py index 30b09834b68..27fbf7c9f44 100644 --- a/homeassistant/components/device_tracker/luci.py +++ b/homeassistant/components/device_tracker/luci.py @@ -4,21 +4,18 @@ Support for OpenWRT (luci) routers. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/device_tracker.luci/ """ -import json -import logging -import re from collections import namedtuple - -import requests +import logging import voluptuous as vol import homeassistant.helpers.config_validation as cv -from homeassistant.exceptions import HomeAssistantError from homeassistant.components.device_tracker import ( DOMAIN, PLATFORM_SCHEMA, DeviceScanner) from homeassistant.const import ( CONF_HOST, CONF_USERNAME, CONF_PASSWORD, CONF_SSL) +REQUIREMENTS = ['openwrt-luci-rpc==0.3.0'] + _LOGGER = logging.getLogger(__name__) DEFAULT_SSL = False @@ -31,12 +28,6 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ }) -class InvalidLuciTokenError(HomeAssistantError): - """When an invalid token is detected.""" - - pass - - def get_scanner(hass, config): """Validate the configuration and return a Luci scanner.""" scanner = LuciDeviceScanner(config[DOMAIN]) @@ -44,138 +35,51 @@ def get_scanner(hass, config): return scanner if scanner.success_init else None -Device = namedtuple('Device', ['mac', 'ip', 'flags', 'device', 'host']) +Device = namedtuple('Device', ['mac', 'name']) class LuciDeviceScanner(DeviceScanner): - """This class queries a wireless router running OpenWrt firmware.""" + """This class scans for devices connected to an OpenWrt router.""" def __init__(self, config): """Initialize the scanner.""" - self.host = config[CONF_HOST] + host = config[CONF_HOST] protocol = 'http' if not config[CONF_SSL] else 'https' - self.origin = '{}://{}'.format(protocol, self.host) - self.username = config[CONF_USERNAME] - self.password = config[CONF_PASSWORD] + host_url = '{}://{}'.format(protocol, host) - self.parse_api_pattern = re.compile(r"(?P\w*) = (?P.*);") + from openwrt_luci_rpc import OpenWrtRpc + + self.router = OpenWrtRpc(host_url, + config[CONF_USERNAME], + config[CONF_PASSWORD]) self.last_results = {} - self.refresh_token() - self.mac2name = None - self.success_init = self.token is not None - - def refresh_token(self): - """Get a new token.""" - self.token = _get_token(self.origin, self.username, self.password) + self.success_init = self.router.is_logged_in() def scan_devices(self): """Scan for new devices and return a list with found device IDs.""" self._update_info() + return [device.mac for device in self.last_results] def get_device_name(self, device): """Return the name of the given device or None if we don't know.""" - if self.mac2name is None: - url = '{}/cgi-bin/luci/rpc/uci'.format(self.origin) - result = _req_json_rpc( - url, 'get_all', 'dhcp', params={'auth': self.token}) - if result: - hosts = [x for x in result.values() - if x['.type'] == 'host' and - 'mac' in x and 'name' in x] - mac2name_list = [ - (x['mac'].upper(), x['name']) for x in hosts] - self.mac2name = dict(mac2name_list) - else: - # Error, handled in the _req_json_rpc - return - return self.mac2name.get(device.upper(), None) - - def get_extra_attributes(self, device): - """Return the IP of the given device.""" - filter_att = next(( - { - 'ip': result.ip, - 'flags': result.flags, - 'device': result.device, - 'host': result.host - } for result in self.last_results + name = next(( + result.name for result in self.last_results if result.mac == device), None) - return filter_att + return name def _update_info(self): - """Ensure the information from the Luci router is up to date. + """Check the Luci router for devices.""" + result = self.router.get_all_connected_devices( + only_reachable=True) - Returns boolean if scanning successful. - """ - if not self.success_init: - return False + _LOGGER.debug("Luci get_all_connected_devices returned:" + " %s", result) - _LOGGER.info("Checking ARP") + last_results = [] + for device in result: + last_results.append( + Device(device['macaddress'], device['hostname'])) - url = '{}/cgi-bin/luci/rpc/sys'.format(self.origin) - - try: - result = _req_json_rpc( - url, 'net.arptable', params={'auth': self.token}) - except InvalidLuciTokenError: - _LOGGER.info("Refreshing token") - self.refresh_token() - return False - - if result: - self.last_results = [] - for device_entry in result: - # Check if the Flags for each device contain - # NUD_REACHABLE and if so, add it to last_results - if int(device_entry['Flags'], 16) & 0x2: - self.last_results.append(Device(device_entry['HW address'], - device_entry['IP address'], - device_entry['Flags'], - device_entry['Device'], - self.host)) - - return True - - return False - - -def _req_json_rpc(url, method, *args, **kwargs): - """Perform one JSON RPC operation.""" - data = json.dumps({'method': method, 'params': args}) - - try: - res = requests.post(url, data=data, timeout=5, **kwargs) - except requests.exceptions.Timeout: - _LOGGER.exception("Connection to the router timed out") - return - if res.status_code == 200: - try: - result = res.json() - except ValueError: - # If json decoder could not parse the response - _LOGGER.exception("Failed to parse response from luci") - return - try: - return result['result'] - except KeyError: - _LOGGER.exception("No result in response from luci") - return - elif res.status_code == 401: - # Authentication error - _LOGGER.exception( - "Failed to authenticate, check your username and password") - return - elif res.status_code == 403: - _LOGGER.error("Luci responded with a 403 Invalid token") - raise InvalidLuciTokenError - - else: - _LOGGER.error("Invalid response from luci: %s", res) - - -def _get_token(origin, username, password): - """Get authentication token for the given configuration.""" - url = '{}/cgi-bin/luci/rpc/auth'.format(origin) - return _req_json_rpc(url, 'login', username, password) + self.last_results = last_results diff --git a/requirements_all.txt b/requirements_all.txt index 63cb4b87072..9e915f79417 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -775,6 +775,9 @@ openhomedevice==0.4.2 # homeassistant.components.air_quality.opensensemap opensensemap-api==0.1.3 +# homeassistant.components.device_tracker.luci +openwrt-luci-rpc==0.3.0 + # homeassistant.components.switch.orvibo orvibo==1.1.1