Added option to connect via SSL for OpenWRT(luci) device tracker (#14627)

* Added option to connect via HTTPS for OpenWRT(luci) device tracker

* Use string formatting

* Update quotes

* Remove whitespace
This commit is contained in:
Lorenz Schmid 2018-05-26 09:51:21 +02:00 committed by Fabian Affolter
parent 6ceafabd78
commit edfc54b2eb

View File

@ -15,14 +15,18 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.exceptions import HomeAssistantError
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.const import (
CONF_HOST, CONF_USERNAME, CONF_PASSWORD, CONF_SSL)
_LOGGER = logging.getLogger(__name__)
DEFAULT_SSL = False
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean
})
@ -44,7 +48,9 @@ class LuciDeviceScanner(DeviceScanner):
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
host = config[CONF_HOST]
protocol = 'http' if not config[CONF_SSL] else 'https'
self.origin = '{}://{}'.format(protocol, host)
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
@ -57,7 +63,7 @@ class LuciDeviceScanner(DeviceScanner):
def refresh_token(self):
"""Get a new token."""
self.token = _get_token(self.host, self.username, self.password)
self.token = _get_token(self.origin, self.username, self.password)
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
@ -67,9 +73,9 @@ class LuciDeviceScanner(DeviceScanner):
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
if self.mac2name is None:
url = 'http://{}/cgi-bin/luci/rpc/uci'.format(self.host)
result = _req_json_rpc(url, 'get_all', 'dhcp',
params={'auth': self.token})
url = '{}/cgi-bin/luci/rpc/uci'.format(self.origin)
result = _req_json_rpc(
url, 'get_all', 'dhcp', params={'auth': self.token})
if result:
hosts = [x for x in result.values()
if x['.type'] == 'host' and
@ -92,11 +98,11 @@ class LuciDeviceScanner(DeviceScanner):
_LOGGER.info("Checking ARP")
url = 'http://{}/cgi-bin/luci/rpc/sys'.format(self.host)
url = '{}/cgi-bin/luci/rpc/sys'.format(self.origin)
try:
result = _req_json_rpc(url, 'net.arptable',
params={'auth': self.token})
result = _req_json_rpc(
url, 'net.arptable', params={'auth': self.token})
except InvalidLuciTokenError:
_LOGGER.info("Refreshing token")
self.refresh_token()
@ -146,10 +152,10 @@ def _req_json_rpc(url, method, *args, **kwargs):
raise InvalidLuciTokenError
else:
_LOGGER.error('Invalid response from luci: %s', res)
_LOGGER.error("Invalid response from luci: %s", res)
def _get_token(host, username, password):
"""Get authentication token for the given host+username+password."""
url = 'http://{}/cgi-bin/luci/rpc/auth'.format(host)
def _get_token(origin, username, password):
"""Get authentication token for the given configuration."""
url = '{}/cgi-bin/luci/rpc/auth'.format(origin)
return _req_json_rpc(url, 'login', username, password)