mirror of
https://github.com/home-assistant/core.git
synced 2025-04-22 16:27:56 +00:00
Add type hints and code cleanup for mikrotik (#74296)
* Add type hints and code cleanup for mikrotik * update test and increase coverage * move setup_mikrotik_entry to __init__.py
This commit is contained in:
parent
110d9232cd
commit
b09aaba421
@ -717,7 +717,6 @@ omit =
|
||||
homeassistant/components/microsoft/tts.py
|
||||
homeassistant/components/miflora/sensor.py
|
||||
homeassistant/components/mikrotik/hub.py
|
||||
homeassistant/components/mikrotik/device_tracker.py
|
||||
homeassistant/components/mill/climate.py
|
||||
homeassistant/components/mill/const.py
|
||||
homeassistant/components/mill/sensor.py
|
||||
|
@ -1,35 +1,41 @@
|
||||
"""The Mikrotik component."""
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.helpers import config_validation as cv, device_registry as dr
|
||||
|
||||
from .const import ATTR_MANUFACTURER, DOMAIN, PLATFORMS
|
||||
from .hub import MikrotikDataUpdateCoordinator
|
||||
from .errors import CannotConnect, LoginError
|
||||
from .hub import MikrotikDataUpdateCoordinator, get_api
|
||||
|
||||
CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
||||
"""Set up the Mikrotik component."""
|
||||
|
||||
hub = MikrotikDataUpdateCoordinator(hass, config_entry)
|
||||
if not await hub.async_setup():
|
||||
try:
|
||||
api = await hass.async_add_executor_job(get_api, dict(config_entry.data))
|
||||
except CannotConnect as api_error:
|
||||
raise ConfigEntryNotReady from api_error
|
||||
except LoginError:
|
||||
return False
|
||||
|
||||
await hub.async_config_entry_first_refresh()
|
||||
coordinator = MikrotikDataUpdateCoordinator(hass, config_entry, api)
|
||||
await hass.async_add_executor_job(coordinator.api.get_hub_details)
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})[config_entry.entry_id] = hub
|
||||
hass.data.setdefault(DOMAIN, {})[config_entry.entry_id] = coordinator
|
||||
|
||||
hass.config_entries.async_setup_platforms(config_entry, PLATFORMS)
|
||||
|
||||
device_registry = dr.async_get(hass)
|
||||
device_registry.async_get_or_create(
|
||||
config_entry_id=config_entry.entry_id,
|
||||
connections={(DOMAIN, hub.serial_num)},
|
||||
connections={(DOMAIN, coordinator.serial_num)},
|
||||
manufacturer=ATTR_MANUFACTURER,
|
||||
model=hub.model,
|
||||
name=hub.hostname,
|
||||
sw_version=hub.firmware,
|
||||
model=coordinator.model,
|
||||
name=coordinator.hostname,
|
||||
sw_version=coordinator.firmware,
|
||||
)
|
||||
|
||||
return True
|
||||
@ -37,10 +43,9 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
|
||||
|
||||
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
||||
"""Unload a config entry."""
|
||||
unload_ok = await hass.config_entries.async_unload_platforms(
|
||||
if unload_ok := await hass.config_entries.async_unload_platforms(
|
||||
config_entry, PLATFORMS
|
||||
)
|
||||
|
||||
hass.data[DOMAIN].pop(config_entry.entry_id)
|
||||
):
|
||||
hass.data[DOMAIN].pop(config_entry.entry_id)
|
||||
|
||||
return unload_ok
|
||||
|
@ -1,6 +1,8 @@
|
||||
"""Config flow for Mikrotik."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
@ -13,6 +15,7 @@ from homeassistant.const import (
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
|
||||
from .const import (
|
||||
CONF_ARP_PING,
|
||||
@ -40,7 +43,9 @@ class MikrotikFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Get the options flow for this handler."""
|
||||
return MikrotikOptionsFlowHandler(config_entry)
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle a flow initialized by the user."""
|
||||
errors = {}
|
||||
if user_input is not None:
|
||||
@ -52,7 +57,7 @@ class MikrotikFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
break
|
||||
|
||||
try:
|
||||
await self.hass.async_add_executor_job(get_api, self.hass, user_input)
|
||||
await self.hass.async_add_executor_job(get_api, user_input)
|
||||
except CannotConnect:
|
||||
errors["base"] = "cannot_connect"
|
||||
except LoginError:
|
||||
@ -86,11 +91,15 @@ class MikrotikOptionsFlowHandler(config_entries.OptionsFlow):
|
||||
"""Initialize Mikrotik options flow."""
|
||||
self.config_entry = config_entry
|
||||
|
||||
async def async_step_init(self, user_input=None):
|
||||
async def async_step_init(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Manage the Mikrotik options."""
|
||||
return await self.async_step_device_tracker()
|
||||
|
||||
async def async_step_device_tracker(self, user_input=None):
|
||||
async def async_step_device_tracker(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Manage the device tracker options."""
|
||||
if user_input is not None:
|
||||
return self.async_create_entry(title="", data=user_input)
|
||||
|
@ -1,33 +1,35 @@
|
||||
"""Constants used in the Mikrotik components."""
|
||||
from typing import Final
|
||||
|
||||
from homeassistant.const import Platform
|
||||
|
||||
DOMAIN = "mikrotik"
|
||||
DEFAULT_NAME = "Mikrotik"
|
||||
DEFAULT_API_PORT = 8728
|
||||
DEFAULT_DETECTION_TIME = 300
|
||||
DOMAIN: Final = "mikrotik"
|
||||
DEFAULT_NAME: Final = "Mikrotik"
|
||||
DEFAULT_API_PORT: Final = 8728
|
||||
DEFAULT_DETECTION_TIME: Final = 300
|
||||
|
||||
ATTR_MANUFACTURER = "Mikrotik"
|
||||
ATTR_SERIAL_NUMBER = "serial-number"
|
||||
ATTR_FIRMWARE = "current-firmware"
|
||||
ATTR_MODEL = "model"
|
||||
ATTR_MANUFACTURER: Final = "Mikrotik"
|
||||
ATTR_SERIAL_NUMBER: Final = "serial-number"
|
||||
ATTR_FIRMWARE: Final = "current-firmware"
|
||||
ATTR_MODEL: Final = "model"
|
||||
|
||||
CONF_ARP_PING = "arp_ping"
|
||||
CONF_FORCE_DHCP = "force_dhcp"
|
||||
CONF_DETECTION_TIME = "detection_time"
|
||||
CONF_ARP_PING: Final = "arp_ping"
|
||||
CONF_FORCE_DHCP: Final = "force_dhcp"
|
||||
CONF_DETECTION_TIME: Final = "detection_time"
|
||||
|
||||
|
||||
NAME = "name"
|
||||
INFO = "info"
|
||||
IDENTITY = "identity"
|
||||
ARP = "arp"
|
||||
NAME: Final = "name"
|
||||
INFO: Final = "info"
|
||||
IDENTITY: Final = "identity"
|
||||
ARP: Final = "arp"
|
||||
|
||||
CAPSMAN = "capsman"
|
||||
DHCP = "dhcp"
|
||||
WIRELESS = "wireless"
|
||||
IS_WIRELESS = "is_wireless"
|
||||
IS_CAPSMAN = "is_capsman"
|
||||
CAPSMAN: Final = "capsman"
|
||||
DHCP: Final = "dhcp"
|
||||
WIRELESS: Final = "wireless"
|
||||
IS_WIRELESS: Final = "is_wireless"
|
||||
IS_CAPSMAN: Final = "is_capsman"
|
||||
|
||||
MIKROTIK_SERVICES = {
|
||||
MIKROTIK_SERVICES: Final = {
|
||||
ARP: "/ip/arp/getall",
|
||||
CAPSMAN: "/caps-man/registration-table/getall",
|
||||
DHCP: "/ip/dhcp-server/lease/getall",
|
||||
@ -38,9 +40,9 @@ MIKROTIK_SERVICES = {
|
||||
IS_CAPSMAN: "/caps-man/interface/print",
|
||||
}
|
||||
|
||||
PLATFORMS = [Platform.DEVICE_TRACKER]
|
||||
PLATFORMS: Final = [Platform.DEVICE_TRACKER]
|
||||
|
||||
ATTR_DEVICE_TRACKER = [
|
||||
ATTR_DEVICE_TRACKER: Final = [
|
||||
"comment",
|
||||
"mac-address",
|
||||
"ssid",
|
||||
|
@ -1,6 +1,8 @@
|
||||
"""Support for Mikrotik routers as device tracker."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.components.device_tracker.config_entry import ScannerEntity
|
||||
from homeassistant.components.device_tracker.const import (
|
||||
DOMAIN as DEVICE_TRACKER,
|
||||
@ -14,7 +16,7 @@ from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .const import DOMAIN
|
||||
from .hub import MikrotikDataUpdateCoordinator
|
||||
from .hub import Device, MikrotikDataUpdateCoordinator
|
||||
|
||||
# These are normalized to ATTR_IP and ATTR_MAC to conform
|
||||
# to device_tracker
|
||||
@ -27,7 +29,9 @@ async def async_setup_entry(
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up device tracker for Mikrotik component."""
|
||||
hub = hass.data[DOMAIN][config_entry.entry_id]
|
||||
coordinator: MikrotikDataUpdateCoordinator = hass.data[DOMAIN][
|
||||
config_entry.entry_id
|
||||
]
|
||||
|
||||
tracked: dict[str, MikrotikDataUpdateCoordinatorTracker] = {}
|
||||
|
||||
@ -42,47 +46,53 @@ async def async_setup_entry(
|
||||
):
|
||||
|
||||
if (
|
||||
entity.unique_id in hub.api.devices
|
||||
or entity.unique_id not in hub.api.all_devices
|
||||
entity.unique_id in coordinator.api.devices
|
||||
or entity.unique_id not in coordinator.api.all_devices
|
||||
):
|
||||
continue
|
||||
hub.api.restore_device(entity.unique_id)
|
||||
coordinator.api.restore_device(entity.unique_id)
|
||||
|
||||
@callback
|
||||
def update_hub():
|
||||
def update_hub() -> None:
|
||||
"""Update the status of the device."""
|
||||
update_items(hub, async_add_entities, tracked)
|
||||
update_items(coordinator, async_add_entities, tracked)
|
||||
|
||||
config_entry.async_on_unload(hub.async_add_listener(update_hub))
|
||||
config_entry.async_on_unload(coordinator.async_add_listener(update_hub))
|
||||
|
||||
update_hub()
|
||||
|
||||
|
||||
@callback
|
||||
def update_items(hub, async_add_entities, tracked):
|
||||
def update_items(
|
||||
coordinator: MikrotikDataUpdateCoordinator,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
tracked: dict[str, MikrotikDataUpdateCoordinatorTracker],
|
||||
):
|
||||
"""Update tracked device state from the hub."""
|
||||
new_tracked = []
|
||||
for mac, device in hub.api.devices.items():
|
||||
new_tracked: list[MikrotikDataUpdateCoordinatorTracker] = []
|
||||
for mac, device in coordinator.api.devices.items():
|
||||
if mac not in tracked:
|
||||
tracked[mac] = MikrotikDataUpdateCoordinatorTracker(device, hub)
|
||||
tracked[mac] = MikrotikDataUpdateCoordinatorTracker(device, coordinator)
|
||||
new_tracked.append(tracked[mac])
|
||||
|
||||
if new_tracked:
|
||||
async_add_entities(new_tracked)
|
||||
|
||||
|
||||
class MikrotikDataUpdateCoordinatorTracker(CoordinatorEntity, ScannerEntity):
|
||||
class MikrotikDataUpdateCoordinatorTracker(
|
||||
CoordinatorEntity[MikrotikDataUpdateCoordinator], ScannerEntity
|
||||
):
|
||||
"""Representation of network device."""
|
||||
|
||||
coordinator: MikrotikDataUpdateCoordinator
|
||||
|
||||
def __init__(self, device, hub):
|
||||
def __init__(
|
||||
self, device: Device, coordinator: MikrotikDataUpdateCoordinator
|
||||
) -> None:
|
||||
"""Initialize the tracked device."""
|
||||
super().__init__(hub)
|
||||
super().__init__(coordinator)
|
||||
self.device = device
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
def is_connected(self) -> bool:
|
||||
"""Return true if the client is connected to the network."""
|
||||
if (
|
||||
self.device.last_seen
|
||||
@ -93,7 +103,7 @@ class MikrotikDataUpdateCoordinatorTracker(CoordinatorEntity, ScannerEntity):
|
||||
return False
|
||||
|
||||
@property
|
||||
def source_type(self):
|
||||
def source_type(self) -> str:
|
||||
"""Return the source type of the client."""
|
||||
return SOURCE_TYPE_ROUTER
|
||||
|
||||
@ -124,7 +134,7 @@ class MikrotikDataUpdateCoordinatorTracker(CoordinatorEntity, ScannerEntity):
|
||||
return self.device.mac
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self):
|
||||
def extra_state_attributes(self) -> dict[str, Any] | None:
|
||||
"""Return the device state attributes."""
|
||||
if self.is_connected:
|
||||
return {k: v for k, v in self.device.attrs.items() if k not in FILTER_ATTRS}
|
||||
|
@ -1,14 +1,18 @@
|
||||
"""The Mikrotik router class."""
|
||||
from datetime import timedelta
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
import socket
|
||||
import ssl
|
||||
from typing import Any
|
||||
|
||||
import librouteros
|
||||
from librouteros.login import plain as login_plain, token as login_token
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_VERIFY_SSL
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
from homeassistant.util import slugify
|
||||
import homeassistant.util.dt as dt_util
|
||||
@ -42,36 +46,36 @@ _LOGGER = logging.getLogger(__name__)
|
||||
class Device:
|
||||
"""Represents a network device."""
|
||||
|
||||
def __init__(self, mac, params):
|
||||
def __init__(self, mac: str, params: dict[str, Any]) -> None:
|
||||
"""Initialize the network device."""
|
||||
self._mac = mac
|
||||
self._params = params
|
||||
self._last_seen = None
|
||||
self._attrs = {}
|
||||
self._wireless_params = None
|
||||
self._last_seen: datetime | None = None
|
||||
self._attrs: dict[str, Any] = {}
|
||||
self._wireless_params: dict[str, Any] = {}
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
"""Return device name."""
|
||||
return self._params.get("host-name", self.mac)
|
||||
|
||||
@property
|
||||
def ip_address(self):
|
||||
def ip_address(self) -> str:
|
||||
"""Return device primary ip address."""
|
||||
return self._params.get("address")
|
||||
return self._params["address"]
|
||||
|
||||
@property
|
||||
def mac(self):
|
||||
def mac(self) -> str:
|
||||
"""Return device mac."""
|
||||
return self._mac
|
||||
|
||||
@property
|
||||
def last_seen(self):
|
||||
def last_seen(self) -> datetime | None:
|
||||
"""Return device last seen."""
|
||||
return self._last_seen
|
||||
|
||||
@property
|
||||
def attrs(self):
|
||||
def attrs(self) -> dict[str, Any]:
|
||||
"""Return device attributes."""
|
||||
attr_data = self._wireless_params if self._wireless_params else self._params
|
||||
for attr in ATTR_DEVICE_TRACKER:
|
||||
@ -80,7 +84,12 @@ class Device:
|
||||
self._attrs["ip_address"] = self._params.get("active-address")
|
||||
return self._attrs
|
||||
|
||||
def update(self, wireless_params=None, params=None, active=False):
|
||||
def update(
|
||||
self,
|
||||
wireless_params: dict[str, Any] | None = None,
|
||||
params: dict[str, Any] | None = None,
|
||||
active: bool = False,
|
||||
) -> None:
|
||||
"""Update Device params."""
|
||||
if wireless_params:
|
||||
self._wireless_params = wireless_params
|
||||
@ -93,27 +102,26 @@ class Device:
|
||||
class MikrotikData:
|
||||
"""Handle all communication with the Mikrotik API."""
|
||||
|
||||
def __init__(self, hass, config_entry, api):
|
||||
def __init__(
|
||||
self, hass: HomeAssistant, config_entry: ConfigEntry, api: librouteros.Api
|
||||
) -> None:
|
||||
"""Initialize the Mikrotik Client."""
|
||||
self.hass = hass
|
||||
self.config_entry = config_entry
|
||||
self.api = api
|
||||
self._host = self.config_entry.data[CONF_HOST]
|
||||
self.all_devices = {}
|
||||
self.devices = {}
|
||||
self.available = True
|
||||
self.support_capsman = False
|
||||
self.support_wireless = False
|
||||
self.hostname = None
|
||||
self.model = None
|
||||
self.firmware = None
|
||||
self.serial_number = None
|
||||
self._host: str = self.config_entry.data[CONF_HOST]
|
||||
self.all_devices: dict[str, dict[str, Any]] = {}
|
||||
self.devices: dict[str, Device] = {}
|
||||
self.support_capsman: bool = False
|
||||
self.support_wireless: bool = False
|
||||
self.hostname: str = ""
|
||||
self.model: str = ""
|
||||
self.firmware: str = ""
|
||||
self.serial_number: str = ""
|
||||
|
||||
@staticmethod
|
||||
def load_mac(devices=None):
|
||||
def load_mac(devices: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||||
"""Load dictionary using MAC address as key."""
|
||||
if not devices:
|
||||
return None
|
||||
mac_devices = {}
|
||||
for device in devices:
|
||||
if "mac-address" in device:
|
||||
@ -122,26 +130,23 @@ class MikrotikData:
|
||||
return mac_devices
|
||||
|
||||
@property
|
||||
def arp_enabled(self):
|
||||
def arp_enabled(self) -> bool:
|
||||
"""Return arp_ping option setting."""
|
||||
return self.config_entry.options[CONF_ARP_PING]
|
||||
return self.config_entry.options.get(CONF_ARP_PING, False)
|
||||
|
||||
@property
|
||||
def force_dhcp(self):
|
||||
def force_dhcp(self) -> bool:
|
||||
"""Return force_dhcp option setting."""
|
||||
return self.config_entry.options[CONF_FORCE_DHCP]
|
||||
return self.config_entry.options.get(CONF_FORCE_DHCP, False)
|
||||
|
||||
def get_info(self, param):
|
||||
def get_info(self, param: str) -> str:
|
||||
"""Return device model name."""
|
||||
cmd = IDENTITY if param == NAME else INFO
|
||||
data = self.command(MIKROTIK_SERVICES[cmd])
|
||||
return (
|
||||
data[0].get(param) # pylint: disable=unsubscriptable-object
|
||||
if data
|
||||
else None
|
||||
)
|
||||
if data := self.command(MIKROTIK_SERVICES[cmd]):
|
||||
return str(data[0].get(param))
|
||||
return ""
|
||||
|
||||
def get_hub_details(self):
|
||||
def get_hub_details(self) -> None:
|
||||
"""Get Hub info."""
|
||||
self.hostname = self.get_info(NAME)
|
||||
self.model = self.get_info(ATTR_MODEL)
|
||||
@ -150,24 +155,17 @@ class MikrotikData:
|
||||
self.support_capsman = bool(self.command(MIKROTIK_SERVICES[IS_CAPSMAN]))
|
||||
self.support_wireless = bool(self.command(MIKROTIK_SERVICES[IS_WIRELESS]))
|
||||
|
||||
def connect_to_hub(self):
|
||||
"""Connect to hub."""
|
||||
try:
|
||||
self.api = get_api(self.hass, self.config_entry.data)
|
||||
return True
|
||||
except (LoginError, CannotConnect):
|
||||
return False
|
||||
|
||||
def get_list_from_interface(self, interface):
|
||||
def get_list_from_interface(self, interface: str) -> dict[str, dict[str, Any]]:
|
||||
"""Get devices from interface."""
|
||||
result = self.command(MIKROTIK_SERVICES[interface])
|
||||
return self.load_mac(result) if result else {}
|
||||
if result := self.command(MIKROTIK_SERVICES[interface]):
|
||||
return self.load_mac(result)
|
||||
return {}
|
||||
|
||||
def restore_device(self, mac):
|
||||
def restore_device(self, mac: str) -> None:
|
||||
"""Restore a missing device after restart."""
|
||||
self.devices[mac] = Device(mac, self.all_devices[mac])
|
||||
|
||||
def update_devices(self):
|
||||
def update_devices(self) -> None:
|
||||
"""Get list of devices with latest status."""
|
||||
arp_devices = {}
|
||||
device_list = {}
|
||||
@ -192,7 +190,7 @@ class MikrotikData:
|
||||
# get new hub firmware version if updated
|
||||
self.firmware = self.get_info(ATTR_FIRMWARE)
|
||||
|
||||
except (CannotConnect, socket.timeout, OSError) as err:
|
||||
except (CannotConnect, LoginError) as err:
|
||||
raise UpdateFailed from err
|
||||
|
||||
if not device_list:
|
||||
@ -218,11 +216,12 @@ class MikrotikData:
|
||||
active = True
|
||||
if self.arp_enabled and mac in arp_devices:
|
||||
active = self.do_arp_ping(
|
||||
params.get("active-address"), arp_devices[mac].get("interface")
|
||||
str(params.get("active-address")),
|
||||
str(arp_devices[mac].get("interface")),
|
||||
)
|
||||
self.devices[mac].update(active=active)
|
||||
|
||||
def do_arp_ping(self, ip_address, interface):
|
||||
def do_arp_ping(self, ip_address: str, interface: str) -> bool:
|
||||
"""Attempt to arp ping MAC address via interface."""
|
||||
_LOGGER.debug("pinging - %s", ip_address)
|
||||
params = {
|
||||
@ -234,9 +233,9 @@ class MikrotikData:
|
||||
}
|
||||
cmd = "/ping"
|
||||
data = self.command(cmd, params)
|
||||
if data is not None:
|
||||
if data:
|
||||
status = 0
|
||||
for result in data: # pylint: disable=not-an-iterable
|
||||
for result in data:
|
||||
if "status" in result:
|
||||
status += 1
|
||||
if status == len(data):
|
||||
@ -246,22 +245,25 @@ class MikrotikData:
|
||||
return False
|
||||
return True
|
||||
|
||||
def command(self, cmd, params=None):
|
||||
def command(
|
||||
self, cmd: str, params: dict[str, Any] | None = None
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Retrieve data from Mikrotik API."""
|
||||
try:
|
||||
_LOGGER.info("Running command %s", cmd)
|
||||
if params:
|
||||
response = list(self.api(cmd=cmd, **params))
|
||||
else:
|
||||
response = list(self.api(cmd=cmd))
|
||||
return list(self.api(cmd=cmd, **params))
|
||||
return list(self.api(cmd=cmd))
|
||||
except (
|
||||
librouteros.exceptions.ConnectionClosed,
|
||||
OSError,
|
||||
socket.timeout,
|
||||
) as api_error:
|
||||
_LOGGER.error("Mikrotik %s connection error %s", self._host, api_error)
|
||||
if not self.connect_to_hub():
|
||||
raise CannotConnect from api_error
|
||||
# try to reconnect
|
||||
self.api = get_api(dict(self.config_entry.data))
|
||||
# we still have to raise CannotConnect to fail the update.
|
||||
raise CannotConnect from api_error
|
||||
except librouteros.exceptions.ProtocolError as api_error:
|
||||
_LOGGER.warning(
|
||||
"Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s",
|
||||
@ -269,106 +271,71 @@ class MikrotikData:
|
||||
cmd,
|
||||
api_error,
|
||||
)
|
||||
return None
|
||||
|
||||
return response if response else None
|
||||
return []
|
||||
|
||||
|
||||
class MikrotikDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
"""Mikrotik Hub Object."""
|
||||
|
||||
def __init__(self, hass, config_entry):
|
||||
def __init__(
|
||||
self, hass: HomeAssistant, config_entry: ConfigEntry, api: librouteros.Api
|
||||
) -> None:
|
||||
"""Initialize the Mikrotik Client."""
|
||||
self.hass = hass
|
||||
self.config_entry = config_entry
|
||||
self._mk_data = None
|
||||
self.config_entry: ConfigEntry = config_entry
|
||||
self._mk_data = MikrotikData(self.hass, self.config_entry, api)
|
||||
super().__init__(
|
||||
self.hass,
|
||||
_LOGGER,
|
||||
name=f"{DOMAIN} - {self.host}",
|
||||
update_method=self.async_update,
|
||||
update_interval=timedelta(seconds=10),
|
||||
)
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
def host(self) -> str:
|
||||
"""Return the host of this hub."""
|
||||
return self.config_entry.data[CONF_HOST]
|
||||
|
||||
@property
|
||||
def hostname(self):
|
||||
def hostname(self) -> str:
|
||||
"""Return the hostname of the hub."""
|
||||
return self._mk_data.hostname
|
||||
|
||||
@property
|
||||
def model(self):
|
||||
def model(self) -> str:
|
||||
"""Return the model of the hub."""
|
||||
return self._mk_data.model
|
||||
|
||||
@property
|
||||
def firmware(self):
|
||||
def firmware(self) -> str:
|
||||
"""Return the firmware of the hub."""
|
||||
return self._mk_data.firmware
|
||||
|
||||
@property
|
||||
def serial_num(self):
|
||||
def serial_num(self) -> str:
|
||||
"""Return the serial number of the hub."""
|
||||
return self._mk_data.serial_number
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
"""Return if the hub is connected."""
|
||||
return self._mk_data.available
|
||||
|
||||
@property
|
||||
def option_detection_time(self):
|
||||
def option_detection_time(self) -> timedelta:
|
||||
"""Config entry option defining number of seconds from last seen to away."""
|
||||
return timedelta(seconds=self.config_entry.options[CONF_DETECTION_TIME])
|
||||
return timedelta(
|
||||
seconds=self.config_entry.options.get(
|
||||
CONF_DETECTION_TIME, DEFAULT_DETECTION_TIME
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def api(self):
|
||||
def api(self) -> MikrotikData:
|
||||
"""Represent Mikrotik data object."""
|
||||
return self._mk_data
|
||||
|
||||
async def async_add_options(self):
|
||||
"""Populate default options for Mikrotik."""
|
||||
if not self.config_entry.options:
|
||||
data = dict(self.config_entry.data)
|
||||
options = {
|
||||
CONF_ARP_PING: data.pop(CONF_ARP_PING, False),
|
||||
CONF_FORCE_DHCP: data.pop(CONF_FORCE_DHCP, False),
|
||||
CONF_DETECTION_TIME: data.pop(
|
||||
CONF_DETECTION_TIME, DEFAULT_DETECTION_TIME
|
||||
),
|
||||
}
|
||||
|
||||
self.hass.config_entries.async_update_entry(
|
||||
self.config_entry, data=data, options=options
|
||||
)
|
||||
|
||||
async def async_update(self):
|
||||
async def _async_update_data(self) -> None:
|
||||
"""Update Mikrotik devices information."""
|
||||
await self.hass.async_add_executor_job(self._mk_data.update_devices)
|
||||
|
||||
async def async_setup(self):
|
||||
"""Set up the Mikrotik hub."""
|
||||
try:
|
||||
api = await self.hass.async_add_executor_job(
|
||||
get_api, self.hass, self.config_entry.data
|
||||
)
|
||||
except CannotConnect as api_error:
|
||||
raise ConfigEntryNotReady from api_error
|
||||
except LoginError:
|
||||
return False
|
||||
|
||||
self._mk_data = MikrotikData(self.hass, self.config_entry, api)
|
||||
await self.async_add_options()
|
||||
await self.hass.async_add_executor_job(self._mk_data.get_hub_details)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def get_api(hass, entry):
|
||||
def get_api(entry: dict[str, Any]) -> librouteros.Api:
|
||||
"""Connect to Mikrotik hub."""
|
||||
_LOGGER.debug("Connecting to Mikrotik hub [%s]", entry[CONF_HOST])
|
||||
|
||||
|
@ -1,4 +1,7 @@
|
||||
"""Tests for the Mikrotik component."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant.components import mikrotik
|
||||
from homeassistant.components.mikrotik.const import (
|
||||
CONF_ARP_PING,
|
||||
CONF_DETECTION_TIME,
|
||||
@ -14,6 +17,8 @@ from homeassistant.const import (
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
MOCK_DATA = {
|
||||
CONF_NAME: "Mikrotik",
|
||||
CONF_HOST: "0.0.0.0",
|
||||
@ -130,3 +135,38 @@ ARP_DATA = [
|
||||
"disabled": False,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
async def setup_mikrotik_entry(hass, **kwargs):
|
||||
"""Set up Mikrotik integration successfully."""
|
||||
support_wireless = kwargs.get("support_wireless", True)
|
||||
dhcp_data = kwargs.get("dhcp_data", DHCP_DATA)
|
||||
wireless_data = kwargs.get("wireless_data", WIRELESS_DATA)
|
||||
|
||||
def mock_command(self, cmd, params=None):
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.IS_WIRELESS]:
|
||||
return support_wireless
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.DHCP]:
|
||||
return dhcp_data
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.WIRELESS]:
|
||||
return wireless_data
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.ARP]:
|
||||
return ARP_DATA
|
||||
return {}
|
||||
|
||||
config_entry = MockConfigEntry(
|
||||
domain=mikrotik.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
if "force_dhcp" in kwargs:
|
||||
config_entry.options = {**config_entry.options, "force_dhcp": True}
|
||||
|
||||
if "arp_ping" in kwargs:
|
||||
config_entry.options = {**config_entry.options, "arp_ping": True}
|
||||
|
||||
with patch("librouteros.connect"), patch.object(
|
||||
mikrotik.hub.MikrotikData, "command", new=mock_command
|
||||
):
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
@ -1,13 +1,14 @@
|
||||
"""The tests for the Mikrotik device tracker platform."""
|
||||
from datetime import timedelta
|
||||
|
||||
from freezegun import freeze_time
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import mikrotik
|
||||
import homeassistant.components.device_tracker as device_tracker
|
||||
from homeassistant.const import STATE_UNAVAILABLE
|
||||
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
||||
from homeassistant.setup import async_setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant.util.dt import utcnow
|
||||
|
||||
from . import (
|
||||
DEVICE_2_WIRELESS,
|
||||
@ -17,12 +18,10 @@ from . import (
|
||||
MOCK_DATA,
|
||||
MOCK_OPTIONS,
|
||||
WIRELESS_DATA,
|
||||
setup_mikrotik_entry,
|
||||
)
|
||||
from .test_hub import setup_mikrotik_entry
|
||||
|
||||
from tests.common import MockConfigEntry, patch
|
||||
|
||||
DEFAULT_DETECTION_TIME = timedelta(seconds=300)
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed, patch
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -56,24 +55,11 @@ def mock_command(self, cmd, params=None):
|
||||
return {}
|
||||
|
||||
|
||||
async def test_platform_manually_configured(hass):
|
||||
"""Test that nothing happens when configuring mikrotik through device tracker platform."""
|
||||
assert (
|
||||
await async_setup_component(
|
||||
hass,
|
||||
device_tracker.DOMAIN,
|
||||
{device_tracker.DOMAIN: {"platform": "mikrotik"}},
|
||||
)
|
||||
is False
|
||||
)
|
||||
assert mikrotik.DOMAIN not in hass.data
|
||||
|
||||
|
||||
async def test_device_trackers(hass, mock_device_registry_devices):
|
||||
"""Test device_trackers created by mikrotik."""
|
||||
|
||||
# test devices are added from wireless list only
|
||||
hub = await setup_mikrotik_entry(hass)
|
||||
await setup_mikrotik_entry(hass)
|
||||
|
||||
device_1 = hass.states.get("device_tracker.device_1")
|
||||
assert device_1 is not None
|
||||
@ -90,7 +76,7 @@ async def test_device_trackers(hass, mock_device_registry_devices):
|
||||
# test device_2 is added after connecting to wireless network
|
||||
WIRELESS_DATA.append(DEVICE_2_WIRELESS)
|
||||
|
||||
await hub.async_refresh()
|
||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=10))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
@ -104,26 +90,72 @@ async def test_device_trackers(hass, mock_device_registry_devices):
|
||||
|
||||
# test state remains home if last_seen consider_home_interval
|
||||
del WIRELESS_DATA[1] # device 2 is removed from wireless list
|
||||
hub.api.devices["00:00:00:00:00:02"]._last_seen = dt_util.utcnow() - timedelta(
|
||||
minutes=4
|
||||
)
|
||||
await hub.async_update()
|
||||
await hass.async_block_till_done()
|
||||
with freeze_time(utcnow() + timedelta(minutes=4)):
|
||||
async_fire_time_changed(hass, utcnow() + timedelta(minutes=4))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
assert device_2.state != "not_home"
|
||||
assert device_2.state == "home"
|
||||
|
||||
# test state changes to away if last_seen > consider_home_interval
|
||||
hub.api.devices["00:00:00:00:00:02"]._last_seen = dt_util.utcnow() - timedelta(
|
||||
minutes=5
|
||||
)
|
||||
await hub.async_refresh()
|
||||
await hass.async_block_till_done()
|
||||
with freeze_time(utcnow() + timedelta(minutes=6)):
|
||||
async_fire_time_changed(hass, utcnow() + timedelta(minutes=6))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
assert device_2.state == "not_home"
|
||||
|
||||
|
||||
async def test_force_dhcp(hass, mock_device_registry_devices):
|
||||
"""Test updating hub that supports wireless with forced dhcp method."""
|
||||
|
||||
# hub supports wireless by default, force_dhcp is enabled to override
|
||||
await setup_mikrotik_entry(hass, force_dhcp=False)
|
||||
device_1 = hass.states.get("device_tracker.device_1")
|
||||
assert device_1
|
||||
assert device_1.state == "home"
|
||||
# device_2 is not on the wireless list but it is still added from DHCP
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
assert device_2
|
||||
assert device_2.state == "home"
|
||||
|
||||
|
||||
async def test_hub_not_support_wireless(hass, mock_device_registry_devices):
|
||||
"""Test device_trackers created when hub doesn't support wireless."""
|
||||
|
||||
await setup_mikrotik_entry(hass, support_wireless=False)
|
||||
device_1 = hass.states.get("device_tracker.device_1")
|
||||
assert device_1
|
||||
assert device_1.state == "home"
|
||||
# device_2 is added from DHCP
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
assert device_2
|
||||
assert device_2.state == "home"
|
||||
|
||||
|
||||
async def test_arp_ping_success(hass, mock_device_registry_devices):
|
||||
"""Test arp ping devices to confirm they are connected."""
|
||||
|
||||
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=True):
|
||||
await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
|
||||
|
||||
# test wired device_2 show as home if arp ping returns True
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
assert device_2
|
||||
assert device_2.state == "home"
|
||||
|
||||
|
||||
async def test_arp_ping_timeout(hass, mock_device_registry_devices):
|
||||
"""Test arp ping timeout so devices are shown away."""
|
||||
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=False):
|
||||
await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
|
||||
|
||||
# test wired device_2 show as not_home if arp ping times out
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
assert device_2
|
||||
assert device_2.state == "not_home"
|
||||
|
||||
|
||||
async def test_device_trackers_numerical_name(hass, mock_device_registry_devices):
|
||||
"""Test device_trackers created by mikrotik with numerical device name."""
|
||||
|
||||
@ -164,6 +196,13 @@ async def test_restoring_devices(hass):
|
||||
suggested_object_id="device_2",
|
||||
config_entry=config_entry,
|
||||
)
|
||||
registry.async_get_or_create(
|
||||
device_tracker.DOMAIN,
|
||||
mikrotik.DOMAIN,
|
||||
"00:00:00:00:00:03",
|
||||
suggested_object_id="device_3",
|
||||
config_entry=config_entry,
|
||||
)
|
||||
|
||||
await setup_mikrotik_entry(hass)
|
||||
|
||||
@ -174,3 +213,22 @@ async def test_restoring_devices(hass):
|
||||
device_2 = hass.states.get("device_tracker.device_2")
|
||||
assert device_2 is not None
|
||||
assert device_2.state == "not_home"
|
||||
# device_3 is not on the DHCP list or wireless list
|
||||
# so it won't be restored.
|
||||
device_3 = hass.states.get("device_tracker.device_3")
|
||||
assert device_3 is None
|
||||
|
||||
|
||||
async def test_update_failed(hass, mock_device_registry_devices):
|
||||
"""Test failing to connect during update."""
|
||||
|
||||
await setup_mikrotik_entry(hass)
|
||||
|
||||
with patch.object(
|
||||
mikrotik.hub.MikrotikData, "command", side_effect=mikrotik.errors.CannotConnect
|
||||
):
|
||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=10))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
device_1 = hass.states.get("device_tracker.device_1")
|
||||
assert device_1.state == STATE_UNAVAILABLE
|
||||
|
@ -1,120 +0,0 @@
|
||||
"""Test Mikrotik hub."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant.components import mikrotik
|
||||
|
||||
from . import ARP_DATA, DHCP_DATA, MOCK_DATA, MOCK_OPTIONS, WIRELESS_DATA
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
async def setup_mikrotik_entry(hass, **kwargs):
|
||||
"""Set up Mikrotik integration successfully."""
|
||||
support_wireless = kwargs.get("support_wireless", True)
|
||||
dhcp_data = kwargs.get("dhcp_data", DHCP_DATA)
|
||||
wireless_data = kwargs.get("wireless_data", WIRELESS_DATA)
|
||||
|
||||
def mock_command(self, cmd, params=None):
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.IS_WIRELESS]:
|
||||
return support_wireless
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.DHCP]:
|
||||
return dhcp_data
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.WIRELESS]:
|
||||
return wireless_data
|
||||
if cmd == mikrotik.const.MIKROTIK_SERVICES[mikrotik.const.ARP]:
|
||||
return ARP_DATA
|
||||
return {}
|
||||
|
||||
config_entry = MockConfigEntry(
|
||||
domain=mikrotik.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
if "force_dhcp" in kwargs:
|
||||
config_entry.options = {**config_entry.options, "force_dhcp": True}
|
||||
|
||||
if "arp_ping" in kwargs:
|
||||
config_entry.options = {**config_entry.options, "arp_ping": True}
|
||||
|
||||
with patch("librouteros.connect"), patch.object(
|
||||
mikrotik.hub.MikrotikData, "command", new=mock_command
|
||||
):
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
return hass.data[mikrotik.DOMAIN][config_entry.entry_id]
|
||||
|
||||
|
||||
async def test_update_failed(hass):
|
||||
"""Test failing to connect during update."""
|
||||
|
||||
hub = await setup_mikrotik_entry(hass)
|
||||
|
||||
with patch.object(
|
||||
mikrotik.hub.MikrotikData, "command", side_effect=mikrotik.errors.CannotConnect
|
||||
):
|
||||
await hub.async_refresh()
|
||||
|
||||
assert not hub.last_update_success
|
||||
|
||||
|
||||
async def test_hub_not_support_wireless(hass):
|
||||
"""Test updating hub devices when hub doesn't support wireless interfaces."""
|
||||
|
||||
# test that the devices are constructed from dhcp data
|
||||
|
||||
hub = await setup_mikrotik_entry(hass, support_wireless=False)
|
||||
|
||||
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
|
||||
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params is None
|
||||
assert hub.api.devices["00:00:00:00:00:02"]._params == DHCP_DATA[1]
|
||||
assert hub.api.devices["00:00:00:00:00:02"]._wireless_params is None
|
||||
|
||||
|
||||
async def test_hub_support_wireless(hass):
|
||||
"""Test updating hub devices when hub support wireless interfaces."""
|
||||
|
||||
# test that the device list is from wireless data list
|
||||
|
||||
hub = await setup_mikrotik_entry(hass)
|
||||
|
||||
assert hub.api.support_wireless is True
|
||||
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
|
||||
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params == WIRELESS_DATA[0]
|
||||
|
||||
# devices not in wireless list will not be added
|
||||
assert "00:00:00:00:00:02" not in hub.api.devices
|
||||
|
||||
|
||||
async def test_force_dhcp(hass):
|
||||
"""Test updating hub devices with forced dhcp method."""
|
||||
|
||||
# test that the devices are constructed from dhcp data
|
||||
|
||||
hub = await setup_mikrotik_entry(hass, force_dhcp=True)
|
||||
|
||||
assert hub.api.support_wireless is True
|
||||
assert hub.api.devices["00:00:00:00:00:01"]._params == DHCP_DATA[0]
|
||||
assert hub.api.devices["00:00:00:00:00:01"]._wireless_params == WIRELESS_DATA[0]
|
||||
|
||||
# devices not in wireless list are added from dhcp
|
||||
assert hub.api.devices["00:00:00:00:00:02"]._params == DHCP_DATA[1]
|
||||
assert hub.api.devices["00:00:00:00:00:02"]._wireless_params is None
|
||||
|
||||
|
||||
async def test_arp_ping(hass):
|
||||
"""Test arp ping devices to confirm they are connected."""
|
||||
|
||||
# test device show as home if arp ping returns value
|
||||
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=True):
|
||||
hub = await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
|
||||
|
||||
assert hub.api.devices["00:00:00:00:00:01"].last_seen is not None
|
||||
assert hub.api.devices["00:00:00:00:00:02"].last_seen is not None
|
||||
|
||||
# test device show as away if arp ping times out
|
||||
with patch.object(mikrotik.hub.MikrotikData, "do_arp_ping", return_value=False):
|
||||
hub = await setup_mikrotik_entry(hass, arp_ping=True, force_dhcp=True)
|
||||
|
||||
assert hub.api.devices["00:00:00:00:00:01"].last_seen is not None
|
||||
# this device is not wireless so it will show as away
|
||||
assert hub.api.devices["00:00:00:00:00:02"].last_seen is None
|
@ -39,7 +39,6 @@ async def test_successful_config_entry(hass):
|
||||
|
||||
await hass.config_entries.async_setup(entry.entry_id)
|
||||
assert entry.state == ConfigEntryState.LOADED
|
||||
assert hass.data[DOMAIN][entry.entry_id]
|
||||
|
||||
|
||||
async def test_hub_conn_error(hass, mock_api):
|
||||
|
Loading…
x
Reference in New Issue
Block a user