Rework Mikrotik device scanning following Unifi (#27484)

* rework device scanning, add tests

* update requirements and coverage

* fix description comments

* update tests, fix disabled entity updates

* rework device scanning, add tests

* update requirements and coverage

* fix description comments

* update tests, fix disabled entity updates

* update librouteros to 3.0.0

* fix sorting

* fix sorting 2

* revert to 2.3.0 as 3.0.0 requires code update

* fix requirements

* apply fixes

* fix tests

* update hub.py and fix tests

* fix test_hub_setup_failed

* rebased on dev and update librouteros to 3.0.0

* fixed test_config_flow

* fixed tests

* fix test_config_flow
This commit is contained in:
Rami Mosleh 2020-01-30 20:21:51 +02:00 committed by GitHub
parent 33361f8580
commit 9432054066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1546 additions and 341 deletions

View File

@ -420,7 +420,8 @@ omit =
homeassistant/components/metoffice/weather.py
homeassistant/components/microsoft/tts.py
homeassistant/components/miflora/sensor.py
homeassistant/components/mikrotik/*
homeassistant/components/mikrotik/hub.py
homeassistant/components/mikrotik/device_tracker.py
homeassistant/components/mill/climate.py
homeassistant/components/mill/const.py
homeassistant/components/minio/*

View File

@ -209,6 +209,7 @@ homeassistant/components/met/* @danielhiversen
homeassistant/components/meteo_france/* @victorcerutti @oncleben31
homeassistant/components/meteoalarm/* @rolfberkenbosch
homeassistant/components/miflora/* @danielhiversen @ChristianKuehnel
homeassistant/components/mikrotik/* @engrbm87
homeassistant/components/mill/* @danielhiversen
homeassistant/components/min_max/* @fabaff
homeassistant/components/minio/* @tkislan

View File

@ -0,0 +1,37 @@
{
"config": {
"title": "Mikrotik",
"step": {
"user": {
"title": "Set up Mikrotik Router",
"data": {
"name": "Name",
"host": "Host",
"username": "Username",
"password": "Password",
"port": "Port",
"verify_ssl": "Use ssl"
}
}
},
"error": {
"name_exists": "Name exists",
"cannot_connect": "Connection Unsuccessful",
"wrong_credentials": "Wrong Credentials"
},
"abort": {
"already_configured": "Mikrotik is already configured"
}
},
"options": {
"step": {
"device_tracker": {
"data": {
"arp_ping": "Enable ARP ping",
"force_dhcp": "Force scanning using DHCP",
"detection_time": "Consider home interval"
}
}
}
}
}

View File

@ -1,43 +1,28 @@
"""The mikrotik component."""
import logging
import ssl
from librouteros import connect
from librouteros.exceptions import LibRouterosError
from librouteros.login import plain as login_plain, token as login_token
"""The Mikrotik component."""
import voluptuous as vol
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import (
CONF_HOST,
CONF_METHOD,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.discovery import load_platform
from .const import (
ATTR_MANUFACTURER,
CONF_ARP_PING,
CONF_ENCODING,
CONF_LOGIN_METHOD,
CONF_TRACK_DEVICES,
DEFAULT_ENCODING,
CONF_DETECTION_TIME,
CONF_FORCE_DHCP,
DEFAULT_API_PORT,
DEFAULT_DETECTION_TIME,
DEFAULT_NAME,
DOMAIN,
HOSTS,
IDENTITY,
MIKROTIK_SERVICES,
MTK_LOGIN_PLAIN,
MTK_LOGIN_TOKEN,
NAME,
)
_LOGGER = logging.getLogger(__name__)
MTK_DEFAULT_API_PORT = "8728"
MTK_DEFAULT_API_SSL_PORT = "8729"
from .hub import MikrotikHub
MIKROTIK_SCHEMA = vol.All(
vol.Schema(
@ -45,13 +30,14 @@ MIKROTIK_SCHEMA = vol.All(
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_METHOD): cv.string,
vol.Optional(CONF_LOGIN_METHOD): vol.Any(MTK_LOGIN_PLAIN, MTK_LOGIN_TOKEN),
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
vol.Optional(CONF_TRACK_DEVICES, default=True): cv.boolean,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_API_PORT): cv.port,
vol.Optional(CONF_VERIFY_SSL, default=False): cv.boolean,
vol.Optional(CONF_ARP_PING, default=False): cv.boolean,
vol.Optional(CONF_FORCE_DHCP, default=False): cv.boolean,
vol.Optional(
CONF_DETECTION_TIME, default=DEFAULT_DETECTION_TIME
): cv.time_period,
}
)
)
@ -61,124 +47,45 @@ CONFIG_SCHEMA = vol.Schema(
)
def setup(hass, config):
"""Set up the Mikrotik component."""
hass.data[DOMAIN] = {HOSTS: {}}
async def async_setup(hass, config):
"""Import the Mikrotik component from config."""
for device in config[DOMAIN]:
host = device[CONF_HOST]
use_ssl = device.get(CONF_SSL)
user = device.get(CONF_USERNAME)
password = device.get(CONF_PASSWORD, "")
login = device.get(CONF_LOGIN_METHOD)
encoding = device.get(CONF_ENCODING)
track_devices = device.get(CONF_TRACK_DEVICES)
if CONF_PORT in device:
port = device.get(CONF_PORT)
else:
if use_ssl:
port = MTK_DEFAULT_API_SSL_PORT
else:
port = MTK_DEFAULT_API_PORT
if login == MTK_LOGIN_PLAIN:
login_method = login_plain
else:
login_method = login_token
try:
api = MikrotikClient(
host, use_ssl, port, user, password, login_method, encoding
if DOMAIN in config:
for entry in config[DOMAIN]:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=entry
)
)
api.connect_to_device()
hass.data[DOMAIN][HOSTS][host] = {"config": device, "api": api}
except LibRouterosError as api_error:
_LOGGER.error("Mikrotik %s error %s", host, api_error)
continue
if track_devices:
hass.data[DOMAIN][HOSTS][host][DEVICE_TRACKER] = True
load_platform(hass, DEVICE_TRACKER, DOMAIN, None, config)
if not hass.data[DOMAIN][HOSTS]:
return False
return True
class MikrotikClient:
"""Handle all communication with the Mikrotik API."""
async def async_setup_entry(hass, config_entry):
"""Set up the Mikrotik component."""
def __init__(self, host, use_ssl, port, user, password, login_method, encoding):
"""Initialize the Mikrotik Client."""
self._host = host
self._use_ssl = use_ssl
self._port = port
self._user = user
self._password = password
self._login_method = login_method
self._encoding = encoding
self._ssl_wrapper = None
self.hostname = None
self._client = None
self._connected = False
hub = MikrotikHub(hass, config_entry)
if not await hub.async_setup():
return False
def connect_to_device(self):
"""Connect to Mikrotik device."""
self._connected = False
_LOGGER.debug("[%s] Connecting to Mikrotik device", self._host)
hass.data.setdefault(DOMAIN, {})[config_entry.entry_id] = hub
device_registry = await hass.helpers.device_registry.async_get_registry()
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(DOMAIN, hub.serial_num)},
manufacturer=ATTR_MANUFACTURER,
model=hub.model,
name=hub.hostname,
sw_version=hub.firmware,
)
kwargs = {
"encoding": self._encoding,
"login_methods": self._login_method,
"port": self._port,
}
return True
if self._use_ssl:
if self._ssl_wrapper is None:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self._ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = self._ssl_wrapper
try:
self._client = connect(self._host, self._user, self._password, **kwargs)
self._connected = True
except LibRouterosError as api_error:
_LOGGER.error("Mikrotik %s: %s", self._host, api_error)
self._client = None
return False
async def async_unload_entry(hass, config_entry):
"""Unload a config entry."""
await hass.config_entries.async_forward_entry_unload(config_entry, "device_tracker")
self.hostname = self.get_hostname()
_LOGGER.info("Mikrotik Connected to %s (%s)", self.hostname, self._host)
return self._connected
hass.data[DOMAIN].pop(config_entry.entry_id)
def get_hostname(self):
"""Return device host name."""
data = list(self.command(MIKROTIK_SERVICES[IDENTITY]))
return data[0][NAME] if data else None
def connected(self):
"""Return connected boolean."""
return self._connected
def command(self, cmd, params=None):
"""Retrieve data from Mikrotik API."""
if not self._connected or not self._client:
if not self.connect_to_device():
return None
try:
if params:
response = self._client(cmd=cmd, **params)
else:
response = self._client(cmd=cmd)
except LibRouterosError as api_error:
_LOGGER.error(
"Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s",
self._host,
cmd,
api_error,
)
return None
return response if response else None
return True

View File

@ -0,0 +1,120 @@
"""Config flow for Mikrotik."""
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.core import callback
from .const import (
CONF_ARP_PING,
CONF_DETECTION_TIME,
CONF_FORCE_DHCP,
DEFAULT_API_PORT,
DEFAULT_DETECTION_TIME,
DEFAULT_NAME,
DOMAIN,
)
from .errors import CannotConnect, LoginError
from .hub import get_api
class MikrotikFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a Mikrotik config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return MikrotikOptionsFlowHandler(config_entry)
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
errors = {}
if user_input is not None:
for entry in self.hass.config_entries.async_entries(DOMAIN):
if entry.data[CONF_HOST] == user_input[CONF_HOST]:
return self.async_abort(reason="already_configured")
if entry.data[CONF_NAME] == user_input[CONF_NAME]:
errors[CONF_NAME] = "name_exists"
break
try:
await self.hass.async_add_executor_job(get_api, self.hass, user_input)
except CannotConnect:
errors["base"] = "cannot_connect"
except LoginError:
errors[CONF_USERNAME] = "wrong_credentials"
errors[CONF_PASSWORD] = "wrong_credentials"
if not errors:
return self.async_create_entry(
title=user_input[CONF_NAME], data=user_input
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_NAME, default=DEFAULT_NAME): str,
vol.Required(CONF_HOST): str,
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
vol.Optional(CONF_PORT, default=DEFAULT_API_PORT): int,
vol.Optional(CONF_VERIFY_SSL, default=False): bool,
}
),
errors=errors,
)
async def async_step_import(self, import_config):
"""Import Miktortik from config."""
import_config[CONF_DETECTION_TIME] = import_config[CONF_DETECTION_TIME].seconds
return await self.async_step_user(user_input=import_config)
class MikrotikOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle Mikrotik options."""
def __init__(self, config_entry):
"""Initialize Mikrotik options flow."""
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
"""Manage the Mikrotik options."""
return await self.async_step_device_tracker()
async def async_step_device_tracker(self, user_input=None):
"""Manage the device tracker options."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
options = {
vol.Optional(
CONF_FORCE_DHCP,
default=self.config_entry.options.get(CONF_FORCE_DHCP, False),
): bool,
vol.Optional(
CONF_ARP_PING,
default=self.config_entry.options.get(CONF_ARP_PING, False),
): bool,
vol.Optional(
CONF_DETECTION_TIME,
default=self.config_entry.options.get(
CONF_DETECTION_TIME, DEFAULT_DETECTION_TIME
),
): int,
}
return self.async_show_form(
step_id="device_tracker", data_schema=vol.Schema(options)
)

View File

@ -1,32 +1,38 @@
"""Constants used in the Mikrotik components."""
DOMAIN = "mikrotik"
MIKROTIK = DOMAIN
HOSTS = "hosts"
MTK_LOGIN_PLAIN = "plain"
MTK_LOGIN_TOKEN = "token"
DEFAULT_NAME = "Mikrotik"
DEFAULT_API_PORT = 8728
DEFAULT_DETECTION_TIME = 300
ATTR_MANUFACTURER = "Mikrotik"
ATTR_SERIAL_NUMBER = "serial-number"
ATTR_FIRMWARE = "current-firmware"
ATTR_MODEL = "model"
CONF_ARP_PING = "arp_ping"
CONF_TRACK_DEVICES = "track_devices"
CONF_LOGIN_METHOD = "login_method"
CONF_ENCODING = "encoding"
DEFAULT_ENCODING = "utf-8"
CONF_FORCE_DHCP = "force_dhcp"
CONF_DETECTION_TIME = "detection_time"
NAME = "name"
INFO = "info"
IDENTITY = "identity"
ARP = "arp"
CAPSMAN = "capsman"
DHCP = "dhcp"
WIRELESS = "wireless"
CAPSMAN = "capsman"
IS_WIRELESS = "is_wireless"
MIKROTIK_SERVICES = {
INFO: "/system/routerboard/getall",
IDENTITY: "/system/identity/getall",
ARP: "/ip/arp/getall",
DHCP: "/ip/dhcp-server/lease/getall",
WIRELESS: "/interface/wireless/registration-table/getall",
CAPSMAN: "/caps-man/registration-table/getall",
DHCP: "/ip/dhcp-server/lease/getall",
IDENTITY: "/system/identity/getall",
INFO: "/system/routerboard/getall",
WIRELESS: "/interface/wireless/registration-table/getall",
IS_WIRELESS: "/interface/wireless/print",
}
ATTR_DEVICE_TRACKER = [
@ -34,16 +40,8 @@ ATTR_DEVICE_TRACKER = [
"mac-address",
"ssid",
"interface",
"host-name",
"last-seen",
"rx-signal",
"signal-strength",
"tx-ccq",
"signal-to-noise",
"wmm-enabled",
"authentication-type",
"encryption",
"tx-rate-set",
"rx-rate",
"tx-rate",
"uptime",

View File

@ -1,191 +1,142 @@
"""Support for Mikrotik routers as device tracker."""
import logging
from homeassistant.components.device_tracker import (
from homeassistant.components.device_tracker.config_entry import ScannerEntity
from homeassistant.components.device_tracker.const import (
DOMAIN as DEVICE_TRACKER,
DeviceScanner,
SOURCE_TYPE_ROUTER,
)
from homeassistant.const import CONF_METHOD
from homeassistant.util import slugify
from homeassistant.core import callback
from homeassistant.helpers import entity_registry
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_connect
import homeassistant.util.dt as dt_util
from .const import (
ARP,
ATTR_DEVICE_TRACKER,
CAPSMAN,
CONF_ARP_PING,
DHCP,
HOSTS,
MIKROTIK,
MIKROTIK_SERVICES,
WIRELESS,
)
from .const import ATTR_MANUFACTURER, DOMAIN
_LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config):
"""Validate the configuration and return MikrotikScanner."""
for host in hass.data[MIKROTIK][HOSTS]:
if DEVICE_TRACKER not in hass.data[MIKROTIK][HOSTS][host]:
continue
hass.data[MIKROTIK][HOSTS][host].pop(DEVICE_TRACKER, None)
api = hass.data[MIKROTIK][HOSTS][host]["api"]
config = hass.data[MIKROTIK][HOSTS][host]["config"]
hostname = api.get_hostname()
scanner = MikrotikScanner(api, host, hostname, config)
return scanner if scanner.success_init else None
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up device tracker for Mikrotik component."""
hub = hass.data[DOMAIN][config_entry.entry_id]
tracked = {}
registry = await entity_registry.async_get_registry(hass)
# Restore clients that is not a part of active clients list.
for entity in registry.entities.values():
if (
entity.config_entry_id == config_entry.entry_id
and entity.domain == DEVICE_TRACKER
):
if (
entity.unique_id in hub.api.devices
or entity.unique_id not in hub.api.all_devices
):
continue
hub.api.restore_device(entity.unique_id)
@callback
def update_hub():
"""Update the status of the device."""
update_items(hub, async_add_entities, tracked)
async_dispatcher_connect(hass, hub.signal_update, update_hub)
update_hub()
class MikrotikScanner(DeviceScanner):
"""This class queries a Mikrotik device."""
@callback
def update_items(hub, async_add_entities, tracked):
"""Update tracked device state from the hub."""
new_tracked = []
for mac, device in hub.api.devices.items():
if mac not in tracked:
tracked[mac] = MikrotikHubTracker(device, hub)
new_tracked.append(tracked[mac])
def __init__(self, api, host, hostname, config):
"""Initialize the scanner."""
self.api = api
self.config = config
self.host = host
self.hostname = hostname
self.method = config.get(CONF_METHOD)
self.arp_ping = config.get(CONF_ARP_PING)
self.dhcp = None
self.devices_arp = {}
self.devices_dhcp = {}
self.device_tracker = None
self.success_init = self.api.connected()
def get_extra_attributes(self, device):
"""
Get extra attributes of a device.
Some known extra attributes that may be returned in the device tuple
include MAC address (mac), network device (dev), IP address
(ip), reachable status (reachable), associated router
(host), hostname if known (hostname) among others.
"""
return self.device_tracker.get(device) or {}
def get_device_name(self, device):
"""Get name for a device."""
host = self.device_tracker.get(device, {})
return host.get("host_name")
def scan_devices(self):
"""Scan for new devices and return a list with found device MACs."""
self.update_device_tracker()
return list(self.device_tracker)
def get_method(self):
"""Determine the device tracker polling method."""
if self.method:
_LOGGER.debug(
"Mikrotik %s: Manually selected polling method %s",
self.host,
self.method,
)
return self.method
capsman = self.api.command(MIKROTIK_SERVICES[CAPSMAN])
if not capsman:
_LOGGER.debug(
"Mikrotik %s: Not a CAPsMAN controller. "
"Trying local wireless interfaces",
(self.host),
)
else:
return CAPSMAN
wireless = self.api.command(MIKROTIK_SERVICES[WIRELESS])
if not wireless:
_LOGGER.info(
"Mikrotik %s: Wireless adapters not found. Try to "
"use DHCP lease table as presence tracker source. "
"Please decrease lease time as much as possible",
self.host,
)
return DHCP
return WIRELESS
def update_device_tracker(self):
"""Update device_tracker from Mikrotik API."""
self.device_tracker = {}
if not self.method:
self.method = self.get_method()
data = self.api.command(MIKROTIK_SERVICES[self.method])
if data is None:
return
if self.method != DHCP:
dhcp = self.api.command(MIKROTIK_SERVICES[DHCP])
if dhcp is not None:
self.devices_dhcp = load_mac(dhcp)
arp = self.api.command(MIKROTIK_SERVICES[ARP])
self.devices_arp = load_mac(arp)
for device in data:
mac = device.get("mac-address")
if self.method == DHCP:
if "active-address" not in device:
continue
if self.arp_ping and self.devices_arp:
if mac not in self.devices_arp:
continue
ip_address = self.devices_arp[mac]["address"]
interface = self.devices_arp[mac]["interface"]
if not self.do_arp_ping(ip_address, interface):
continue
attrs = {}
if mac in self.devices_dhcp and "host-name" in self.devices_dhcp[mac]:
hostname = self.devices_dhcp[mac].get("host-name")
if hostname:
attrs["host_name"] = hostname
if self.devices_arp and mac in self.devices_arp:
attrs["ip_address"] = self.devices_arp[mac].get("address")
for attr in ATTR_DEVICE_TRACKER:
if attr in device and device[attr] is not None:
attrs[slugify(attr)] = device[attr]
attrs["scanner_type"] = self.method
attrs["scanner_host"] = self.host
attrs["scanner_hostname"] = self.hostname
self.device_tracker[mac] = attrs
def do_arp_ping(self, ip_address, interface):
"""Attempt to arp ping MAC address via interface."""
params = {
"arp-ping": "yes",
"interval": "100ms",
"count": 3,
"interface": interface,
"address": ip_address,
}
cmd = "/ping"
data = self.api.command(cmd, params)
if data is not None:
status = 0
for result in data:
if "status" in result:
_LOGGER.debug(
"Mikrotik %s arp_ping error: %s", self.host, result["status"]
)
status += 1
if status == len(data):
return None
return data
if new_tracked:
async_add_entities(new_tracked)
def load_mac(devices=None):
"""Load dictionary using MAC address as key."""
if not devices:
class MikrotikHubTracker(ScannerEntity):
"""Representation of network device."""
def __init__(self, device, hub):
"""Initialize the tracked device."""
self.device = device
self.hub = hub
self.unsub_dispatcher = None
@property
def is_connected(self):
"""Return true if the client is connected to the network."""
if (
self.device.last_seen
and (dt_util.utcnow() - self.device.last_seen)
< self.hub.option_detection_time
):
return True
return False
@property
def source_type(self):
"""Return the source type of the client."""
return SOURCE_TYPE_ROUTER
@property
def name(self) -> str:
"""Return the name of the client."""
return self.device.name
@property
def unique_id(self) -> str:
"""Return a unique identifier for this device."""
return self.device.mac
@property
def available(self) -> bool:
"""Return if controller is available."""
return self.hub.available
@property
def device_state_attributes(self):
"""Return the device state attributes."""
if self.is_connected:
return self.device.attrs
return None
mac_devices = {}
for device in devices:
if "mac-address" in device:
mac = device.pop("mac-address")
mac_devices[mac] = device
return mac_devices
@property
def device_info(self):
"""Return a client description for device registry."""
info = {
"connections": {(CONNECTION_NETWORK_MAC, self.device.mac)},
"manufacturer": ATTR_MANUFACTURER,
"identifiers": {(DOMAIN, self.device.mac)},
"name": self.name,
"via_device": (DOMAIN, self.hub.serial_num),
}
return info
async def async_added_to_hass(self):
"""Client entity created."""
_LOGGER.debug("New network device tracker %s (%s)", self.name, self.unique_id)
self.unsub_dispatcher = async_dispatcher_connect(
self.hass, self.hub.signal_update, self.async_write_ha_state
)
async def async_update(self):
"""Synchronize state with hub."""
_LOGGER.debug(
"Updating Mikrotik tracked client %s (%s)", self.entity_id, self.unique_id
)
await self.hub.request_update()
async def will_remove_from_hass(self):
"""Disconnect from dispatcher."""
if self.unsub_dispatcher:
self.unsub_dispatcher()

View File

@ -0,0 +1,10 @@
"""Errors for the Mikrotik component."""
from homeassistant.exceptions import HomeAssistantError
class CannotConnect(HomeAssistantError):
"""Unable to connect to the hub."""
class LoginError(HomeAssistantError):
"""Component got logged out."""

View File

@ -0,0 +1,413 @@
"""The Mikrotik router class."""
from datetime import timedelta
import logging
import socket
import ssl
import librouteros
from librouteros.login import plain as login_plain, token as login_token
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_VERIFY_SSL
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.util import slugify
import homeassistant.util.dt as dt_util
from .const import (
ARP,
ATTR_DEVICE_TRACKER,
ATTR_FIRMWARE,
ATTR_MODEL,
ATTR_SERIAL_NUMBER,
CAPSMAN,
CONF_ARP_PING,
CONF_DETECTION_TIME,
CONF_FORCE_DHCP,
DEFAULT_DETECTION_TIME,
DHCP,
IDENTITY,
INFO,
IS_WIRELESS,
MIKROTIK_SERVICES,
NAME,
WIRELESS,
)
from .errors import CannotConnect, LoginError
_LOGGER = logging.getLogger(__name__)
class Device:
"""Represents a network device."""
def __init__(self, mac, params):
"""Initialize the network device."""
self._mac = mac
self._params = params
self._last_seen = None
self._attrs = {}
self._wireless_params = None
@property
def name(self):
"""Return device name."""
return self._params.get("host-name", self.mac)
@property
def mac(self):
"""Return device mac."""
return self._mac
@property
def last_seen(self):
"""Return device last seen."""
return self._last_seen
@property
def attrs(self):
"""Return device attributes."""
attr_data = self._wireless_params if self._wireless_params else self._params
for attr in ATTR_DEVICE_TRACKER:
if attr in attr_data:
self._attrs[slugify(attr)] = attr_data[attr]
self._attrs["ip_address"] = self._params.get("active-address")
return self._attrs
def update(self, wireless_params=None, params=None, active=False):
"""Update Device params."""
if wireless_params:
self._wireless_params = wireless_params
if params:
self._params = params
if active:
self._last_seen = dt_util.utcnow()
class MikrotikData:
"""Handle all communication with the Mikrotik API."""
def __init__(self, hass, config_entry, api):
"""Initialize the Mikrotik Client."""
self.hass = hass
self.config_entry = config_entry
self.api = api
self._host = self.config_entry.data[CONF_HOST]
self.all_devices = {}
self.devices = {}
self.available = True
self.support_wireless = bool(self.command(MIKROTIK_SERVICES[IS_WIRELESS]))
self.hostname = None
self.model = None
self.firmware = None
self.serial_number = None
@staticmethod
def load_mac(devices=None):
"""Load dictionary using MAC address as key."""
if not devices:
return None
mac_devices = {}
for device in devices:
if "mac-address" in device:
mac = device["mac-address"]
mac_devices[mac] = device
return mac_devices
@property
def arp_enabled(self):
"""Return arp_ping option setting."""
return self.config_entry.options[CONF_ARP_PING]
@property
def force_dhcp(self):
"""Return force_dhcp option setting."""
return self.config_entry.options[CONF_FORCE_DHCP]
def get_info(self, param):
"""Return device model name."""
cmd = IDENTITY if param == NAME else INFO
data = list(self.command(MIKROTIK_SERVICES[cmd]))
return data[0].get(param) if data else None
def get_hub_details(self):
"""Get Hub info."""
self.hostname = self.get_info(NAME)
self.model = self.get_info(ATTR_MODEL)
self.firmware = self.get_info(ATTR_FIRMWARE)
self.serial_number = self.get_info(ATTR_SERIAL_NUMBER)
def connect_to_hub(self):
"""Connect to hub."""
try:
self.api = get_api(self.hass, self.config_entry.data)
self.available = True
return True
except (LoginError, CannotConnect):
self.available = False
return False
def get_list_from_interface(self, interface):
"""Get devices from interface."""
result = list(self.command(MIKROTIK_SERVICES[interface]))
return self.load_mac(result) if result else {}
def restore_device(self, mac):
"""Restore a missing device after restart."""
self.devices[mac] = Device(mac, self.all_devices[mac])
def update_devices(self):
"""Get list of devices with latest status."""
arp_devices = {}
wireless_devices = {}
device_list = {}
try:
self.all_devices = self.get_list_from_interface(DHCP)
if self.support_wireless:
_LOGGER.debug("wireless is supported")
for interface in [CAPSMAN, WIRELESS]:
wireless_devices = self.get_list_from_interface(interface)
if wireless_devices:
_LOGGER.debug("Scanning wireless devices using %s", interface)
break
if self.support_wireless and not self.force_dhcp:
device_list = wireless_devices
else:
device_list = self.all_devices
_LOGGER.debug("Falling back to DHCP for scanning devices")
if self.arp_enabled:
arp_devices = self.get_list_from_interface(ARP)
# get new hub firmware version if updated
self.firmware = self.get_info(ATTR_FIRMWARE)
except (CannotConnect, socket.timeout, socket.error):
self.available = False
return
if not device_list:
return
for mac, params in device_list.items():
if mac not in self.devices:
self.devices[mac] = Device(mac, self.all_devices.get(mac, {}))
else:
self.devices[mac].update(params=self.all_devices.get(mac, {}))
if mac in wireless_devices:
# if wireless is supported then wireless_params are params
self.devices[mac].update(
wireless_params=wireless_devices[mac], active=True
)
continue
# for wired devices or when forcing dhcp check for active-address
if not params.get("active-address"):
self.devices[mac].update(active=False)
continue
# ping check the rest of active devices if arp ping is enabled
active = True
if self.arp_enabled and mac in arp_devices:
active = self.do_arp_ping(
params.get("active-address"), arp_devices[mac].get("interface")
)
self.devices[mac].update(active=active)
def do_arp_ping(self, ip_address, interface):
"""Attempt to arp ping MAC address via interface."""
_LOGGER.debug("pinging - %s", ip_address)
params = {
"arp-ping": "yes",
"interval": "100ms",
"count": 3,
"interface": interface,
"address": ip_address,
}
cmd = "/ping"
data = list(self.command(cmd, params))
if data is not None:
status = 0
for result in data:
if "status" in result:
status += 1
if status == len(data):
_LOGGER.debug(
"Mikrotik %s - %s arp_ping timed out", ip_address, interface
)
return False
return True
def command(self, cmd, params=None):
"""Retrieve data from Mikrotik API."""
try:
_LOGGER.info("Running command %s", cmd)
if params:
response = self.api(cmd=cmd, **params)
else:
response = self.api(cmd=cmd)
except (
librouteros.exceptions.ConnectionClosed,
socket.error,
socket.timeout,
) as api_error:
_LOGGER.error("Mikrotik %s connection error %s", self._host, api_error)
raise CannotConnect
except librouteros.exceptions.ProtocolError as api_error:
_LOGGER.warning(
"Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s",
self._host,
cmd,
api_error,
)
return None
return response if response else None
def update(self):
"""Update device_tracker from Mikrotik API."""
if not self.available or not self.api:
if not self.connect_to_hub():
return
_LOGGER.debug("updating network devices for host: %s", self._host)
self.update_devices()
class MikrotikHub:
"""Mikrotik Hub Object."""
def __init__(self, hass, config_entry):
"""Initialize the Mikrotik Client."""
self.hass = hass
self.config_entry = config_entry
self._mk_data = None
self.progress = None
@property
def host(self):
"""Return the host of this hub."""
return self.config_entry.data[CONF_HOST]
@property
def hostname(self):
"""Return the hostname of the hub."""
return self._mk_data.hostname
@property
def model(self):
"""Return the model of the hub."""
return self._mk_data.model
@property
def firmware(self):
"""Return the firware of the hub."""
return self._mk_data.firmware
@property
def serial_num(self):
"""Return the serial number of the hub."""
return self._mk_data.serial_number
@property
def available(self):
"""Return if the hub is connected."""
return self._mk_data.available
@property
def option_detection_time(self):
"""Config entry option defining number of seconds from last seen to away."""
return timedelta(seconds=self.config_entry.options[CONF_DETECTION_TIME])
@property
def signal_update(self):
"""Event specific per Mikrotik entry to signal updates."""
return f"mikrotik-update-{self.host}"
@property
def api(self):
"""Represent Mikrotik data object."""
return self._mk_data
async def async_add_options(self):
"""Populate default options for Mikrotik."""
if not self.config_entry.options:
options = {
CONF_ARP_PING: self.config_entry.data.pop(CONF_ARP_PING, False),
CONF_FORCE_DHCP: self.config_entry.data.pop(CONF_FORCE_DHCP, False),
CONF_DETECTION_TIME: self.config_entry.data.pop(
CONF_DETECTION_TIME, DEFAULT_DETECTION_TIME
),
}
self.hass.config_entries.async_update_entry(
self.config_entry, options=options
)
async def request_update(self):
"""Request an update."""
if self.progress is not None:
await self.progress
return
self.progress = self.hass.async_create_task(self.async_update())
await self.progress
self.progress = None
async def async_update(self):
"""Update Mikrotik devices information."""
await self.hass.async_add_executor_job(self._mk_data.update)
async_dispatcher_send(self.hass, self.signal_update)
async def async_setup(self):
"""Set up the Mikrotik hub."""
try:
api = await self.hass.async_add_executor_job(
get_api, self.hass, self.config_entry.data
)
except CannotConnect:
raise ConfigEntryNotReady
except LoginError:
return False
self._mk_data = MikrotikData(self.hass, self.config_entry, api)
await self.async_add_options()
await self.hass.async_add_executor_job(self._mk_data.get_hub_details)
await self.hass.async_add_executor_job(self._mk_data.update)
self.hass.async_create_task(
self.hass.config_entries.async_forward_entry_setup(
self.config_entry, "device_tracker"
)
)
return True
def get_api(hass, entry):
"""Connect to Mikrotik hub."""
_LOGGER.debug("Connecting to Mikrotik hub [%s]", entry[CONF_HOST])
_login_method = (login_plain, login_token)
kwargs = {"login_methods": _login_method, "port": entry["port"]}
if entry[CONF_VERIFY_SSL]:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
_ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = _ssl_wrapper
try:
api = librouteros.connect(
entry[CONF_HOST], entry[CONF_USERNAME], entry[CONF_PASSWORD], **kwargs,
)
_LOGGER.debug("Connected to %s successfully", entry[CONF_HOST])
return api
except (
librouteros.exceptions.LibRouterosError,
socket.error,
socket.timeout,
) as api_error:
_LOGGER.error("Mikrotik %s error: %s", entry[CONF_HOST], api_error)
if "invalid user name or password" in str(api_error):
raise LoginError
raise CannotConnect

View File

@ -1,8 +1,13 @@
{
"domain": "mikrotik",
"name": "MikroTik",
"name": "Mikrotik",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/mikrotik",
"requirements": ["librouteros==3.0.0"],
"requirements": [
"librouteros==3.0.0"
],
"dependencies": [],
"codeowners": []
}
"codeowners": [
"@engrbm87"
]
}

View File

@ -0,0 +1,37 @@
{
"config": {
"title": "Mikrotik",
"step": {
"user": {
"title": "Set up Mikrotik Router",
"data": {
"name": "Name",
"host": "Host",
"username": "Username",
"password": "Password",
"port": "Port",
"verify_ssl": "Use ssl"
}
}
},
"error": {
"name_exists": "Name exists",
"cannot_connect": "Connection Unsuccessful",
"wrong_credentials": "Wrong Credentials"
},
"abort": {
"already_configured": "Mikrotik is already configured"
}
},
"options": {
"step": {
"device_tracker": {
"data": {
"arp_ping": "Enable ARP ping",
"force_dhcp": "Force scanning using DHCP",
"detection_time": "Consider home interval"
}
}
}
}
}

View File

@ -54,6 +54,7 @@ FLOWS = [
"luftdaten",
"mailgun",
"met",
"mikrotik",
"mobile_app",
"mqtt",
"neato",

View File

@ -283,6 +283,9 @@ keyrings.alt==3.4.0
# homeassistant.components.dyson
libpurecool==0.6.0
# homeassistant.components.mikrotik
librouteros==3.0.0
# homeassistant.components.soundtouch
libsoundtouch==0.7.2

View File

@ -0,0 +1,133 @@
"""Tests for the Mikrotik component."""
from homeassistant.components import mikrotik
MOCK_DATA = {
mikrotik.CONF_NAME: "Mikrotik",
mikrotik.CONF_HOST: "0.0.0.0",
mikrotik.CONF_USERNAME: "user",
mikrotik.CONF_PASSWORD: "pass",
mikrotik.CONF_PORT: 8278,
mikrotik.CONF_VERIFY_SSL: False,
}
MOCK_OPTIONS = {
mikrotik.CONF_ARP_PING: False,
mikrotik.const.CONF_FORCE_DHCP: False,
mikrotik.CONF_DETECTION_TIME: mikrotik.DEFAULT_DETECTION_TIME,
}
DEVICE_1_DHCP = {
".id": "*1A",
"address": "0.0.0.1",
"mac-address": "00:00:00:00:00:01",
"active-address": "0.0.0.1",
"host-name": "Device_1",
"comment": "Mobile",
}
DEVICE_2_DHCP = {
".id": "*1B",
"address": "0.0.0.2",
"mac-address": "00:00:00:00:00:02",
"active-address": "0.0.0.2",
"host-name": "Device_2",
"comment": "PC",
}
DEVICE_1_WIRELESS = {
".id": "*264",
"interface": "wlan1",
"mac-address": "00:00:00:00:00:01",
"ap": False,
"wds": False,
"bridge": False,
"rx-rate": "72.2Mbps-20MHz/1S/SGI",
"tx-rate": "72.2Mbps-20MHz/1S/SGI",
"packets": "59542,17464",
"bytes": "17536671,2966351",
"frames": "59542,17472",
"frame-bytes": "17655785,2862445",
"hw-frames": "78935,38395",
"hw-frame-bytes": "25636019,4063445",
"tx-frames-timed-out": 0,
"uptime": "5h49m36s",
"last-activity": "170ms",
"signal-strength": "-62@1Mbps",
"signal-to-noise": 52,
"signal-strength-ch0": -63,
"signal-strength-ch1": -69,
"strength-at-rates": "-62@1Mbps 16s330ms,-64@6Mbps 13s560ms,-65@HT20-3 52m6s30ms,-66@HT20-4 52m4s350ms,-66@HT20-5 51m58s580ms,-65@HT20-6 51m24s780ms,-65@HT20-7 5s680ms",
"tx-ccq": 93,
"p-throughput": 54928,
"last-ip": "0.0.0.1",
"802.1x-port-enabled": True,
"authentication-type": "wpa2-psk",
"encryption": "aes-ccm",
"group-encryption": "aes-ccm",
"management-protection": False,
"wmm-enabled": True,
"tx-rate-set": "OFDM:6-54 BW:1x SGI:1x HT:0-7",
}
DEVICE_2_WIRELESS = {
".id": "*265",
"interface": "wlan1",
"mac-address": "00:00:00:00:00:02",
"ap": False,
"wds": False,
"bridge": False,
"rx-rate": "72.2Mbps-20MHz/1S/SGI",
"tx-rate": "72.2Mbps-20MHz/1S/SGI",
"packets": "59542,17464",
"bytes": "17536671,2966351",
"frames": "59542,17472",
"frame-bytes": "17655785,2862445",
"hw-frames": "78935,38395",
"hw-frame-bytes": "25636019,4063445",
"tx-frames-timed-out": 0,
"uptime": "5h49m36s",
"last-activity": "170ms",
"signal-strength": "-62@1Mbps",
"signal-to-noise": 52,
"signal-strength-ch0": -63,
"signal-strength-ch1": -69,
"strength-at-rates": "-62@1Mbps 16s330ms,-64@6Mbps 13s560ms,-65@HT20-3 52m6s30ms,-66@HT20-4 52m4s350ms,-66@HT20-5 51m58s580ms,-65@HT20-6 51m24s780ms,-65@HT20-7 5s680ms",
"tx-ccq": 93,
"p-throughput": 54928,
"last-ip": "0.0.0.2",
"802.1x-port-enabled": True,
"authentication-type": "wpa2-psk",
"encryption": "aes-ccm",
"group-encryption": "aes-ccm",
"management-protection": False,
"wmm-enabled": True,
"tx-rate-set": "OFDM:6-54 BW:1x SGI:1x HT:0-7",
}
DHCP_DATA = [DEVICE_1_DHCP, DEVICE_2_DHCP]
WIRELESS_DATA = [DEVICE_1_WIRELESS]
ARP_DATA = [
{
".id": "*1",
"address": "0.0.0.1",
"mac-address": "00:00:00:00:00:01",
"interface": "bridge",
"published": False,
"invalid": False,
"DHCP": True,
"dynamic": True,
"complete": True,
"disabled": False,
},
{
".id": "*2",
"address": "0.0.0.2",
"mac-address": "00:00:00:00:00:02",
"interface": "bridge",
"published": False,
"invalid": False,
"DHCP": True,
"dynamic": True,
"complete": True,
"disabled": False,
},
]

View File

@ -0,0 +1,208 @@
"""Test Mikrotik setup process."""
from datetime import timedelta
from unittest.mock import patch
import librouteros
import pytest
from homeassistant import data_entry_flow
from homeassistant.components import mikrotik
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from tests.common import MockConfigEntry
DEMO_USER_INPUT = {
CONF_NAME: "Home router",
CONF_HOST: "0.0.0.0",
CONF_USERNAME: "username",
CONF_PASSWORD: "password",
CONF_PORT: 8278,
CONF_VERIFY_SSL: False,
}
DEMO_CONFIG = {
CONF_NAME: "Home router",
CONF_HOST: "0.0.0.0",
CONF_USERNAME: "username",
CONF_PASSWORD: "password",
CONF_PORT: 8278,
CONF_VERIFY_SSL: False,
mikrotik.const.CONF_FORCE_DHCP: False,
mikrotik.CONF_ARP_PING: False,
mikrotik.CONF_DETECTION_TIME: timedelta(seconds=30),
}
DEMO_CONFIG_ENTRY = {
CONF_NAME: "Home router",
CONF_HOST: "0.0.0.0",
CONF_USERNAME: "username",
CONF_PASSWORD: "password",
CONF_PORT: 8278,
CONF_VERIFY_SSL: False,
mikrotik.const.CONF_FORCE_DHCP: False,
mikrotik.CONF_ARP_PING: False,
mikrotik.CONF_DETECTION_TIME: 30,
}
@pytest.fixture(name="api")
def mock_mikrotik_api():
"""Mock an api."""
with patch("librouteros.connect"):
yield
@pytest.fixture(name="auth_error")
def mock_api_authentication_error():
"""Mock an api."""
with patch(
"librouteros.connect",
side_effect=librouteros.exceptions.TrapError("invalid user name or password"),
):
yield
@pytest.fixture(name="conn_error")
def mock_api_connection_error():
"""Mock an api."""
with patch(
"librouteros.connect", side_effect=librouteros.exceptions.ConnectionClosed
):
yield
async def test_import(hass, api):
"""Test import step."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "import"}, data=DEMO_CONFIG
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "Home router"
assert result["data"][CONF_NAME] == "Home router"
assert result["data"][CONF_HOST] == "0.0.0.0"
assert result["data"][CONF_USERNAME] == "username"
assert result["data"][CONF_PASSWORD] == "password"
assert result["data"][CONF_PORT] == 8278
assert result["data"][CONF_VERIFY_SSL] is False
async def test_flow_works(hass, api):
"""Test config flow."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "Home router"
assert result["data"][CONF_NAME] == "Home router"
assert result["data"][CONF_HOST] == "0.0.0.0"
assert result["data"][CONF_USERNAME] == "username"
assert result["data"][CONF_PASSWORD] == "password"
assert result["data"][CONF_PORT] == 8278
async def test_options(hass):
"""Test updating options."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=DEMO_CONFIG_ENTRY)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "device_tracker"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
mikrotik.CONF_DETECTION_TIME: 30,
mikrotik.CONF_ARP_PING: True,
mikrotik.const.CONF_FORCE_DHCP: False,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == {
mikrotik.CONF_DETECTION_TIME: 30,
mikrotik.CONF_ARP_PING: True,
mikrotik.const.CONF_FORCE_DHCP: False,
}
async def test_host_already_configured(hass, auth_error):
"""Test host already configured."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=DEMO_CONFIG_ENTRY)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
async def test_name_exists(hass, api):
"""Test name already configured."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=DEMO_CONFIG_ENTRY)
entry.add_to_hass(hass)
user_input = DEMO_USER_INPUT.copy()
user_input[CONF_HOST] = "0.0.0.1"
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=user_input
)
assert result["type"] == "form"
assert result["errors"] == {CONF_NAME: "name_exists"}
async def test_connection_error(hass, conn_error):
"""Test error when connection is unsuccesful."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "cannot_connect"}
async def test_wrong_credentials(hass, auth_error):
"""Test error when credentials are wrong."""
result = await hass.config_entries.flow.async_init(
mikrotik.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {
CONF_USERNAME: "wrong_credentials",
CONF_PASSWORD: "wrong_credentials",
}

View File

@ -0,0 +1,118 @@
"""The tests for the Mikrotik device tracker platform."""
from datetime import timedelta
from homeassistant.components import mikrotik
import homeassistant.components.device_tracker as device_tracker
from homeassistant.helpers import entity_registry
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from . import DEVICE_2_WIRELESS, DHCP_DATA, MOCK_DATA, MOCK_OPTIONS, WIRELESS_DATA
from .test_hub import setup_mikrotik_entry
from tests.common import MockConfigEntry, patch
DEFAULT_DETECTION_TIME = timedelta(seconds=300)
def mock_command(self, cmd, params=None):
"""Mock the Mikrotik command method."""
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.IS_WIRELESS]:
return True
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.DHCP]:
return DHCP_DATA
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.WIRELESS]:
return WIRELESS_DATA
return {}
async def test_platform_manually_configured(hass):
"""Test that nothing happens when configuring mikrotik through device tracker platform."""
assert (
await async_setup_component(
hass,
device_tracker.DOMAIN,
{device_tracker.DOMAIN: {"platform": "mikrotik"}},
)
is False
)
assert mikrotik.DOMAIN not in hass.data
async def test_device_trackers(hass):
"""Test device_trackers created by mikrotik."""
# test devices are added from wireless list only
hub = await setup_mikrotik_entry(hass)
device_1 = hass.states.get("device_tracker.device_1")
assert device_1 is not None
assert device_1.state == "home"
device_2 = hass.states.get("device_tracker.device_2")
assert device_2 is None
with patch.object(mikrotik.hub.MikrotikData, "command", new=mock_command):
# test device_2 is added after connecting to wireless network
WIRELESS_DATA.append(DEVICE_2_WIRELESS)
await hub.async_update()
await hass.async_block_till_done()
device_2 = hass.states.get("device_tracker.device_2")
assert device_2 is not None
assert device_2.state == "home"
# test state remains home if last_seen consider_home_interval
del WIRELESS_DATA[1] # device 2 is removed from wireless list
hub.api.devices["00:00:00:00:00:02"]._last_seen = dt_util.utcnow() - timedelta(
minutes=4
)
await hub.async_update()
await hass.async_block_till_done()
device_2 = hass.states.get("device_tracker.device_2")
assert device_2.state != "not_home"
# test state changes to away if last_seen > consider_home_interval
hub.api.devices["00:00:00:00:00:02"]._last_seen = dt_util.utcnow() - timedelta(
minutes=5
)
await hub.async_update()
await hass.async_block_till_done()
device_2 = hass.states.get("device_tracker.device_2")
assert device_2.state == "not_home"
async def test_restoring_devices(hass):
"""Test restoring existing device_tracker entities if not detected on startup."""
config_entry = MockConfigEntry(
domain=mikrotik.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
registry = await entity_registry.async_get_registry(hass)
registry.async_get_or_create(
device_tracker.DOMAIN,
mikrotik.DOMAIN,
"00:00:00:00:00:01",
suggested_object_id="device_1",
config_entry=config_entry,
)
registry.async_get_or_create(
device_tracker.DOMAIN,
mikrotik.DOMAIN,
"00:00:00:00:00:02",
suggested_object_id="device_2",
config_entry=config_entry,
)
await setup_mikrotik_entry(hass)
# test device_2 which is not in wireless list is restored
device_1 = hass.states.get("device_tracker.device_1")
assert device_1 is not None
assert device_1.state == "home"
device_2 = hass.states.get("device_tracker.device_2")
assert device_2 is not None
assert device_2.state == "not_home"

View File

@ -0,0 +1,179 @@
"""Test Mikrotik hub."""
from asynctest import patch
import librouteros
from homeassistant import config_entries
from homeassistant.components import mikrotik
from . import ARP_DATA, DHCP_DATA, MOCK_DATA, MOCK_OPTIONS, WIRELESS_DATA
from tests.common import MockConfigEntry
async def setup_mikrotik_entry(hass, **kwargs):
"""Set up Mikrotik intergation successfully."""
support_wireless = kwargs.get("support_wireless", True)
dhcp_data = kwargs.get("dhcp_data", DHCP_DATA)
wireless_data = kwargs.get("wireless_data", WIRELESS_DATA)
def mock_command(self, cmd, params=None):
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.IS_WIRELESS]:
return support_wireless
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.DHCP]:
return dhcp_data
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.WIRELESS]:
return wireless_data
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.ARP]:
return ARP_DATA
return {}
config_entry = MockConfigEntry(
domain=mikrotik.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
if "force_dhcp" in kwargs:
config_entry.options["force_dhcp"] = True
if "arp_ping" in kwargs:
config_entry.options["arp_ping"] = True
with patch("librouteros.connect"), patch.object(
mikrotik.hub.MikrotikData, "command", new=mock_command
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return hass.data[mikrotik.DOMAIN][config_entry.entry_id]
async def test_hub_setup_successful(hass):
"""Successful setup of Mikrotik hub."""
with patch(
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup",
return_value=True,
) as forward_entry_setup:
hub = await setup_mikrotik_entry(hass)
assert hub.config_entry.data == {
mikrotik.CONF_NAME: "Mikrotik",
mikrotik.CONF_HOST: "0.0.0.0",
mikrotik.CONF_USERNAME: "user",
mikrotik.CONF_PASSWORD: "pass",
mikrotik.CONF_PORT: 8278,
mikrotik.CONF_VERIFY_SSL: False,
}
assert hub.config_entry.options == {
mikrotik.hub.CONF_FORCE_DHCP: False,
mikrotik.CONF_ARP_PING: False,
mikrotik.CONF_DETECTION_TIME: 300,
}
assert hub.api.available is True
assert hub.signal_update == "mikrotik-update-0.0.0.0"
assert forward_entry_setup.mock_calls[0][1] == (hub.config_entry, "device_tracker")
async def test_hub_setup_failed(hass):
"""Failed setup of Mikrotik hub."""
config_entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA)
config_entry.add_to_hass(hass)
# error when connection fails
with patch(
"librouteros.connect", side_effect=librouteros.exceptions.ConnectionClosed
):
await hass.config_entries.async_setup(config_entry.entry_id)
assert config_entry.state == config_entries.ENTRY_STATE_SETUP_RETRY
# error when username or password is invalid
config_entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup"
) as forward_entry_setup, patch(
"librouteros.connect",
side_effect=librouteros.exceptions.TrapError("invalid user name or password"),
):
result = await hass.config_entries.async_setup(config_entry.entry_id)
assert result is False
assert len(forward_entry_setup.mock_calls) == 0
async def test_update_failed(hass):
"""Test failing to connect during update."""
hub = await setup_mikrotik_entry(hass)
with patch.object(
mikrotik.hub.MikrotikData, "command", side_effect=mikrotik.errors.CannotConnect
):
await hub.async_update()
assert hub.api.available is False
async def test_hub_not_support_wireless(hass):
"""Test updating hub devices when hub doesn't support wireless interfaces."""
# test that the devices are constructed from dhcp data
hub = await setup_mikrotik_entry(hass, support_wireless=False)
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params is None
assert hub.api.devices["00:00:00:00:00:02"]._params == DHCP_DATA[1]
assert hub.api.devices["00:00:00:00:00:02"]._wireless_params is None
async def test_hub_support_wireless(hass):
"""Test updating hub devices when hub support wireless interfaces."""
# test that the device list is from wireless data list
hub = await setup_mikrotik_entry(hass)
assert hub.api.support_wireless is True
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params == WIRELESS_DATA[0]
# devices not in wireless list will not be added
assert "00:00:00:00:00:02" not in hub.api.devices
async def test_force_dhcp(hass):
"""Test updating hub devices with forced dhcp method."""
# test that the devices are constructed from dhcp data
hub = await setup_mikrotik_entry(hass, force_dhcp=True)
assert hub.api.support_wireless is True
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params == WIRELESS_DATA[0]
# devices not in wireless list are added from dhcp
assert hub.api.devices["00:00:00:00:00:02"]._params == DHCP_DATA[1]
assert hub.api.devices["00:00:00:00:00:02"]._wireless_params is None
async def test_arp_ping(hass):
"""Test arp ping devices to confirm they are connected."""
# test device show as home if arp ping returns value
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=True):
hub = await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
assert hub.api.devices["00:00:00:00:00:01"].last_seen is not None
assert hub.api.devices["00:00:00:00:00:02"].last_seen is not None
# test device show as away if arp ping times out
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=False):
hub = await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
assert hub.api.devices["00:00:00:00:00:01"].last_seen is not None
# this device is not wireless so it will show as away
assert hub.api.devices["00:00:00:00:00:02"].last_seen is None

View File

@ -0,0 +1,83 @@
"""Test Mikrotik setup process."""
from unittest.mock import Mock, patch
from homeassistant.components import mikrotik
from homeassistant.setup import async_setup_component
from . import MOCK_DATA
from tests.common import MockConfigEntry, mock_coro
async def test_setup_with_no_config(hass):
"""Test that we do not discover anything or try to set up a hub."""
assert await async_setup_component(hass, mikrotik.DOMAIN, {}) is True
assert mikrotik.DOMAIN not in hass.data
async def test_successful_config_entry(hass):
"""Test config entry successfull setup."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA,)
entry.add_to_hass(hass)
mock_registry = Mock()
with patch.object(mikrotik, "MikrotikHub") as mock_hub, patch(
"homeassistant.helpers.device_registry.async_get_registry",
return_value=mock_coro(mock_registry),
):
mock_hub.return_value.async_setup.return_value = mock_coro(True)
mock_hub.return_value.serial_num = "12345678"
mock_hub.return_value.model = "RB750"
mock_hub.return_value.hostname = "mikrotik"
mock_hub.return_value.firmware = "3.65"
assert await mikrotik.async_setup_entry(hass, entry) is True
assert len(mock_hub.mock_calls) == 2
p_hass, p_entry = mock_hub.mock_calls[0][1]
assert p_hass is hass
assert p_entry is entry
assert len(mock_registry.mock_calls) == 1
assert mock_registry.mock_calls[0][2] == {
"config_entry_id": entry.entry_id,
"connections": {("mikrotik", "12345678")},
"manufacturer": mikrotik.ATTR_MANUFACTURER,
"model": "RB750",
"name": "mikrotik",
"sw_version": "3.65",
}
async def test_hub_fail_setup(hass):
"""Test that a failed setup will not store the hub."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA,)
entry.add_to_hass(hass)
with patch.object(mikrotik, "MikrotikHub") as mock_hub:
mock_hub.return_value.async_setup.return_value = mock_coro(False)
assert await mikrotik.async_setup_entry(hass, entry) is False
assert mikrotik.DOMAIN not in hass.data
async def test_unload_entry(hass):
"""Test being able to unload an entry."""
entry = MockConfigEntry(domain=mikrotik.DOMAIN, data=MOCK_DATA,)
entry.add_to_hass(hass)
with patch.object(mikrotik, "MikrotikHub") as mock_hub, patch(
"homeassistant.helpers.device_registry.async_get_registry",
return_value=mock_coro(Mock()),
):
mock_hub.return_value.async_setup.return_value = mock_coro(True)
mock_hub.return_value.serial_num = "12345678"
mock_hub.return_value.model = "RB750"
mock_hub.return_value.hostname = "mikrotik"
mock_hub.return_value.firmware = "3.65"
assert await mikrotik.async_setup_entry(hass, entry) is True
assert len(mock_hub.return_value.mock_calls) == 1
assert await mikrotik.async_unload_entry(hass, entry)
assert entry.entry_id not in hass.data[mikrotik.DOMAIN]