Move get_serial_by_id and human_readable_device_name to usb (#54968)

This commit is contained in:
J. Nick Koston 2021-08-21 07:24:21 -05:00 committed by GitHub
parent 2cfd78bc49
commit c609236a63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 121 additions and 86 deletions

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import dataclasses import dataclasses
import datetime import datetime
import logging import logging
import os
import sys import sys
from serial.tools.list_ports import comports from serial.tools.list_ports import comports
@ -26,6 +27,37 @@ _LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = datetime.timedelta(minutes=60) SCAN_INTERVAL = datetime.timedelta(minutes=60)
def human_readable_device_name(
device: str,
serial_number: str | None,
manufacturer: str | None,
description: str | None,
vid: str | None,
pid: str | None,
) -> str:
"""Return a human readable name from USBDevice attributes."""
device_details = f"{device}, s/n: {serial_number or 'n/a'}"
manufacturer_details = f" - {manufacturer}" if manufacturer else ""
vendor_details = f" - {vid}:{pid}" if vid else ""
full_details = f"{device_details}{manufacturer_details}{vendor_details}"
if not description:
return full_details
return f"{description[:26]} - {full_details}"
def get_serial_by_id(dev_path: str) -> str:
"""Return a /dev/serial/by-id match for given device if available."""
by_id = "/dev/serial/by-id"
if not os.path.isdir(by_id):
return dev_path
for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
if os.path.realpath(path) == dev_path:
return path
return dev_path
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the USB Discovery integration.""" """Set up the USB Discovery integration."""
usb = await async_get_usb(hass) usb = await async_get_usb(hass)

View File

@ -1,7 +1,6 @@
"""Config flow for ZHA.""" """Config flow for ZHA."""
from __future__ import annotations from __future__ import annotations
import os
from typing import Any from typing import Any
import serial.tools.list_ports import serial.tools.list_ports
@ -9,6 +8,7 @@ import voluptuous as vol
from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.components import usb
from homeassistant.const import CONF_HOST, CONF_NAME from homeassistant.const import CONF_HOST, CONF_NAME
from homeassistant.helpers.typing import DiscoveryInfoType from homeassistant.helpers.typing import DiscoveryInfoType
@ -27,24 +27,6 @@ SUPPORTED_PORT_SETTINGS = (
) )
def _format_port_human_readable(
device: str,
serial_number: str | None,
manufacturer: str | None,
description: str | None,
vid: str | None,
pid: str | None,
) -> str:
device_details = f"{device}, s/n: {serial_number or 'n/a'}"
manufacturer_details = f" - {manufacturer}" if manufacturer else ""
vendor_details = f" - {vid}:{pid}" if vid else ""
full_details = f"{device_details}{manufacturer_details}{vendor_details}"
if not description:
return full_details
return f"{description[:26]} - {full_details}"
class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow.""" """Handle a config flow."""
@ -81,7 +63,7 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
port = ports[list_of_ports.index(user_selection)] port = ports[list_of_ports.index(user_selection)]
dev_path = await self.hass.async_add_executor_job( dev_path = await self.hass.async_add_executor_job(
get_serial_by_id, port.device usb.get_serial_by_id, port.device
) )
auto_detected_data = await detect_radios(dev_path) auto_detected_data = await detect_radios(dev_path)
if auto_detected_data is not None: if auto_detected_data is not None:
@ -145,12 +127,12 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if vid == "10C4" and pid == "8A2A" and "ZigBee" not in description: if vid == "10C4" and pid == "8A2A" and "ZigBee" not in description:
return self.async_abort(reason="not_zha_device") return self.async_abort(reason="not_zha_device")
dev_path = await self.hass.async_add_executor_job(get_serial_by_id, device) dev_path = await self.hass.async_add_executor_job(usb.get_serial_by_id, device)
self._auto_detected_data = await detect_radios(dev_path) self._auto_detected_data = await detect_radios(dev_path)
if self._auto_detected_data is None: if self._auto_detected_data is None:
return self.async_abort(reason="not_zha_device") return self.async_abort(reason="not_zha_device")
self._device_path = dev_path self._device_path = dev_path
self._title = _format_port_human_readable( self._title = usb.human_readable_device_name(
dev_path, dev_path,
serial_number, serial_number,
manufacturer, manufacturer,
@ -215,7 +197,7 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self._device_path = user_input.get(CONF_DEVICE_PATH) self._device_path = user_input.get(CONF_DEVICE_PATH)
if await app_cls.probe(user_input): if await app_cls.probe(user_input):
serial_by_id = await self.hass.async_add_executor_job( serial_by_id = await self.hass.async_add_executor_job(
get_serial_by_id, user_input[CONF_DEVICE_PATH] usb.get_serial_by_id, user_input[CONF_DEVICE_PATH]
) )
user_input[CONF_DEVICE_PATH] = serial_by_id user_input[CONF_DEVICE_PATH] = serial_by_id
return self.async_create_entry( return self.async_create_entry(
@ -255,15 +237,3 @@ async def detect_radios(dev_path: str) -> dict[str, Any] | None:
return {CONF_RADIO_TYPE: radio.name, CONF_DEVICE: dev_config} return {CONF_RADIO_TYPE: radio.name, CONF_DEVICE: dev_config}
return None return None
def get_serial_by_id(dev_path: str) -> str:
"""Return a /dev/serial/by-id match for given device if available."""
by_id = "/dev/serial/by-id"
if not os.path.isdir(by_id):
return dev_path
for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
if os.path.realpath(path) == dev_path:
return path
return dev_path

View File

@ -28,6 +28,6 @@
"name": "tube*" "name": "tube*"
} }
], ],
"after_dependencies": ["zeroconf"], "after_dependencies": ["usb", "zeroconf"],
"iot_class": "local_polling" "iot_class": "local_polling"
} }

