Extend hardware/udev support & usb mapping (#1940)

* Extend hardware/udev support & usb mapping

* Cleanup list

* new style

* Fix tests

* Fix

* use frozen

* add test for usb

* Fix block disks

* Update API.md

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Pascal Vizeli 2020-08-18 14:25:07 +02:00 committed by GitHub
parent 97c38b8534
commit d315e81ab2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 159 additions and 53 deletions

1
API.md
View File

@ -535,6 +535,7 @@ Get all available add-ons.
"stdin": "bool",
"webui": "null|http(s)://[HOST]:port/xy/zx",
"gpio": "bool",
"usb": "[physical_path_to_usb_device]",
"kernel_modules": "bool",
"devicetree": "bool",
"docker_api": "bool",

View File

@ -58,6 +58,7 @@ from ..const import (
ATTR_TMPFS,
ATTR_UDEV,
ATTR_URL,
ATTR_USB,
ATTR_VERSION,
ATTR_VIDEO,
ATTR_WEBUI,
@ -292,11 +293,6 @@ class AddonModel(CoreSysAttributes, ABC):
"""Return devices of add-on."""
return self.data.get(ATTR_DEVICES, [])
@property
def auto_uart(self) -> bool:
"""Return True if we should map all UART device."""
return self.data[ATTR_AUTO_UART]
@property
def tmpfs(self) -> Optional[str]:
"""Return tmpfs of add-on."""
@ -376,6 +372,16 @@ class AddonModel(CoreSysAttributes, ABC):
"""Return True if the add-on access to GPIO interface."""
return self.data[ATTR_GPIO]
@property
def with_usb(self) -> bool:
"""Return True if the add-on need USB access."""
return self.data[ATTR_USB]
@property
def with_uart(self) -> bool:
"""Return True if we should map all UART device."""
return self.data[ATTR_AUTO_UART]
@property
def with_udev(self) -> bool:
"""Return True if the add-on have his own udev."""

View File

@ -75,6 +75,7 @@ from ..const import (
ATTR_TMPFS,
ATTR_UDEV,
ATTR_URL,
ATTR_USB,
ATTR_USER,
ATTR_UUID,
ATTR_VERSION,
@ -226,6 +227,7 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_AUDIO, default=False): vol.Boolean(),
vol.Optional(ATTR_VIDEO, default=False): vol.Boolean(),
vol.Optional(ATTR_GPIO, default=False): vol.Boolean(),
vol.Optional(ATTR_USB, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(),
vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(),

View File

@ -81,6 +81,7 @@ from ..const import (
ATTR_STDIN,
ATTR_UDEV,
ATTR_URL,
ATTR_USB,
ATTR_VERSION,
ATTR_VERSION_LATEST,
ATTR_VIDEO,
@ -237,6 +238,7 @@ class APIAddons(CoreSysAttributes):
ATTR_AUTH_API: addon.access_auth_api,
ATTR_HOMEASSISTANT_API: addon.access_homeassistant_api,
ATTR_GPIO: addon.with_gpio,
ATTR_USB: addon.with_usb,
ATTR_KERNEL_MODULES: addon.with_kernel_modules,
ATTR_DEVICETREE: addon.with_devicetree,
ATTR_UDEV: addon.with_udev,

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor hardware RESTful API."""
import asyncio
import logging
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable, Dict, List
from aiohttp import web
@ -12,6 +12,7 @@ from ..const import (
ATTR_INPUT,
ATTR_OUTPUT,
ATTR_SERIAL,
ATTR_USB,
)
from ..coresys import CoreSysAttributes
from .utils import api_process
@ -25,13 +26,24 @@ class APIHardware(CoreSysAttributes):
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
"""Show hardware info."""
serial: List[str] = []
# Create Serial list with device links
for device in self.sys_hardware.serial_devices:
serial.append(device.path.as_posix())
for link in device.links:
serial.append(link.as_posix())
return {
ATTR_SERIAL: list(
self.sys_hardware.serial_devices | self.sys_hardware.serial_by_id
),
ATTR_SERIAL: serial,
ATTR_INPUT: list(self.sys_hardware.input_devices),
ATTR_DISK: list(self.sys_hardware.disk_devices),
ATTR_DISK: [
device.path.as_posix() for device in self.sys_hardware.disk_devices
],
ATTR_GPIO: list(self.sys_hardware.gpio_devices),
ATTR_USB: [
device.path.as_posix() for device in self.sys_hardware.usb_devices
],
ATTR_AUDIO: self.sys_hardware.audio_devices,
}

View File

@ -108,6 +108,7 @@ ATTR_PROVIDERS = "providers"
ATTR_VERSION = "version"
ATTR_VERSION_LATEST = "version_latest"
ATTR_AUTO_UART = "auto_uart"
ATTR_USB = "usb"
ATTR_LAST_BOOT = "last_boot"
ATTR_CHANNEL = "channel"
ATTR_NAME = "name"

View File

@ -127,20 +127,21 @@ class DockerAddon(DockerInterface):
devices = []
# Extend add-on config
if self.addon.devices:
devices.extend(self.addon.devices)
for device in self.addon.devices:
if not Path(device.split(":")[0]).exists():
continue
devices.append(device)
# Auto mapping UART devices
if self.addon.auto_uart:
if self.addon.with_udev:
serial_devs = self.sys_hardware.serial_devices
else:
serial_devs = (
self.sys_hardware.serial_devices | self.sys_hardware.serial_by_id
)
for device in serial_devs:
devices.append(f"{device}:{device}:rwm")
if self.addon.with_uart:
for device in self.sys_hardware.serial_devices:
devices.append(f"{device.path.as_posix()}:{device.path.as_posix()}:rwm")
if self.addon.with_udev:
continue
for device_link in device.links:
devices.append(
f"{device_link.as_posix()}:{device_link.as_posix()}:rwm"
)
# Use video devices
if self.addon.with_video:
@ -286,6 +287,10 @@ class DockerAddon(DockerInterface):
}
)
# USB support
if self.addon.with_usb and self.sys_hardware.usb_devices:
volumes.update({"/dev/bus/usb": {"bind": "/dev/bus/usb", "mode": "rw"}})
# Kernel Modules support
if self.addon.with_kernel_modules:
volumes.update({"/lib/modules": {"bind": "/lib/modules", "mode": "ro"}})

View File

@ -31,13 +31,15 @@ RE_TTY: re.Pattern = re.compile(r"tty[A-Z]+")
RE_VIDEO_DEVICES = re.compile(r"^(?:vchiq|cec\d+|video\d+)")
@attr.s(frozen=True)
@attr.s(slots=True, frozen=True)
class Device:
"""Represent a device."""
name: str = attr.ib()
path: Path = attr.ib()
subsystem: str = attr.ib()
links: List[Path] = attr.ib()
attributes: Dict[str, str] = attr.ib()
class Hardware:
@ -62,7 +64,9 @@ class Hardware:
Device(
device.sys_name,
Path(device.device_node),
device.subsystem,
[Path(node) for node in device.device_links],
{attr: device.properties[attr] for attr in device.properties},
)
)
@ -81,28 +85,31 @@ class Hardware:
return dev_list
@property
def serial_devices(self) -> Set[str]:
def serial_devices(self) -> List[Device]:
"""Return all serial and connected devices."""
dev_list: Set[str] = set()
for device in self.context.list_devices(subsystem="tty"):
if "ID_VENDOR" in device.properties or RE_TTY.search(device.device_node):
dev_list.add(device.device_node)
dev_list: List[Device] = []
for device in self.devices:
if (
device.subsystem != "tty"
or "ID_VENDOR" not in device.attributes
or not RE_TTY.search(str(device.path))
):
continue
# Cleanup not usable device links
for link in device.links.copy():
if link.match("/dev/serial/by-id/*"):
continue
device.links.remove(link)
dev_list.append(device)
return dev_list
@property
def serial_by_id(self) -> Set[str]:
"""Return all /dev/serial/by-id for serial devices."""
dev_list: Set[str] = set()
for device in self.context.list_devices(subsystem="tty"):
if "ID_VENDOR" in device.properties or RE_TTY.search(device.device_node):
# Add /dev/serial/by-id devlink for current device
for dev_link in device.device_links:
if not dev_link.startswith("/dev/serial/by-id"):
continue
dev_list.add(dev_link)
return dev_list
def usb_devices(self) -> List[Device]:
"""Return all usb and connected devices."""
return [device for device in self.devices if device.subsystem == "usb"]
@property
def input_devices(self) -> Set[str]:
@ -115,12 +122,13 @@ class Hardware:
return dev_list
@property
def disk_devices(self) -> Set[str]:
def disk_devices(self) -> List[Device]:
"""Return all disk devices."""
dev_list: Set[str] = set()
for device in self.context.list_devices(subsystem="block"):
if "ID_NAME" in device.properties:
dev_list.add(device.device_node)
dev_list: List[Device] = []
for device in self.devices:
if device.subsystem != "block" or "ID_NAME" not in device.attributes:
continue
dev_list.append(device)
return dev_list

View File

@ -16,10 +16,10 @@ def test_video_devices():
"""Test video device filter."""
system = Hardware()
device_list = [
Device("test-dev", Path("/dev/test-dev"), []),
Device("vchiq", Path("/dev/vchiq"), []),
Device("cec0", Path("/dev/cec0"), []),
Device("video1", Path("/dev/video1"), []),
Device("test-dev", Path("/dev/test-dev"), "xy", [], {}),
Device("vchiq", Path("/dev/vchiq"), "xy", [], {}),
Device("cec0", Path("/dev/cec0"), "xy", [], {}),
Device("video1", Path("/dev/video1"), "xy", [], {}),
]
with patch(
@ -27,10 +27,79 @@ def test_video_devices():
) as mock_device:
mock_device.return_value = device_list
assert system.video_devices == [
Device("vchiq", Path("/dev/vchiq"), []),
Device("cec0", Path("/dev/cec0"), []),
Device("video1", Path("/dev/video1"), []),
assert [device.name for device in system.video_devices] == [
"vchiq",
"cec0",
"video1",
]
def test_serial_devices():
"""Test serial device filter."""
system = Hardware()
device_list = [
Device("ttyACM0", Path("/dev/ttyACM0"), "tty", [], {"ID_VENDOR": "xy"}),
Device(
"ttyUSB0",
Path("/dev/ttyUSB0"),
"tty",
[Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")],
{"ID_VENDOR": "xy"},
),
Device("ttyS0", Path("/dev/ttyS0"), "tty", [], {}),
Device("video1", Path("/dev/video1"), "misc", [], {"ID_VENDOR": "xy"}),
]
with patch(
"supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock
) as mock_device:
mock_device.return_value = device_list
assert [(device.name, device.links) for device in system.serial_devices] == [
("ttyACM0", []),
("ttyUSB0", [Path("/dev/serial/by-id/xyx")]),
]
def test_usb_devices():
"""Test usb device filter."""
system = Hardware()
device_list = [
Device("usb1", Path("/dev/bus/usb/1/1"), "usb", [], {}),
Device("usb2", Path("/dev/bus/usb/2/1"), "usb", [], {}),
Device("cec0", Path("/dev/cec0"), "xy", [], {}),
Device("video1", Path("/dev/video1"), "xy", [], {}),
]
with patch(
"supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock
) as mock_device:
mock_device.return_value = device_list
assert [device.name for device in system.usb_devices] == [
"usb1",
"usb2",
]
def test_block_devices():
"""Test usb device filter."""
system = Hardware()
device_list = [
Device("sda", Path("/dev/sda"), "block", [], {"ID_NAME": "xy"}),
Device("sdb", Path("/dev/sdb"), "block", [], {"ID_NAME": "xy"}),
Device("cec0", Path("/dev/cec0"), "xy", [], {}),
Device("video1", Path("/dev/video1"), "xy", [], {"ID_NAME": "xy"}),
]
with patch(
"supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock
) as mock_device:
mock_device.return_value = device_list
assert [device.name for device in system.disk_devices] == [
"sda",
"sdb",
]