Add support for odhcpd DHCP server (#9858)

This commit adds support for the odhcp DHCP server in addition to
dnsmasq. A new configuration option 'dhcp_software' has been added and
allows the user to set the server used (defaults to dnsmasq to not break
existing installations).
This commit is contained in:
Audric Schiltknecht 2017-10-31 08:40:12 -04:00 committed by Fabian Affolter
parent 6d94c121a7
commit 6b96bc3859

View File

@ -19,16 +19,29 @@ from homeassistant.exceptions import HomeAssistantError
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
CONF_DHCP_SOFTWARE = 'dhcp_software'
DEFAULT_DHCP_SOFTWARE = 'dnsmasq'
DHCP_SOFTWARES = [
'dnsmasq',
'odhcpd'
]
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string, vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string, vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_DHCP_SOFTWARE,
default=DEFAULT_DHCP_SOFTWARE): vol.In(DHCP_SOFTWARES)
}) })
def get_scanner(hass, config): def get_scanner(hass, config):
"""Validate the configuration and return an ubus scanner.""" """Validate the configuration and return an ubus scanner."""
scanner = UbusDeviceScanner(config[DOMAIN]) dhcp_sw = config[DOMAIN][CONF_DHCP_SOFTWARE]
if dhcp_sw == 'dnsmasq':
scanner = DnsmasqUbusDeviceScanner(config[DOMAIN])
else:
scanner = OdhcpdUbusDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None return scanner if scanner.success_init else None
@ -70,7 +83,6 @@ class UbusDeviceScanner(DeviceScanner):
self.session_id = _get_session_id(self.url, self.username, self.session_id = _get_session_id(self.url, self.username,
self.password) self.password)
self.hostapd = [] self.hostapd = []
self.leasefile = None
self.mac2name = None self.mac2name = None
self.success_init = self.session_id is not None self.success_init = self.session_id is not None
@ -79,44 +91,29 @@ class UbusDeviceScanner(DeviceScanner):
self._update_info() self._update_info()
return self.last_results return self.last_results
def _generate_mac2name(self):
"""Must be implemented depending on the software."""
raise NotImplementedError
@_refresh_on_acccess_denied @_refresh_on_acccess_denied
def get_device_name(self, mac): def get_device_name(self, mac):
"""Return the name of the given device or None if we don't know.""" """Return the name of the given device or None if we don't know."""
if self.leasefile is None:
result = _req_json_rpc(
self.url, self.session_id, 'call', 'uci', 'get',
config="dhcp", type="dnsmasq")
if result:
values = result["values"].values()
self.leasefile = next(iter(values))["leasefile"]
else:
return
if self.mac2name is None: if self.mac2name is None:
result = _req_json_rpc( self._generate_mac2name()
self.url, self.session_id, 'call', 'file', 'read', name = self.mac2name.get(mac.upper(), None)
path=self.leasefile) self.mac2name = None
if result: return name
self.mac2name = dict()
for line in result["data"].splitlines():
hosts = line.split(" ")
self.mac2name[hosts[1].upper()] = hosts[3]
else:
# Error, handled in the _req_json_rpc
return
return self.mac2name.get(mac.upper(), None)
@_refresh_on_acccess_denied @_refresh_on_acccess_denied
def _update_info(self): def _update_info(self):
"""Ensure the information from the Luci router is up to date. """Ensure the information from the router is up to date.
Returns boolean if scanning successful. Returns boolean if scanning successful.
""" """
if not self.success_init: if not self.success_init:
return False return False
_LOGGER.info("Checking ARP") _LOGGER.info("Checking hostapd")
if not self.hostapd: if not self.hostapd:
hostapd = _req_json_rpc( hostapd = _req_json_rpc(
@ -136,6 +133,57 @@ class UbusDeviceScanner(DeviceScanner):
return bool(results) return bool(results)
class DnsmasqUbusDeviceScanner(UbusDeviceScanner):
"""Implement the Ubus device scanning for the dnsmasq DHCP server."""
def __init__(self, config):
"""Initialize the scanner."""
super(DnsmasqUbusDeviceScanner, self).__init__(config)
self.leasefile = None
def _generate_mac2name(self):
if self.leasefile is None:
result = _req_json_rpc(
self.url, self.session_id, 'call', 'uci', 'get',
config="dhcp", type="dnsmasq")
if result:
values = result["values"].values()
self.leasefile = next(iter(values))["leasefile"]
else:
return
result = _req_json_rpc(
self.url, self.session_id, 'call', 'file', 'read',
path=self.leasefile)
if result:
self.mac2name = dict()
for line in result["data"].splitlines():
hosts = line.split(" ")
self.mac2name[hosts[1].upper()] = hosts[3]
else:
# Error, handled in the _req_json_rpc
return
class OdhcpdUbusDeviceScanner(UbusDeviceScanner):
"""Implement the Ubus device scanning for the odhcp DHCP server."""
def _generate_mac2name(self):
result = _req_json_rpc(
self.url, self.session_id, 'call', 'dhcp', 'ipv4leases')
if result:
self.mac2name = dict()
for device in result["device"].values():
for lease in device['leases']:
mac = lease['mac'] # mac = aabbccddeeff
# Convert it to expected format with colon
mac = ":".join(mac[i:i+2] for i in range(0, len(mac), 2))
self.mac2name[mac.upper()] = lease['hostname']
else:
# Error, handled in the _req_json_rpc
return
def _req_json_rpc(url, session_id, rpcmethod, subsystem, method, **params): def _req_json_rpc(url, session_id, rpcmethod, subsystem, method, **params):
"""Perform one JSON RPC operation.""" """Perform one JSON RPC operation."""
data = json.dumps({"jsonrpc": "2.0", data = json.dumps({"jsonrpc": "2.0",