View File

@ -1,10 +1,12 @@
"""Tests for the USB Discovery integration.""" """Tests for the USB Discovery integration."""
import datetime import datetime
import os
import sys import sys
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch, sentinel
import pytest import pytest
from homeassistant.components import usb
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@ -271,3 +273,82 @@ async def test_non_matching_discovered_by_scanner_after_started(hass):
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0 assert len(mock_config_flow.mock_calls) == 0
def test_get_serial_by_id_no_dir():
"""Test serial by id conversion if there's no /dev/serial/by-id."""
p1 = patch("os.path.isdir", MagicMock(return_value=False))
p2 = patch("os.scandir")
with p1 as is_dir_mock, p2 as scan_mock:
res = usb.get_serial_by_id(sentinel.path)
assert res is sentinel.path
assert is_dir_mock.call_count == 1
assert scan_mock.call_count == 0
def test_get_serial_by_id():
"""Test serial by id conversion."""
p1 = patch("os.path.isdir", MagicMock(return_value=True))
p2 = patch("os.scandir")
def _realpath(path):
if path is sentinel.matched_link:
return sentinel.path
return sentinel.serial_link_path
p3 = patch("os.path.realpath", side_effect=_realpath)
with p1 as is_dir_mock, p2 as scan_mock, p3:
res = usb.get_serial_by_id(sentinel.path)
assert res is sentinel.path
assert is_dir_mock.call_count == 1
assert scan_mock.call_count == 1
entry1 = MagicMock(spec_set=os.DirEntry)
entry1.is_symlink.return_value = True
entry1.path = sentinel.some_path
entry2 = MagicMock(spec_set=os.DirEntry)
entry2.is_symlink.return_value = False
entry2.path = sentinel.other_path
entry3 = MagicMock(spec_set=os.DirEntry)
entry3.is_symlink.return_value = True
entry3.path = sentinel.matched_link
scan_mock.return_value = [entry1, entry2, entry3]
res = usb.get_serial_by_id(sentinel.path)
assert res is sentinel.matched_link
assert is_dir_mock.call_count == 2
assert scan_mock.call_count == 2
def test_human_readable_device_name():
"""Test human readable device name includes the passed data."""
name = usb.human_readable_device_name(
"/dev/null",
"612020FD",
"Silicon Labs",
"HubZ Smart Home Controller - HubZ Z-Wave Com Port",
"10C4",
"8A2A",
)
assert "/dev/null" in name
assert "612020FD" in name
assert "Silicon Labs" in name
assert "HubZ Smart Home Controller - HubZ Z-Wave Com Port"[:26] in name
assert "10C4" in name
assert "8A2A" in name
name = usb.human_readable_device_name(
"/dev/null",
"612020FD",
"Silicon Labs",
None,
"10C4",
"8A2A",
)
assert "/dev/null" in name
assert "612020FD" in name
assert "Silicon Labs" in name
assert "10C4" in name
assert "8A2A" in name

View File

@ -1,7 +1,6 @@
"""Tests for ZHA config flow.""" """Tests for ZHA config flow."""
import os from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch, sentinel
import pytest import pytest
import serial.tools.list_ports import serial.tools.list_ports
@ -400,50 +399,3 @@ async def test_user_port_config(probe_mock, hass):
) )
assert result["data"][CONF_RADIO_TYPE] == "ezsp" assert result["data"][CONF_RADIO_TYPE] == "ezsp"
assert probe_mock.await_count == 1 assert probe_mock.await_count == 1
def test_get_serial_by_id_no_dir():
"""Test serial by id conversion if there's no /dev/serial/by-id."""
p1 = patch("os.path.isdir", MagicMock(return_value=False))
p2 = patch("os.scandir")
with p1 as is_dir_mock, p2 as scan_mock:
res = config_flow.get_serial_by_id(sentinel.path)
assert res is sentinel.path
assert is_dir_mock.call_count == 1
assert scan_mock.call_count == 0
def test_get_serial_by_id():
"""Test serial by id conversion."""
p1 = patch("os.path.isdir", MagicMock(return_value=True))
p2 = patch("os.scandir")
def _realpath(path):
if path is sentinel.matched_link:
return sentinel.path
return sentinel.serial_link_path
p3 = patch("os.path.realpath", side_effect=_realpath)
with p1 as is_dir_mock, p2 as scan_mock, p3:
res = config_flow.get_serial_by_id(sentinel.path)
assert res is sentinel.path
assert is_dir_mock.call_count == 1
assert scan_mock.call_count == 1
entry1 = MagicMock(spec_set=os.DirEntry)
entry1.is_symlink.return_value = True
entry1.path = sentinel.some_path
entry2 = MagicMock(spec_set=os.DirEntry)
entry2.is_symlink.return_value = False
entry2.path = sentinel.other_path
entry3 = MagicMock(spec_set=os.DirEntry)
entry3.is_symlink.return_value = True
entry3.path = sentinel.matched_link
scan_mock.return_value = [entry1, entry2, entry3]
res = config_flow.get_serial_by_id(sentinel.path)
assert res is sentinel.matched_link
assert is_dir_mock.call_count == 2
assert scan_mock.call_count == 2