mirror of
https://github.com/home-assistant/core.git
synced 2025-11-14 21:40:16 +00:00
Compare commits
11 Commits
claude/tri
...
otbr_handl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
84fb87935d | ||
|
|
876ff17230 | ||
|
|
8eb0e77a7a | ||
|
|
69e5bda0bd | ||
|
|
1cf49096ce | ||
|
|
712a5d6e21 | ||
|
|
303c7fc934 | ||
|
|
56b031e858 | ||
|
|
7402e91b23 | ||
|
|
cf76305f81 | ||
|
|
69e9f83f05 |
@@ -2,9 +2,10 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from contextlib import suppress
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, cast
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
import aiohttp
|
||||
import python_otbr_api
|
||||
@@ -13,9 +14,16 @@ from python_otbr_api.tlv_parser import MeshcopTLVType
|
||||
import voluptuous as vol
|
||||
import yarl
|
||||
|
||||
from homeassistant.components.hassio import AddonError, AddonManager
|
||||
from homeassistant.components.hassio import (
|
||||
AddonError,
|
||||
AddonInfo,
|
||||
AddonManager,
|
||||
AddonState,
|
||||
)
|
||||
from homeassistant.components.homeassistant_hardware.util import get_otbr_addon_manager
|
||||
from homeassistant.components.homeassistant_yellow import hardware as yellow_hardware
|
||||
from homeassistant.components.thread import async_get_preferred_dataset
|
||||
from homeassistant.components.usb import async_get_usb_ports
|
||||
from homeassistant.config_entries import (
|
||||
SOURCE_HASSIO,
|
||||
ConfigEntryState,
|
||||
@@ -24,8 +32,10 @@ from homeassistant.config_entries import (
|
||||
)
|
||||
from homeassistant.const import CONF_URL
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.data_entry_flow import AbortFlow
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.hassio import is_hassio
|
||||
from homeassistant.helpers.service_info.hassio import HassioServiceInfo
|
||||
|
||||
from .const import DEFAULT_CHANNEL, DOMAIN
|
||||
@@ -41,6 +51,13 @@ if TYPE_CHECKING:
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
CONF_DEVICE = "device"
|
||||
|
||||
# Timeout and retry constants for OTBR connection
|
||||
OTBR_CONNECTION_TIMEOUT = 10.0
|
||||
OTBR_RETRY_BACKOFF = 0.5
|
||||
|
||||
|
||||
class AlreadyConfigured(HomeAssistantError):
|
||||
"""Raised when the router is already configured."""
|
||||
|
||||
@@ -83,6 +100,17 @@ class OTBRConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
|
||||
VERSION = 1
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""Instantiate otbr config flow."""
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self._device: str | None = None
|
||||
self._url: str | None = None # used only in recommended setup
|
||||
|
||||
self._addon_install_task: asyncio.Task | None = None
|
||||
self._addon_start_task: asyncio.Task | None = None
|
||||
self._addon_connect_task: asyncio.Task[bytes] | None = None
|
||||
|
||||
async def _set_dataset(self, api: python_otbr_api.OTBR, otbr_url: str) -> None:
|
||||
"""Connect to the OTBR and create or apply a dataset if it doesn't have one."""
|
||||
if await api.get_active_dataset_tlvs() is None:
|
||||
@@ -143,7 +171,7 @@ class OTBRConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
Returns the router's border agent id.
|
||||
"""
|
||||
api = python_otbr_api.OTBR(otbr_url, async_get_clientsession(self.hass), 10)
|
||||
border_agent_id = await api.get_border_agent_id()
|
||||
border_agent_id: bytes = await api.get_border_agent_id()
|
||||
_LOGGER.debug("border agent id for url %s: %s", otbr_url, border_agent_id.hex())
|
||||
|
||||
if await self._is_border_agent_id_configured(border_agent_id):
|
||||
@@ -153,10 +181,257 @@ class OTBRConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
|
||||
return border_agent_id
|
||||
|
||||
async def _connect_with_retry(self, url: str) -> bytes:
|
||||
"""Connect to OTBR with retry logic for up to 10 seconds."""
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
elapsed_time = asyncio.get_event_loop().time() - start_time
|
||||
|
||||
if elapsed_time >= OTBR_CONNECTION_TIMEOUT:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to connect to OTBR after {OTBR_CONNECTION_TIMEOUT} seconds"
|
||||
)
|
||||
|
||||
try:
|
||||
return await self._connect_and_configure_router(url)
|
||||
except aiohttp.ClientConnectionError as exc:
|
||||
_LOGGER.debug(
|
||||
"ClientConnectorError after %.2f seconds, retrying in %.1fs: %s",
|
||||
elapsed_time,
|
||||
OTBR_RETRY_BACKOFF,
|
||||
exc,
|
||||
)
|
||||
await asyncio.sleep(OTBR_RETRY_BACKOFF)
|
||||
|
||||
async def _async_get_addon_info(self, addon_manager: AddonManager) -> AddonInfo:
|
||||
"""Return add-on info."""
|
||||
try:
|
||||
addon_info = await addon_manager.async_get_addon_info()
|
||||
except AddonError as err:
|
||||
_LOGGER.error(err)
|
||||
raise AbortFlow(
|
||||
"addon_info_failed",
|
||||
description_placeholders={
|
||||
"addon_name": addon_manager.addon_name,
|
||||
},
|
||||
) from err
|
||||
|
||||
return addon_info
|
||||
|
||||
async def _async_configure_start_addon(self) -> None:
|
||||
"""Configure the start addon."""
|
||||
otbr_manager = get_otbr_addon_manager(self.hass)
|
||||
addon_info = await self._async_get_addon_info(otbr_manager)
|
||||
|
||||
assert self._device is not None
|
||||
new_addon_config = {
|
||||
**addon_info.options,
|
||||
"device": self._device,
|
||||
"autoflash_firmware": False,
|
||||
}
|
||||
|
||||
_LOGGER.debug("Reconfiguring OTBR addon with %s", new_addon_config)
|
||||
|
||||
try:
|
||||
await otbr_manager.async_set_addon_options(new_addon_config)
|
||||
except AddonError as err:
|
||||
_LOGGER.error(err)
|
||||
raise AbortFlow(
|
||||
"addon_set_config_failed",
|
||||
description_placeholders={
|
||||
"addon_name": otbr_manager.addon_name,
|
||||
},
|
||||
) from err
|
||||
|
||||
await otbr_manager.async_start_addon_waiting()
|
||||
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Set up by user."""
|
||||
if not is_hassio(self.hass):
|
||||
# skip to url step if not hassio
|
||||
return await self.async_step_url()
|
||||
|
||||
return self.async_show_menu(
|
||||
step_id="user",
|
||||
menu_options=["recommended", "url"],
|
||||
)
|
||||
|
||||
async def async_step_recommended(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Select usb device."""
|
||||
if self._device:
|
||||
# device already set, skip to addon step
|
||||
return await self.async_step_addon()
|
||||
|
||||
if user_input is not None and (
|
||||
usb_path := user_input.get(CONF_DEVICE, "").strip()
|
||||
):
|
||||
self._device = usb_path
|
||||
return await self.async_step_addon()
|
||||
|
||||
try:
|
||||
ports = await async_get_usb_ports(self.hass)
|
||||
except OSError as err:
|
||||
_LOGGER.error("Failed to get USB ports: %s", err)
|
||||
return self.async_abort(reason="usb_ports_failed")
|
||||
|
||||
data_schema = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_DEVICE): vol.In(ports),
|
||||
}
|
||||
)
|
||||
return self.async_show_form(step_id="recommended", data_schema=data_schema)
|
||||
|
||||
async def async_step_addon(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Set up the addon."""
|
||||
otbr_manager = get_otbr_addon_manager(self.hass)
|
||||
addon_info = await self._async_get_addon_info(otbr_manager)
|
||||
|
||||
if addon_info.state == AddonState.NOT_INSTALLED:
|
||||
return await self.async_step_install_addon()
|
||||
|
||||
if addon_info.state == AddonState.RUNNING:
|
||||
await otbr_manager.async_stop_addon()
|
||||
|
||||
return await self.async_step_start_addon()
|
||||
|
||||
async def async_step_install_addon(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Show progress dialog for installing the OTBR addon."""
|
||||
addon_manager = get_otbr_addon_manager(self.hass)
|
||||
addon_info = await self._async_get_addon_info(addon_manager)
|
||||
|
||||
_LOGGER.debug("OTBR addon info: %s", addon_info)
|
||||
|
||||
if not self._addon_install_task:
|
||||
self._addon_install_task = self.hass.async_create_task(
|
||||
addon_manager.async_install_addon_waiting(),
|
||||
"OTBR addon install",
|
||||
)
|
||||
|
||||
if not self._addon_install_task.done():
|
||||
return self.async_show_progress(
|
||||
step_id="install_addon",
|
||||
progress_action="install_addon",
|
||||
description_placeholders={
|
||||
"addon_name": addon_manager.addon_name,
|
||||
},
|
||||
progress_task=self._addon_install_task,
|
||||
)
|
||||
|
||||
try:
|
||||
await self._addon_install_task
|
||||
except AddonError as err:
|
||||
_LOGGER.error(err)
|
||||
return self.async_abort(
|
||||
reason="addon_install_failed",
|
||||
description_placeholders={
|
||||
"addon_name": addon_manager.addon_name,
|
||||
},
|
||||
)
|
||||
finally:
|
||||
self._addon_install_task = None
|
||||
|
||||
return self.async_show_progress_done(next_step_id="addon")
|
||||
|
||||
async def async_step_start_addon(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Configure OTBR to point to the SkyConnect and run the addon."""
|
||||
otbr_manager = get_otbr_addon_manager(self.hass)
|
||||
|
||||
if not self._addon_start_task:
|
||||
self._addon_start_task = self.hass.async_create_task(
|
||||
self._async_configure_start_addon()
|
||||
)
|
||||
|
||||
if not self._addon_start_task.done():
|
||||
return self.async_show_progress(
|
||||
step_id="start_addon",
|
||||
progress_action="start_addon",
|
||||
description_placeholders={
|
||||
"addon_name": otbr_manager.addon_name,
|
||||
},
|
||||
progress_task=self._addon_start_task,
|
||||
)
|
||||
|
||||
try:
|
||||
await self._addon_start_task
|
||||
except (AddonError, AbortFlow) as err:
|
||||
_LOGGER.error(err)
|
||||
return self.async_abort(
|
||||
reason="addon_start_failed",
|
||||
description_placeholders={
|
||||
"addon_name": otbr_manager.addon_name,
|
||||
},
|
||||
)
|
||||
finally:
|
||||
self._addon_start_task = None
|
||||
|
||||
return self.async_show_progress_done(next_step_id="connect_otbr")
|
||||
|
||||
async def async_step_connect_otbr(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Connect to OTBR with retry logic."""
|
||||
otbr_manager = get_otbr_addon_manager(self.hass)
|
||||
addon_info = await self._async_get_addon_info(otbr_manager)
|
||||
self._url = f"http://{addon_info.hostname}:8081"
|
||||
|
||||
if not self._addon_connect_task:
|
||||
self._addon_connect_task = self.hass.async_create_task(
|
||||
self._connect_with_retry(self._url)
|
||||
)
|
||||
|
||||
if not self._addon_connect_task.done():
|
||||
return self.async_show_progress(
|
||||
step_id="connect_otbr",
|
||||
progress_action="connect_otbr",
|
||||
description_placeholders={
|
||||
"addon_name": otbr_manager.addon_name,
|
||||
},
|
||||
progress_task=self._addon_connect_task,
|
||||
)
|
||||
|
||||
try:
|
||||
border_agent_id = await self._addon_connect_task
|
||||
except AlreadyConfigured:
|
||||
return self.async_abort(reason="already_configured")
|
||||
except (
|
||||
python_otbr_api.OTBRError,
|
||||
aiohttp.ClientError,
|
||||
TimeoutError,
|
||||
HomeAssistantError,
|
||||
) as exc:
|
||||
_LOGGER.warning("Failed to communicate with OTBR@%s: %s", self._url, exc)
|
||||
return self.async_abort(reason="unknown")
|
||||
finally:
|
||||
self._addon_connect_task = None
|
||||
|
||||
await self.async_set_unique_id(border_agent_id.hex())
|
||||
return self.async_show_progress_done(next_step_id="addon_done")
|
||||
|
||||
async def async_step_addon_done(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Add-on done."""
|
||||
config_entry_data = {"url": self._url, "device": self._device}
|
||||
return self.async_create_entry(
|
||||
title="Open Thread Border Router",
|
||||
data=config_entry_data,
|
||||
)
|
||||
|
||||
async def async_step_url(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Custom step to set up by URL."""
|
||||
errors = {}
|
||||
|
||||
if user_input is not None:
|
||||
@@ -181,7 +456,7 @@ class OTBRConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
|
||||
data_schema = vol.Schema({CONF_URL: str})
|
||||
return self.async_show_form(
|
||||
step_id="user", data_schema=data_schema, errors=errors
|
||||
step_id="url", data_schema=data_schema, errors=errors
|
||||
)
|
||||
|
||||
async def async_step_hassio(
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
"after_dependencies": ["hassio", "homeassistant_yellow", "zha"],
|
||||
"codeowners": ["@home-assistant/core"],
|
||||
"config_flow": true,
|
||||
"dependencies": ["homeassistant_hardware", "thread"],
|
||||
"dependencies": ["homeassistant_hardware", "thread", "usb"],
|
||||
"documentation": "https://www.home-assistant.io/integrations/otbr",
|
||||
"integration_type": "service",
|
||||
"iot_class": "local_polling",
|
||||
"requirements": ["python-otbr-api==2.7.0"]
|
||||
"requirements": ["pyserial==3.5", "python-otbr-api==2.7.0"]
|
||||
}
|
||||
|
||||
@@ -2,20 +2,40 @@
|
||||
"config": {
|
||||
"step": {
|
||||
"user": {
|
||||
"menu_options": {
|
||||
"recommended": "Recommended installation",
|
||||
"url": "Custom"
|
||||
},
|
||||
"menu_option_descriptions": {
|
||||
"recommended": "Set up the OTBR add-on",
|
||||
"url": "Manually set the OTBR URL"
|
||||
},
|
||||
"description": "Please choose a setup process"
|
||||
},
|
||||
"url": {
|
||||
"data": {
|
||||
"url": "[%key:common::config_flow::data::url%]"
|
||||
},
|
||||
"description": "Provide URL for the OpenThread Border Router's REST API"
|
||||
"description": "Provide the URL for the OpenThread Border Router's REST API"
|
||||
}
|
||||
},
|
||||
"progress": {
|
||||
"start_addon": "Starting OTBR add-on",
|
||||
"install_addon": "Installing OTBR add-on",
|
||||
"connect_otbr": "Connecting to OTBR add-on"
|
||||
},
|
||||
"error": {
|
||||
"already_configured": "The Thread border router is already configured",
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "The Thread border router is already configured",
|
||||
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]",
|
||||
"unknown": "[%key:common::config_flow::error::unknown%]"
|
||||
"addon_info_failed": "[%key:component::homeassistant_hardware::silabs_multiprotocol_hardware::options::abort::addon_info_failed%]",
|
||||
"addon_install_failed": "[%key:component::homeassistant_hardware::silabs_multiprotocol_hardware::options::abort::addon_install_failed%]",
|
||||
"addon_start_failed": "[%key:component::homeassistant_hardware::silabs_multiprotocol_hardware::options::abort::addon_start_failed%]",
|
||||
"addon_set_config_failed": "[%key:component::homeassistant_hardware::silabs_multiprotocol_hardware::options::abort::addon_set_config_failed%]",
|
||||
"unknown": "[%key:common::config_flow::error::unknown%]",
|
||||
"usb_ports_failed": "Failed to get USB devices."
|
||||
}
|
||||
},
|
||||
"issues": {
|
||||
|
||||
@@ -14,6 +14,7 @@ import sys
|
||||
from typing import Any, overload
|
||||
|
||||
from aiousbwatcher import AIOUSBWatcher, InotifyNotAvailableError
|
||||
from serial.tools import list_ports
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
@@ -41,10 +42,7 @@ from homeassistant.loader import USBMatcher, async_get_usb
|
||||
|
||||
from .const import DOMAIN
|
||||
from .models import USBDevice
|
||||
from .utils import (
|
||||
scan_serial_ports,
|
||||
usb_device_from_port, # noqa: F401
|
||||
)
|
||||
from .utils import scan_serial_ports, usb_device_from_port
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -529,6 +527,50 @@ async def websocket_usb_scan(
|
||||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
def get_usb_ports() -> dict[str, str]:
|
||||
"""Return a dict of USB ports and their friendly names."""
|
||||
|
||||
ports = list_ports.comports()
|
||||
port_descriptions = {}
|
||||
for port in ports:
|
||||
vid: str | None = None
|
||||
pid: str | None = None
|
||||
if port.vid is not None and port.pid is not None:
|
||||
usb_device = usb_device_from_port(port)
|
||||
vid = usb_device.vid
|
||||
pid = usb_device.pid
|
||||
|
||||
dev_path = get_serial_by_id(port.device)
|
||||
human_name = human_readable_device_name(
|
||||
dev_path,
|
||||
port.serial_number,
|
||||
port.manufacturer,
|
||||
port.description,
|
||||
vid,
|
||||
pid,
|
||||
)
|
||||
port_descriptions[dev_path] = human_name
|
||||
|
||||
# Filter out "n/a" descriptions only if there are other ports available
|
||||
non_na_ports = {
|
||||
path: desc
|
||||
for path, desc in port_descriptions.items()
|
||||
if not desc.lower().startswith("n/a")
|
||||
}
|
||||
|
||||
# If we have non-"n/a" ports, return only those; otherwise return all ports as-is
|
||||
return non_na_ports if non_na_ports else port_descriptions
|
||||
|
||||
|
||||
async def async_get_usb_ports(hass: HomeAssistant) -> dict[str, str]:
|
||||
"""Return a dict of USB ports and their friendly names."""
|
||||
try:
|
||||
return await hass.async_add_executor_job(get_usb_ports)
|
||||
except OSError:
|
||||
_LOGGER.warning("Failed to scan USB ports", exc_info=True)
|
||||
return {}
|
||||
|
||||
|
||||
# These can be removed if no deprecated constant are in this module anymore
|
||||
__getattr__ = partial(check_if_deprecated_constant, module_globals=globals())
|
||||
__dir__ = partial(
|
||||
|
||||
@@ -11,7 +11,6 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from awesomeversion import AwesomeVersion
|
||||
from serial.tools import list_ports
|
||||
import voluptuous as vol
|
||||
from zwave_js_server.client import Client
|
||||
from zwave_js_server.exceptions import FailedCommand
|
||||
@@ -25,6 +24,7 @@ from homeassistant.components.hassio import (
|
||||
AddonManager,
|
||||
AddonState,
|
||||
)
|
||||
from homeassistant.components.usb import async_get_usb_ports
|
||||
from homeassistant.config_entries import (
|
||||
SOURCE_USB,
|
||||
ConfigEntryState,
|
||||
@@ -145,44 +145,6 @@ async def validate_input(hass: HomeAssistant, user_input: dict) -> VersionInfo:
|
||||
raise InvalidInput("cannot_connect") from err
|
||||
|
||||
|
||||
def get_usb_ports() -> dict[str, str]:
|
||||
"""Return a dict of USB ports and their friendly names."""
|
||||
ports = list_ports.comports()
|
||||
port_descriptions = {}
|
||||
for port in ports:
|
||||
vid: str | None = None
|
||||
pid: str | None = None
|
||||
if port.vid is not None and port.pid is not None:
|
||||
usb_device = usb.usb_device_from_port(port)
|
||||
vid = usb_device.vid
|
||||
pid = usb_device.pid
|
||||
dev_path = usb.get_serial_by_id(port.device)
|
||||
human_name = usb.human_readable_device_name(
|
||||
dev_path,
|
||||
port.serial_number,
|
||||
port.manufacturer,
|
||||
port.description,
|
||||
vid,
|
||||
pid,
|
||||
)
|
||||
port_descriptions[dev_path] = human_name
|
||||
|
||||
# Filter out "n/a" descriptions only if there are other ports available
|
||||
non_na_ports = {
|
||||
path: desc
|
||||
for path, desc in port_descriptions.items()
|
||||
if not desc.lower().startswith("n/a")
|
||||
}
|
||||
|
||||
# If we have non-"n/a" ports, return only those; otherwise return all ports as-is
|
||||
return non_na_ports if non_na_ports else port_descriptions
|
||||
|
||||
|
||||
async def async_get_usb_ports(hass: HomeAssistant) -> dict[str, str]:
|
||||
"""Return a dict of USB ports and their friendly names."""
|
||||
return await hass.async_add_executor_job(get_usb_ports)
|
||||
|
||||
|
||||
class ZWaveJSConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for Z-Wave JS."""
|
||||
|
||||
|
||||
1
requirements_all.txt
generated
1
requirements_all.txt
generated
@@ -2334,6 +2334,7 @@ pyserial-asyncio-fast==0.16
|
||||
|
||||
# homeassistant.components.acer_projector
|
||||
# homeassistant.components.crownstone
|
||||
# homeassistant.components.otbr
|
||||
# homeassistant.components.usb
|
||||
# homeassistant.components.zwave_js
|
||||
pyserial==3.5
|
||||
|
||||
1
requirements_test_all.txt
generated
1
requirements_test_all.txt
generated
@@ -1943,6 +1943,7 @@ pysensibo==1.2.1
|
||||
|
||||
# homeassistant.components.acer_projector
|
||||
# homeassistant.components.crownstone
|
||||
# homeassistant.components.otbr
|
||||
# homeassistant.components.usb
|
||||
# homeassistant.components.zwave_js
|
||||
pyserial==3.5
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
204
tests/components/usb/test_ports.py
Normal file
204
tests/components/usb/test_ports.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""Test USB utils."""
|
||||
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from serial.tools.list_ports_common import ListPortInfo
|
||||
|
||||
from homeassistant.components.usb import async_get_usb_ports, get_usb_ports
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
|
||||
async def test_get_usb_ports_with_vid_pid() -> None:
|
||||
"""Test get_usb_ports with VID/PID information."""
|
||||
mock_port = Mock()
|
||||
mock_port.device = "/dev/ttyUSB0"
|
||||
mock_port.serial_number = "12345"
|
||||
mock_port.manufacturer = "Test"
|
||||
mock_port.description = "Valid Device"
|
||||
mock_port.vid = 0x1234
|
||||
mock_port.pid = 0x5678
|
||||
|
||||
mock_usb_device = Mock()
|
||||
mock_usb_device.vid = "1234"
|
||||
mock_usb_device.pid = "5678"
|
||||
|
||||
with (
|
||||
patch("serial.tools.list_ports.comports", return_value=[mock_port]),
|
||||
patch(
|
||||
"homeassistant.components.usb.get_serial_by_id",
|
||||
return_value="/dev/ttyUSB0",
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.usb.utils.usb_device_from_port",
|
||||
return_value=mock_usb_device,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.usb.human_readable_device_name",
|
||||
return_value="Valid Device",
|
||||
),
|
||||
):
|
||||
result = get_usb_ports()
|
||||
assert result == {"/dev/ttyUSB0": "Valid Device"}
|
||||
|
||||
|
||||
async def test_get_usb_ports_filtering_mixed_ports() -> None:
|
||||
"""Test get_usb_ports filtering with mixed valid and 'n/a' ports."""
|
||||
mock_port1 = Mock()
|
||||
mock_port1.device = "/dev/ttyUSB0"
|
||||
mock_port1.serial_number = "12345"
|
||||
mock_port1.manufacturer = "Test"
|
||||
mock_port1.description = "Valid Device"
|
||||
mock_port1.vid = None
|
||||
mock_port1.pid = None
|
||||
|
||||
mock_port2 = Mock()
|
||||
mock_port2.device = "/dev/ttyUSB1"
|
||||
mock_port2.serial_number = "67890"
|
||||
mock_port2.manufacturer = "Test"
|
||||
mock_port2.description = "n/a"
|
||||
mock_port2.vid = None
|
||||
mock_port2.pid = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"serial.tools.list_ports.comports", return_value=[mock_port1, mock_port2]
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.usb.get_serial_by_id",
|
||||
side_effect=["/dev/ttyUSB0", "/dev/ttyUSB1"],
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.usb.human_readable_device_name",
|
||||
side_effect=["Valid Device", "n/a"],
|
||||
),
|
||||
):
|
||||
result = get_usb_ports()
|
||||
# Should filter out the "n/a" port and only return the valid one
|
||||
assert result == {"/dev/ttyUSB0": "Valid Device"}
|
||||
|
||||
|
||||
async def test_get_usb_ports_filtering() -> None:
|
||||
"""Test that get_usb_ports filters out 'n/a' descriptions when other ports are available."""
|
||||
|
||||
mock_ports = [
|
||||
ListPortInfo("/dev/ttyUSB0"),
|
||||
ListPortInfo("/dev/ttyUSB1"),
|
||||
ListPortInfo("/dev/ttyUSB2"),
|
||||
ListPortInfo("/dev/ttyUSB3"),
|
||||
]
|
||||
mock_ports[0].description = "n/a"
|
||||
mock_ports[1].description = "Device A"
|
||||
mock_ports[2].description = "N/A"
|
||||
mock_ports[3].description = "Device B"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that only non-"n/a" descriptions are returned
|
||||
assert descriptions == [
|
||||
"Device A - /dev/ttyUSB1, s/n: n/a",
|
||||
"Device B - /dev/ttyUSB3, s/n: n/a",
|
||||
]
|
||||
|
||||
|
||||
async def test_get_usb_ports_all_na() -> None:
|
||||
"""Test that get_usb_ports returns all ports as-is when only 'n/a' descriptions exist."""
|
||||
|
||||
mock_ports = [
|
||||
ListPortInfo("/dev/ttyUSB0"),
|
||||
ListPortInfo("/dev/ttyUSB1"),
|
||||
ListPortInfo("/dev/ttyUSB2"),
|
||||
]
|
||||
mock_ports[0].description = "n/a"
|
||||
mock_ports[1].description = "N/A"
|
||||
mock_ports[2].description = "n/a"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that all ports are returned since they all have "n/a" descriptions
|
||||
assert len(descriptions) == 3
|
||||
# Verify that all descriptions contain "n/a" (case-insensitive)
|
||||
assert all("n/a" in desc.lower() for desc in descriptions)
|
||||
# Verify that all expected device paths are present
|
||||
device_paths = [desc.split(" - ")[1].split(",")[0] for desc in descriptions]
|
||||
assert "/dev/ttyUSB0" in device_paths
|
||||
assert "/dev/ttyUSB1" in device_paths
|
||||
assert "/dev/ttyUSB2" in device_paths
|
||||
|
||||
|
||||
async def test_get_usb_ports_mixed_case_filtering() -> None:
|
||||
"""Test that get_usb_ports filters out 'n/a' descriptions with different case variations."""
|
||||
|
||||
mock_ports = [
|
||||
ListPortInfo("/dev/ttyUSB0"),
|
||||
ListPortInfo("/dev/ttyUSB1"),
|
||||
ListPortInfo("/dev/ttyUSB2"),
|
||||
ListPortInfo("/dev/ttyUSB3"),
|
||||
]
|
||||
mock_ports[0].description = "n/a"
|
||||
mock_ports[1].description = "Not Available"
|
||||
mock_ports[2].description = "N/A"
|
||||
mock_ports[3].description = "Device B"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that only non-"n/a" descriptions are returned
|
||||
assert descriptions == [
|
||||
"Not Available - /dev/ttyUSB1, s/n: n/a",
|
||||
"Device B - /dev/ttyUSB3, s/n: n/a",
|
||||
]
|
||||
|
||||
|
||||
async def test_get_usb_ports_empty_list() -> None:
|
||||
"""Test that get_usb_ports handles empty port list."""
|
||||
with patch("serial.tools.list_ports.comports", return_value=[]):
|
||||
result = get_usb_ports()
|
||||
assert result == {}
|
||||
|
||||
|
||||
async def test_get_usb_ports_single_na_port() -> None:
|
||||
"""Test that get_usb_ports returns single 'n/a' port when it's the only one available."""
|
||||
|
||||
mock_port = ListPortInfo("/dev/ttyUSB0")
|
||||
mock_port.description = "n/a"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=[mock_port]):
|
||||
result = get_usb_ports()
|
||||
assert len(result) == 1
|
||||
assert "/dev/ttyUSB0" in result
|
||||
assert "n/a" in result["/dev/ttyUSB0"].lower()
|
||||
|
||||
|
||||
async def test_get_usb_ports_single_valid_port() -> None:
|
||||
"""Test that get_usb_ports returns single valid port."""
|
||||
|
||||
mock_port = ListPortInfo("/dev/ttyUSB0")
|
||||
mock_port.description = "Valid Device"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=[mock_port]):
|
||||
result = get_usb_ports()
|
||||
assert len(result) == 1
|
||||
assert "/dev/ttyUSB0" in result
|
||||
assert "Valid Device" in result["/dev/ttyUSB0"]
|
||||
|
||||
|
||||
async def test_async_get_usb_ports_exception_handling(hass: HomeAssistant) -> None:
|
||||
"""Test async_get_usb_ports exception handling."""
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.usb.get_usb_ports",
|
||||
side_effect=OSError("USB scan failed"),
|
||||
),
|
||||
patch("homeassistant.components.usb._LOGGER.warning") as mock_logger,
|
||||
):
|
||||
result = await async_get_usb_ports(hass)
|
||||
assert result == {}
|
||||
mock_logger.assert_called_once()
|
||||
@@ -19,7 +19,7 @@ from zwave_js_server.model.node import Node
|
||||
from zwave_js_server.version import VersionInfo
|
||||
|
||||
from homeassistant import config_entries, data_entry_flow
|
||||
from homeassistant.components.zwave_js.config_flow import TITLE, get_usb_ports
|
||||
from homeassistant.components.zwave_js.config_flow import TITLE
|
||||
from homeassistant.components.zwave_js.const import (
|
||||
ADDON_SLUG,
|
||||
CONF_ADDON_DEVICE,
|
||||
@@ -161,9 +161,7 @@ def serial_port_fixture() -> ListPortInfo:
|
||||
@pytest.fixture(name="mock_list_ports", autouse=True)
|
||||
def mock_list_ports_fixture(serial_port) -> Generator[MagicMock]:
|
||||
"""Mock list ports."""
|
||||
with patch(
|
||||
"homeassistant.components.zwave_js.config_flow.list_ports.comports"
|
||||
) as mock_list_ports:
|
||||
with patch("serial.tools.list_ports.comports") as mock_list_ports:
|
||||
another_port = copy(serial_port)
|
||||
another_port.device = "/new"
|
||||
another_port.description = "New serial port"
|
||||
@@ -4290,126 +4288,6 @@ async def test_configure_addon_usb_ports_failure(
|
||||
assert result["reason"] == "usb_ports_failed"
|
||||
|
||||
|
||||
async def test_get_usb_ports_filtering() -> None:
|
||||
"""Test that get_usb_ports filters out 'n/a' descriptions when other ports are available."""
|
||||
mock_ports = [
|
||||
ListPortInfo("/dev/ttyUSB0"),
|
||||
ListPortInfo("/dev/ttyUSB1"),
|
||||
ListPortInfo("/dev/ttyUSB2"),
|
||||
ListPortInfo("/dev/ttyUSB3"),
|
||||
]
|
||||
mock_ports[0].description = "n/a"
|
||||
mock_ports[1].description = "Device A"
|
||||
mock_ports[2].description = "N/A"
|
||||
mock_ports[3].description = "Device B"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that only non-"n/a" descriptions are returned
|
||||
assert descriptions == [
|
||||
"Device A - /dev/ttyUSB1, s/n: n/a",
|
||||
"Device B - /dev/ttyUSB3, s/n: n/a",
|
||||
]
|
||||
|
||||
|
||||
async def test_get_usb_ports_all_na() -> None:
|
||||
"""Test that get_usb_ports returns all ports as-is when only 'n/a' descriptions exist."""
|
||||
mock_ports = [
|
||||
ListPortInfo("/dev/ttyUSB0"),
|
||||
ListPortInfo("/dev/ttyUSB1"),
|
||||
ListPortInfo("/dev/ttyUSB2"),
|
||||
]
|
||||
mock_ports[0].description = "n/a"
|
||||
mock_ports[1].description = "N/A"
|
||||
mock_ports[2].description = "n/a"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that all ports are returned since they all have "n/a" descriptions
|
||||
assert len(descriptions) == 3
|
||||
# Verify that all descriptions contain "n/a" (case-insensitive)
|
||||
assert all("n/a" in desc.lower() for desc in descriptions)
|
||||
# Verify that all expected device paths are present
|
||||
device_paths = [desc.split(" - ")[1].split(",")[0] for desc in descriptions]
|
||||
assert "/dev/ttyUSB0" in device_paths
|
||||
assert "/dev/ttyUSB1" in device_paths
|
||||
assert "/dev/ttyUSB2" in device_paths
|
||||
|
||||
|
||||
async def test_get_usb_ports_mixed_case_filtering() -> None:
|
||||
"""Test that get_usb_ports filters out 'n/a' descriptions with different case variations."""
|
||||
mock_ports = [
|
||||
ListPortInfo("/dev/ttyUSB0"),
|
||||
ListPortInfo("/dev/ttyUSB1"),
|
||||
ListPortInfo("/dev/ttyUSB2"),
|
||||
ListPortInfo("/dev/ttyUSB3"),
|
||||
ListPortInfo("/dev/ttyUSB4"),
|
||||
]
|
||||
mock_ports[0].description = "n/a"
|
||||
mock_ports[1].description = "Device A"
|
||||
mock_ports[2].description = "N/A"
|
||||
mock_ports[3].description = "n/A"
|
||||
mock_ports[4].description = "Device B"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that only non-"n/a" descriptions are returned (case-insensitive filtering)
|
||||
assert descriptions == [
|
||||
"Device A - /dev/ttyUSB1, s/n: n/a",
|
||||
"Device B - /dev/ttyUSB4, s/n: n/a",
|
||||
]
|
||||
|
||||
|
||||
async def test_get_usb_ports_empty_list() -> None:
|
||||
"""Test that get_usb_ports handles empty port list."""
|
||||
with patch("serial.tools.list_ports.comports", return_value=[]):
|
||||
result = get_usb_ports()
|
||||
|
||||
# Verify that empty dict is returned
|
||||
assert result == {}
|
||||
|
||||
|
||||
async def test_get_usb_ports_single_na_port() -> None:
|
||||
"""Test that get_usb_ports returns single 'n/a' port when it's the only one available."""
|
||||
mock_ports = [ListPortInfo("/dev/ttyUSB0")]
|
||||
mock_ports[0].description = "n/a"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that the single "n/a" port is returned
|
||||
assert descriptions == [
|
||||
"n/a - /dev/ttyUSB0, s/n: n/a",
|
||||
]
|
||||
|
||||
|
||||
async def test_get_usb_ports_single_valid_port() -> None:
|
||||
"""Test that get_usb_ports returns single valid port."""
|
||||
mock_ports = [ListPortInfo("/dev/ttyUSB0")]
|
||||
mock_ports[0].description = "Device A"
|
||||
|
||||
with patch("serial.tools.list_ports.comports", return_value=mock_ports):
|
||||
result = get_usb_ports()
|
||||
|
||||
descriptions = list(result.values())
|
||||
|
||||
# Verify that the single valid port is returned
|
||||
assert descriptions == [
|
||||
"Device A - /dev/ttyUSB0, s/n: n/a",
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("supervisor", "addon_not_installed", "addon_info")
|
||||
async def test_intent_recommended_user(
|
||||
hass: HomeAssistant,
|
||||
|
||||
Reference in New Issue
Block a user