diff --git a/homeassistant/bootstrap.py b/homeassistant/bootstrap.py index 5a8a4327f9e..2a8ff2bc6f5 100644 --- a/homeassistant/bootstrap.py +++ b/homeassistant/bootstrap.py @@ -96,6 +96,16 @@ def from_config_file(config_path, enable_logging=True): get_opt('device_tracker.netgear', 'username'), get_opt('device_tracker.netgear', 'password')) + elif has_section('device_tracker.luci'): + device_tracker = load_module('device_tracker') + + dev_scan_name = "Luci" + + dev_scan = device_tracker.LuciDeviceScanner( + get_opt('device_tracker.luci', 'host'), + get_opt('device_tracker.luci', 'username'), + get_opt('device_tracker.luci', 'password')) + except configparser.NoOptionError: # If one of the options didn't exist logger.exception(("Error initializing {}DeviceScanner, " diff --git a/homeassistant/components/device_tracker.py b/homeassistant/components/device_tracker.py index 7f00277be39..3b26f09fefc 100644 --- a/homeassistant/components/device_tracker.py +++ b/homeassistant/components/device_tracker.py @@ -455,3 +455,117 @@ class NetgearDeviceScanner(object): else: return + + +class LuciDeviceScanner(object): + """ This class queries a wireless router running OpenWrt firmware + for connected devices. Adapted from Tomato scanner. + + # opkg install luci-mod-rpc + for this to work on the router. + + The API is described here: + http://luci.subsignal.org/trac/wiki/Documentation/JsonRpcHowTo + + (Currently, we do only wifi iwscan, and no DHCP lease access.) + """ + + def __init__(self, host, username, password): + self.parse_api_pattern = re.compile(r"(?P\w*) = (?P.*);") + + self.logger = logging.getLogger(__name__) + self.lock = threading.Lock() + + self.date_updated = None + self.last_results = {} + + self.token = self.get_token(host, username, password) + self.host = host + + self.mac2name = None + self.success_init = self.token + + def _req_json_rpc(self, url, method, *args, **kwargs): + """ Perform one JSON RPC operation. """ + data = json.dumps({'method': method, 'params': args}) + try: + res = requests.post(url, data=data, **kwargs) + except requests.exceptions.Timeout: + self.logger.exception("Connection to the router timed out") + return + if res.status_code == 200: + try: + result = res.json() + except ValueError: + # If json decoder could not parse the response + self.logger.exception("Failed to parse response from luci") + return + try: + return result['result'] + except KeyError: + self.logger.exception("No result in response from luci") + return + elif res.status_code == 401: + # Authentication error + self.logger.exception( + "Failed to authenticate, " + "please check your username and password") + return + else: + self.logger.error("Invalid response from luci: {}".format(res)) + + def get_token(self, host, username, password): + """ Get authentication token for the given host+username+password """ + url = 'http://{}/cgi-bin/luci/rpc/auth'.format(host) + return self._req_json_rpc(url, 'login', username, password) + + def scan_devices(self): + """ Scans for new devices and return a + list containing found device ids. """ + + self._update_info() + + return self.last_results + + def get_device_name(self, device): + """ Returns the name of the given device or None if we don't know. """ + + with self.lock: + if self.mac2name is None: + url = 'http://{}/cgi-bin/luci/rpc/uci'.format(self.host) + result = self._req_json_rpc(url, 'get_all', 'dhcp', + params={'auth': self.token}) + if result: + hosts = [x for x in result.values() + if x['.type'] == 'host' and + 'mac' in x and 'name' in x] + mac2name_list = [(x['mac'], x['name']) for x in hosts] + self.mac2name = dict(mac2name_list) + else: + # Error, handled in the _req_json_rpc + return + return self.mac2name.get(device, None) + + def _update_info(self): + """ Ensures the information from the Luci router is up to date. + Returns boolean if scanning successful. """ + if not self.success_init: + return False + with self.lock: + # if date_updated is None or the date is too old we scan + # for new data + if (not self.date_updated or datetime.now() - self.date_updated > + MIN_TIME_BETWEEN_SCANS): + + self.logger.info("Checking ARP") + + url = 'http://{}/cgi-bin/luci/rpc/sys'.format(self.host) + result = self._req_json_rpc(url, 'net.arptable', + params={'auth': self.token}) + if result: + self.last_results = [x['HW address'] for x in result] + self.date_updated = datetime.now() + return True + return False + + return True