mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-10 02:36:29 +00:00
Addon provide his own udev support (#1206)
* Addon provide his own udev support * upgrade logger
This commit is contained in:
parent
41ce9913d2
commit
0a0a62f238
1
API.md
1
API.md
@ -502,6 +502,7 @@ Get all available addons.
|
|||||||
"privileged": ["NET_ADMIN", "SYS_ADMIN"],
|
"privileged": ["NET_ADMIN", "SYS_ADMIN"],
|
||||||
"apparmor": "disable|default|profile",
|
"apparmor": "disable|default|profile",
|
||||||
"devices": ["/dev/xy"],
|
"devices": ["/dev/xy"],
|
||||||
|
"udev": "bool",
|
||||||
"auto_uart": "bool",
|
"auto_uart": "bool",
|
||||||
"icon": "bool",
|
"icon": "bool",
|
||||||
"logo": "bool",
|
"logo": "bool",
|
||||||
|
@ -51,6 +51,7 @@ from ..const import (
|
|||||||
ATTR_STDIN,
|
ATTR_STDIN,
|
||||||
ATTR_TIMEOUT,
|
ATTR_TIMEOUT,
|
||||||
ATTR_TMPFS,
|
ATTR_TMPFS,
|
||||||
|
ATTR_UDEV,
|
||||||
ATTR_URL,
|
ATTR_URL,
|
||||||
ATTR_VERSION,
|
ATTR_VERSION,
|
||||||
ATTR_WEBUI,
|
ATTR_WEBUI,
|
||||||
@ -343,6 +344,11 @@ class AddonModel(CoreSysAttributes):
|
|||||||
"""Return True if the add-on access to GPIO interface."""
|
"""Return True if the add-on access to GPIO interface."""
|
||||||
return self.data[ATTR_GPIO]
|
return self.data[ATTR_GPIO]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def with_udev(self) -> bool:
|
||||||
|
"""Return True if the add-on have his own udev."""
|
||||||
|
return self.data[ATTR_UDEV]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def with_kernel_modules(self) -> bool:
|
def with_kernel_modules(self) -> bool:
|
||||||
"""Return True if the add-on access to kernel modules."""
|
"""Return True if the add-on access to kernel modules."""
|
||||||
|
@ -68,6 +68,7 @@ from ..const import (
|
|||||||
ATTR_SYSTEM,
|
ATTR_SYSTEM,
|
||||||
ATTR_TIMEOUT,
|
ATTR_TIMEOUT,
|
||||||
ATTR_TMPFS,
|
ATTR_TMPFS,
|
||||||
|
ATTR_UDEV,
|
||||||
ATTR_URL,
|
ATTR_URL,
|
||||||
ATTR_USER,
|
ATTR_USER,
|
||||||
ATTR_UUID,
|
ATTR_UUID,
|
||||||
@ -186,6 +187,7 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
|
|||||||
vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(),
|
vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(),
|
||||||
vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")],
|
vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")],
|
||||||
vol.Optional(ATTR_AUTO_UART, default=False): vol.Boolean(),
|
vol.Optional(ATTR_AUTO_UART, default=False): vol.Boolean(),
|
||||||
|
vol.Optional(ATTR_UDEV, default=False): vol.Boolean(),
|
||||||
vol.Optional(ATTR_TMPFS): vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"),
|
vol.Optional(ATTR_TMPFS): vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"),
|
||||||
vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)],
|
vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)],
|
||||||
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)},
|
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)},
|
||||||
|
@ -8,6 +8,7 @@ import voluptuous as vol
|
|||||||
from voluptuous.humanize import humanize_error
|
from voluptuous.humanize import humanize_error
|
||||||
|
|
||||||
from ..addons import AnyAddon
|
from ..addons import AnyAddon
|
||||||
|
from ..docker.stats import DockerStats
|
||||||
from ..addons.utils import rating_security
|
from ..addons.utils import rating_security
|
||||||
from ..const import (
|
from ..const import (
|
||||||
ATTR_ADDONS,
|
ATTR_ADDONS,
|
||||||
@ -58,8 +59,8 @@ from ..const import (
|
|||||||
ATTR_MACHINE,
|
ATTR_MACHINE,
|
||||||
ATTR_MAINTAINER,
|
ATTR_MAINTAINER,
|
||||||
ATTR_MEMORY_LIMIT,
|
ATTR_MEMORY_LIMIT,
|
||||||
ATTR_MEMORY_USAGE,
|
|
||||||
ATTR_MEMORY_PERCENT,
|
ATTR_MEMORY_PERCENT,
|
||||||
|
ATTR_MEMORY_USAGE,
|
||||||
ATTR_NAME,
|
ATTR_NAME,
|
||||||
ATTR_NETWORK,
|
ATTR_NETWORK,
|
||||||
ATTR_NETWORK_DESCRIPTION,
|
ATTR_NETWORK_DESCRIPTION,
|
||||||
@ -76,6 +77,7 @@ from ..const import (
|
|||||||
ATTR_SOURCE,
|
ATTR_SOURCE,
|
||||||
ATTR_STATE,
|
ATTR_STATE,
|
||||||
ATTR_STDIN,
|
ATTR_STDIN,
|
||||||
|
ATTR_UDEV,
|
||||||
ATTR_URL,
|
ATTR_URL,
|
||||||
ATTR_VERSION,
|
ATTR_VERSION,
|
||||||
ATTR_WEBUI,
|
ATTR_WEBUI,
|
||||||
@ -119,7 +121,7 @@ class APIAddons(CoreSysAttributes):
|
|||||||
self, request: web.Request, check_installed: bool = True
|
self, request: web.Request, check_installed: bool = True
|
||||||
) -> AnyAddon:
|
) -> AnyAddon:
|
||||||
"""Return addon, throw an exception it it doesn't exist."""
|
"""Return addon, throw an exception it it doesn't exist."""
|
||||||
addon_slug = request.match_info.get("addon")
|
addon_slug: str = request.match_info.get("addon")
|
||||||
|
|
||||||
# Lookup itself
|
# Lookup itself
|
||||||
if addon_slug == "self":
|
if addon_slug == "self":
|
||||||
@ -178,7 +180,7 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
async def info(self, request: web.Request) -> Dict[str, Any]:
|
async def info(self, request: web.Request) -> Dict[str, Any]:
|
||||||
"""Return add-on information."""
|
"""Return add-on information."""
|
||||||
addon = self._extract_addon(request, check_installed=False)
|
addon: AnyAddon = self._extract_addon(request, check_installed=False)
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
ATTR_NAME: addon.name,
|
ATTR_NAME: addon.name,
|
||||||
@ -225,6 +227,7 @@ class APIAddons(CoreSysAttributes):
|
|||||||
ATTR_GPIO: addon.with_gpio,
|
ATTR_GPIO: addon.with_gpio,
|
||||||
ATTR_KERNEL_MODULES: addon.with_kernel_modules,
|
ATTR_KERNEL_MODULES: addon.with_kernel_modules,
|
||||||
ATTR_DEVICETREE: addon.with_devicetree,
|
ATTR_DEVICETREE: addon.with_devicetree,
|
||||||
|
ATTR_UDEV: addon.with_udev,
|
||||||
ATTR_DOCKER_API: addon.access_docker_api,
|
ATTR_DOCKER_API: addon.access_docker_api,
|
||||||
ATTR_AUDIO: addon.with_audio,
|
ATTR_AUDIO: addon.with_audio,
|
||||||
ATTR_AUDIO_INPUT: None,
|
ATTR_AUDIO_INPUT: None,
|
||||||
@ -261,12 +264,12 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
async def options(self, request: web.Request) -> None:
|
async def options(self, request: web.Request) -> None:
|
||||||
"""Store user options for add-on."""
|
"""Store user options for add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
|
|
||||||
addon_schema = SCHEMA_OPTIONS.extend(
|
addon_schema = SCHEMA_OPTIONS.extend(
|
||||||
{vol.Optional(ATTR_OPTIONS): vol.Any(None, addon.schema)}
|
{vol.Optional(ATTR_OPTIONS): vol.Any(None, addon.schema)}
|
||||||
)
|
)
|
||||||
body = await api_validate(addon_schema, request)
|
body: Dict[str, Any] = await api_validate(addon_schema, request)
|
||||||
|
|
||||||
if ATTR_OPTIONS in body:
|
if ATTR_OPTIONS in body:
|
||||||
addon.options = body[ATTR_OPTIONS]
|
addon.options = body[ATTR_OPTIONS]
|
||||||
@ -289,8 +292,8 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
async def security(self, request: web.Request) -> None:
|
async def security(self, request: web.Request) -> None:
|
||||||
"""Store security options for add-on."""
|
"""Store security options for add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
body = await api_validate(SCHEMA_SECURITY, request)
|
body: Dict[str, Any] = await api_validate(SCHEMA_SECURITY, request)
|
||||||
|
|
||||||
if ATTR_PROTECTED in body:
|
if ATTR_PROTECTED in body:
|
||||||
_LOGGER.warning("Protected flag changing for %s!", addon.slug)
|
_LOGGER.warning("Protected flag changing for %s!", addon.slug)
|
||||||
@ -301,8 +304,8 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
async def stats(self, request: web.Request) -> Dict[str, Any]:
|
async def stats(self, request: web.Request) -> Dict[str, Any]:
|
||||||
"""Return resource information."""
|
"""Return resource information."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
stats = await addon.stats()
|
stats: DockerStats = await addon.stats()
|
||||||
|
|
||||||
return {
|
return {
|
||||||
ATTR_CPU_PERCENT: stats.cpu_percent,
|
ATTR_CPU_PERCENT: stats.cpu_percent,
|
||||||
@ -318,19 +321,19 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
def install(self, request: web.Request) -> Awaitable[None]:
|
def install(self, request: web.Request) -> Awaitable[None]:
|
||||||
"""Install add-on."""
|
"""Install add-on."""
|
||||||
addon = self._extract_addon(request, check_installed=False)
|
addon: AnyAddon = self._extract_addon(request, check_installed=False)
|
||||||
return asyncio.shield(addon.install())
|
return asyncio.shield(addon.install())
|
||||||
|
|
||||||
@api_process
|
@api_process
|
||||||
def uninstall(self, request: web.Request) -> Awaitable[None]:
|
def uninstall(self, request: web.Request) -> Awaitable[None]:
|
||||||
"""Uninstall add-on."""
|
"""Uninstall add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
return asyncio.shield(addon.uninstall())
|
return asyncio.shield(addon.uninstall())
|
||||||
|
|
||||||
@api_process
|
@api_process
|
||||||
def start(self, request: web.Request) -> Awaitable[None]:
|
def start(self, request: web.Request) -> Awaitable[None]:
|
||||||
"""Start add-on."""
|
"""Start add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
|
|
||||||
# check options
|
# check options
|
||||||
options = addon.options
|
options = addon.options
|
||||||
@ -344,13 +347,13 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
def stop(self, request: web.Request) -> Awaitable[None]:
|
def stop(self, request: web.Request) -> Awaitable[None]:
|
||||||
"""Stop add-on."""
|
"""Stop add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
return asyncio.shield(addon.stop())
|
return asyncio.shield(addon.stop())
|
||||||
|
|
||||||
@api_process
|
@api_process
|
||||||
def update(self, request: web.Request) -> Awaitable[None]:
|
def update(self, request: web.Request) -> Awaitable[None]:
|
||||||
"""Update add-on."""
|
"""Update add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
|
|
||||||
if addon.latest_version == addon.version:
|
if addon.latest_version == addon.version:
|
||||||
raise APIError("No update available!")
|
raise APIError("No update available!")
|
||||||
@ -360,13 +363,13 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
def restart(self, request: web.Request) -> Awaitable[None]:
|
def restart(self, request: web.Request) -> Awaitable[None]:
|
||||||
"""Restart add-on."""
|
"""Restart add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
return asyncio.shield(addon.restart())
|
return asyncio.shield(addon.restart())
|
||||||
|
|
||||||
@api_process
|
@api_process
|
||||||
def rebuild(self, request: web.Request) -> Awaitable[None]:
|
def rebuild(self, request: web.Request) -> Awaitable[None]:
|
||||||
"""Rebuild local build add-on."""
|
"""Rebuild local build add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
if not addon.need_build:
|
if not addon.need_build:
|
||||||
raise APIError("Only local build addons are supported")
|
raise APIError("Only local build addons are supported")
|
||||||
|
|
||||||
@ -375,13 +378,13 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process_raw(CONTENT_TYPE_BINARY)
|
@api_process_raw(CONTENT_TYPE_BINARY)
|
||||||
def logs(self, request: web.Request) -> Awaitable[bytes]:
|
def logs(self, request: web.Request) -> Awaitable[bytes]:
|
||||||
"""Return logs from add-on."""
|
"""Return logs from add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
return addon.logs()
|
return addon.logs()
|
||||||
|
|
||||||
@api_process_raw(CONTENT_TYPE_PNG)
|
@api_process_raw(CONTENT_TYPE_PNG)
|
||||||
async def icon(self, request: web.Request) -> bytes:
|
async def icon(self, request: web.Request) -> bytes:
|
||||||
"""Return icon from add-on."""
|
"""Return icon from add-on."""
|
||||||
addon = self._extract_addon(request, check_installed=False)
|
addon: AnyAddon = self._extract_addon(request, check_installed=False)
|
||||||
if not addon.with_icon:
|
if not addon.with_icon:
|
||||||
raise APIError("No icon found!")
|
raise APIError("No icon found!")
|
||||||
|
|
||||||
@ -391,7 +394,7 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process_raw(CONTENT_TYPE_PNG)
|
@api_process_raw(CONTENT_TYPE_PNG)
|
||||||
async def logo(self, request: web.Request) -> bytes:
|
async def logo(self, request: web.Request) -> bytes:
|
||||||
"""Return logo from add-on."""
|
"""Return logo from add-on."""
|
||||||
addon = self._extract_addon(request, check_installed=False)
|
addon: AnyAddon = self._extract_addon(request, check_installed=False)
|
||||||
if not addon.with_logo:
|
if not addon.with_logo:
|
||||||
raise APIError("No logo found!")
|
raise APIError("No logo found!")
|
||||||
|
|
||||||
@ -401,7 +404,7 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process_raw(CONTENT_TYPE_TEXT)
|
@api_process_raw(CONTENT_TYPE_TEXT)
|
||||||
async def changelog(self, request: web.Request) -> str:
|
async def changelog(self, request: web.Request) -> str:
|
||||||
"""Return changelog from add-on."""
|
"""Return changelog from add-on."""
|
||||||
addon = self._extract_addon(request, check_installed=False)
|
addon: AnyAddon = self._extract_addon(request, check_installed=False)
|
||||||
if not addon.with_changelog:
|
if not addon.with_changelog:
|
||||||
raise APIError("No changelog found!")
|
raise APIError("No changelog found!")
|
||||||
|
|
||||||
@ -411,7 +414,7 @@ class APIAddons(CoreSysAttributes):
|
|||||||
@api_process
|
@api_process
|
||||||
async def stdin(self, request: web.Request) -> None:
|
async def stdin(self, request: web.Request) -> None:
|
||||||
"""Write to stdin of add-on."""
|
"""Write to stdin of add-on."""
|
||||||
addon = self._extract_addon(request)
|
addon: AnyAddon = self._extract_addon(request)
|
||||||
if not addon.with_stdin:
|
if not addon.with_stdin:
|
||||||
raise APIError("STDIN not supported by add-on")
|
raise APIError("STDIN not supported by add-on")
|
||||||
|
|
||||||
|
@ -22,7 +22,9 @@ class APIHardware(CoreSysAttributes):
|
|||||||
async def info(self, request):
|
async def info(self, request):
|
||||||
"""Show hardware info."""
|
"""Show hardware info."""
|
||||||
return {
|
return {
|
||||||
ATTR_SERIAL: list(self.sys_hardware.serial_devices),
|
ATTR_SERIAL: list(
|
||||||
|
self.sys_hardware.serial_devices | self.sys_hardware.serial_by_id
|
||||||
|
),
|
||||||
ATTR_INPUT: list(self.sys_hardware.input_devices),
|
ATTR_INPUT: list(self.sys_hardware.input_devices),
|
||||||
ATTR_DISK: list(self.sys_hardware.disk_devices),
|
ATTR_DISK: list(self.sys_hardware.disk_devices),
|
||||||
ATTR_GPIO: list(self.sys_hardware.gpio_devices),
|
ATTR_GPIO: list(self.sys_hardware.gpio_devices),
|
||||||
|
@ -218,6 +218,7 @@ ATTR_DEBUG = "debug"
|
|||||||
ATTR_DEBUG_BLOCK = "debug_block"
|
ATTR_DEBUG_BLOCK = "debug_block"
|
||||||
ATTR_DNS = "dns"
|
ATTR_DNS = "dns"
|
||||||
ATTR_SERVERS = "servers"
|
ATTR_SERVERS = "servers"
|
||||||
|
ATTR_UDEV = "udev"
|
||||||
|
|
||||||
PROVIDE_SERVICE = "provide"
|
PROVIDE_SERVICE = "provide"
|
||||||
NEED_SERVICE = "need"
|
NEED_SERVICE = "need"
|
||||||
|
@ -135,7 +135,14 @@ class DockerAddon(DockerInterface):
|
|||||||
|
|
||||||
# Auto mapping UART devices
|
# Auto mapping UART devices
|
||||||
if self.addon.auto_uart:
|
if self.addon.auto_uart:
|
||||||
for device in self.sys_hardware.serial_devices:
|
if self.addon.with_udev:
|
||||||
|
serial_devs = self.sys_hardware.serial_devices
|
||||||
|
else:
|
||||||
|
serial_devs = (
|
||||||
|
self.sys_hardware.serial_devices | self.sys_hardware.serial_by_id
|
||||||
|
)
|
||||||
|
|
||||||
|
for device in serial_devs:
|
||||||
devices.append(f"{device}:{device}:rwm")
|
devices.append(f"{device}:{device}:rwm")
|
||||||
|
|
||||||
# Return None if no devices is present
|
# Return None if no devices is present
|
||||||
|
@ -3,25 +3,26 @@ from datetime import datetime
|
|||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import re
|
import re
|
||||||
|
from typing import Any, Dict, Optional, Set
|
||||||
|
|
||||||
import pyudev
|
import pyudev
|
||||||
|
|
||||||
from ..const import ATTR_NAME, ATTR_TYPE, ATTR_DEVICES, CHAN_ID, CHAN_TYPE
|
from ..const import ATTR_DEVICES, ATTR_NAME, ATTR_TYPE, CHAN_ID, CHAN_TYPE
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
ASOUND_CARDS = Path("/proc/asound/cards")
|
ASOUND_CARDS: Path = Path("/proc/asound/cards")
|
||||||
RE_CARDS = re.compile(r"(\d+) \[(\w*) *\]: (.*\w)")
|
RE_CARDS: re.Pattern = re.compile(r"(\d+) \[(\w*) *\]: (.*\w)")
|
||||||
|
|
||||||
ASOUND_DEVICES = Path("/proc/asound/devices")
|
ASOUND_DEVICES: Path = Path("/proc/asound/devices")
|
||||||
RE_DEVICES = re.compile(r"\[.*(\d+)- (\d+).*\]: ([\w ]*)")
|
RE_DEVICES: re.Pattern = re.compile(r"\[.*(\d+)- (\d+).*\]: ([\w ]*)")
|
||||||
|
|
||||||
PROC_STAT = Path("/proc/stat")
|
PROC_STAT: Path = Path("/proc/stat")
|
||||||
RE_BOOT_TIME = re.compile(r"btime (\d+)")
|
RE_BOOT_TIME: re.Pattern = re.compile(r"btime (\d+)")
|
||||||
|
|
||||||
GPIO_DEVICES = Path("/sys/class/gpio")
|
GPIO_DEVICES: Path = Path("/sys/class/gpio")
|
||||||
SOC_DEVICES = Path("/sys/devices/platform/soc")
|
SOC_DEVICES: Path = Path("/sys/devices/platform/soc")
|
||||||
RE_TTY = re.compile(r"tty[A-Z]+")
|
RE_TTY: re.Pattern = re.compile(r"tty[A-Z]+")
|
||||||
|
|
||||||
|
|
||||||
class Hardware:
|
class Hardware:
|
||||||
@ -32,13 +33,21 @@ class Hardware:
|
|||||||
self.context = pyudev.Context()
|
self.context = pyudev.Context()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def serial_devices(self):
|
def serial_devices(self) -> Set[str]:
|
||||||
"""Return all serial and connected devices."""
|
"""Return all serial and connected devices."""
|
||||||
dev_list = set()
|
dev_list: Set[str] = set()
|
||||||
for device in self.context.list_devices(subsystem="tty"):
|
for device in self.context.list_devices(subsystem="tty"):
|
||||||
if "ID_VENDOR" in device.properties or RE_TTY.search(device.device_node):
|
if "ID_VENDOR" in device.properties or RE_TTY.search(device.device_node):
|
||||||
dev_list.add(device.device_node)
|
dev_list.add(device.device_node)
|
||||||
|
|
||||||
|
return dev_list
|
||||||
|
|
||||||
|
@property
|
||||||
|
def serial_by_id(self) -> Set[str]:
|
||||||
|
"""Return all /dev/serial/by-id for serial devices."""
|
||||||
|
dev_list: Set[str] = set()
|
||||||
|
for device in self.context.list_devices(subsystem="tty"):
|
||||||
|
if "ID_VENDOR" in device.properties or RE_TTY.search(device.device_node):
|
||||||
# Add /dev/serial/by-id devlink for current device
|
# Add /dev/serial/by-id devlink for current device
|
||||||
for dev_link in device.device_links:
|
for dev_link in device.device_links:
|
||||||
if not dev_link.startswith("/dev/serial/by-id"):
|
if not dev_link.startswith("/dev/serial/by-id"):
|
||||||
@ -48,9 +57,9 @@ class Hardware:
|
|||||||
return dev_list
|
return dev_list
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def input_devices(self):
|
def input_devices(self) -> Set[str]:
|
||||||
"""Return all input devices."""
|
"""Return all input devices."""
|
||||||
dev_list = set()
|
dev_list: Set[str] = set()
|
||||||
for device in self.context.list_devices(subsystem="input"):
|
for device in self.context.list_devices(subsystem="input"):
|
||||||
if "NAME" in device.properties:
|
if "NAME" in device.properties:
|
||||||
dev_list.add(device.properties["NAME"].replace('"', ""))
|
dev_list.add(device.properties["NAME"].replace('"', ""))
|
||||||
@ -58,9 +67,9 @@ class Hardware:
|
|||||||
return dev_list
|
return dev_list
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def disk_devices(self):
|
def disk_devices(self) -> Set[str]:
|
||||||
"""Return all disk devices."""
|
"""Return all disk devices."""
|
||||||
dev_list = set()
|
dev_list: Set[str] = set()
|
||||||
for device in self.context.list_devices(subsystem="block"):
|
for device in self.context.list_devices(subsystem="block"):
|
||||||
if "ID_NAME" in device.properties:
|
if "ID_NAME" in device.properties:
|
||||||
dev_list.add(device.device_node)
|
dev_list.add(device.device_node)
|
||||||
@ -68,15 +77,15 @@ class Hardware:
|
|||||||
return dev_list
|
return dev_list
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def support_audio(self):
|
def support_audio(self) -> bool:
|
||||||
"""Return True if the system have audio support."""
|
"""Return True if the system have audio support."""
|
||||||
return bool(self.audio_devices)
|
return bool(self.audio_devices)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def audio_devices(self):
|
def audio_devices(self) -> Dict[str, Any]:
|
||||||
"""Return all available audio interfaces."""
|
"""Return all available audio interfaces."""
|
||||||
if not ASOUND_CARDS.exists():
|
if not ASOUND_CARDS.exists():
|
||||||
_LOGGER.debug("No audio devices found")
|
_LOGGER.info("No audio devices found")
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -86,7 +95,7 @@ class Hardware:
|
|||||||
_LOGGER.error("Can't read asound data: %s", err)
|
_LOGGER.error("Can't read asound data: %s", err)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
audio_list = {}
|
audio_list: Dict[str, Any] = {}
|
||||||
|
|
||||||
# parse cards
|
# parse cards
|
||||||
for match in RE_CARDS.finditer(cards):
|
for match in RE_CARDS.finditer(cards):
|
||||||
@ -109,31 +118,31 @@ class Hardware:
|
|||||||
return audio_list
|
return audio_list
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def support_gpio(self):
|
def support_gpio(self) -> bool:
|
||||||
"""Return True if device support GPIOs."""
|
"""Return True if device support GPIOs."""
|
||||||
return SOC_DEVICES.exists() and GPIO_DEVICES.exists()
|
return SOC_DEVICES.exists() and GPIO_DEVICES.exists()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def gpio_devices(self):
|
def gpio_devices(self) -> Set[str]:
|
||||||
"""Return list of GPIO interface on device."""
|
"""Return list of GPIO interface on device."""
|
||||||
dev_list = set()
|
dev_list: Set[str] = set()
|
||||||
for interface in GPIO_DEVICES.glob("gpio*"):
|
for interface in GPIO_DEVICES.glob("gpio*"):
|
||||||
dev_list.add(interface.name)
|
dev_list.add(interface.name)
|
||||||
|
|
||||||
return dev_list
|
return dev_list
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def last_boot(self):
|
def last_boot(self) -> Optional[str]:
|
||||||
"""Return last boot time."""
|
"""Return last boot time."""
|
||||||
try:
|
try:
|
||||||
with PROC_STAT.open("r") as stat_file:
|
with PROC_STAT.open("r") as stat_file:
|
||||||
stats = stat_file.read()
|
stats: str = stat_file.read()
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
_LOGGER.error("Can't read stat data: %s", err)
|
_LOGGER.error("Can't read stat data: %s", err)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# parse stat file
|
# parse stat file
|
||||||
found = RE_BOOT_TIME.search(stats)
|
found: Optional[re.Match] = RE_BOOT_TIME.search(stats)
|
||||||
if not found:
|
if not found:
|
||||||
_LOGGER.error("Can't found last boot time!")
|
_LOGGER.error("Can't found last boot time!")
|
||||||
return None
|
return None
|
||||||
|
Loading…
x
Reference in New Issue
Block a user