mirror of
https://github.com/home-assistant/core.git
synced 2025-05-19 05:19:14 +00:00

* added login_method config option to fix login issue with RouterOS Version > 6.43 * minor changes so that users don't have to change their config * removed default config value to make the fallback without config change work as expected
225 lines
8.0 KiB
Python
225 lines
8.0 KiB
Python
"""Support for Mikrotik routers as device tracker."""
|
|
import logging
|
|
|
|
import ssl
|
|
|
|
import voluptuous as vol
|
|
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.components.device_tracker import (
|
|
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
|
|
from homeassistant.const import (
|
|
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT, CONF_SSL, CONF_METHOD)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
MTK_DEFAULT_API_PORT = '8728'
|
|
MTK_DEFAULT_API_SSL_PORT = '8729'
|
|
|
|
CONF_LOGIN_METHOD = 'login_method'
|
|
MTK_LOGIN_PLAIN = 'plain'
|
|
MTK_LOGIN_TOKEN = 'token'
|
|
|
|
CONF_ENCODING = 'encoding'
|
|
DEFAULT_ENCODING = 'utf-8'
|
|
|
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
|
|
vol.Required(CONF_HOST): cv.string,
|
|
vol.Required(CONF_USERNAME): cv.string,
|
|
vol.Required(CONF_PASSWORD): cv.string,
|
|
vol.Optional(CONF_METHOD): cv.string,
|
|
vol.Optional(CONF_LOGIN_METHOD):
|
|
vol.Any(MTK_LOGIN_PLAIN, MTK_LOGIN_TOKEN),
|
|
vol.Optional(CONF_PORT): cv.port,
|
|
vol.Optional(CONF_SSL, default=False): cv.boolean,
|
|
vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
|
|
})
|
|
|
|
|
|
def get_scanner(hass, config):
|
|
"""Validate the configuration and return MTikScanner."""
|
|
scanner = MikrotikScanner(config[DOMAIN])
|
|
return scanner if scanner.success_init else None
|
|
|
|
|
|
class MikrotikScanner(DeviceScanner):
|
|
"""This class queries a Mikrotik router."""
|
|
|
|
def __init__(self, config):
|
|
"""Initialize the scanner."""
|
|
self.last_results = {}
|
|
|
|
self.host = config[CONF_HOST]
|
|
self.ssl = config[CONF_SSL]
|
|
try:
|
|
self.port = config[CONF_PORT]
|
|
except KeyError:
|
|
if self.ssl:
|
|
self.port = MTK_DEFAULT_API_SSL_PORT
|
|
else:
|
|
self.port = MTK_DEFAULT_API_PORT
|
|
self.username = config[CONF_USERNAME]
|
|
self.password = config[CONF_PASSWORD]
|
|
self.login_method = config.get(CONF_LOGIN_METHOD)
|
|
self.method = config.get(CONF_METHOD)
|
|
self.encoding = config[CONF_ENCODING]
|
|
|
|
self.connected = False
|
|
self.success_init = False
|
|
self.client = None
|
|
self.wireless_exist = None
|
|
self.success_init = self.connect_to_device()
|
|
|
|
if self.success_init:
|
|
_LOGGER.info("Start polling Mikrotik (%s) router...", self.host)
|
|
self._update_info()
|
|
else:
|
|
_LOGGER.error("Connection to Mikrotik (%s) failed", self.host)
|
|
|
|
def connect_to_device(self):
|
|
"""Connect to Mikrotik method."""
|
|
import librouteros
|
|
from librouteros.login import login_plain, login_token
|
|
|
|
if self.login_method == MTK_LOGIN_PLAIN:
|
|
login_method = (login_plain,)
|
|
elif self.login_method == MTK_LOGIN_TOKEN:
|
|
login_method = (login_token,)
|
|
else:
|
|
login_method = (login_plain, login_token)
|
|
|
|
try:
|
|
kwargs = {
|
|
'port': self.port,
|
|
'encoding': self.encoding,
|
|
'login_methods': login_method
|
|
}
|
|
|
|
if self.ssl:
|
|
ssl_context = ssl.create_default_context()
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
kwargs['ssl_wrapper'] = ssl_context.wrap_socket
|
|
self.client = librouteros.connect(
|
|
self.host,
|
|
self.username,
|
|
self.password,
|
|
**kwargs
|
|
)
|
|
|
|
try:
|
|
routerboard_info = self.client(
|
|
cmd='/system/routerboard/getall')
|
|
except (librouteros.exceptions.TrapError,
|
|
librouteros.exceptions.MultiTrapError,
|
|
librouteros.exceptions.ConnectionError):
|
|
routerboard_info = None
|
|
raise
|
|
|
|
if routerboard_info:
|
|
_LOGGER.info(
|
|
"Connected to Mikrotik %s with IP %s",
|
|
routerboard_info[0].get('model', 'Router'), self.host)
|
|
|
|
self.connected = True
|
|
|
|
try:
|
|
self.capsman_exist = self.client(
|
|
cmd='/caps-man/interface/getall')
|
|
except (librouteros.exceptions.TrapError,
|
|
librouteros.exceptions.MultiTrapError,
|
|
librouteros.exceptions.ConnectionError):
|
|
self.capsman_exist = False
|
|
|
|
if not self.capsman_exist:
|
|
_LOGGER.info(
|
|
"Mikrotik %s: Not a CAPSman controller. Trying "
|
|
"local interfaces", self.host)
|
|
|
|
try:
|
|
self.wireless_exist = self.client(
|
|
cmd='/interface/wireless/getall')
|
|
except (librouteros.exceptions.TrapError,
|
|
librouteros.exceptions.MultiTrapError,
|
|
librouteros.exceptions.ConnectionError):
|
|
self.wireless_exist = False
|
|
|
|
if not self.wireless_exist and not self.capsman_exist \
|
|
or self.method == 'ip':
|
|
_LOGGER.info(
|
|
"Mikrotik %s: Wireless adapters not found. Try to "
|
|
"use DHCP lease table as presence tracker source. "
|
|
"Please decrease lease time as much as possible",
|
|
self.host)
|
|
if self.method:
|
|
_LOGGER.info(
|
|
"Mikrotik %s: Manually selected polling method %s",
|
|
self.host, self.method)
|
|
|
|
except (librouteros.exceptions.TrapError,
|
|
librouteros.exceptions.MultiTrapError,
|
|
librouteros.exceptions.ConnectionError) as api_error:
|
|
_LOGGER.error("Connection error: %s", api_error)
|
|
return self.connected
|
|
|
|
def scan_devices(self):
|
|
"""Scan for new devices and return a list with found device MACs."""
|
|
import librouteros
|
|
try:
|
|
self._update_info()
|
|
except (librouteros.exceptions.TrapError,
|
|
librouteros.exceptions.MultiTrapError,
|
|
librouteros.exceptions.ConnectionError) as api_error:
|
|
_LOGGER.error("Connection error: %s", api_error)
|
|
self.connect_to_device()
|
|
return [device for device in self.last_results]
|
|
|
|
def get_device_name(self, device):
|
|
"""Return the name of the given device or None if we don't know."""
|
|
return self.last_results.get(device)
|
|
|
|
def _update_info(self):
|
|
"""Retrieve latest information from the Mikrotik box."""
|
|
if self.method:
|
|
devices_tracker = self.method
|
|
else:
|
|
if self.capsman_exist:
|
|
devices_tracker = 'capsman'
|
|
elif self.wireless_exist:
|
|
devices_tracker = 'wireless'
|
|
else:
|
|
devices_tracker = 'ip'
|
|
|
|
_LOGGER.debug(
|
|
"Loading %s devices from Mikrotik (%s) ...",
|
|
devices_tracker, self.host)
|
|
|
|
device_names = self.client(cmd='/ip/dhcp-server/lease/getall')
|
|
if devices_tracker == 'capsman':
|
|
devices = self.client(
|
|
cmd='/caps-man/registration-table/getall')
|
|
elif devices_tracker == 'wireless':
|
|
devices = self.client(
|
|
cmd='/interface/wireless/registration-table/getall')
|
|
else:
|
|
devices = device_names
|
|
|
|
if device_names is None and devices is None:
|
|
return False
|
|
|
|
mac_names = {device.get('mac-address'): device.get('host-name')
|
|
for device in device_names if device.get('mac-address')}
|
|
|
|
if devices_tracker in ('wireless', 'capsman'):
|
|
self.last_results = {
|
|
device.get('mac-address'):
|
|
mac_names.get(device.get('mac-address'))
|
|
for device in devices}
|
|
else:
|
|
self.last_results = {
|
|
device.get('mac-address'):
|
|
mac_names.get(device.get('mac-address'))
|
|
for device in device_names if device.get('active-address')}
|
|
|
|
return True
|