mirror of
https://github.com/home-assistant/core.git
synced 2025-07-14 16:57:10 +00:00
Modernize Huawei LTE (#26675)
* Modernization rework - config entry support, with override support from huawei_lte platform in YAML - device tracker entity registry support - refactor for easier addition of more features - internal code cleanups * Remove log level dependent subscription/data debug hack No longer needed, because pretty much all keys from supported categories are exposed as sensors. Closes https://github.com/home-assistant/home-assistant/issues/23819 * Upgrade huawei-lte-api to 1.4.1 https://github.com/Salamek/huawei-lte-api/releases * Add support for access without username and password * Use subclass init instead of config_entries.HANDLERS * Update huawei-lte-api to 1.4.3 (#27269) * Convert device state attributes to snake_case * Simplify scanner entity initialization * Remove not needed hass reference from Router * Return explicit None from unsupported old device tracker setup * Mark unknown connection errors during config as such * Drop some dead config flow code * Run config flow sync I/O in executor * Parametrize config flow login error tests * Forward entry unload to platforms * Async/sync fixups * Improve data subscription debug logging * Implement on the fly add of new and tracking of seen device tracker entities * Handle device tracker entry unload cleanup in component * Remove unnecessary _async_setup_lte, just have code in async_setup_entry * Remove time tracker on unload * Fix to not use same mutable default subscription set for all routers * Pylint fixes * Remove some redundant defensive device tracker code * Add back explicit get_scanner None return, hush pylint * Adjust approach to set system_options on entry create * Enable some sensors on first add instead of disabling everything * Fix SMS notification recipients default value * Add option to skip new device tracker entities * Fix SMS notification recipient option default * Work around https://github.com/PyCQA/pylint/issues/3202 * Remove unrelated type hint additions * Change async_add_new_entities to a regular function * Remove option to disable polling for new device tracker entries
This commit is contained in:
parent
969322e14a
commit
fc09702cc3
39
homeassistant/components/huawei_lte/.translations/en.json
Normal file
39
homeassistant/components/huawei_lte/.translations/en.json
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
{
|
||||||
|
"config": {
|
||||||
|
"abort": {
|
||||||
|
"already_configured": "This device is already configured"
|
||||||
|
},
|
||||||
|
"error": {
|
||||||
|
"connection_failed": "Connection failed",
|
||||||
|
"incorrect_password": "Incorrect password",
|
||||||
|
"incorrect_username": "Incorrect username",
|
||||||
|
"incorrect_username_or_password": "Incorrect username or password",
|
||||||
|
"invalid_url": "Invalid URL",
|
||||||
|
"login_attempts_exceeded": "Maximum login attempts exceeded, please try again later",
|
||||||
|
"response_error": "Unknown error from device",
|
||||||
|
"unknown_connection_error": "Unknown error connecting to device"
|
||||||
|
},
|
||||||
|
"step": {
|
||||||
|
"user": {
|
||||||
|
"data": {
|
||||||
|
"password": "Password",
|
||||||
|
"url": "URL",
|
||||||
|
"username": "User name"
|
||||||
|
},
|
||||||
|
"description": "Enter device access details. Specifying username and password is optional, but enables support for more integration features. On the other hand, use of an authorized connection may cause problems accessing the device web interface from outside Home Assistant while the integration is active, and the other way around.",
|
||||||
|
"title": "Configure Huawei LTE"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"title": "Huawei LTE"
|
||||||
|
},
|
||||||
|
"options": {
|
||||||
|
"step": {
|
||||||
|
"init": {
|
||||||
|
"data": {
|
||||||
|
"recipient": "SMS notification recipients",
|
||||||
|
"track_new_devices": "Track new devices"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,34 +1,57 @@
|
|||||||
"""Support for Huawei LTE routers."""
|
"""Support for Huawei LTE routers."""
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from functools import reduce
|
from functools import partial
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
import ipaddress
|
import ipaddress
|
||||||
import logging
|
import logging
|
||||||
import operator
|
from typing import Any, Callable, Dict, List, Set
|
||||||
from typing import Any, Callable
|
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
import attr
|
import attr
|
||||||
from getmac import get_mac_address
|
from getmac import get_mac_address
|
||||||
from huawei_lte_api.AuthorizedConnection import AuthorizedConnection
|
from huawei_lte_api.AuthorizedConnection import AuthorizedConnection
|
||||||
from huawei_lte_api.Client import Client
|
from huawei_lte_api.Client import Client
|
||||||
from huawei_lte_api.exceptions import ResponseErrorNotSupportedException
|
from huawei_lte_api.Connection import Connection
|
||||||
|
from huawei_lte_api.exceptions import (
|
||||||
|
ResponseErrorLoginRequiredException,
|
||||||
|
ResponseErrorNotSupportedException,
|
||||||
|
)
|
||||||
|
from url_normalize import url_normalize
|
||||||
|
|
||||||
|
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER_DOMAIN
|
||||||
|
from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN
|
||||||
|
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
||||||
|
from homeassistant.config_entries import ConfigEntry, SOURCE_IMPORT
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
|
CONF_PASSWORD,
|
||||||
|
CONF_RECIPIENT,
|
||||||
CONF_URL,
|
CONF_URL,
|
||||||
CONF_USERNAME,
|
CONF_USERNAME,
|
||||||
CONF_PASSWORD,
|
|
||||||
EVENT_HOMEASSISTANT_STOP,
|
EVENT_HOMEASSISTANT_STOP,
|
||||||
)
|
)
|
||||||
from homeassistant.helpers import config_validation as cv
|
from homeassistant.core import CALLBACK_TYPE
|
||||||
from homeassistant.util import Throttle
|
from homeassistant.helpers import config_validation as cv, discovery
|
||||||
|
from homeassistant.helpers.dispatcher import (
|
||||||
|
async_dispatcher_connect,
|
||||||
|
async_dispatcher_send,
|
||||||
|
dispatcher_send,
|
||||||
|
)
|
||||||
|
from homeassistant.helpers.entity import Entity
|
||||||
|
from homeassistant.helpers.event import async_track_time_interval
|
||||||
|
from homeassistant.helpers.typing import HomeAssistantType
|
||||||
from .const import (
|
from .const import (
|
||||||
|
ALL_KEYS,
|
||||||
|
DEFAULT_DEVICE_NAME,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
|
KEY_DEVICE_BASIC_INFORMATION,
|
||||||
KEY_DEVICE_INFORMATION,
|
KEY_DEVICE_INFORMATION,
|
||||||
KEY_DEVICE_SIGNAL,
|
KEY_DEVICE_SIGNAL,
|
||||||
KEY_MONITORING_TRAFFIC_STATISTICS,
|
KEY_MONITORING_TRAFFIC_STATISTICS,
|
||||||
KEY_WLAN_HOST_LIST,
|
KEY_WLAN_HOST_LIST,
|
||||||
|
UPDATE_OPTIONS_SIGNAL,
|
||||||
|
UPDATE_SIGNAL,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -38,7 +61,20 @@ _LOGGER = logging.getLogger(__name__)
|
|||||||
# https://github.com/quandyfactory/dicttoxml/issues/60
|
# https://github.com/quandyfactory/dicttoxml/issues/60
|
||||||
logging.getLogger("dicttoxml").setLevel(logging.WARNING)
|
logging.getLogger("dicttoxml").setLevel(logging.WARNING)
|
||||||
|
|
||||||
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=10)
|
DEFAULT_NAME_TEMPLATE = "Huawei {} {}"
|
||||||
|
|
||||||
|
SCAN_INTERVAL = timedelta(seconds=10)
|
||||||
|
|
||||||
|
NOTIFY_SCHEMA = vol.Any(
|
||||||
|
None,
|
||||||
|
vol.Schema(
|
||||||
|
{
|
||||||
|
vol.Optional(CONF_RECIPIENT): vol.Any(
|
||||||
|
None, vol.All(cv.ensure_list, [cv.string])
|
||||||
|
)
|
||||||
|
}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
CONFIG_SCHEMA = vol.Schema(
|
CONFIG_SCHEMA = vol.Schema(
|
||||||
{
|
{
|
||||||
@ -48,8 +84,9 @@ CONFIG_SCHEMA = vol.Schema(
|
|||||||
vol.Schema(
|
vol.Schema(
|
||||||
{
|
{
|
||||||
vol.Required(CONF_URL): cv.url,
|
vol.Required(CONF_URL): cv.url,
|
||||||
vol.Required(CONF_USERNAME): cv.string,
|
vol.Optional(CONF_USERNAME): cv.string,
|
||||||
vol.Required(CONF_PASSWORD): cv.string,
|
vol.Optional(CONF_PASSWORD): cv.string,
|
||||||
|
vol.Optional(NOTIFY_DOMAIN): NOTIFY_SCHEMA,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
@ -60,97 +97,136 @@ CONFIG_SCHEMA = vol.Schema(
|
|||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class RouterData:
|
class Router:
|
||||||
"""Class for router state."""
|
"""Class for router state."""
|
||||||
|
|
||||||
client = attr.ib()
|
connection: Connection = attr.ib()
|
||||||
mac = attr.ib()
|
url: str = attr.ib()
|
||||||
device_information = attr.ib(init=False, factory=dict)
|
mac: str = attr.ib()
|
||||||
device_signal = attr.ib(init=False, factory=dict)
|
signal_update: CALLBACK_TYPE = attr.ib()
|
||||||
monitoring_traffic_statistics = attr.ib(init=False, factory=dict)
|
|
||||||
wlan_host_list = attr.ib(init=False, factory=dict)
|
|
||||||
|
|
||||||
_subscriptions = attr.ib(init=False, factory=set)
|
data: Dict[str, Any] = attr.ib(init=False, factory=dict)
|
||||||
|
subscriptions: Dict[str, Set[str]] = attr.ib(
|
||||||
|
init=False,
|
||||||
|
factory=lambda: defaultdict(set, ((x, {"initial_scan"}) for x in ALL_KEYS)),
|
||||||
|
)
|
||||||
|
unload_handlers: List[CALLBACK_TYPE] = attr.ib(init=False, factory=list)
|
||||||
|
client: Client
|
||||||
|
|
||||||
def __getitem__(self, path: str):
|
def __attrs_post_init__(self):
|
||||||
"""
|
"""Set up internal state on init."""
|
||||||
Get value corresponding to a dotted path.
|
self.client = Client(self.connection)
|
||||||
|
|
||||||
The first path component designates a member of this class
|
@property
|
||||||
such as device_information, device_signal etc, and the remaining
|
def device_name(self) -> str:
|
||||||
path points to a value in the member's data structure.
|
"""Get router device name."""
|
||||||
"""
|
for key, item in (
|
||||||
root, *rest = path.split(".")
|
(KEY_DEVICE_BASIC_INFORMATION, "devicename"),
|
||||||
try:
|
(KEY_DEVICE_INFORMATION, "DeviceName"),
|
||||||
data = getattr(self, root)
|
):
|
||||||
except AttributeError as err:
|
try:
|
||||||
raise KeyError from err
|
return self.data[key][item]
|
||||||
return reduce(operator.getitem, rest, data)
|
except (KeyError, TypeError):
|
||||||
|
pass
|
||||||
|
return DEFAULT_DEVICE_NAME
|
||||||
|
|
||||||
def subscribe(self, path: str) -> None:
|
|
||||||
"""Subscribe to given router data entries."""
|
|
||||||
self._subscriptions.add(path.split(".")[0])
|
|
||||||
|
|
||||||
def unsubscribe(self, path: str) -> None:
|
|
||||||
"""Unsubscribe from given router data entries."""
|
|
||||||
self._subscriptions.discard(path.split(".")[0])
|
|
||||||
|
|
||||||
@Throttle(MIN_TIME_BETWEEN_UPDATES)
|
|
||||||
def update(self) -> None:
|
def update(self) -> None:
|
||||||
"""Call API to update data."""
|
"""Update router data."""
|
||||||
self._update()
|
|
||||||
|
|
||||||
def _update(self) -> None:
|
def get_data(key: str, func: Callable[[None], Any]) -> None:
|
||||||
debugging = _LOGGER.isEnabledFor(logging.DEBUG)
|
if not self.subscriptions[key]:
|
||||||
|
return
|
||||||
def get_data(path: str, func: Callable[[None], Any]) -> None:
|
_LOGGER.debug("Getting %s for subscribers %s", key, self.subscriptions[key])
|
||||||
if debugging or path in self._subscriptions:
|
try:
|
||||||
try:
|
self.data[key] = func()
|
||||||
setattr(self, path, func())
|
except ResponseErrorNotSupportedException:
|
||||||
except ResponseErrorNotSupportedException:
|
_LOGGER.info(
|
||||||
_LOGGER.warning("%s not supported by device", path)
|
"%s not supported by device, excluding from future updates", key
|
||||||
self._subscriptions.discard(path)
|
)
|
||||||
finally:
|
self.subscriptions.pop(key)
|
||||||
_LOGGER.debug("%s=%s", path, getattr(self, path))
|
except ResponseErrorLoginRequiredException:
|
||||||
|
_LOGGER.info(
|
||||||
|
"%s requires authorization, excluding from future updates", key
|
||||||
|
)
|
||||||
|
self.subscriptions.pop(key)
|
||||||
|
finally:
|
||||||
|
_LOGGER.debug("%s=%s", key, self.data.get(key))
|
||||||
|
|
||||||
get_data(KEY_DEVICE_INFORMATION, self.client.device.information)
|
get_data(KEY_DEVICE_INFORMATION, self.client.device.information)
|
||||||
|
if self.data.get(KEY_DEVICE_INFORMATION):
|
||||||
|
# Full information includes everything in basic
|
||||||
|
self.subscriptions.pop(KEY_DEVICE_BASIC_INFORMATION, None)
|
||||||
|
get_data(KEY_DEVICE_BASIC_INFORMATION, self.client.device.basic_information)
|
||||||
get_data(KEY_DEVICE_SIGNAL, self.client.device.signal)
|
get_data(KEY_DEVICE_SIGNAL, self.client.device.signal)
|
||||||
get_data(
|
get_data(
|
||||||
KEY_MONITORING_TRAFFIC_STATISTICS, self.client.monitoring.traffic_statistics
|
KEY_MONITORING_TRAFFIC_STATISTICS, self.client.monitoring.traffic_statistics
|
||||||
)
|
)
|
||||||
get_data(KEY_WLAN_HOST_LIST, self.client.wlan.host_list)
|
get_data(KEY_WLAN_HOST_LIST, self.client.wlan.host_list)
|
||||||
|
|
||||||
|
self.signal_update()
|
||||||
|
|
||||||
|
def cleanup(self, *_) -> None:
|
||||||
|
"""Clean up resources."""
|
||||||
|
|
||||||
|
self.subscriptions.clear()
|
||||||
|
|
||||||
|
for handler in self.unload_handlers:
|
||||||
|
handler()
|
||||||
|
self.unload_handlers.clear()
|
||||||
|
|
||||||
|
if not isinstance(self.connection, AuthorizedConnection):
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.client.user.logout()
|
||||||
|
except ResponseErrorNotSupportedException:
|
||||||
|
_LOGGER.debug("Logout not supported by device", exc_info=True)
|
||||||
|
except ResponseErrorLoginRequiredException:
|
||||||
|
_LOGGER.debug("Logout not supported when not logged in", exc_info=True)
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
_LOGGER.warning("Logout error", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class HuaweiLteData:
|
class HuaweiLteData:
|
||||||
"""Shared state."""
|
"""Shared state."""
|
||||||
|
|
||||||
data = attr.ib(init=False, factory=dict)
|
hass_config: dict = attr.ib()
|
||||||
|
# Our YAML config, keyed by router URL
|
||||||
def get_data(self, config):
|
config: Dict[str, Dict[str, Any]] = attr.ib()
|
||||||
"""Get the requested or the only data value."""
|
routers: Dict[str, Router] = attr.ib(init=False, factory=dict)
|
||||||
if CONF_URL in config:
|
|
||||||
return self.data.get(config[CONF_URL])
|
|
||||||
if len(self.data) == 1:
|
|
||||||
return next(iter(self.data.values()))
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def setup(hass, config) -> bool:
|
async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry) -> bool:
|
||||||
"""Set up Huawei LTE component."""
|
"""Set up Huawei LTE component from config entry."""
|
||||||
if DOMAIN not in hass.data:
|
url = config_entry.data[CONF_URL]
|
||||||
hass.data[DOMAIN] = HuaweiLteData()
|
|
||||||
for conf in config.get(DOMAIN, []):
|
|
||||||
_setup_lte(hass, conf)
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
# Override settings from YAML config, but only if they're changed in it
|
||||||
def _setup_lte(hass, lte_config) -> None:
|
# Old values are stored as *_from_yaml in the config entry
|
||||||
"""Set up Huawei LTE router."""
|
yaml_config = hass.data[DOMAIN].config.get(url)
|
||||||
url = lte_config[CONF_URL]
|
if yaml_config:
|
||||||
username = lte_config[CONF_USERNAME]
|
# Config values
|
||||||
password = lte_config[CONF_PASSWORD]
|
new_data = {}
|
||||||
|
for key in CONF_USERNAME, CONF_PASSWORD:
|
||||||
|
if key in yaml_config:
|
||||||
|
value = yaml_config[key]
|
||||||
|
if value != config_entry.data.get(f"{key}_from_yaml"):
|
||||||
|
new_data[f"{key}_from_yaml"] = value
|
||||||
|
new_data[key] = value
|
||||||
|
# Options
|
||||||
|
new_options = {}
|
||||||
|
yaml_recipient = yaml_config.get(NOTIFY_DOMAIN, {}).get(CONF_RECIPIENT)
|
||||||
|
if yaml_recipient is not None and yaml_recipient != config_entry.options.get(
|
||||||
|
f"{CONF_RECIPIENT}_from_yaml"
|
||||||
|
):
|
||||||
|
new_options[f"{CONF_RECIPIENT}_from_yaml"] = yaml_recipient
|
||||||
|
new_options[CONF_RECIPIENT] = yaml_recipient
|
||||||
|
# Update entry if overrides were found
|
||||||
|
if new_data or new_options:
|
||||||
|
hass.config_entries.async_update_entry(
|
||||||
|
config_entry,
|
||||||
|
data={**config_entry.data, **new_data},
|
||||||
|
options={**config_entry.options, **new_options},
|
||||||
|
)
|
||||||
|
|
||||||
# Get MAC address for use in unique ids. Being able to use something
|
# Get MAC address for use in unique ids. Being able to use something
|
||||||
# from the API would be nice, but all of that seems to be available only
|
# from the API would be nice, but all of that seems to be available only
|
||||||
@ -164,19 +240,194 @@ def _setup_lte(hass, lte_config) -> None:
|
|||||||
mode = "ip"
|
mode = "ip"
|
||||||
except ValueError:
|
except ValueError:
|
||||||
mode = "hostname"
|
mode = "hostname"
|
||||||
mac = get_mac_address(**{mode: host})
|
mac = await hass.async_add_executor_job(partial(get_mac_address, **{mode: host}))
|
||||||
|
|
||||||
connection = AuthorizedConnection(url, username=username, password=password)
|
def get_connection() -> Connection:
|
||||||
client = Client(connection)
|
"""
|
||||||
|
Set up a connection.
|
||||||
|
|
||||||
data = RouterData(client, mac)
|
Authorized one if username/pass specified (even if empty), unauthorized one otherwise.
|
||||||
hass.data[DOMAIN].data[url] = data
|
"""
|
||||||
|
username = config_entry.data.get(CONF_USERNAME)
|
||||||
|
password = config_entry.data.get(CONF_PASSWORD)
|
||||||
|
if username or password:
|
||||||
|
connection = AuthorizedConnection(url, username=username, password=password)
|
||||||
|
else:
|
||||||
|
connection = Connection(url)
|
||||||
|
return connection
|
||||||
|
|
||||||
def cleanup(event):
|
def signal_update() -> None:
|
||||||
"""Clean up resources."""
|
"""Signal updates to data."""
|
||||||
try:
|
dispatcher_send(hass, UPDATE_SIGNAL, url)
|
||||||
client.user.logout()
|
|
||||||
except ResponseErrorNotSupportedException as ex:
|
|
||||||
_LOGGER.debug("Logout not supported by device", exc_info=ex)
|
|
||||||
|
|
||||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup)
|
connection = await hass.async_add_executor_job(get_connection)
|
||||||
|
|
||||||
|
# Set up router and store reference to it
|
||||||
|
router = Router(connection, url, mac, signal_update)
|
||||||
|
hass.data[DOMAIN].routers[url] = router
|
||||||
|
|
||||||
|
# Do initial data update
|
||||||
|
await hass.async_add_executor_job(router.update)
|
||||||
|
|
||||||
|
# Clear all subscriptions, enabled entities will push back theirs
|
||||||
|
router.subscriptions.clear()
|
||||||
|
|
||||||
|
# Forward config entry setup to platforms
|
||||||
|
for domain in (DEVICE_TRACKER_DOMAIN, SENSOR_DOMAIN):
|
||||||
|
hass.async_create_task(
|
||||||
|
hass.config_entries.async_forward_entry_setup(config_entry, domain)
|
||||||
|
)
|
||||||
|
# Notify doesn't support config entry setup yet, load with discovery for now
|
||||||
|
await discovery.async_load_platform(
|
||||||
|
hass,
|
||||||
|
NOTIFY_DOMAIN,
|
||||||
|
DOMAIN,
|
||||||
|
{CONF_URL: url, CONF_RECIPIENT: config_entry.options.get(CONF_RECIPIENT)},
|
||||||
|
hass.data[DOMAIN].hass_config,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add config entry options update listener
|
||||||
|
router.unload_handlers.append(
|
||||||
|
config_entry.add_update_listener(async_signal_options_update)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update_router(*_: Any) -> None:
|
||||||
|
"""
|
||||||
|
Update router data.
|
||||||
|
|
||||||
|
Separate passthrough function because lambdas don't work with track_time_interval.
|
||||||
|
"""
|
||||||
|
router.update()
|
||||||
|
|
||||||
|
# Set up periodic update
|
||||||
|
router.unload_handlers.append(
|
||||||
|
async_track_time_interval(hass, _update_router, SCAN_INTERVAL)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Clean up at end
|
||||||
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, router.cleanup)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def async_unload_entry(
|
||||||
|
hass: HomeAssistantType, config_entry: ConfigEntry
|
||||||
|
) -> bool:
|
||||||
|
"""Unload config entry."""
|
||||||
|
|
||||||
|
# Forward config entry unload to platforms
|
||||||
|
for domain in (DEVICE_TRACKER_DOMAIN, SENSOR_DOMAIN):
|
||||||
|
await hass.config_entries.async_forward_entry_unload(config_entry, domain)
|
||||||
|
|
||||||
|
# Forget about the router and invoke its cleanup
|
||||||
|
router = hass.data[DOMAIN].routers.pop(config_entry.data[CONF_URL])
|
||||||
|
await hass.async_add_executor_job(router.cleanup)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def async_setup(hass: HomeAssistantType, config) -> bool:
|
||||||
|
"""Set up Huawei LTE component."""
|
||||||
|
|
||||||
|
# Arrange our YAML config to dict with normalized URLs as keys
|
||||||
|
domain_config = {}
|
||||||
|
if DOMAIN not in hass.data:
|
||||||
|
hass.data[DOMAIN] = HuaweiLteData(hass_config=config, config=domain_config)
|
||||||
|
for router_config in config.get(DOMAIN, []):
|
||||||
|
domain_config[url_normalize(router_config.pop(CONF_URL))] = router_config
|
||||||
|
|
||||||
|
for url, router_config in domain_config.items():
|
||||||
|
hass.async_create_task(
|
||||||
|
hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": SOURCE_IMPORT},
|
||||||
|
data={
|
||||||
|
CONF_URL: url,
|
||||||
|
CONF_USERNAME: router_config.get(CONF_USERNAME),
|
||||||
|
CONF_PASSWORD: router_config.get(CONF_PASSWORD),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def async_signal_options_update(
|
||||||
|
hass: HomeAssistantType, config_entry: ConfigEntry
|
||||||
|
) -> None:
|
||||||
|
"""Handle config entry options update."""
|
||||||
|
async_dispatcher_send(hass, UPDATE_OPTIONS_SIGNAL, config_entry)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class HuaweiLteBaseEntity(Entity):
|
||||||
|
"""Huawei LTE entity base class."""
|
||||||
|
|
||||||
|
router: Router = attr.ib()
|
||||||
|
|
||||||
|
_available: bool = attr.ib(init=False, default=True)
|
||||||
|
_unsub_handlers: List[Callable] = attr.ib(init=False, factory=list)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _entity_name(self) -> str:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _device_unique_id(self) -> str:
|
||||||
|
"""Return unique ID for entity within a router."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unique_id(self) -> str:
|
||||||
|
"""Return unique ID for entity."""
|
||||||
|
return f"{self.router.mac}-{self._device_unique_id}"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
"""Return entity name."""
|
||||||
|
return DEFAULT_NAME_TEMPLATE.format(self.router.device_name, self._entity_name)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def available(self) -> bool:
|
||||||
|
"""Return whether the entity is available."""
|
||||||
|
return self._available
|
||||||
|
|
||||||
|
@property
|
||||||
|
def should_poll(self) -> bool:
|
||||||
|
"""Huawei LTE entities report their state without polling."""
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def async_update(self) -> None:
|
||||||
|
"""Update state."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def async_update_options(self, config_entry: ConfigEntry) -> None:
|
||||||
|
"""Update config entry options."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def async_added_to_hass(self) -> None:
|
||||||
|
"""Connect to update signals."""
|
||||||
|
self._unsub_handlers.append(
|
||||||
|
async_dispatcher_connect(self.hass, UPDATE_SIGNAL, self._async_maybe_update)
|
||||||
|
)
|
||||||
|
self._unsub_handlers.append(
|
||||||
|
async_dispatcher_connect(
|
||||||
|
self.hass, UPDATE_OPTIONS_SIGNAL, self._async_maybe_update_options
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _async_maybe_update(self, url: str) -> None:
|
||||||
|
"""Update state if the update signal comes from our router."""
|
||||||
|
if url == self.router.url:
|
||||||
|
await self.async_update()
|
||||||
|
|
||||||
|
async def _async_maybe_update_options(self, config_entry: ConfigEntry) -> None:
|
||||||
|
"""Update options if the update signal comes from our router."""
|
||||||
|
if config_entry.data[CONF_URL] == self.router.url:
|
||||||
|
await self.async_update_options(config_entry)
|
||||||
|
|
||||||
|
async def async_will_remove_from_hass(self) -> None:
|
||||||
|
"""Invoke unsubscription handlers."""
|
||||||
|
for unsub in self._unsub_handlers:
|
||||||
|
unsub()
|
||||||
|
self._unsub_handlers.clear()
|
||||||
|
208
homeassistant/components/huawei_lte/config_flow.py
Normal file
208
homeassistant/components/huawei_lte/config_flow.py
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
"""Config flow for the Huawei LTE platform."""
|
||||||
|
|
||||||
|
from collections import OrderedDict
|
||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from huawei_lte_api.AuthorizedConnection import AuthorizedConnection
|
||||||
|
from huawei_lte_api.Client import Client
|
||||||
|
from huawei_lte_api.Connection import Connection
|
||||||
|
from huawei_lte_api.exceptions import (
|
||||||
|
LoginErrorUsernameWrongException,
|
||||||
|
LoginErrorPasswordWrongException,
|
||||||
|
LoginErrorUsernamePasswordWrongException,
|
||||||
|
LoginErrorUsernamePasswordOverrunException,
|
||||||
|
ResponseErrorException,
|
||||||
|
)
|
||||||
|
from url_normalize import url_normalize
|
||||||
|
import voluptuous as vol
|
||||||
|
|
||||||
|
from homeassistant import config_entries
|
||||||
|
from homeassistant.const import CONF_PASSWORD, CONF_RECIPIENT, CONF_URL, CONF_USERNAME
|
||||||
|
from homeassistant.core import callback
|
||||||
|
from .const import DEFAULT_DEVICE_NAME
|
||||||
|
|
||||||
|
# https://github.com/PyCQA/pylint/issues/3202
|
||||||
|
from .const import DOMAIN # pylint: disable=unused-import
|
||||||
|
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
|
"""Handle Huawei LTE config flow."""
|
||||||
|
|
||||||
|
VERSION = 1
|
||||||
|
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@callback
|
||||||
|
def async_get_options_flow(config_entry):
|
||||||
|
"""Get options flow."""
|
||||||
|
return OptionsFlowHandler(config_entry)
|
||||||
|
|
||||||
|
async def _async_show_user_form(self, user_input=None, errors=None):
|
||||||
|
if user_input is None:
|
||||||
|
user_input = {}
|
||||||
|
return self.async_show_form(
|
||||||
|
step_id="user",
|
||||||
|
data_schema=vol.Schema(
|
||||||
|
OrderedDict(
|
||||||
|
(
|
||||||
|
(
|
||||||
|
vol.Required(
|
||||||
|
CONF_URL, default=user_input.get(CONF_URL, "")
|
||||||
|
),
|
||||||
|
str,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vol.Optional(
|
||||||
|
CONF_USERNAME, default=user_input.get(CONF_USERNAME, "")
|
||||||
|
),
|
||||||
|
str,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vol.Optional(
|
||||||
|
CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "")
|
||||||
|
),
|
||||||
|
str,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
errors=errors or {},
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_step_import(self, user_input=None):
|
||||||
|
"""Handle import initiated config flow."""
|
||||||
|
return await self.async_step_user(user_input)
|
||||||
|
|
||||||
|
async def async_step_user(self, user_input=None):
|
||||||
|
"""Handle user initiated config flow."""
|
||||||
|
if user_input is None:
|
||||||
|
return await self._async_show_user_form()
|
||||||
|
|
||||||
|
errors = {}
|
||||||
|
|
||||||
|
# Normalize URL
|
||||||
|
user_input[CONF_URL] = url_normalize(
|
||||||
|
user_input[CONF_URL], default_scheme="http"
|
||||||
|
)
|
||||||
|
if "://" not in user_input[CONF_URL]:
|
||||||
|
errors[CONF_URL] = "invalid_url"
|
||||||
|
return await self._async_show_user_form(
|
||||||
|
user_input=user_input, errors=errors
|
||||||
|
)
|
||||||
|
|
||||||
|
# See if we already have a router configured with this URL
|
||||||
|
existing_urls = { # existing entries
|
||||||
|
url_normalize(entry.data[CONF_URL], default_scheme="http")
|
||||||
|
for entry in self._async_current_entries()
|
||||||
|
}
|
||||||
|
if user_input[CONF_URL] in existing_urls:
|
||||||
|
return self.async_abort(reason="already_configured")
|
||||||
|
|
||||||
|
conn = None
|
||||||
|
|
||||||
|
def logout():
|
||||||
|
if hasattr(conn, "user"):
|
||||||
|
try:
|
||||||
|
conn.user.logout()
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
_LOGGER.debug("Could not logout", exc_info=True)
|
||||||
|
|
||||||
|
def try_connect(username: Optional[str], password: Optional[str]) -> Connection:
|
||||||
|
"""Try connecting with given credentials."""
|
||||||
|
if username or password:
|
||||||
|
conn = AuthorizedConnection(
|
||||||
|
user_input[CONF_URL], username=username, password=password
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
conn = AuthorizedConnection(
|
||||||
|
user_input[CONF_URL], username="", password=""
|
||||||
|
)
|
||||||
|
user_input[CONF_USERNAME] = ""
|
||||||
|
user_input[CONF_PASSWORD] = ""
|
||||||
|
except ResponseErrorException:
|
||||||
|
_LOGGER.debug(
|
||||||
|
"Could not login with empty credentials, proceeding unauthenticated",
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
conn = Connection(user_input[CONF_URL])
|
||||||
|
del user_input[CONF_USERNAME]
|
||||||
|
del user_input[CONF_PASSWORD]
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def get_router_title(conn: Connection) -> str:
|
||||||
|
"""Get title for router."""
|
||||||
|
title = None
|
||||||
|
client = Client(conn)
|
||||||
|
try:
|
||||||
|
info = client.device.basic_information()
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
_LOGGER.debug("Could not get device.basic_information", exc_info=True)
|
||||||
|
else:
|
||||||
|
title = info.get("devicename")
|
||||||
|
if not title:
|
||||||
|
try:
|
||||||
|
info = client.device.information()
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
_LOGGER.debug("Could not get device.information", exc_info=True)
|
||||||
|
else:
|
||||||
|
title = info.get("DeviceName")
|
||||||
|
return title or DEFAULT_DEVICE_NAME
|
||||||
|
|
||||||
|
username = user_input.get(CONF_USERNAME)
|
||||||
|
password = user_input.get(CONF_PASSWORD)
|
||||||
|
try:
|
||||||
|
conn = await self.hass.async_add_executor_job(
|
||||||
|
try_connect, username, password
|
||||||
|
)
|
||||||
|
except LoginErrorUsernameWrongException:
|
||||||
|
errors[CONF_USERNAME] = "incorrect_username"
|
||||||
|
except LoginErrorPasswordWrongException:
|
||||||
|
errors[CONF_PASSWORD] = "incorrect_password"
|
||||||
|
except LoginErrorUsernamePasswordWrongException:
|
||||||
|
errors[CONF_USERNAME] = "incorrect_username_or_password"
|
||||||
|
except LoginErrorUsernamePasswordOverrunException:
|
||||||
|
errors["base"] = "login_attempts_exceeded"
|
||||||
|
except ResponseErrorException:
|
||||||
|
_LOGGER.warning("Response error", exc_info=True)
|
||||||
|
errors["base"] = "response_error"
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
_LOGGER.warning("Unknown error connecting to device", exc_info=True)
|
||||||
|
errors[CONF_URL] = "unknown_connection_error"
|
||||||
|
if errors:
|
||||||
|
await self.hass.async_add_executor_job(logout)
|
||||||
|
return await self._async_show_user_form(
|
||||||
|
user_input=user_input, errors=errors
|
||||||
|
)
|
||||||
|
|
||||||
|
title = await self.hass.async_add_executor_job(get_router_title, conn)
|
||||||
|
await self.hass.async_add_executor_job(logout)
|
||||||
|
|
||||||
|
return self.async_create_entry(title=title, data=user_input)
|
||||||
|
|
||||||
|
|
||||||
|
class OptionsFlowHandler(config_entries.OptionsFlow):
|
||||||
|
"""Huawei LTE options flow."""
|
||||||
|
|
||||||
|
def __init__(self, config_entry: config_entries.ConfigEntry):
|
||||||
|
"""Initialize options flow."""
|
||||||
|
self.config_entry = config_entry
|
||||||
|
|
||||||
|
async def async_step_init(self, user_input=None):
|
||||||
|
"""Handle options flow."""
|
||||||
|
if user_input is not None:
|
||||||
|
return self.async_create_entry(title="", data=user_input)
|
||||||
|
|
||||||
|
data_schema = vol.Schema(
|
||||||
|
{
|
||||||
|
vol.Optional(
|
||||||
|
CONF_RECIPIENT,
|
||||||
|
default=self.config_entry.options.get(CONF_RECIPIENT, ""),
|
||||||
|
): str
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return self.async_show_form(step_id="init", data_schema=data_schema)
|
@ -2,7 +2,23 @@
|
|||||||
|
|
||||||
DOMAIN = "huawei_lte"
|
DOMAIN = "huawei_lte"
|
||||||
|
|
||||||
|
DEFAULT_DEVICE_NAME = "LTE"
|
||||||
|
|
||||||
|
UPDATE_SIGNAL = f"{DOMAIN}_update"
|
||||||
|
UPDATE_OPTIONS_SIGNAL = f"{DOMAIN}_options_update"
|
||||||
|
|
||||||
|
KEY_DEVICE_BASIC_INFORMATION = "device_basic_information"
|
||||||
KEY_DEVICE_INFORMATION = "device_information"
|
KEY_DEVICE_INFORMATION = "device_information"
|
||||||
KEY_DEVICE_SIGNAL = "device_signal"
|
KEY_DEVICE_SIGNAL = "device_signal"
|
||||||
KEY_MONITORING_TRAFFIC_STATISTICS = "monitoring_traffic_statistics"
|
KEY_MONITORING_TRAFFIC_STATISTICS = "monitoring_traffic_statistics"
|
||||||
KEY_WLAN_HOST_LIST = "wlan_host_list"
|
KEY_WLAN_HOST_LIST = "wlan_host_list"
|
||||||
|
|
||||||
|
DEVICE_TRACKER_KEYS = {KEY_WLAN_HOST_LIST}
|
||||||
|
|
||||||
|
SENSOR_KEYS = {
|
||||||
|
KEY_DEVICE_INFORMATION,
|
||||||
|
KEY_DEVICE_SIGNAL,
|
||||||
|
KEY_MONITORING_TRAFFIC_STATISTICS,
|
||||||
|
}
|
||||||
|
|
||||||
|
ALL_KEYS = DEVICE_TRACKER_KEYS | SENSOR_KEYS
|
||||||
|
@ -1,63 +1,162 @@
|
|||||||
"""Support for device tracking of Huawei LTE routers."""
|
"""Support for device tracking of Huawei LTE routers."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, Dict, List, Optional
|
import re
|
||||||
|
from typing import Any, Dict, Set
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
import voluptuous as vol
|
from stringcase import snakecase
|
||||||
|
|
||||||
import homeassistant.helpers.config_validation as cv
|
from homeassistant.components.device_tracker import (
|
||||||
from homeassistant.components.device_tracker import PLATFORM_SCHEMA, DeviceScanner
|
DOMAIN as DEVICE_TRACKER_DOMAIN,
|
||||||
|
SOURCE_TYPE_ROUTER,
|
||||||
|
)
|
||||||
|
from homeassistant.components.device_tracker.config_entry import ScannerEntity
|
||||||
from homeassistant.const import CONF_URL
|
from homeassistant.const import CONF_URL
|
||||||
from . import RouterData
|
from homeassistant.helpers import entity_registry
|
||||||
from .const import DOMAIN, KEY_WLAN_HOST_LIST
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||||
|
from . import HuaweiLteBaseEntity
|
||||||
|
from .const import DOMAIN, KEY_WLAN_HOST_LIST, UPDATE_SIGNAL
|
||||||
|
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({vol.Optional(CONF_URL): cv.url})
|
_DEVICE_SCAN = f"{DEVICE_TRACKER_DOMAIN}/device_scan"
|
||||||
|
|
||||||
HOSTS_PATH = f"{KEY_WLAN_HOST_LIST}.Hosts.Host"
|
|
||||||
|
|
||||||
|
|
||||||
def get_scanner(hass, config):
|
async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||||
"""Get a Huawei LTE router scanner."""
|
"""Set up from config entry."""
|
||||||
data = hass.data[DOMAIN].get_data(config)
|
|
||||||
data.subscribe(HOSTS_PATH)
|
# Grab hosts list once to examine whether the initial fetch has got some data for
|
||||||
return HuaweiLteScanner(data)
|
# us, i.e. if wlan host list is supported. Only set up a subscription and proceed
|
||||||
|
# with adding and tracking entities if it is.
|
||||||
|
router = hass.data[DOMAIN].routers[config_entry.data[CONF_URL]]
|
||||||
|
try:
|
||||||
|
_ = router.data[KEY_WLAN_HOST_LIST]["Hosts"]["Host"]
|
||||||
|
except KeyError:
|
||||||
|
_LOGGER.debug("%s[%s][%s] not in data", KEY_WLAN_HOST_LIST, "Hosts", "Host")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Initialize already tracked entities
|
||||||
|
tracked: Set[str] = set()
|
||||||
|
registry = await entity_registry.async_get_registry(hass)
|
||||||
|
for entity in registry.entities.values():
|
||||||
|
if (
|
||||||
|
entity.domain == DEVICE_TRACKER_DOMAIN
|
||||||
|
and entity.config_entry_id == config_entry.entry_id
|
||||||
|
):
|
||||||
|
tracked.add(entity.unique_id)
|
||||||
|
async_add_new_entities(hass, router.url, async_add_entities, tracked, True)
|
||||||
|
|
||||||
|
# Tell parent router to poll hosts list to gather new devices
|
||||||
|
router.subscriptions[KEY_WLAN_HOST_LIST].add(_DEVICE_SCAN)
|
||||||
|
|
||||||
|
async def _async_maybe_add_new_entities(url: str) -> None:
|
||||||
|
"""Add new entities if the update signal comes from our router."""
|
||||||
|
if url == router.url:
|
||||||
|
async_add_new_entities(hass, url, async_add_entities, tracked)
|
||||||
|
|
||||||
|
# Register to handle router data updates
|
||||||
|
disconnect_dispatcher = async_dispatcher_connect(
|
||||||
|
hass, UPDATE_SIGNAL, _async_maybe_add_new_entities
|
||||||
|
)
|
||||||
|
router.unload_handlers.append(disconnect_dispatcher)
|
||||||
|
|
||||||
|
# Add new entities from initial scan
|
||||||
|
async_add_new_entities(hass, router.url, async_add_entities, tracked)
|
||||||
|
|
||||||
|
|
||||||
|
def async_add_new_entities(
|
||||||
|
hass, router_url, async_add_entities, tracked, included: bool = False
|
||||||
|
):
|
||||||
|
"""Add new entities.
|
||||||
|
|
||||||
|
:param included: if True, setup only items in tracked, and vice versa
|
||||||
|
"""
|
||||||
|
router = hass.data[DOMAIN].routers[router_url]
|
||||||
|
try:
|
||||||
|
hosts = router.data[KEY_WLAN_HOST_LIST]["Hosts"]["Host"]
|
||||||
|
except KeyError:
|
||||||
|
_LOGGER.debug("%s[%s][%s] not in data", KEY_WLAN_HOST_LIST, "Hosts", "Host")
|
||||||
|
return
|
||||||
|
|
||||||
|
new_entities = []
|
||||||
|
for host in (x for x in hosts if x.get("MacAddress")):
|
||||||
|
entity = HuaweiLteScannerEntity(router, host["MacAddress"])
|
||||||
|
tracking = entity.unique_id in tracked
|
||||||
|
if tracking != included:
|
||||||
|
continue
|
||||||
|
tracked.add(entity.unique_id)
|
||||||
|
new_entities.append(entity)
|
||||||
|
async_add_entities(new_entities, True)
|
||||||
|
|
||||||
|
|
||||||
|
def _better_snakecase(text: str) -> str:
|
||||||
|
if text == text.upper():
|
||||||
|
# All uppercase to all lowercase to get http for HTTP, not h_t_t_p
|
||||||
|
text = text.lower()
|
||||||
|
else:
|
||||||
|
# Three or more consecutive uppercase with middle part lowercased
|
||||||
|
# to get http_response for HTTPResponse, not h_t_t_p_response
|
||||||
|
text = re.sub(
|
||||||
|
r"([A-Z])([A-Z]+)([A-Z](?:[^A-Z]|$))",
|
||||||
|
lambda match: f"{match.group(1)}{match.group(2).lower()}{match.group(3)}",
|
||||||
|
text,
|
||||||
|
)
|
||||||
|
return snakecase(text)
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class HuaweiLteScanner(DeviceScanner):
|
class HuaweiLteScannerEntity(HuaweiLteBaseEntity, ScannerEntity):
|
||||||
"""Huawei LTE router scanner."""
|
"""Huawei LTE router scanner entity."""
|
||||||
|
|
||||||
data = attr.ib(type=RouterData)
|
mac: str = attr.ib()
|
||||||
|
|
||||||
_hosts = attr.ib(init=False, factory=dict)
|
_is_connected: bool = attr.ib(init=False, default=False)
|
||||||
|
_name: str = attr.ib(init=False, default="device")
|
||||||
|
_device_state_attributes: Dict[str, Any] = attr.ib(init=False, factory=dict)
|
||||||
|
|
||||||
def scan_devices(self) -> List[str]:
|
@property
|
||||||
"""Scan for devices."""
|
def _entity_name(self) -> str:
|
||||||
self.data.update()
|
return self._name
|
||||||
try:
|
|
||||||
self._hosts = {
|
@property
|
||||||
x["MacAddress"]: x for x in self.data[HOSTS_PATH] if x.get("MacAddress")
|
def _device_unique_id(self) -> str:
|
||||||
|
return self.mac
|
||||||
|
|
||||||
|
@property
|
||||||
|
def source_type(self) -> str:
|
||||||
|
"""Return SOURCE_TYPE_ROUTER."""
|
||||||
|
return SOURCE_TYPE_ROUTER
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_connected(self) -> bool:
|
||||||
|
"""Get whether the entity is connected."""
|
||||||
|
return self._is_connected
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device_state_attributes(self) -> Dict[str, Any]:
|
||||||
|
"""Get additional attributes related to entity state."""
|
||||||
|
return self._device_state_attributes
|
||||||
|
|
||||||
|
async def async_update(self) -> None:
|
||||||
|
"""Update state."""
|
||||||
|
hosts = self.router.data[KEY_WLAN_HOST_LIST]["Hosts"]["Host"]
|
||||||
|
host = next((x for x in hosts if x.get("MacAddress") == self.mac), None)
|
||||||
|
self._is_connected = host is not None
|
||||||
|
if self._is_connected:
|
||||||
|
self._name = host.get("HostName", self.mac)
|
||||||
|
self._device_state_attributes = {
|
||||||
|
_better_snakecase(k): v
|
||||||
|
for k, v in host.items()
|
||||||
|
if k not in ("MacAddress", "HostName")
|
||||||
}
|
}
|
||||||
except KeyError:
|
|
||||||
_LOGGER.debug("%s not in data", HOSTS_PATH)
|
|
||||||
return list(self._hosts)
|
|
||||||
|
|
||||||
def get_device_name(self, device: str) -> Optional[str]:
|
|
||||||
"""Get name for a device."""
|
|
||||||
host = self._hosts.get(device)
|
|
||||||
return host.get("HostName") or None if host else None
|
|
||||||
|
|
||||||
def get_extra_attributes(self, device: str) -> Dict[str, Any]:
|
def get_scanner(*args, **kwargs): # pylint: disable=useless-return
|
||||||
"""
|
"""Old no longer used way to set up Huawei LTE device tracker."""
|
||||||
Get extra attributes of a device.
|
_LOGGER.warning(
|
||||||
|
"Loading and configuring as a platform is no longer supported or "
|
||||||
Some known extra attributes that may be returned in the dict
|
"required, convert to enabling/disabling available entities"
|
||||||
include MacAddress (MAC address), ID (client ID), IpAddress
|
)
|
||||||
(IP address), AssociatedSsid (associated SSID), AssociatedTime
|
return None
|
||||||
(associated time in seconds), and HostName (host name).
|
|
||||||
"""
|
|
||||||
return self._hosts.get(device) or {}
|
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
{
|
{
|
||||||
"domain": "huawei_lte",
|
"domain": "huawei_lte",
|
||||||
"name": "Huawei LTE",
|
"name": "Huawei LTE",
|
||||||
|
"config_flow": true,
|
||||||
"documentation": "https://www.home-assistant.io/integrations/huawei_lte",
|
"documentation": "https://www.home-assistant.io/integrations/huawei_lte",
|
||||||
"requirements": [
|
"requirements": [
|
||||||
"getmac==0.8.1",
|
"getmac==0.8.1",
|
||||||
"huawei-lte-api==1.3.0"
|
"huawei-lte-api==1.4.3",
|
||||||
|
"stringcase==1.2.0",
|
||||||
|
"url-normalize==1.4.1"
|
||||||
],
|
],
|
||||||
"dependencies": [],
|
"dependencies": [],
|
||||||
"codeowners": [
|
"codeowners": [
|
||||||
|
@ -1,58 +1,54 @@
|
|||||||
"""Support for Huawei LTE router notifications."""
|
"""Support for Huawei LTE router notifications."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Any, List
|
||||||
|
|
||||||
import voluptuous as vol
|
|
||||||
import attr
|
import attr
|
||||||
|
from huawei_lte_api.exceptions import ResponseErrorException
|
||||||
|
|
||||||
from homeassistant.components.notify import (
|
from homeassistant.components.notify import BaseNotificationService, ATTR_TARGET
|
||||||
BaseNotificationService,
|
|
||||||
ATTR_TARGET,
|
|
||||||
PLATFORM_SCHEMA,
|
|
||||||
)
|
|
||||||
from homeassistant.const import CONF_RECIPIENT, CONF_URL
|
from homeassistant.const import CONF_RECIPIENT, CONF_URL
|
||||||
import homeassistant.helpers.config_validation as cv
|
|
||||||
|
|
||||||
|
from . import Router
|
||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
|
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
|
||||||
{
|
|
||||||
vol.Optional(CONF_URL): cv.url,
|
|
||||||
vol.Required(CONF_RECIPIENT): vol.All(cv.ensure_list, [cv.string]),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def async_get_service(hass, config, discovery_info=None):
|
async def async_get_service(hass, config, discovery_info=None):
|
||||||
"""Get the notification service."""
|
"""Get the notification service."""
|
||||||
return HuaweiLteSmsNotificationService(hass, config)
|
if discovery_info is None:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Loading as a platform is no longer supported, convert to use "
|
||||||
|
"config entries or the huawei_lte component"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
router = hass.data[DOMAIN].routers[discovery_info[CONF_URL]]
|
||||||
|
default_targets = discovery_info[CONF_RECIPIENT] or []
|
||||||
|
|
||||||
|
return HuaweiLteSmsNotificationService(router, default_targets)
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class HuaweiLteSmsNotificationService(BaseNotificationService):
|
class HuaweiLteSmsNotificationService(BaseNotificationService):
|
||||||
"""Huawei LTE router SMS notification service."""
|
"""Huawei LTE router SMS notification service."""
|
||||||
|
|
||||||
hass = attr.ib()
|
router: Router = attr.ib()
|
||||||
config = attr.ib()
|
default_targets: List[str] = attr.ib()
|
||||||
|
|
||||||
def send_message(self, message="", **kwargs):
|
def send_message(self, message: str = "", **kwargs: Any) -> None:
|
||||||
"""Send message to target numbers."""
|
"""Send message to target numbers."""
|
||||||
from huawei_lte_api.exceptions import ResponseErrorException
|
|
||||||
|
|
||||||
targets = kwargs.get(ATTR_TARGET, self.config.get(CONF_RECIPIENT))
|
targets = kwargs.get(ATTR_TARGET, self.default_targets)
|
||||||
if not targets or not message:
|
if not targets or not message:
|
||||||
return
|
return
|
||||||
|
|
||||||
data = self.hass.data[DOMAIN].get_data(self.config)
|
|
||||||
if not data:
|
|
||||||
_LOGGER.error("Router not available")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
resp = data.client.sms.send_sms(phone_numbers=targets, message=message)
|
resp = self.router.client.sms.send_sms(
|
||||||
|
phone_numbers=targets, message=message
|
||||||
|
)
|
||||||
_LOGGER.debug("Sent to %s: %s", targets, resp)
|
_LOGGER.debug("Sent to %s: %s", targets, resp)
|
||||||
except ResponseErrorException as ex:
|
except ResponseErrorException as ex:
|
||||||
_LOGGER.error("Could not send to %s: %s", targets, ex)
|
_LOGGER.error("Could not send to %s: %s", targets, ex)
|
||||||
|
@ -5,18 +5,15 @@ import re
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
import voluptuous as vol
|
|
||||||
|
|
||||||
from homeassistant.const import CONF_URL, CONF_MONITORED_CONDITIONS, STATE_UNKNOWN
|
from homeassistant.const import CONF_URL, STATE_UNKNOWN
|
||||||
from homeassistant.components.sensor import (
|
from homeassistant.components.sensor import (
|
||||||
PLATFORM_SCHEMA,
|
|
||||||
DEVICE_CLASS_SIGNAL_STRENGTH,
|
DEVICE_CLASS_SIGNAL_STRENGTH,
|
||||||
|
DOMAIN as SENSOR_DOMAIN,
|
||||||
)
|
)
|
||||||
from homeassistant.helpers import entity_registry
|
from homeassistant.helpers import entity_registry
|
||||||
from homeassistant.helpers.entity import Entity
|
|
||||||
import homeassistant.helpers.config_validation as cv
|
|
||||||
|
|
||||||
from . import RouterData
|
from . import HuaweiLteBaseEntity
|
||||||
from .const import (
|
from .const import (
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
KEY_DEVICE_INFORMATION,
|
KEY_DEVICE_INFORMATION,
|
||||||
@ -27,34 +24,27 @@ from .const import (
|
|||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
DEFAULT_NAME_TEMPLATE = "Huawei {} {}"
|
|
||||||
DEFAULT_DEVICE_NAME = "LTE"
|
|
||||||
|
|
||||||
DEFAULT_SENSORS = [
|
|
||||||
f"{KEY_DEVICE_INFORMATION}.WanIPAddress",
|
|
||||||
f"{KEY_DEVICE_SIGNAL}.rsrq",
|
|
||||||
f"{KEY_DEVICE_SIGNAL}.rsrp",
|
|
||||||
f"{KEY_DEVICE_SIGNAL}.rssi",
|
|
||||||
f"{KEY_DEVICE_SIGNAL}.sinr",
|
|
||||||
]
|
|
||||||
|
|
||||||
SENSOR_META = {
|
SENSOR_META = {
|
||||||
f"{KEY_DEVICE_INFORMATION}.SoftwareVersion": dict(name="Software version"),
|
KEY_DEVICE_INFORMATION: dict(
|
||||||
f"{KEY_DEVICE_INFORMATION}.WanIPAddress": dict(
|
include=re.compile(r"^WanIP.*Address$", re.IGNORECASE)
|
||||||
name="WAN IP address", icon="mdi:ip"
|
|
||||||
),
|
),
|
||||||
f"{KEY_DEVICE_INFORMATION}.WanIPv6Address": dict(
|
(KEY_DEVICE_INFORMATION, "SoftwareVersion"): dict(name="Software version"),
|
||||||
|
(KEY_DEVICE_INFORMATION, "WanIPAddress"): dict(
|
||||||
|
name="WAN IP address", icon="mdi:ip", enabled_default=True
|
||||||
|
),
|
||||||
|
(KEY_DEVICE_INFORMATION, "WanIPv6Address"): dict(
|
||||||
name="WAN IPv6 address", icon="mdi:ip"
|
name="WAN IPv6 address", icon="mdi:ip"
|
||||||
),
|
),
|
||||||
f"{KEY_DEVICE_SIGNAL}.band": dict(name="Band"),
|
(KEY_DEVICE_SIGNAL, "band"): dict(name="Band"),
|
||||||
f"{KEY_DEVICE_SIGNAL}.cell_id": dict(name="Cell ID"),
|
(KEY_DEVICE_SIGNAL, "cell_id"): dict(name="Cell ID"),
|
||||||
f"{KEY_DEVICE_SIGNAL}.lac": dict(name="LAC"),
|
(KEY_DEVICE_SIGNAL, "lac"): dict(name="LAC"),
|
||||||
f"{KEY_DEVICE_SIGNAL}.mode": dict(
|
(KEY_DEVICE_SIGNAL, "mode"): dict(
|
||||||
name="Mode",
|
name="Mode",
|
||||||
formatter=lambda x: ({"0": "2G", "2": "3G", "7": "4G"}.get(x, "Unknown"), None),
|
formatter=lambda x: ({"0": "2G", "2": "3G", "7": "4G"}.get(x, "Unknown"), None),
|
||||||
),
|
),
|
||||||
f"{KEY_DEVICE_SIGNAL}.pci": dict(name="PCI"),
|
(KEY_DEVICE_SIGNAL, "pci"): dict(name="PCI"),
|
||||||
f"{KEY_DEVICE_SIGNAL}.rsrq": dict(
|
(KEY_DEVICE_SIGNAL, "rsrq"): dict(
|
||||||
name="RSRQ",
|
name="RSRQ",
|
||||||
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
||||||
# http://www.lte-anbieter.info/technik/rsrq.php
|
# http://www.lte-anbieter.info/technik/rsrq.php
|
||||||
@ -65,8 +55,9 @@ SENSOR_META = {
|
|||||||
or x < -5
|
or x < -5
|
||||||
and "mdi:signal-cellular-2"
|
and "mdi:signal-cellular-2"
|
||||||
or "mdi:signal-cellular-3",
|
or "mdi:signal-cellular-3",
|
||||||
|
enabled_default=True,
|
||||||
),
|
),
|
||||||
f"{KEY_DEVICE_SIGNAL}.rsrp": dict(
|
(KEY_DEVICE_SIGNAL, "rsrp"): dict(
|
||||||
name="RSRP",
|
name="RSRP",
|
||||||
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
||||||
# http://www.lte-anbieter.info/technik/rsrp.php
|
# http://www.lte-anbieter.info/technik/rsrp.php
|
||||||
@ -77,8 +68,9 @@ SENSOR_META = {
|
|||||||
or x < -80
|
or x < -80
|
||||||
and "mdi:signal-cellular-2"
|
and "mdi:signal-cellular-2"
|
||||||
or "mdi:signal-cellular-3",
|
or "mdi:signal-cellular-3",
|
||||||
|
enabled_default=True,
|
||||||
),
|
),
|
||||||
f"{KEY_DEVICE_SIGNAL}.rssi": dict(
|
(KEY_DEVICE_SIGNAL, "rssi"): dict(
|
||||||
name="RSSI",
|
name="RSSI",
|
||||||
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
||||||
# https://eyesaas.com/wi-fi-signal-strength/
|
# https://eyesaas.com/wi-fi-signal-strength/
|
||||||
@ -89,8 +81,9 @@ SENSOR_META = {
|
|||||||
or x < -60
|
or x < -60
|
||||||
and "mdi:signal-cellular-2"
|
and "mdi:signal-cellular-2"
|
||||||
or "mdi:signal-cellular-3",
|
or "mdi:signal-cellular-3",
|
||||||
|
enabled_default=True,
|
||||||
),
|
),
|
||||||
f"{KEY_DEVICE_SIGNAL}.sinr": dict(
|
(KEY_DEVICE_SIGNAL, "sinr"): dict(
|
||||||
name="SINR",
|
name="SINR",
|
||||||
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
|
||||||
# http://www.lte-anbieter.info/technik/sinr.php
|
# http://www.lte-anbieter.info/technik/sinr.php
|
||||||
@ -101,28 +94,38 @@ SENSOR_META = {
|
|||||||
or x < 10
|
or x < 10
|
||||||
and "mdi:signal-cellular-2"
|
and "mdi:signal-cellular-2"
|
||||||
or "mdi:signal-cellular-3",
|
or "mdi:signal-cellular-3",
|
||||||
|
enabled_default=True,
|
||||||
|
),
|
||||||
|
KEY_MONITORING_TRAFFIC_STATISTICS: dict(
|
||||||
|
exclude=re.compile(r"^showtraffic$", re.IGNORECASE)
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
|
||||||
{
|
|
||||||
vol.Optional(CONF_URL): cv.url,
|
|
||||||
vol.Optional(
|
|
||||||
CONF_MONITORED_CONDITIONS, default=DEFAULT_SENSORS
|
|
||||||
): cv.ensure_list,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||||
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
|
"""Set up from config entry."""
|
||||||
"""Set up Huawei LTE sensor devices."""
|
router = hass.data[DOMAIN].routers[config_entry.data[CONF_URL]]
|
||||||
data = hass.data[DOMAIN].get_data(config)
|
|
||||||
sensors = []
|
sensors = []
|
||||||
for path in config.get(CONF_MONITORED_CONDITIONS):
|
for key in (
|
||||||
if path == "traffic_statistics": # backwards compatibility
|
KEY_DEVICE_INFORMATION,
|
||||||
path = KEY_MONITORING_TRAFFIC_STATISTICS
|
KEY_DEVICE_SIGNAL,
|
||||||
data.subscribe(path)
|
KEY_MONITORING_TRAFFIC_STATISTICS,
|
||||||
sensors.append(HuaweiLteSensor(data, path, SENSOR_META.get(path, {})))
|
):
|
||||||
|
items = router.data.get(key)
|
||||||
|
if not items:
|
||||||
|
continue
|
||||||
|
key_meta = SENSOR_META.get(key)
|
||||||
|
if key_meta:
|
||||||
|
include = key_meta.get("include")
|
||||||
|
if include:
|
||||||
|
items = filter(include.search, items)
|
||||||
|
exclude = key_meta.get("exclude")
|
||||||
|
if exclude:
|
||||||
|
items = [x for x in items if not exclude.search(x)]
|
||||||
|
for item in items:
|
||||||
|
sensors.append(
|
||||||
|
HuaweiLteSensor(router, key, item, SENSOR_META.get((key, item), {}))
|
||||||
|
)
|
||||||
|
|
||||||
# Pre-0.97 unique id migration. Old ones used the device serial number
|
# Pre-0.97 unique id migration. Old ones used the device serial number
|
||||||
# (see comments in HuaweiLteData._setup_lte for more info), as well as
|
# (see comments in HuaweiLteData._setup_lte for more info), as well as
|
||||||
@ -134,7 +137,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
|
|||||||
if ent.platform != DOMAIN:
|
if ent.platform != DOMAIN:
|
||||||
continue
|
continue
|
||||||
for sensor in sensors:
|
for sensor in sensors:
|
||||||
oldsuf = ".".join(sensor.path)
|
oldsuf = ".".join(f"{sensor.key}.{sensor.item}")
|
||||||
if ent.unique_id.endswith(f"_{oldsuf}"):
|
if ent.unique_id.endswith(f"_{oldsuf}"):
|
||||||
entreg.async_update_entity(entid, new_unique_id=sensor.unique_id)
|
entreg.async_update_entity(entid, new_unique_id=sensor.unique_id)
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
@ -162,30 +165,33 @@ def format_default(value):
|
|||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class HuaweiLteSensor(Entity):
|
class HuaweiLteSensor(HuaweiLteBaseEntity):
|
||||||
"""Huawei LTE sensor entity."""
|
"""Huawei LTE sensor entity."""
|
||||||
|
|
||||||
data = attr.ib(type=RouterData)
|
key: str = attr.ib()
|
||||||
path = attr.ib(type=str)
|
item: str = attr.ib()
|
||||||
meta = attr.ib(type=dict)
|
meta: dict = attr.ib()
|
||||||
|
|
||||||
_state = attr.ib(init=False, default=STATE_UNKNOWN)
|
_state = attr.ib(init=False, default=STATE_UNKNOWN)
|
||||||
_unit = attr.ib(init=False, type=str)
|
_unit: str = attr.ib(init=False)
|
||||||
|
|
||||||
|
async def async_added_to_hass(self):
|
||||||
|
"""Subscribe to needed data on add."""
|
||||||
|
await super().async_added_to_hass()
|
||||||
|
self.router.subscriptions[self.key].add(f"{SENSOR_DOMAIN}/{self.item}")
|
||||||
|
|
||||||
|
async def async_will_remove_from_hass(self):
|
||||||
|
"""Unsubscribe from needed data on remove."""
|
||||||
|
await super().async_will_remove_from_hass()
|
||||||
|
self.router.subscriptions[self.key].remove(f"{SENSOR_DOMAIN}/{self.item}")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def unique_id(self) -> str:
|
def _entity_name(self) -> str:
|
||||||
"""Return unique ID for sensor."""
|
return self.meta.get("name", self.item)
|
||||||
return f"{self.data.mac}-{self.path}"
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self) -> str:
|
def _device_unique_id(self) -> str:
|
||||||
"""Return sensor name."""
|
return f"{self.key}.{self.item}"
|
||||||
try:
|
|
||||||
dname = self.data[f"{KEY_DEVICE_INFORMATION}.DeviceName"]
|
|
||||||
except KeyError:
|
|
||||||
dname = None
|
|
||||||
vname = self.meta.get("name", self.path)
|
|
||||||
return DEFAULT_NAME_TEMPLATE.format(dname or DEFAULT_DEVICE_NAME, vname)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self):
|
def state(self):
|
||||||
@ -210,18 +216,31 @@ class HuaweiLteSensor(Entity):
|
|||||||
return icon(self.state)
|
return icon(self.state)
|
||||||
return icon
|
return icon
|
||||||
|
|
||||||
def update(self):
|
@property
|
||||||
"""Update state."""
|
def entity_registry_enabled_default(self) -> bool:
|
||||||
self.data.update()
|
"""Return if the entity should be enabled when first added to the entity registry."""
|
||||||
|
return bool(self.meta.get("enabled_default"))
|
||||||
|
|
||||||
|
async def async_update(self):
|
||||||
|
"""Update state."""
|
||||||
try:
|
try:
|
||||||
value = self.data[self.path]
|
value = self.router.data[self.key][self.item]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
_LOGGER.debug("%s not in data", self.path)
|
_LOGGER.debug("%s[%s] not in data", self.key, self.item)
|
||||||
value = None
|
self._available = False
|
||||||
|
return
|
||||||
|
self._available = True
|
||||||
|
|
||||||
formatter = self.meta.get("formatter")
|
formatter = self.meta.get("formatter")
|
||||||
if not callable(formatter):
|
if not callable(formatter):
|
||||||
formatter = format_default
|
formatter = format_default
|
||||||
|
|
||||||
self._state, self._unit = formatter(value)
|
self._state, self._unit = formatter(value)
|
||||||
|
|
||||||
|
|
||||||
|
async def async_setup_platform(*args, **kwargs):
|
||||||
|
"""Old no longer used way to set up Huawei LTE sensors."""
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Loading and configuring as a platform is no longer supported or "
|
||||||
|
"required, convert to enabling/disabling available entities"
|
||||||
|
)
|
||||||
|
39
homeassistant/components/huawei_lte/strings.json
Normal file
39
homeassistant/components/huawei_lte/strings.json
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
{
|
||||||
|
"config": {
|
||||||
|
"abort": {
|
||||||
|
"already_configured": "This device is already configured"
|
||||||
|
},
|
||||||
|
"error": {
|
||||||
|
"connection_failed": "Connection failed",
|
||||||
|
"incorrect_password": "Incorrect password",
|
||||||
|
"incorrect_username": "Incorrect username",
|
||||||
|
"incorrect_username_or_password": "Incorrect username or password",
|
||||||
|
"invalid_url": "Invalid URL",
|
||||||
|
"login_attempts_exceeded": "Maximum login attempts exceeded, please try again later",
|
||||||
|
"response_error": "Unknown error from device",
|
||||||
|
"unknown_connection_error": "Unknown error connecting to device"
|
||||||
|
},
|
||||||
|
"step": {
|
||||||
|
"user": {
|
||||||
|
"data": {
|
||||||
|
"password": "Password",
|
||||||
|
"url": "URL",
|
||||||
|
"username": "User name"
|
||||||
|
},
|
||||||
|
"description": "Enter device access details. Specifying username and password is optional, but enables support for more integration features. On the other hand, use of an authorized connection may cause problems accessing the device web interface from outside Home Assistant while the integration is active, and the other way around.",
|
||||||
|
"title": "Configure Huawei LTE"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"title": "Huawei LTE"
|
||||||
|
},
|
||||||
|
"options": {
|
||||||
|
"step": {
|
||||||
|
"init": {
|
||||||
|
"data": {
|
||||||
|
"recipient": "SMS notification recipients",
|
||||||
|
"track_new_devices": "Track new devices"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -29,6 +29,7 @@ FLOWS = [
|
|||||||
"heos",
|
"heos",
|
||||||
"homekit_controller",
|
"homekit_controller",
|
||||||
"homematicip_cloud",
|
"homematicip_cloud",
|
||||||
|
"huawei_lte",
|
||||||
"hue",
|
"hue",
|
||||||
"iaqualink",
|
"iaqualink",
|
||||||
"ifttt",
|
"ifttt",
|
||||||
|
@ -665,7 +665,7 @@ horimote==0.4.1
|
|||||||
httplib2==0.10.3
|
httplib2==0.10.3
|
||||||
|
|
||||||
# homeassistant.components.huawei_lte
|
# homeassistant.components.huawei_lte
|
||||||
huawei-lte-api==1.3.0
|
huawei-lte-api==1.4.3
|
||||||
|
|
||||||
# homeassistant.components.hydrawise
|
# homeassistant.components.hydrawise
|
||||||
hydrawiser==0.1.1
|
hydrawiser==0.1.1
|
||||||
@ -1837,6 +1837,7 @@ steamodd==4.21
|
|||||||
# homeassistant.components.streamlabswater
|
# homeassistant.components.streamlabswater
|
||||||
streamlabswater==1.0.1
|
streamlabswater==1.0.1
|
||||||
|
|
||||||
|
# homeassistant.components.huawei_lte
|
||||||
# homeassistant.components.solaredge
|
# homeassistant.components.solaredge
|
||||||
# homeassistant.components.thermoworks_smoke
|
# homeassistant.components.thermoworks_smoke
|
||||||
# homeassistant.components.traccar
|
# homeassistant.components.traccar
|
||||||
@ -1923,6 +1924,9 @@ unifiled==0.10
|
|||||||
# homeassistant.components.upcloud
|
# homeassistant.components.upcloud
|
||||||
upcloud-api==0.4.3
|
upcloud-api==0.4.3
|
||||||
|
|
||||||
|
# homeassistant.components.huawei_lte
|
||||||
|
url-normalize==1.4.1
|
||||||
|
|
||||||
# homeassistant.components.uscis
|
# homeassistant.components.uscis
|
||||||
uscisstatus==0.1.1
|
uscisstatus==0.1.1
|
||||||
|
|
||||||
|
@ -258,7 +258,7 @@ homematicip==0.10.13
|
|||||||
httplib2==0.10.3
|
httplib2==0.10.3
|
||||||
|
|
||||||
# homeassistant.components.huawei_lte
|
# homeassistant.components.huawei_lte
|
||||||
huawei-lte-api==1.3.0
|
huawei-lte-api==1.4.3
|
||||||
|
|
||||||
# homeassistant.components.iaqualink
|
# homeassistant.components.iaqualink
|
||||||
iaqualink==0.2.9
|
iaqualink==0.2.9
|
||||||
@ -581,6 +581,7 @@ sqlalchemy==1.3.10
|
|||||||
# homeassistant.components.statsd
|
# homeassistant.components.statsd
|
||||||
statsd==3.2.1
|
statsd==3.2.1
|
||||||
|
|
||||||
|
# homeassistant.components.huawei_lte
|
||||||
# homeassistant.components.solaredge
|
# homeassistant.components.solaredge
|
||||||
# homeassistant.components.thermoworks_smoke
|
# homeassistant.components.thermoworks_smoke
|
||||||
# homeassistant.components.traccar
|
# homeassistant.components.traccar
|
||||||
@ -604,6 +605,9 @@ twentemilieu==0.1.0
|
|||||||
# homeassistant.components.twilio
|
# homeassistant.components.twilio
|
||||||
twilio==6.32.0
|
twilio==6.32.0
|
||||||
|
|
||||||
|
# homeassistant.components.huawei_lte
|
||||||
|
url-normalize==1.4.1
|
||||||
|
|
||||||
# homeassistant.components.uvc
|
# homeassistant.components.uvc
|
||||||
uvcclient==0.11.0
|
uvcclient==0.11.0
|
||||||
|
|
||||||
|
140
tests/components/huawei_lte/test_config_flow.py
Normal file
140
tests/components/huawei_lte/test_config_flow.py
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
"""Tests for the Huawei LTE config flow."""
|
||||||
|
|
||||||
|
from huawei_lte_api.enums.client import ResponseCodeEnum
|
||||||
|
from huawei_lte_api.enums.user import LoginErrorEnum, LoginStateEnum, PasswordTypeEnum
|
||||||
|
from requests_mock import ANY
|
||||||
|
from requests.exceptions import ConnectionError
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from homeassistant import data_entry_flow
|
||||||
|
from homeassistant.const import CONF_USERNAME, CONF_PASSWORD, CONF_URL
|
||||||
|
from homeassistant.components.huawei_lte.const import DOMAIN
|
||||||
|
from homeassistant.components.huawei_lte.config_flow import ConfigFlowHandler
|
||||||
|
from tests.common import MockConfigEntry
|
||||||
|
|
||||||
|
|
||||||
|
FIXTURE_USER_INPUT = {
|
||||||
|
CONF_URL: "http://192.168.1.1/",
|
||||||
|
CONF_USERNAME: "admin",
|
||||||
|
CONF_PASSWORD: "secret",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_show_set_form(hass):
|
||||||
|
"""Test that the setup form is served."""
|
||||||
|
flow = ConfigFlowHandler()
|
||||||
|
flow.hass = hass
|
||||||
|
result = await flow.async_step_user(user_input=None)
|
||||||
|
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_urlize_plain_host(hass, requests_mock):
|
||||||
|
"""Test that plain host or IP gets converted to a URL."""
|
||||||
|
requests_mock.request(ANY, ANY, exc=ConnectionError())
|
||||||
|
flow = ConfigFlowHandler()
|
||||||
|
flow.hass = hass
|
||||||
|
host = "192.168.100.1"
|
||||||
|
user_input = {**FIXTURE_USER_INPUT, CONF_URL: host}
|
||||||
|
result = await flow.async_step_user(user_input=user_input)
|
||||||
|
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
assert user_input[CONF_URL] == f"http://{host}/"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_already_configured(hass):
|
||||||
|
"""Test we reject already configured devices."""
|
||||||
|
MockConfigEntry(
|
||||||
|
domain=DOMAIN, data=FIXTURE_USER_INPUT, title="Already configured"
|
||||||
|
).add_to_hass(hass)
|
||||||
|
|
||||||
|
flow = ConfigFlowHandler()
|
||||||
|
flow.hass = hass
|
||||||
|
# Tweak URL a bit to check that doesn't fail duplicate detection
|
||||||
|
result = await flow.async_step_user(
|
||||||
|
user_input={
|
||||||
|
**FIXTURE_USER_INPUT,
|
||||||
|
CONF_URL: FIXTURE_USER_INPUT[CONF_URL].replace("http", "HTTP"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||||
|
assert result["reason"] == "already_configured"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_connection_error(hass, requests_mock):
|
||||||
|
"""Test we show user form on connection error."""
|
||||||
|
|
||||||
|
requests_mock.request(ANY, ANY, exc=ConnectionError())
|
||||||
|
flow = ConfigFlowHandler()
|
||||||
|
flow.hass = hass
|
||||||
|
result = await flow.async_step_user(user_input=FIXTURE_USER_INPUT)
|
||||||
|
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
assert result["errors"] == {CONF_URL: "unknown_connection_error"}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def login_requests_mock(requests_mock):
|
||||||
|
"""Set up a requests_mock with base mocks for login tests."""
|
||||||
|
requests_mock.request(
|
||||||
|
ANY, FIXTURE_USER_INPUT[CONF_URL], text='<meta name="csrf_token" content="x"/>'
|
||||||
|
)
|
||||||
|
requests_mock.request(
|
||||||
|
ANY,
|
||||||
|
f"{FIXTURE_USER_INPUT[CONF_URL]}api/user/state-login",
|
||||||
|
text=(
|
||||||
|
f"<response><State>{LoginStateEnum.LOGGED_OUT}</State>"
|
||||||
|
f"<password_type>{PasswordTypeEnum.SHA256}</password_type></response>"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return requests_mock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("code", "errors"),
|
||||||
|
(
|
||||||
|
(LoginErrorEnum.USERNAME_WRONG, {CONF_USERNAME: "incorrect_username"}),
|
||||||
|
(LoginErrorEnum.PASSWORD_WRONG, {CONF_PASSWORD: "incorrect_password"}),
|
||||||
|
(
|
||||||
|
LoginErrorEnum.USERNAME_PWD_WRONG,
|
||||||
|
{CONF_USERNAME: "incorrect_username_or_password"},
|
||||||
|
),
|
||||||
|
(LoginErrorEnum.USERNAME_PWD_ORERRUN, {"base": "login_attempts_exceeded"}),
|
||||||
|
(ResponseCodeEnum.ERROR_SYSTEM_UNKNOWN, {"base": "response_error"}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
async def test_login_error(hass, login_requests_mock, code, errors):
|
||||||
|
"""Test we show user form with appropriate error on response failure."""
|
||||||
|
login_requests_mock.request(
|
||||||
|
ANY,
|
||||||
|
f"{FIXTURE_USER_INPUT[CONF_URL]}api/user/login",
|
||||||
|
text=f"<error><code>{code}</code><message/></error>",
|
||||||
|
)
|
||||||
|
flow = ConfigFlowHandler()
|
||||||
|
flow.hass = hass
|
||||||
|
result = await flow.async_step_user(user_input=FIXTURE_USER_INPUT)
|
||||||
|
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
assert result["errors"] == errors
|
||||||
|
|
||||||
|
|
||||||
|
async def test_success(hass, login_requests_mock):
|
||||||
|
"""Test successful flow provides entry creation data."""
|
||||||
|
login_requests_mock.request(
|
||||||
|
ANY,
|
||||||
|
f"{FIXTURE_USER_INPUT[CONF_URL]}api/user/login",
|
||||||
|
text=f"<response>OK</response>",
|
||||||
|
)
|
||||||
|
flow = ConfigFlowHandler()
|
||||||
|
flow.hass = hass
|
||||||
|
result = await flow.async_step_user(user_input=FIXTURE_USER_INPUT)
|
||||||
|
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||||
|
assert result["data"][CONF_URL] == FIXTURE_USER_INPUT[CONF_URL]
|
||||||
|
assert result["data"][CONF_USERNAME] == FIXTURE_USER_INPUT[CONF_USERNAME]
|
||||||
|
assert result["data"][CONF_PASSWORD] == FIXTURE_USER_INPUT[CONF_PASSWORD]
|
20
tests/components/huawei_lte/test_device_tracker.py
Normal file
20
tests/components/huawei_lte/test_device_tracker.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
"""Huawei LTE device tracker tests."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from homeassistant.components.huawei_lte import device_tracker
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("value", "expected"),
|
||||||
|
(
|
||||||
|
("HTTP", "http"),
|
||||||
|
("ID", "id"),
|
||||||
|
("IPAddress", "ip_address"),
|
||||||
|
("HTTPResponse", "http_response"),
|
||||||
|
("foo_bar", "foo_bar"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
def test_better_snakecase(value, expected):
|
||||||
|
"""Test that better snakecase works better."""
|
||||||
|
assert device_tracker._better_snakecase(value) == expected
|
@ -1,48 +0,0 @@
|
|||||||
"""Huawei LTE component tests."""
|
|
||||||
from unittest.mock import Mock
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from homeassistant.components import huawei_lte
|
|
||||||
from homeassistant.components.huawei_lte.const import KEY_DEVICE_INFORMATION
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def routerdata():
|
|
||||||
"""Set up a router data for testing."""
|
|
||||||
rd = huawei_lte.RouterData(Mock(), "de:ad:be:ef:00:00")
|
|
||||||
rd.device_information = {"SoftwareVersion": "1.0", "nested": {"foo": "bar"}}
|
|
||||||
return rd
|
|
||||||
|
|
||||||
|
|
||||||
async def test_routerdata_get_nonexistent_root(routerdata):
|
|
||||||
"""Test that accessing a nonexistent root element raises KeyError."""
|
|
||||||
with pytest.raises(KeyError): # NOT AttributeError
|
|
||||||
routerdata["nonexistent_root.foo"]
|
|
||||||
|
|
||||||
|
|
||||||
async def test_routerdata_get_nonexistent_leaf(routerdata):
|
|
||||||
"""Test that accessing a nonexistent leaf element raises KeyError."""
|
|
||||||
with pytest.raises(KeyError):
|
|
||||||
routerdata[f"{KEY_DEVICE_INFORMATION}.foo"]
|
|
||||||
|
|
||||||
|
|
||||||
async def test_routerdata_get_nonexistent_leaf_path(routerdata):
|
|
||||||
"""Test that accessing a nonexistent long path raises KeyError."""
|
|
||||||
with pytest.raises(KeyError):
|
|
||||||
routerdata[f"{KEY_DEVICE_INFORMATION}.long.path.foo"]
|
|
||||||
|
|
||||||
|
|
||||||
async def test_routerdata_get_simple(routerdata):
|
|
||||||
"""Test that accessing a short, simple path works."""
|
|
||||||
assert routerdata[f"{KEY_DEVICE_INFORMATION}.SoftwareVersion"] == "1.0"
|
|
||||||
|
|
||||||
|
|
||||||
async def test_routerdata_get_longer(routerdata):
|
|
||||||
"""Test that accessing a longer path works."""
|
|
||||||
assert routerdata[f"{KEY_DEVICE_INFORMATION}.nested.foo"] == "bar"
|
|
||||||
|
|
||||||
|
|
||||||
async def test_routerdata_get_dict(routerdata):
|
|
||||||
"""Test that returning an intermediate dict works."""
|
|
||||||
assert routerdata[f"{KEY_DEVICE_INFORMATION}.nested"] == {"foo": "bar"}
|
|
Loading…
x
Reference in New Issue
Block a user