Rework Mikrotik device scanning following Unifi (#27484)

* rework device scanning, add tests

* update requirements and coverage

* fix description comments

* update tests, fix disabled entity updates

* rework device scanning, add tests

* update requirements and coverage

* fix description comments

* update tests, fix disabled entity updates

* update librouteros to 3.0.0

* fix sorting

* fix sorting 2

* revert to 2.3.0 as 3.0.0 requires code update

* fix requirements

* apply fixes

* fix tests

* update hub.py and fix tests

* fix test_hub_setup_failed

* rebased on dev and update librouteros to 3.0.0

* fixed test_config_flow

* fixed tests

* fix test_config_flow
This commit is contained in:
Rami Mosleh 2020-01-30 20:21:51 +02:00 committed by Paulus Schoutsen
parent 4f79ec0c78
commit 04cb2e9fd5
18 changed files with 1546 additions and 341 deletions

View File

@ -420,7 +420,8 @@ omit =
homeassistant/components/metoffice/weather.py homeassistant/components/metoffice/weather.py
homeassistant/components/microsoft/tts.py homeassistant/components/microsoft/tts.py
homeassistant/components/miflora/sensor.py homeassistant/components/miflora/sensor.py
homeassistant/components/mikrotik/* homeassistant/components/mikrotik/hub.py
homeassistant/components/mikrotik/device_tracker.py
homeassistant/components/mill/climate.py homeassistant/components/mill/climate.py
homeassistant/components/mill/const.py homeassistant/components/mill/const.py
homeassistant/components/minio/* homeassistant/components/minio/*

View File

@ -209,6 +209,7 @@ homeassistant/components/met/* @danielhiversen
homeassistant/components/meteo_france/* @victorcerutti @oncleben31 homeassistant/components/meteo_france/* @victorcerutti @oncleben31
homeassistant/components/meteoalarm/* @rolfberkenbosch homeassistant/components/meteoalarm/* @rolfberkenbosch
homeassistant/components/miflora/* @danielhiversen @ChristianKuehnel homeassistant/components/miflora/* @danielhiversen @ChristianKuehnel
homeassistant/components/mikrotik/* @engrbm87
homeassistant/components/mill/* @danielhiversen homeassistant/components/mill/* @danielhiversen
homeassistant/components/min_max/* @fabaff homeassistant/components/min_max/* @fabaff
homeassistant/components/minio/* @tkislan homeassistant/components/minio/* @tkislan

View File

@ -0,0 +1,37 @@
{
"config": {
"title": "Mikrotik",
"step": {
"user": {
"title": "Set up Mikrotik Router",
"data": {
"name": "Name",
"host": "Host",
"username": "Username",
"password": "Password",
"port": "Port",
"verify_ssl": "Use ssl"
}
}
},
"error": {
"name_exists": "Name exists",
"cannot_connect": "Connection Unsuccessful",
"wrong_credentials": "Wrong Credentials"
},
"abort": {
"already_configured": "Mikrotik is already configured"
}
},
"options": {
"step": {
"device_tracker": {
"data": {
"arp_ping": "Enable ARP ping",
"force_dhcp": "Force scanning using DHCP",
"detection_time": "Consider home interval"
}
}
}
}
}

View File

@ -1,43 +1,28 @@
"""The mikrotik component.""" """The Mikrotik component."""
import logging
import ssl
from librouteros import connect
from librouteros.exceptions import LibRouterosError
from librouteros.login import plain as login_plain, token as login_token
import voluptuous as vol import voluptuous as vol
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import ( from homeassistant.const import (
CONF_HOST, CONF_HOST,
CONF_METHOD, CONF_NAME,
CONF_PASSWORD, CONF_PASSWORD,
CONF_PORT, CONF_PORT,
CONF_SSL,
CONF_USERNAME, CONF_USERNAME,
CONF_VERIFY_SSL,
) )
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.discovery import load_platform
from .const import ( from .const import (
ATTR_MANUFACTURER,
CONF_ARP_PING, CONF_ARP_PING,
CONF_ENCODING, CONF_DETECTION_TIME,
CONF_LOGIN_METHOD, CONF_FORCE_DHCP,
CONF_TRACK_DEVICES, DEFAULT_API_PORT,
DEFAULT_ENCODING, DEFAULT_DETECTION_TIME,
DEFAULT_NAME,
DOMAIN, DOMAIN,
HOSTS,
IDENTITY,
MIKROTIK_SERVICES,
MTK_LOGIN_PLAIN,
MTK_LOGIN_TOKEN,
NAME,
) )
from .hub import MikrotikHub
_LOGGER = logging.getLogger(__name__)
MTK_DEFAULT_API_PORT = "8728"
MTK_DEFAULT_API_SSL_PORT = "8729"
MIKROTIK_SCHEMA = vol.All( MIKROTIK_SCHEMA = vol.All(
vol.Schema( vol.Schema(
@ -45,13 +30,14 @@ MIKROTIK_SCHEMA = vol.All(
vol.Required(CONF_HOST): cv.string, vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string, vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_METHOD): cv.string, vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_LOGIN_METHOD): vol.Any(MTK_LOGIN_PLAIN, MTK_LOGIN_TOKEN), vol.Optional(CONF_PORT, default=DEFAULT_API_PORT): cv.port,
vol.Optional(CONF_PORT): cv.port, vol.Optional(CONF_VERIFY_SSL, default=False): cv.boolean,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
vol.Optional(CONF_TRACK_DEVICES, default=True): cv.boolean,
vol.Optional(CONF_ARP_PING, default=False): cv.boolean, vol.Optional(CONF_ARP_PING, default=False): cv.boolean,
vol.Optional(CONF_FORCE_DHCP, default=False): cv.boolean,
vol.Optional(
CONF_DETECTION_TIME, default=DEFAULT_DETECTION_TIME
): cv.time_period,
} }
) )
) )
@ -61,124 +47,45 @@ CONFIG_SCHEMA = vol.Schema(
) )
def setup(hass, config): async def async_setup(hass, config):
"""Set up the Mikrotik component.""" """Import the Mikrotik component from config."""
hass.data[DOMAIN] = {HOSTS: {}}
for device in config[DOMAIN]: if DOMAIN in config:
host = device[CONF_HOST] for entry in config[DOMAIN]:
use_ssl = device.get(CONF_SSL) hass.async_create_task(
user = device.get(CONF_USERNAME) hass.config_entries.flow.async_init(
password = device.get(CONF_PASSWORD, "") DOMAIN, context={"source": SOURCE_IMPORT}, data=entry
login = device.get(CONF_LOGIN_METHOD) )
encoding = device.get(CONF_ENCODING)
track_devices = device.get(CONF_TRACK_DEVICES)
if CONF_PORT in device:
port = device.get(CONF_PORT)
else:
if use_ssl:
port = MTK_DEFAULT_API_SSL_PORT
else:
port = MTK_DEFAULT_API_PORT
if login == MTK_LOGIN_PLAIN:
login_method = login_plain
else:
login_method = login_token
try:
api = MikrotikClient(
host, use_ssl, port, user, password, login_method, encoding
) )
api.connect_to_device()
hass.data[DOMAIN][HOSTS][host] = {"config": device, "api": api}
except LibRouterosError as api_error:
_LOGGER.error("Mikrotik %s error %s", host, api_error)
continue
if track_devices:
hass.data[DOMAIN][HOSTS][host][DEVICE_TRACKER] = True
load_platform(hass, DEVICE_TRACKER, DOMAIN, None, config)
if not hass.data[DOMAIN][HOSTS]:
return False
return True return True
class MikrotikClient: async def async_setup_entry(hass, config_entry):
"""Handle all communication with the Mikrotik API.""" """Set up the Mikrotik component."""
def __init__(self, host, use_ssl, port, user, password, login_method, encoding): hub = MikrotikHub(hass, config_entry)
"""Initialize the Mikrotik Client.""" if not await hub.async_setup():
self._host = host
self._use_ssl = use_ssl
self._port = port
self._user = user
self._password = password
self._login_method = login_method
self._encoding = encoding
self._ssl_wrapper = None
self.hostname = None
self._client = None
self._connected = False
def connect_to_device(self):
"""Connect to Mikrotik device."""
self._connected = False
_LOGGER.debug("[%s] Connecting to Mikrotik device", self._host)
kwargs = {
"encoding": self._encoding,
"login_methods": self._login_method,
"port": self._port,
}
if self._use_ssl:
if self._ssl_wrapper is None:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self._ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = self._ssl_wrapper
try:
self._client = connect(self._host, self._user, self._password, **kwargs)
self._connected = True
except LibRouterosError as api_error:
_LOGGER.error("Mikrotik %s: %s", self._host, api_error)
self._client = None
return False return False
self.hostname = self.get_hostname() hass.data.setdefault(DOMAIN, {})[config_entry.entry_id] = hub
_LOGGER.info("Mikrotik Connected to %s (%s)", self.hostname, self._host) device_registry = await hass.helpers.device_registry.async_get_registry()
return self._connected device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
def get_hostname(self): connections={(DOMAIN, hub.serial_num)},
"""Return device host name.""" manufacturer=ATTR_MANUFACTURER,
data = list(self.command(MIKROTIK_SERVICES[IDENTITY])) model=hub.model,
return data[0][NAME] if data else None name=hub.hostname,
sw_version=hub.firmware,
def connected(self):
"""Return connected boolean."""
return self._connected
def command(self, cmd, params=None):
"""Retrieve data from Mikrotik API."""
if not self._connected or not self._client:
if not self.connect_to_device():
return None
try:
if params:
response = self._client(cmd=cmd, **params)
else:
response = self._client(cmd=cmd)
except LibRouterosError as api_error:
_LOGGER.error(
"Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s",
self._host,
cmd,
api_error,
) )
return None
return response if response else None return True
async def async_unload_entry(hass, config_entry):
"""Unload a config entry."""
await hass.config_entries.async_forward_entry_unload(config_entry, "device_tracker")
hass.data[DOMAIN].pop(config_entry.entry_id)
return True

View File

@ -0,0 +1,120 @@
"""Config flow for Mikrotik."""
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.core import callback
from .const import (
CONF_ARP_PING,
CONF_DETECTION_TIME,
CONF_FORCE_DHCP,
DEFAULT_API_PORT,
DEFAULT_DETECTION_TIME,
DEFAULT_NAME,
DOMAIN,
)
from .errors import CannotConnect, LoginError
from .hub import get_api
class MikrotikFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a Mikrotik config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return MikrotikOptionsFlowHandler(config_entry)
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
errors = {}
if user_input is not None:
for entry in self.hass.config_entries.async_entries(DOMAIN):
if entry.data[CONF_HOST] == user_input[CONF_HOST]:
return self.async_abort(reason="already_configured")
if entry.data[CONF_NAME] == user_input[CONF_NAME]:
errors[CONF_NAME] = "name_exists"
break
try:
await self.hass.async_add_executor_job(get_api, self.hass, user_input)
except CannotConnect:
errors["base"] = "cannot_connect"
except LoginError:
errors[CONF_USERNAME] = "wrong_credentials"
errors[CONF_PASSWORD] = "wrong_credentials"
if not errors:
return self.async_create_entry(
title=user_input[CONF_NAME], data=user_input
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_NAME, default=DEFAULT_NAME): str,
vol.Required(CONF_HOST): str,
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
vol.Optional(CONF_PORT, default=DEFAULT_API_PORT): int,
vol.Optional(CONF_VERIFY_SSL, default=False): bool,
}
),
errors=errors,
)
async def async_step_import(self, import_config):
"""Import Miktortik from config."""
import_config[CONF_DETECTION_TIME] = import_config[CONF_DETECTION_TIME].seconds
return await self.async_step_user(user_input=import_config)
class MikrotikOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle Mikrotik options."""
def __init__(self, config_entry):
"""Initialize Mikrotik options flow."""
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
"""Manage the Mikrotik options."""
return await self.async_step_device_tracker()
async def async_step_device_tracker(self, user_input=None):
"""Manage the device tracker options."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
options = {
vol.Optional(
CONF_FORCE_DHCP,
default=self.config_entry.options.get(CONF_FORCE_DHCP, False),
): bool,
vol.Optional(
CONF_ARP_PING,
default=self.config_entry.options.get(CONF_ARP_PING, False),
): bool,
vol.Optional(
CONF_DETECTION_TIME,
default=self.config_entry.options.get(
CONF_DETECTION_TIME, DEFAULT_DETECTION_TIME
),
): int,
}
return self.async_show_form(
step_id="device_tracker", data_schema=vol.Schema(options)
)

View File

@ -1,32 +1,38 @@
"""Constants used in the Mikrotik components.""" """Constants used in the Mikrotik components."""
DOMAIN = "mikrotik" DOMAIN = "mikrotik"
MIKROTIK = DOMAIN DEFAULT_NAME = "Mikrotik"
HOSTS = "hosts" DEFAULT_API_PORT = 8728
MTK_LOGIN_PLAIN = "plain" DEFAULT_DETECTION_TIME = 300
MTK_LOGIN_TOKEN = "token"
ATTR_MANUFACTURER = "Mikrotik"
ATTR_SERIAL_NUMBER = "serial-number"
ATTR_FIRMWARE = "current-firmware"
ATTR_MODEL = "model"
CONF_ARP_PING = "arp_ping" CONF_ARP_PING = "arp_ping"
CONF_TRACK_DEVICES = "track_devices" CONF_FORCE_DHCP = "force_dhcp"
CONF_LOGIN_METHOD = "login_method" CONF_DETECTION_TIME = "detection_time"
CONF_ENCODING = "encoding"
DEFAULT_ENCODING = "utf-8"
NAME = "name" NAME = "name"
INFO = "info" INFO = "info"
IDENTITY = "identity" IDENTITY = "identity"
ARP = "arp" ARP = "arp"
CAPSMAN = "capsman"
DHCP = "dhcp" DHCP = "dhcp"
WIRELESS = "wireless" WIRELESS = "wireless"
CAPSMAN = "capsman" IS_WIRELESS = "is_wireless"
MIKROTIK_SERVICES = { MIKROTIK_SERVICES = {
INFO: "/system/routerboard/getall",
IDENTITY: "/system/identity/getall",
ARP: "/ip/arp/getall", ARP: "/ip/arp/getall",
DHCP: "/ip/dhcp-server/lease/getall",
WIRELESS: "/interface/wireless/registration-table/getall",
CAPSMAN: "/caps-man/registration-table/getall", CAPSMAN: "/caps-man/registration-table/getall",
DHCP: "/ip/dhcp-server/lease/getall",
IDENTITY: "/system/identity/getall",
INFO: "/system/routerboard/getall",
WIRELESS: "/interface/wireless/registration-table/getall",
IS_WIRELESS: "/interface/wireless/print",
} }
ATTR_DEVICE_TRACKER = [ ATTR_DEVICE_TRACKER = [
@ -34,16 +40,8 @@ ATTR_DEVICE_TRACKER = [
"mac-address", "mac-address",
"ssid", "ssid",
"interface", "interface",
"host-name",
"last-seen",
"rx-signal",
"signal-strength", "signal-strength",
"tx-ccq",
"signal-to-noise", "signal-to-noise",
"wmm-enabled",
"authentication-type",
"encryption",
"tx-rate-set",
"rx-rate", "rx-rate",
"tx-rate", "tx-rate",
"uptime", "uptime",

View File

@ -1,191 +1,142 @@
"""Support for Mikrotik routers as device tracker.""" """Support for Mikrotik routers as device tracker."""
import logging import logging
from homeassistant.components.device_tracker import ( from homeassistant.components.device_tracker.config_entry import ScannerEntity
from homeassistant.components.device_tracker.const import (
DOMAIN as DEVICE_TRACKER, DOMAIN as DEVICE_TRACKER,
DeviceScanner, SOURCE_TYPE_ROUTER,
) )
from homeassistant.const import CONF_METHOD from homeassistant.core import callback
from homeassistant.util import slugify from homeassistant.helpers import entity_registry
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_connect
import homeassistant.util.dt as dt_util
from .const import ( from .const import ATTR_MANUFACTURER, DOMAIN
ARP,
ATTR_DEVICE_TRACKER,
CAPSMAN,
CONF_ARP_PING,
DHCP,
HOSTS,
MIKROTIK,
MIKROTIK_SERVICES,
WIRELESS,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config): async def async_setup_entry(hass, config_entry, async_add_entities):
"""Validate the configuration and return MikrotikScanner.""" """Set up device tracker for Mikrotik component."""
for host in hass.data[MIKROTIK][HOSTS]: hub = hass.data[DOMAIN][config_entry.entry_id]
if DEVICE_TRACKER not in hass.data[MIKROTIK][HOSTS][host]:
tracked = {}
registry = await entity_registry.async_get_registry(hass)
# Restore clients that is not a part of active clients list.
for entity in registry.entities.values():
if (
entity.config_entry_id == config_entry.entry_id
and entity.domain == DEVICE_TRACKER
):
if (
entity.unique_id in hub.api.devices
or entity.unique_id not in hub.api.all_devices
):
continue continue
hass.data[MIKROTIK][HOSTS][host].pop(DEVICE_TRACKER, None) hub.api.restore_device(entity.unique_id)
api = hass.data[MIKROTIK][HOSTS][host]["api"]
config = hass.data[MIKROTIK][HOSTS][host]["config"] @callback
hostname = api.get_hostname() def update_hub():
scanner = MikrotikScanner(api, host, hostname, config) """Update the status of the device."""
return scanner if scanner.success_init else None update_items(hub, async_add_entities, tracked)
async_dispatcher_connect(hass, hub.signal_update, update_hub)
update_hub()
class MikrotikScanner(DeviceScanner): @callback
"""This class queries a Mikrotik device.""" def update_items(hub, async_add_entities, tracked):
"""Update tracked device state from the hub."""
new_tracked = []
for mac, device in hub.api.devices.items():
if mac not in tracked:
tracked[mac] = MikrotikHubTracker(device, hub)
new_tracked.append(tracked[mac])
def __init__(self, api, host, hostname, config): if new_tracked:
"""Initialize the scanner.""" async_add_entities(new_tracked)
self.api = api
self.config = config
self.host = host
self.hostname = hostname
self.method = config.get(CONF_METHOD)
self.arp_ping = config.get(CONF_ARP_PING)
self.dhcp = None
self.devices_arp = {}
self.devices_dhcp = {}
self.device_tracker = None
self.success_init = self.api.connected()
def get_extra_attributes(self, device):
"""
Get extra attributes of a device.
Some known extra attributes that may be returned in the device tuple class MikrotikHubTracker(ScannerEntity):
include MAC address (mac), network device (dev), IP address """Representation of network device."""
(ip), reachable status (reachable), associated router
(host), hostname if known (hostname) among others.
"""
return self.device_tracker.get(device) or {}
def get_device_name(self, device): def __init__(self, device, hub):
"""Get name for a device.""" """Initialize the tracked device."""
host = self.device_tracker.get(device, {}) self.device = device
return host.get("host_name") self.hub = hub
self.unsub_dispatcher = None
def scan_devices(self): @property
"""Scan for new devices and return a list with found device MACs.""" def is_connected(self):
self.update_device_tracker() """Return true if the client is connected to the network."""
return list(self.device_tracker) if (
self.device.last_seen
and (dt_util.utcnow() - self.device.last_seen)
< self.hub.option_detection_time
):
return True
return False
def get_method(self): @property
"""Determine the device tracker polling method.""" def source_type(self):
if self.method: """Return the source type of the client."""
_LOGGER.debug( return SOURCE_TYPE_ROUTER
"Mikrotik %s: Manually selected polling method %s",
self.host,
self.method,
)
return self.method
capsman = self.api.command(MIKROTIK_SERVICES[CAPSMAN]) @property
if not capsman: def name(self) -> str:
_LOGGER.debug( """Return the name of the client."""
"Mikrotik %s: Not a CAPsMAN controller. " return self.device.name
"Trying local wireless interfaces",
(self.host),
)
else:
return CAPSMAN
wireless = self.api.command(MIKROTIK_SERVICES[WIRELESS]) @property
if not wireless: def unique_id(self) -> str:
_LOGGER.info( """Return a unique identifier for this device."""
"Mikrotik %s: Wireless adapters not found. Try to " return self.device.mac
"use DHCP lease table as presence tracker source. "
"Please decrease lease time as much as possible",
self.host,
)
return DHCP
return WIRELESS @property
def available(self) -> bool:
"""Return if controller is available."""
return self.hub.available
def update_device_tracker(self): @property
"""Update device_tracker from Mikrotik API.""" def device_state_attributes(self):
self.device_tracker = {} """Return the device state attributes."""
if not self.method: if self.is_connected:
self.method = self.get_method() return self.device.attrs
return None
data = self.api.command(MIKROTIK_SERVICES[self.method]) @property
if data is None: def device_info(self):
return """Return a client description for device registry."""
info = {
if self.method != DHCP: "connections": {(CONNECTION_NETWORK_MAC, self.device.mac)},
dhcp = self.api.command(MIKROTIK_SERVICES[DHCP]) "manufacturer": ATTR_MANUFACTURER,
if dhcp is not None: "identifiers": {(DOMAIN, self.device.mac)},
self.devices_dhcp = load_mac(dhcp) "name": self.name,
"via_device": (DOMAIN, self.hub.serial_num),
arp = self.api.command(MIKROTIK_SERVICES[ARP])
self.devices_arp = load_mac(arp)
for device in data:
mac = device.get("mac-address")
if self.method == DHCP:
if "active-address" not in device:
continue
if self.arp_ping and self.devices_arp:
if mac not in self.devices_arp:
continue
ip_address = self.devices_arp[mac]["address"]
interface = self.devices_arp[mac]["interface"]
if not self.do_arp_ping(ip_address, interface):
continue
attrs = {}
if mac in self.devices_dhcp and "host-name" in self.devices_dhcp[mac]:
hostname = self.devices_dhcp[mac].get("host-name")
if hostname:
attrs["host_name"] = hostname
if self.devices_arp and mac in self.devices_arp:
attrs["ip_address"] = self.devices_arp[mac].get("address")
for attr in ATTR_DEVICE_TRACKER:
if attr in device and device[attr] is not None:
attrs[slugify(attr)] = device[attr]
attrs["scanner_type"] = self.method
attrs["scanner_host"] = self.host
attrs["scanner_hostname"] = self.hostname
self.device_tracker[mac] = attrs
def do_arp_ping(self, ip_address, interface):
"""Attempt to arp ping MAC address via interface."""
params = {
"arp-ping": "yes",
"interval": "100ms",
"count": 3,
"interface": interface,
"address": ip_address,
} }
cmd = "/ping" return info
data = self.api.command(cmd, params)
if data is not None: async def async_added_to_hass(self):
status = 0 """Client entity created."""
for result in data: _LOGGER.debug("New network device tracker %s (%s)", self.name, self.unique_id)
if "status" in result: self.unsub_dispatcher = async_dispatcher_connect(
_LOGGER.debug( self.hass, self.hub.signal_update, self.async_write_ha_state
"Mikrotik %s arp_ping error: %s", self.host, result["status"]
) )
status += 1
if status == len(data):
return None
return data
async def async_update(self):
"""Synchronize state with hub."""
_LOGGER.debug(
"Updating Mikrotik tracked client %s (%s)", self.entity_id, self.unique_id
)
await self.hub.request_update()
def load_mac(devices=None): async def will_remove_from_hass(self):
"""Load dictionary using MAC address as key.""" """Disconnect from dispatcher."""
if not devices: if self.unsub_dispatcher:
return None self.unsub_dispatcher()
mac_devices = {}
for device in devices:
if "mac-address" in device:
mac = device.pop("mac-address")
mac_devices[mac] = device
return mac_devices

View File

@ -0,0 +1,10 @@
"""Errors for the Mikrotik component."""
from homeassistant.exceptions import HomeAssistantError
class CannotConnect(HomeAssistantError):
"""Unable to connect to the hub."""
class LoginError(HomeAssistantError):
"""Component got logged out."""

View File

@ -0,0 +1,413 @@
"""The Mikrotik router class."""
from datetime import timedelta
import logging
import socket
import ssl
import librouteros
from librouteros.login import plain as login_plain, token as login_token
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_VERIFY_SSL
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.util import slugify
import homeassistant.util.dt as dt_util
from .const import (
ARP,
ATTR_DEVICE_TRACKER,
ATTR_FIRMWARE,
ATTR_MODEL,
ATTR_SERIAL_NUMBER,
CAPSMAN,
CONF_ARP_PING,
CONF_DETECTION_TIME,
CONF_FORCE_DHCP,
DEFAULT_DETECTION_TIME,
DHCP,
IDENTITY,
INFO,
IS_WIRELESS,
MIKROTIK_SERVICES,
NAME,
WIRELESS,
)
from .errors import CannotConnect, LoginError
_LOGGER = logging.getLogger(__name__)
class Device:
"""Represents a network device."""
def __init__(self, mac, params):
"""Initialize the network device."""
self._mac = mac
self._params = params
self._last_seen = None
self._attrs = {}
self._wireless_params = None
@property
def name(self):
"""Return device name."""
return self._params.get("host-name", self.mac)
@property
def mac(self):
"""Return device mac."""
return self._mac
@property
def last_seen(self):
"""Return device last seen."""
return self._last_seen
@property
def attrs(self):
"""Return device attributes."""
attr_data = self._wireless_params if self._wireless_params else self._params
for attr in ATTR_DEVICE_TRACKER:
if attr in attr_data:
self._attrs[slugify(attr)] = attr_data[attr]
self._attrs["ip_address"] = self._params.get("active-address")
return self._attrs
def update(self, wireless_params=None, params=None, active=False):
"""Update Device params."""
if wireless_params:
self._wireless_params = wireless_params
if params:
self._params = params
if active:
self._last_seen = dt_util.utcnow()
class MikrotikData:
"""Handle all communication with the Mikrotik API."""
def __init__(self, hass, config_entry, api):
"""Initialize the Mikrotik Client."""
self.hass = hass
self.config_entry = config_entry
self.api = api
self._host = self.config_entry.data[CONF_HOST]
self.all_devices = {}
self.devices = {}
self.available = True
self.support_wireless = bool(self.command(MIKROTIK_SERVICES[IS_WIRELESS]))
self.hostname = None
self.model = None
self.firmware = None
self.serial_number = None
@staticmethod
def load_mac(devices=None):
"""Load dictionary using MAC address as key."""
if not devices:
return None
mac_devices = {}
for device in devices:
if "mac-address" in device:
mac = device["mac-address"]
mac_devices[mac] = device
return mac_devices
@property
def arp_enabled(self):
"""Return arp_ping option setting."""
return self.config_entry.options[CONF_ARP_PING]
@property
def force_dhcp(self):
"""Return force_dhcp option setting."""
return self.config_entry.options[CONF_FORCE_DHCP]
def get_info(self, param):
"""Return device model name."""
cmd = IDENTITY if param == NAME else INFO
data = list(self.command(MIKROTIK_SERVICES[cmd]))
return data[0].get(param) if data else None
def get_hub_details(self):
"""Get Hub info."""
self.hostname = self.get_info(NAME)
self.model = self.get_info(ATTR_MODEL)
self.firmware = self.get_info(ATTR_FIRMWARE)
self.serial_number = self.get_info(ATTR_SERIAL_NUMBER)
def connect_to_hub(self):
"""Connect to hub."""
try:
self.api = get_api(self.hass, self.config_entry.data)
self.available = True
return True
except (LoginError, CannotConnect):
self.available = False
return False
def get_list_from_interface(self, interface):
"""Get devices from interface."""
result = list(self.command(MIKROTIK_SERVICES[interface]))
return self.load_mac(result) if result else {}
def restore_device(self, mac):
"""Restore a missing device after restart."""
self.devices[mac] = Device(mac, self.all_devices[mac])
def update_devices(self):
"""Get list of devices with latest status."""
arp_devices = {}
wireless_devices = {}
device_list = {}
try:
self.all_devices = self.get_list_from_interface(DHCP)
if self.support_wireless:
_LOGGER.debug("wireless is supported")
for interface in [CAPSMAN, WIRELESS]:
wireless_devices = self.get_list_from_interface(interface)
if wireless_devices:
_LOGGER.debug("Scanning wireless devices using %s", interface)
break
if self.support_wireless and not self.force_dhcp:
device_list = wireless_devices
else:
device_list = self.all_devices
_LOGGER.debug("Falling back to DHCP for scanning devices")
if self.arp_enabled:
arp_devices = self.get_list_from_interface(ARP)
# get new hub firmware version if updated
self.firmware = self.get_info(ATTR_FIRMWARE)
except (CannotConnect, socket.timeout, socket.error):
self.available = False
return
if not device_list:
return
for mac, params in device_list.items():
if mac not in self.devices:
self.devices[mac] = Device(mac, self.all_devices.get(mac, {}))
else:
self.devices[mac].update(params=self.all_devices.get(mac, {}))
if mac in wireless_devices:
# if wireless is supported then wireless_params are params
self.devices[mac].update(
wireless_params=wireless_devices[mac], active=True
)
continue
# for wired devices or when forcing dhcp check for active-address
if not params.get("active-address"):
self.devices[mac].update(active=False)
continue
# ping check the rest of active devices if arp ping is enabled
active = True
if self.arp_enabled and mac in arp_devices:
active = self.do_arp_ping(
params.get("active-address"), arp_devices[mac].get("interface")
)
self.devices[mac].update(active=active)
def do_arp_ping(self, ip_address, interface):
"""Attempt to arp ping MAC address via interface."""
_LOGGER.debug("pinging - %s", ip_address)
params = {
"arp-ping": "yes",
"interval": "100ms",
"count": 3,
"interface": interface,
"address": ip_address,
}
cmd = "/ping"
data = list(self.command(cmd, params))
if data is not None:
status = 0
for result in data:
if "status" in result:
status += 1
if status == len(data):
_LOGGER.debug(
"Mikrotik %s - %s arp_ping timed out", ip_address, interface
)
return False
return True
def command(self, cmd, params=None):
"""Retrieve data from Mikrotik API."""
try:
_LOGGER.info("Running command %s", cmd)
if params:
response = self.api(cmd=cmd, **params)
else:
response = self.api(cmd=cmd)
except (
librouteros.exceptions.ConnectionClosed,
socket.error,
socket.timeout,
) as api_error:
_LOGGER.error("Mikrotik %s connection error %s", self._host, api_error)
raise CannotConnect
except librouteros.exceptions.ProtocolError as api_error:
_LOGGER.warning(
"Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s",
self._host,
cmd,
api_error,
)
return None
return response if response else None
def update(self):
"""Update device_tracker from Mikrotik API."""
if not self.available or not self.api:
if not self.connect_to_hub():
return
_LOGGER.debug("updating network devices for host: %s", self._host)
self.update_devices()
class MikrotikHub:
"""Mikrotik Hub Object."""
def __init__(self, hass, config_entry):
"""Initialize the Mikrotik Client."""
self.hass = hass
self.config_entry = config_entry
self._mk_data = None
self.progress = None
@property
def host(self):
"""Return the host of this hub."""
return self.config_entry.data[CONF_HOST]
@property
def hostname(self):
"""Return the hostname of the hub."""
return self._mk_data.hostname
@property
def model(self):
"""Return the model of the hub."""
return self._mk_data.model
@property
def firmware(self):
"""Return the firware of the hub."""
return self._mk_data.firmware
@property
def serial_num(self):
"""Return the serial number of the hub."""
return self._mk_data.serial_number
@property
def available(self):
"""Return if the hub is connected."""
return self._mk_data.available
@property
def option_detection_time(self):
"""Config entry option defining number of seconds from last seen to away."""
return timedelta(seconds=self.config_entry.options[CONF_DETECTION_TIME])
@property
def signal_update(self):
"""Event specific per Mikrotik entry to signal updates."""
return f"mikrotik-update-{self.host}"
@property
def api(self):
"""Represent Mikrotik data object."""
return self._mk_data
async def async_add_options(self):
"""Populate default options for Mikrotik."""
if not self.config_entry.options:
options = {
CONF_ARP_PING: self.config_entry.data.pop(CONF_ARP_PING, False),
CONF_FORCE_DHCP: self.config_entry.data.pop(CONF_FORCE_DHCP, False),
CONF_DETECTION_TIME: self.config_entry.data.pop(
CONF_DETECTION_TIME, DEFAULT_DETECTION_TIME
),
}
self.hass.config_entries.async_update_entry(
self.config_entry, options=options
)
async def request_update(self):
"""Request an update."""
if self.progress is not None:
await self.progress
return
self.progress = self.hass.async_create_task(self.async_update())
await self.progress
self.progress = None
async def async_update(self):
"""Update Mikrotik devices information."""
await self.hass.async_add_executor_job(self._mk_data.update)
async_dispatcher_send(self.hass, self.signal_update)
async def async_setup(self):
"""Set up the Mikrotik hub."""
try:
api = await self.hass.async_add_executor_job(
get_api, self.hass, self.config_entry.data
)
except CannotConnect:
raise ConfigEntryNotReady
except LoginError:
return False
self._mk_data = MikrotikData(self.hass, self.config_entry, api)
await self.async_add_options()
await self.hass.async_add_executor_job(self._mk_data.get_hub_details)
await self.hass.async_add_executor_job(self._mk_data.update)
self.hass.async_create_task(
self.hass.config_entries.async_forward_entry_setup(
self.config_entry, "device_tracker"
)
)
return True
def get_api(hass, entry):
"""Connect to Mikrotik hub."""
_LOGGER.debug("Connecting to Mikrotik hub [%s]", entry[CONF_HOST])
_login_method = (login_plain, login_token)
kwargs = {"login_methods": _login_method, "port": entry["port"]}
if entry[CONF_VERIFY_SSL]:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
_ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = _ssl_wrapper
try:
api = librouteros.connect(
entry[CONF_HOST], entry[CONF_USERNAME], entry[CONF_PASSWORD], **kwargs,
)
_LOGGER.debug("Connected to %s successfully", entry[CONF_HOST])
return api
except (
librouteros.exceptions.LibRouterosError,
socket.error,
socket.timeout,
) as api_error:
_LOGGER.error("Mikrotik %s error: %s", entry[CONF_HOST], api_error)
if "invalid user name or password" in str(api_error):
raise LoginError
raise CannotConnect

View File

@ -1,8 +1,13 @@
{ {
"domain": "mikrotik", "domain": "mikrotik",
"name": "MikroTik", "name": "Mikrotik",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/mikrotik", "documentation": "https://www.home-assistant.io/integrations/mikrotik",
"requirements": ["librouteros==3.0.0"], "requirements": [
"librouteros==3.0.0"
],
"dependencies": [], "dependencies": [],
"codeowners": [] "codeowners": [
"@engrbm87"
]
} }

View File

@ -0,0 +1,37 @@
{
"config": {
"title": "Mikrotik",
"step": {
"user": {
"title": "Set up Mikrotik Router",
"data": {
"name": "Name",
"host": "Host",
"username": "Username",
"password": "Password",
"port": "Port",
"verify_ssl": "Use ssl"
}
}
},
"error": {
"name_exists": "Name exists",
"cannot_connect": "Connection Unsuccessful",
"wrong_credentials": "Wrong Credentials"
},
"abort": {
"already_configured": "Mikrotik is already configured"
}
},
"options": {
"step": {
"device_tracker": {
"data": {
"arp_ping": "Enable ARP ping",
"force_dhcp": "Force scanning using DHCP",
"detection_time": "Consider home interval"
}
}
}
}
}

View File

@ -54,6 +54,7 @@ FLOWS = [
"luftdaten", "luftdaten",
"mailgun", "mailgun",
"met", "met",
"mikrotik",
"mobile_app", "mobile_app",
"mqtt", "mqtt",
"neato", "neato",

View File

@ -283,6 +283,9 @@ keyrings.alt==3.4.0
# homeassistant.components.dyson # homeassistant.components.dyson
libpurecool==0.6.0 libpurecool==0.6.0
# homeassistant.components.mikrotik
librouteros==3.0.0
# homeassistant.components.soundtouch # homeassistant.components.soundtouch
libsoundtouch==0.7.2 libsoundtouch==0.7.2

View File

@ -0,0 +1,133 @@
"""Tests for the Mikrotik component."""
from homeassistant.components import mikrotik
MOCK_DATA = {
mikrotik.CONF_NAME: "Mikrotik",
mikrotik.CONF_HOST: "0.0.0.0",
mikrotik.CONF_USERNAME: "user",
mikrotik.CONF_PASSWORD: "pass",
mikrotik.CONF_PORT: 8278,
mikrotik.CONF_VERIFY_SSL: False,
}
MOCK_OPTIONS = {
mikrotik.CONF_ARP_PING: False,
mikrotik.const.CONF_FORCE_DHCP: False,
mikrotik.CONF_DETECTION_TIME: mikrotik.DEFAULT_DETECTION_TIME,
}
DEVICE_1_DHCP = {
".id": "*1A",
"address": "0.0.0.1",
"mac-address": "00:00:00:00:00:01",
"active-address": "0.0.0.1",
"host-name": "Device_1",
"comment": "Mobile",
}
DEVICE_2_DHCP = {
".id": "*1B",
"address": "0.0.0.2",
"mac-address": "00:00:00:00:00:02",
"active-address": "0.0.0.2",
"host-name": "Device_2",
"comment": "PC",
}
DEVICE_1_WIRELESS = {
".id": "*264",
"interface": "wlan1",
"mac-address": "00:00:00:00:00:01",
"ap": False,
"wds": False,
"bridge": False,
"rx-rate": "72.2Mbps-20MHz/1S/SGI",
"tx-rate": "72.2Mbps-20MHz/1S/SGI",
"packets": "59542,17464",
"bytes": "17536671,2966351",
"frames": "59542,17472",
"frame-bytes": "17655785,2862445",
"hw-frames": "78935,38395",
"hw-frame-bytes": "25636019,4063445",
"tx-frames-timed-out": 0,
"uptime": "5h49m36s",
"last-activity": "170ms",
"signal-strength": "-62@1Mbps",
"signal-to-noise": 52,
"signal-strength-ch0": -63,
"signal-strength-ch1": -69,
"strength-at-rates": "-62@1Mbps 16s330ms,-64@6Mbps 13s560ms,-65@HT20-3 52m6s30ms,-66@HT20-4 52m4s350ms,-66@HT20-5 51m58s580ms,-65@HT20-6 51m24s780ms,-65@HT20-7 5s680ms",
"tx-ccq": 93,
"p-throughput": 54928,
"last-ip": "0.0.0.1",
"802.1x-port-enabled": True,
"authentication-type": "wpa2-psk",
"encryption": "aes-ccm",
"group-encryption": "aes-ccm",
"management-protection": False,
"wmm-enabled": True,
"tx-rate-set": "OFDM:6-54 BW:1x SGI:1x HT:0-7",
}
DEVICE_2_WIRELESS = {
".id": "*265",
"interface": "wlan1",
"mac-address": "00:00:00:00:00:02",
"ap": False,
"wds": False,
"bridge": False,
"rx-rate": "72.2Mbps-20MHz/1S/SGI",
"tx-rate": "72.2Mbps-20MHz/1S/SGI",
"packets": "59542,17464",
"bytes": "17536671,2966351",
"frames": "59542,17472",
"frame-bytes": "17655785,2862445",
"hw-frames": "78935,38395",
"hw-frame-bytes": "25636019,4063445",
"tx-frames-timed-out": 0,
"uptime": "5h49m36s",
"last-activity": "170ms",
"signal-strength": "-62@1Mbps",
"signal-to-noise": 52,
"signal-strength-ch0": -63,
"signal-strength-ch1": -69,
"strength-at-rates": "-62@1Mbps 16s330ms,-64@6Mbps 13s560ms,-65@HT20-3 52m6s30ms,-66@HT20-4 52m4s350ms,-66@HT20-5 51m58s580ms,-65@HT20-6 51m24s780ms,-65@HT20-7 5s680ms",
"tx-ccq": 93,
"p-throughput": 54928,
"last-ip": "0.0.0.2",
"802.1x-port-enabled": True,
"authentication-type": "wpa2-psk",
"encryption": "aes-ccm",
"group-encryption": "aes-ccm",
"management-protection": False,
"wmm-enabled": True,
"tx-rate-set": "OFDM:6-54 BW:1x SGI:1x HT:0-7",
}
DHCP_DATA = [DEVICE_1_DHCP, DEVICE_2_DHCP]
WIRELESS_DATA = [DEVICE_1_WIRELESS]
ARP_DATA = [
{
".id": "*1",
"address": "0.0.0.1",
"mac-address": "00:00:00:00:00:01",
"interface": "bridge",
"published": False,
"invalid": False,
"DHCP": True,
"dynamic": True,
"complete": True,
"disabled": False,
},
{
".id": "*2",
"address": "0.0.0.2",
"mac-address": "00:00:00:00:00:02",
"interface": "bridge",
"published": False,
"invalid": False,
"DHCP": True,
"dynamic": True,
"complete": True,
"disabled": False,
},
]

View File

@ -0,0 +1,208 @@
"""Test Mikrotik setup process."""
from datetime import timedelta
from unittest.mock import patch
import librouteros
import pytest
from homeassistant import data_entry_flow
from homeassistant.components import mikrotik
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from tests.common import MockConfigEntry
DEMO_USER_INPUT = {
CONF_NAME: "Home router",
CONF_HOST: "0.0.0.0",
CONF_USERNAME: "username",
CONF_PASSWORD: "password",
CONF_PORT: 8278,
CONF_VERIFY_SSL: False,
}
DEMO_CONFIG = {
CONF_NAME: "Home router",
CONF_HOST: "0.0.0.0",
CONF_USERNAME: "username",
CONF_PASSWORD: "password",
CONF_PORT: 8278,
CONF_VERIFY_SSL: False,
mikrotik.const.CONF_FORCE_DHCP: False,
mikrotik.CONF_ARP_PING: False,
mikrotik.CONF_DETECTION_TIME: timedelta(seconds=30),
}
DEMO_CONFIG_ENTRY = {
CONF_NAME: "Home router",
CONF_HOST: "0.0.0.0",
CONF_USERNAME: "username",
CONF_PASSWORD: "password",
CONF_PORT: 8278,
CONF_VERIFY_SSL: False,
mikrotik.const.CONF_FORCE_DHCP: False,
mikrotik.CONF_ARP_PING: False,
mikrotik.CONF_DETECTION_TIME: 30,
}
@pytest.fixture(name="api")
def mock_mikrotik_api():
"""Mock an api."""
with patch("librouteros.connect"):
yield
@pytest.fixture(name="auth_error")
def mock_api_authentication_error():
"""Mock an api."""
with patch(
"librouteros.connect",
side_effect=librouteros.exceptions.TrapError("invalid user name or password"),
):
yield
@pytest.fixture(name="conn_error")
def mock_api_connection_error():
"""Mock an api."""
with patch(
"librouteros.connect", side_effect=librouteros.exceptions.ConnectionClosed
):
yield
async def test_import(hass, api):
"""Test import step."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "import"}, data=DEMO_CONFIG
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "Home router"
assert result["data"][CONF_NAME] == "Home router"
assert result["data"][CONF_HOST] == "0.0.0.0"
assert result["data"][CONF_USERNAME] == "username"
assert result["data"][CONF_PASSWORD] == "password"
assert result["data"][CONF_PORT] == 8278
assert result["data"][CONF_VERIFY_SSL] is False
async def test_flow_works(hass, api):
"""Test config flow."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "Home router"
assert result["data"][CONF_NAME] == "Home router"
assert result["data"][CONF_HOST] == "0.0.0.0"
assert result["data"][CONF_USERNAME] == "username"
assert result["data"][CONF_PASSWORD] == "password"
assert result["data"][CONF_PORT] == 8278
async def test_options(hass):
"""Test updating options."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=DEMO_CONFIG_ENTRY)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "device_tracker"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
mikrotik.CONF_DETECTION_TIME: 30,
mikrotik.CONF_ARP_PING: True,
mikrotik.const.CONF_FORCE_DHCP: False,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == {
mikrotik.CONF_DETECTION_TIME: 30,
mikrotik.CONF_ARP_PING: True,
mikrotik.const.CONF_FORCE_DHCP: False,
}
async def test_host_already_configured(hass, auth_error):
"""Test host already configured."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=DEMO_CONFIG_ENTRY)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
async def test_name_exists(hass, api):
"""Test name already configured."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=DEMO_CONFIG_ENTRY)
entry.add_to_hass(hass)
user_input = DEMO_USER_INPUT.copy()
user_input[CONF_HOST] = "0.0.0.1"
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=user_input
)
assert result["type"] == "form"
assert result["errors"] == {CONF_NAME: "name_exists"}
async def test_connection_error(hass, conn_error):
"""Test error when connection is unsuccesful."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "cannot_connect"}
async def test_wrong_credentials(hass, auth_error):
"""Test error when credentials are wrong."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {
CONF_USERNAME: "wrong_credentials",
CONF_PASSWORD: "wrong_credentials",
}

View File

@ -0,0 +1,118 @@
"""The tests for the Mikrotik device tracker platform."""
from datetime import timedelta
from homeassistant.components import mikrotik
import homeassistant.components.device_tracker as device_tracker
from homeassistant.helpers import entity_registry
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from . import DEVICE_2_WIRELESS, DHCP_DATA, MOCK_DATA, MOCK_OPTIONS, WIRELESS_DATA
from .test_hub import setup_mikrotik_entry
from tests.common import MockConfigEntry, patch
DEFAULT_DETECTION_TIME = timedelta(seconds=300)
def mock_command(self, cmd, params=None):
"""Mock the Mikrotik command method."""
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.IS_WIRELESS]:
return True
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.DHCP]:
return DHCP_DATA
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.WIRELESS]:
return WIRELESS_DATA
return {}
async def test_platform_manually_configured(hass):
"""Test that nothing happens when configuring mikrotik through device tracker platform."""
assert (
await async_setup_component(
hass,
device_tracker.DOMAIN,
{device_tracker.DOMAIN: {"platform": "mikrotik"}},
)
is False
)
assert mikrotik.DOMAIN not in hass.data
async def test_device_trackers(hass):
"""Test device_trackers created by mikrotik."""
# test devices are added from wireless list only
hub = await setup_mikrotik_entry(hass)
device_1 = hass.states.get("device_tracker.device_1")
assert device_1 is not None
assert device_1.state == "home"
device_2 = hass.states.get("device_tracker.device_2")
assert device_2 is None
with patch.object(mikrotik.hub.MikrotikData, "command", new=mock_command):
# test device_2 is added after connecting to wireless network
WIRELESS_DATA.append(DEVICE_2_WIRELESS)
await hub.async_update()
await hass.async_block_till_done()
device_2 = hass.states.get("device_tracker.device_2")
assert device_2 is not None
assert device_2.state == "home"
# test state remains home if last_seen consider_home_interval
del WIRELESS_DATA[1] # device 2 is removed from wireless list
hub.api.devices["00:00:00:00:00:02"]._last_seen = dt_util.utcnow() - timedelta(
minutes=4
)
await hub.async_update()
await hass.async_block_till_done()
device_2 = hass.states.get("device_tracker.device_2")
assert device_2.state != "not_home"
# test state changes to away if last_seen > consider_home_interval
hub.api.devices["00:00:00:00:00:02"]._last_seen = dt_util.utcnow() - timedelta(
minutes=5
)
await hub.async_update()
await hass.async_block_till_done()
device_2 = hass.states.get("device_tracker.device_2")
assert device_2.state == "not_home"
async def test_restoring_devices(hass):
"""Test restoring existing device_tracker entities if not detected on startup."""
config_entry = MockConfigEntry(
domain=mikrotik.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
registry = await entity_registry.async_get_registry(hass)
registry.async_get_or_create(
device_tracker.DOMAIN,
mikrotik.DOMAIN,
"00:00:00:00:00:01",
suggested_object_id="device_1",
config_entry=config_entry,
)
registry.async_get_or_create(
device_tracker.DOMAIN,
mikrotik.DOMAIN,
"00:00:00:00:00:02",
suggested_object_id="device_2",
config_entry=config_entry,
)
await setup_mikrotik_entry(hass)
# test device_2 which is not in wireless list is restored
device_1 = hass.states.get("device_tracker.device_1")
assert device_1 is not None
assert device_1.state == "home"
device_2 = hass.states.get("device_tracker.device_2")
assert device_2 is not None
assert device_2.state == "not_home"

View File

@ -0,0 +1,179 @@
"""Test Mikrotik hub."""
from asynctest import patch
import librouteros
from homeassistant import config_entries
from homeassistant.components import mikrotik
from . import ARP_DATA, DHCP_DATA, MOCK_DATA, MOCK_OPTIONS, WIRELESS_DATA
from tests.common import MockConfigEntry
async def setup_mikrotik_entry(hass, **kwargs):
"""Set up Mikrotik intergation successfully."""
support_wireless = kwargs.get("support_wireless", True)
dhcp_data = kwargs.get("dhcp_data", DHCP_DATA)
wireless_data = kwargs.get("wireless_data", WIRELESS_DATA)
def mock_command(self, cmd, params=None):
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.IS_WIRELESS]:
return support_wireless
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.DHCP]:
return dhcp_data
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.WIRELESS]:
return wireless_data
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.ARP]:
return ARP_DATA
return {}
config_entry = MockConfigEntry(
domain=mikrotik.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
if "force_dhcp" in kwargs:
config_entry.options["force_dhcp"] = True
if "arp_ping" in kwargs:
config_entry.options["arp_ping"] = True
with patch("librouteros.connect"), patch.object(
mikrotik.hub.MikrotikData, "command", new=mock_command
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return hass.data[mikrotik.DOMAIN][config_entry.entry_id]
async def test_hub_setup_successful(hass):
"""Successful setup of Mikrotik hub."""
with patch(
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup",
return_value=True,
) as forward_entry_setup:
hub = await setup_mikrotik_entry(hass)
assert hub.config_entry.data == {
mikrotik.CONF_NAME: "Mikrotik",
mikrotik.CONF_HOST: "0.0.0.0",
mikrotik.CONF_USERNAME: "user",
mikrotik.CONF_PASSWORD: "pass",
mikrotik.CONF_PORT: 8278,
mikrotik.CONF_VERIFY_SSL: False,
}
assert hub.config_entry.options == {
mikrotik.hub.CONF_FORCE_DHCP: False,
mikrotik.CONF_ARP_PING: False,
mikrotik.CONF_DETECTION_TIME: 300,
}
assert hub.api.available is True
assert hub.signal_update == "mikrotik-update-0.0.0.0"
assert forward_entry_setup.mock_calls[0][1] == (hub.config_entry, "device_tracker")
async def test_hub_setup_failed(hass):
"""Failed setup of Mikrotik hub."""
config_entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA)
config_entry.add_to_hass(hass)
# error when connection fails
with patch(
"librouteros.connect", side_effect=librouteros.exceptions.ConnectionClosed
):
await hass.config_entries.async_setup(config_entry.entry_id)
assert config_entry.state == config_entries.ENTRY_STATE_SETUP_RETRY
# error when username or password is invalid
config_entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup"
) as forward_entry_setup, patch(
"librouteros.connect",
side_effect=librouteros.exceptions.TrapError("invalid user name or password"),
):
result = await hass.config_entries.async_setup(config_entry.entry_id)
assert result is False
assert len(forward_entry_setup.mock_calls) == 0
async def test_update_failed(hass):
"""Test failing to connect during update."""
hub = await setup_mikrotik_entry(hass)
with patch.object(
mikrotik.hub.MikrotikData, "command", side_effect=mikrotik.errors.CannotConnect
):
await hub.async_update()
assert hub.api.available is False
async def test_hub_not_support_wireless(hass):
"""Test updating hub devices when hub doesn't support wireless interfaces."""
# test that the devices are constructed from dhcp data
hub = await setup_mikrotik_entry(hass, support_wireless=False)
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params is None
assert hub.api.devices["00:00:00:00:00:02"]._params == DHCP_DATA[1]
assert hub.api.devices["00:00:00:00:00:02"]._wireless_params is None
async def test_hub_support_wireless(hass):
"""Test updating hub devices when hub support wireless interfaces."""
# test that the device list is from wireless data list
hub = await setup_mikrotik_entry(hass)
assert hub.api.support_wireless is True
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params == WIRELESS_DATA[0]
# devices not in wireless list will not be added
assert "00:00:00:00:00:02" not in hub.api.devices
async def test_force_dhcp(hass):
"""Test updating hub devices with forced dhcp method."""
# test that the devices are constructed from dhcp data
hub = await setup_mikrotik_entry(hass, force_dhcp=True)
assert hub.api.support_wireless is True
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params == WIRELESS_DATA[0]
# devices not in wireless list are added from dhcp
assert hub.api.devices["00:00:00:00:00:02"]._params == DHCP_DATA[1]
assert hub.api.devices["00:00:00:00:00:02"]._wireless_params is None
async def test_arp_ping(hass):
"""Test arp ping devices to confirm they are connected."""
# test device show as home if arp ping returns value
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=True):
hub = await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
assert hub.api.devices["00:00:00:00:00:01"].last_seen is not None
assert hub.api.devices["00:00:00:00:00:02"].last_seen is not None
# test device show as away if arp ping times out
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=False):
hub = await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
assert hub.api.devices["00:00:00:00:00:01"].last_seen is not None
# this device is not wireless so it will show as away
assert hub.api.devices["00:00:00:00:00:02"].last_seen is None

View File

@ -0,0 +1,83 @@
"""Test Mikrotik setup process."""
from unittest.mock import Mock, patch
from homeassistant.components import mikrotik
from homeassistant.setup import async_setup_component
from . import MOCK_DATA
from tests.common import MockConfigEntry, mock_coro
async def test_setup_with_no_config(hass):
"""Test that we do not discover anything or try to set up a hub."""
assert await async_setup_component(hass, mikrotik.DOMAIN, {}) is True
assert mikrotik.DOMAIN not in hass.data
async def test_successful_config_entry(hass):
"""Test config entry successfull setup."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA,)
entry.add_to_hass(hass)
mock_registry = Mock()
with patch.object(mikrotik, "MikrotikHub") as mock_hub, patch(
"homeassistant.helpers.device_registry.async_get_registry",
return_value=mock_coro(mock_registry),
):
mock_hub.return_value.async_setup.return_value = mock_coro(True)
mock_hub.return_value.serial_num = "12345678"
mock_hub.return_value.model = "RB750"
mock_hub.return_value.hostname = "mikrotik"
mock_hub.return_value.firmware = "3.65"
assert await mikrotik.async_setup_entry(hass, entry) is True
assert len(mock_hub.mock_calls) == 2
p_hass, p_entry = mock_hub.mock_calls[0][1]
assert p_hass is hass
assert p_entry is entry
assert len(mock_registry.mock_calls) == 1
assert mock_registry.mock_calls[0][2] == {
"config_entry_id": entry.entry_id,
"connections": {("mikrotik", "12345678")},
"manufacturer": mikrotik.ATTR_MANUFACTURER,
"model": "RB750",
"name": "mikrotik",
"sw_version": "3.65",
}
async def test_hub_fail_setup(hass):
"""Test that a failed setup will not store the hub."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA,)
entry.add_to_hass(hass)
with patch.object(mikrotik, "MikrotikHub") as mock_hub:
mock_hub.return_value.async_setup.return_value = mock_coro(False)
assert await mikrotik.async_setup_entry(hass, entry) is False
assert mikrotik.DOMAIN not in hass.data
async def test_unload_entry(hass):
"""Test being able to unload an entry."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA,)
entry.add_to_hass(hass)
with patch.object(mikrotik, "MikrotikHub") as mock_hub, patch(
"homeassistant.helpers.device_registry.async_get_registry",
return_value=mock_coro(Mock()),
):
mock_hub.return_value.async_setup.return_value = mock_coro(True)
mock_hub.return_value.serial_num = "12345678"
mock_hub.return_value.model = "RB750"
mock_hub.return_value.hostname = "mikrotik"
mock_hub.return_value.firmware = "3.65"
assert await mikrotik.async_setup_entry(hass, entry) is True
assert len(mock_hub.return_value.mock_calls) == 1
assert await mikrotik.async_unload_entry(hass, entry)
assert entry.entry_id not in hass.data[mikrotik.DOMAIN]