Next generation hardware handling (#2429)

* Next generation hardware handling

* need daemon for some details

* fix tests

* fix wrong coresys lookup

* test initial import

* test device lookup

* validate if device exists

* Add cgroups rules manager

* mapping udev from host

* Modify validation/options handling

* lookup devices

* add support for host udev mapping

* next

* Add policy support to add-ons

* Depricate hardware trigger call

* next cleanup round

* detect USB linking

* optimize

* readd udev utils for backwards compatibility

* fix tests

* Add more tests

* fix tests

* Make device explicit

* Add filter

* work on tests

* Add migration step

* clean out auto_uart

* Fix all tests

* Expose all device information

* small  improvment

* Fix loop over right devices

* Use migration for new device format

* Update rootfs/etc/cont-init.d/udev.sh

Co-authored-by: Franck Nijhof <git@frenck.dev>

* Fix old helper

* Fix API

* add helper for by-id

* fix tests

* Fix serial helper

* Fix hardware API schema

* Hide some virtual devices from tracking

* Apply suggestions from code review

Co-authored-by: Stefan Agner <stefan@agner.ch>

* Update supervisor/addons/validate.py

Co-authored-by: Stefan Agner <stefan@agner.ch>

* Update supervisor/addons/validate.py

Co-authored-by: Stefan Agner <stefan@agner.ch>

* fix lint

* Apply suggestions from code review

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

* Apply suggestions from code review

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

* fix black

* fix lint

Co-authored-by: Franck Nijhof <git@frenck.dev>
Co-authored-by: Stefan Agner <stefan@agner.ch>
Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Pascal Vizeli 2021-01-28 15:26:56 +01:00 committed by GitHub
parent 69a8a83528
commit 6a0206c1e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 1473 additions and 968 deletions

View File

@ -2,9 +2,16 @@
# ============================================================================== # ==============================================================================
# Start udev service # Start udev service
# ============================================================================== # ==============================================================================
if bashio::fs.directory_exists /run/udev; then
bashio::log.info "Using udev information from host"
bashio::exit.ok
fi
bashio::log.info "Setup udev backend inside container"
udevd --daemon udevd --daemon
bashio::log.info "Update udev information"
if udevadm trigger; then if udevadm trigger; then
udevadm settle || true udevadm settle || true
else else

View File

@ -10,7 +10,7 @@ import secrets
import shutil import shutil
import tarfile import tarfile
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any, Awaitable, Dict, List, Optional from typing import Any, Awaitable, Dict, List, Optional, Set
import aiohttp import aiohttp
import voluptuous as vol import voluptuous as vol
@ -55,13 +55,15 @@ from ..exceptions import (
HostAppArmorError, HostAppArmorError,
JsonFileError, JsonFileError,
) )
from ..hardware.data import Device
from ..utils import check_port from ..utils import check_port
from ..utils.apparmor import adjust_profile from ..utils.apparmor import adjust_profile
from ..utils.json import read_json_file, write_json_file from ..utils.json import read_json_file, write_json_file
from ..utils.tar import atomic_contents_add, secure_path from ..utils.tar import atomic_contents_add, secure_path
from .model import AddonModel, Data from .model import AddonModel, Data
from .options import AddonOptions
from .utils import remove_data from .utils import remove_data
from .validate import SCHEMA_ADDON_SNAPSHOT, validate_options from .validate import SCHEMA_ADDON_SNAPSHOT
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -394,6 +396,20 @@ class Addon(AddonModel):
"""Return path to asound config for Docker.""" """Return path to asound config for Docker."""
return Path(self.sys_config.path_extern_tmp, f"{self.slug}_pulse") return Path(self.sys_config.path_extern_tmp, f"{self.slug}_pulse")
@property
def devices(self) -> Set[Device]:
"""Create a schema for add-on options."""
raw_schema = self.data[ATTR_SCHEMA]
if isinstance(raw_schema, bool) or not raw_schema:
return set()
# Validate devices
options_validator = AddonOptions(self.coresys, raw_schema)
with suppress(vol.Invalid):
options_validator(self.options)
return options_validator.devices
def save_persist(self) -> None: def save_persist(self) -> None:
"""Save data of add-on.""" """Save data of add-on."""
self.sys_addons.data.save_data() self.sys_addons.data.save_data()
@ -442,20 +458,17 @@ class Addon(AddonModel):
async def write_options(self) -> None: async def write_options(self) -> None:
"""Return True if add-on options is written to data.""" """Return True if add-on options is written to data."""
schema = self.schema
options = self.options
# Update secrets for validation # Update secrets for validation
await self.sys_homeassistant.secrets.reload() await self.sys_homeassistant.secrets.reload()
try: try:
options = schema(options) options = self.schema(self.options)
write_json_file(self.path_options, options) write_json_file(self.path_options, options)
except vol.Invalid as ex: except vol.Invalid as ex:
_LOGGER.error( _LOGGER.error(
"Add-on %s has invalid options: %s", "Add-on %s has invalid options: %s",
self.slug, self.slug,
humanize_error(options, ex), humanize_error(self.options, ex),
) )
except JsonFileError: except JsonFileError:
_LOGGER.error("Add-on %s can't write options", self.slug) _LOGGER.error("Add-on %s can't write options", self.slug)
@ -538,7 +551,7 @@ class Addon(AddonModel):
# create voluptuous # create voluptuous
new_schema = vol.Schema( new_schema = vol.Schema(
vol.All(dict, validate_options(self.coresys, new_raw_schema)) vol.All(dict, AddonOptions(self.coresys, new_raw_schema))
) )
# validate # validate

View File

@ -12,7 +12,6 @@ from ..const import (
ATTR_ARCH, ATTR_ARCH,
ATTR_AUDIO, ATTR_AUDIO,
ATTR_AUTH_API, ATTR_AUTH_API,
ATTR_AUTO_UART,
ATTR_BOOT, ATTR_BOOT,
ATTR_DESCRIPTON, ATTR_DESCRIPTON,
ATTR_DEVICES, ATTR_DEVICES,
@ -56,6 +55,7 @@ from ..const import (
ATTR_STDIN, ATTR_STDIN,
ATTR_TIMEOUT, ATTR_TIMEOUT,
ATTR_TMPFS, ATTR_TMPFS,
ATTR_UART,
ATTR_UDEV, ATTR_UDEV,
ATTR_URL, ATTR_URL,
ATTR_USB, ATTR_USB,
@ -71,7 +71,8 @@ from ..const import (
AddonStartup, AddonStartup,
) )
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from .validate import RE_SERVICE, RE_VOLUME, schema_ui_options, validate_options from .options import AddonOptions, UiOptions
from .validate import RE_SERVICE, RE_VOLUME
Data = Dict[str, Any] Data = Dict[str, Any]
@ -296,9 +297,9 @@ class AddonModel(CoreSysAttributes, ABC):
return self.data[ATTR_HOST_DBUS] return self.data[ATTR_HOST_DBUS]
@property @property
def devices(self) -> List[str]: def static_devices(self) -> List[Path]:
"""Return devices of add-on.""" """Return static devices of add-on."""
return self.data.get(ATTR_DEVICES, []) return [Path(node) for node in self.data.get(ATTR_DEVICES, [])]
@property @property
def tmpfs(self) -> Optional[str]: def tmpfs(self) -> Optional[str]:
@ -387,7 +388,7 @@ class AddonModel(CoreSysAttributes, ABC):
@property @property
def with_uart(self) -> bool: def with_uart(self) -> bool:
"""Return True if we should map all UART device.""" """Return True if we should map all UART device."""
return self.data[ATTR_AUTO_UART] return self.data[ATTR_UART]
@property @property
def with_udev(self) -> bool: def with_udev(self) -> bool:
@ -522,8 +523,8 @@ class AddonModel(CoreSysAttributes, ABC):
raw_schema = self.data[ATTR_SCHEMA] raw_schema = self.data[ATTR_SCHEMA]
if isinstance(raw_schema, bool): if isinstance(raw_schema, bool):
return vol.Schema(dict) raw_schema = {}
return vol.Schema(vol.All(dict, validate_options(self.coresys, raw_schema))) return vol.Schema(vol.All(dict, AddonOptions(self.coresys, raw_schema)))
@property @property
def schema_ui(self) -> Optional[List[Dict[str, Any]]]: def schema_ui(self) -> Optional[List[Dict[str, Any]]]:
@ -532,7 +533,7 @@ class AddonModel(CoreSysAttributes, ABC):
if isinstance(raw_schema, bool): if isinstance(raw_schema, bool):
return None return None
return schema_ui_options(raw_schema) return UiOptions(self.coresys)(raw_schema)
def __eq__(self, other): def __eq__(self, other):
"""Compaired add-on objects.""" """Compaired add-on objects."""

View File

@ -0,0 +1,380 @@
"""Add-on Options / UI rendering."""
import logging
from pathlib import Path
import re
from typing import Any, Dict, List, Set, Union
import voluptuous as vol
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HardwareNotFound
from ..hardware.const import UdevSubsystem
from ..hardware.data import Device
from ..validate import network_port
_LOGGER: logging.Logger = logging.getLogger(__name__)
_STR = "str"
_INT = "int"
_FLOAT = "float"
_BOOL = "bool"
_PASSWORD = "password"
_EMAIL = "email"
_URL = "url"
_PORT = "port"
_MATCH = "match"
_LIST = "list"
_DEVICE = "device"
RE_SCHEMA_ELEMENT = re.compile(
r"^(?:"
r"|bool"
r"|email"
r"|url"
r"|port"
r"|device(?:\((?P<filter>subsystem=[a-z]+)\))?"
r"|str(?:\((?P<s_min>\d+)?,(?P<s_max>\d+)?\))?"
r"|password(?:\((?P<p_min>\d+)?,(?P<p_max>\d+)?\))?"
r"|int(?:\((?P<i_min>\d+)?,(?P<i_max>\d+)?\))?"
r"|float(?:\((?P<f_min>[\d\.]+)?,(?P<f_max>[\d\.]+)?\))?"
r"|match\((?P<match>.*)\)"
r"|list\((?P<list>.+)\)"
r")\??$"
)
_SCHEMA_LENGTH_PARTS = (
"i_min",
"i_max",
"f_min",
"f_max",
"s_min",
"s_max",
"p_min",
"p_max",
)
class AddonOptions(CoreSysAttributes):
"""Validate Add-ons Options."""
def __init__(
self,
coresys: CoreSys,
raw_schema: Dict[str, Any],
):
"""Validate schema."""
self.coresys: CoreSys = coresys
self.raw_schema: Dict[str, Any] = raw_schema
self.devices: Set[Device] = set()
def __call__(self, struct):
"""Create schema validator for add-ons options."""
options = {}
# read options
for key, value in struct.items():
# Ignore unknown options / remove from list
if key not in self.raw_schema:
_LOGGER.warning("Unknown options %s", key)
continue
typ = self.raw_schema[key]
try:
if isinstance(typ, list):
# nested value list
options[key] = self._nested_validate_list(typ[0], value, key)
elif isinstance(typ, dict):
# nested value dict
options[key] = self._nested_validate_dict(typ, value, key)
else:
# normal value
options[key] = self._single_validate(typ, value, key)
except (IndexError, KeyError):
raise vol.Invalid(f"Type error for {key}") from None
self._check_missing_options(self.raw_schema, options, "root")
return options
# pylint: disable=no-value-for-parameter
def _single_validate(self, typ: str, value: Any, key: str):
"""Validate a single element."""
# if required argument
if value is None:
raise vol.Invalid(f"Missing required option '{key}'") from None
# Lookup secret
if str(value).startswith("!secret "):
secret: str = value.partition(" ")[2]
value = self.sys_homeassistant.secrets.get(secret)
if value is None:
raise vol.Invalid(f"Unknown secret {secret}") from None
# parse extend data from type
match = RE_SCHEMA_ELEMENT.match(typ)
if not match:
raise vol.Invalid(f"Unknown type {typ}") from None
# prepare range
range_args = {}
for group_name in _SCHEMA_LENGTH_PARTS:
group_value = match.group(group_name)
if group_value:
range_args[group_name[2:]] = float(group_value)
if typ.startswith(_STR) or typ.startswith(_PASSWORD):
return vol.All(str(value), vol.Range(**range_args))(value)
elif typ.startswith(_INT):
return vol.All(vol.Coerce(int), vol.Range(**range_args))(value)
elif typ.startswith(_FLOAT):
return vol.All(vol.Coerce(float), vol.Range(**range_args))(value)
elif typ.startswith(_BOOL):
return vol.Boolean()(value)
elif typ.startswith(_EMAIL):
return vol.Email()(value)
elif typ.startswith(_URL):
return vol.Url()(value)
elif typ.startswith(_PORT):
return network_port(value)
elif typ.startswith(_MATCH):
return vol.Match(match.group("match"))(str(value))
elif typ.startswith(_LIST):
return vol.In(match.group("list").split("|"))(str(value))
elif typ.startswith(_DEVICE):
try:
device = self.sys_hardware.get_by_path(Path(value))
except HardwareNotFound:
raise vol.Invalid(f"Device {value} does not exists!") from None
# Have filter
if match.group("filter"):
str_filter = match.group("filter")
device_filter = _create_device_filter(str_filter)
if device not in self.sys_hardware.filter_devices(**device_filter):
raise vol.Invalid(
f"Device {value} don't match the filter {str_filter}!"
)
# Device valid
self.devices.add(device)
return str(device.path)
raise vol.Invalid(f"Fatal error for {key} type {typ}") from None
def _nested_validate_list(self, typ: Any, data_list: List[Any], key: str):
"""Validate nested items."""
options = []
# Make sure it is a list
if not isinstance(data_list, list):
raise vol.Invalid(f"Invalid list for {key}") from None
# Process list
for element in data_list:
# Nested?
if isinstance(typ, dict):
c_options = self._nested_validate_dict(typ, element, key)
options.append(c_options)
else:
options.append(self._single_validate(typ, element, key))
return options
def _nested_validate_dict(
self, typ: Dict[Any, Any], data_dict: Dict[Any, Any], key: str
):
"""Validate nested items."""
options = {}
# Make sure it is a dict
if not isinstance(data_dict, dict):
raise vol.Invalid(f"Invalid dict for {key}") from None
# Process dict
for c_key, c_value in data_dict.items():
# Ignore unknown options / remove from list
if c_key not in typ:
_LOGGER.warning("Unknown options %s", c_key)
continue
# Nested?
if isinstance(typ[c_key], list):
options[c_key] = self._nested_validate_list(
typ[c_key][0], c_value, c_key
)
else:
options[c_key] = self._single_validate(typ[c_key], c_value, c_key)
self._check_missing_options(typ, options, key)
return options
def _check_missing_options(
self, origin: Dict[Any, Any], exists: Dict[Any, Any], root: str
) -> None:
"""Check if all options are exists."""
missing = set(origin) - set(exists)
for miss_opt in missing:
if isinstance(origin[miss_opt], str) and origin[miss_opt].endswith("?"):
continue
raise vol.Invalid(f"Missing option {miss_opt} in {root}") from None
class UiOptions(CoreSysAttributes):
"""Render UI Add-ons Options."""
def __init__(self, coresys: CoreSys) -> None:
"""Initialize UI option render."""
self.coresys = coresys
def __call__(self, raw_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate UI schema."""
ui_schema: List[Dict[str, Any]] = []
# read options
for key, value in raw_schema.items():
if isinstance(value, list):
# nested value list
self._nested_ui_list(ui_schema, value, key)
elif isinstance(value, dict):
# nested value dict
self._nested_ui_dict(ui_schema, value, key)
else:
# normal value
self._single_ui_option(ui_schema, value, key)
return ui_schema
def _single_ui_option(
self,
ui_schema: List[Dict[str, Any]],
value: str,
key: str,
multiple: bool = False,
) -> None:
"""Validate a single element."""
ui_node: Dict[str, Union[str, bool, float, List[str]]] = {"name": key}
# If multiple
if multiple:
ui_node["multiple"] = True
# Parse extend data from type
match = RE_SCHEMA_ELEMENT.match(value)
if not match:
return
# Prepare range
for group_name in _SCHEMA_LENGTH_PARTS:
group_value = match.group(group_name)
if not group_value:
continue
if group_name[2:] == "min":
ui_node["lengthMin"] = float(group_value)
elif group_name[2:] == "max":
ui_node["lengthMax"] = float(group_value)
# If required
if value.endswith("?"):
ui_node["optional"] = True
else:
ui_node["required"] = True
# Data types
if value.startswith(_STR):
ui_node["type"] = "string"
elif value.startswith(_PASSWORD):
ui_node["type"] = "string"
ui_node["format"] = "password"
elif value.startswith(_INT):
ui_node["type"] = "integer"
elif value.startswith(_FLOAT):
ui_node["type"] = "float"
elif value.startswith(_BOOL):
ui_node["type"] = "boolean"
elif value.startswith(_EMAIL):
ui_node["type"] = "string"
ui_node["format"] = "email"
elif value.startswith(_URL):
ui_node["type"] = "string"
ui_node["format"] = "url"
elif value.startswith(_PORT):
ui_node["type"] = "integer"
elif value.startswith(_MATCH):
ui_node["type"] = "string"
elif value.startswith(_LIST):
ui_node["type"] = "select"
ui_node["options"] = match.group("list").split("|")
elif value.startswith(_DEVICE):
ui_node["type"] = "select"
# Have filter
if match.group("filter"):
device_filter = _create_device_filter(match.group("filter"))
ui_node["options"] = [
device.path.as_posix()
for device in self.sys_hardware.filter_devices(**device_filter)
]
else:
ui_node["options"] = [
device.path.as_posix() for device in self.sys_hardware.devices()
]
ui_schema.append(ui_node)
def _nested_ui_list(
self,
ui_schema: List[Dict[str, Any]],
option_list: List[Any],
key: str,
) -> None:
"""UI nested list items."""
try:
element = option_list[0]
except IndexError:
_LOGGER.error("Invalid schema %s", key)
return
if isinstance(element, dict):
self._nested_ui_dict(ui_schema, element, key, multiple=True)
else:
self._single_ui_option(ui_schema, element, key, multiple=True)
def _nested_ui_dict(
self,
ui_schema: List[Dict[str, Any]],
option_dict: Dict[str, Any],
key: str,
multiple: bool = False,
) -> None:
"""UI nested dict items."""
ui_node = {
"name": key,
"type": "schema",
"optional": True,
"multiple": multiple,
}
nested_schema = []
for c_key, c_value in option_dict.items():
# Nested?
if isinstance(c_value, list):
self._nested_ui_list(nested_schema, c_value, c_key)
else:
self._single_ui_option(nested_schema, c_value, c_key)
ui_node["schema"] = nested_schema
ui_schema.append(ui_node)
def _create_device_filter(str_filter: str) -> Dict[str, Any]:
"""Generate device Filter."""
raw_filter = dict(value.split("=") for value in str_filter.split(";"))
clean_filter = {}
for key, value in raw_filter.items():
if key == "subsystem":
clean_filter[key] = UdevSubsystem(value)
else:
clean_filter[key] = value
return clean_filter

View File

@ -2,7 +2,7 @@
import logging import logging
import re import re
import secrets import secrets
from typing import Any, Dict, List, Union from typing import Any, Dict
import uuid import uuid
import voluptuous as vol import voluptuous as vol
@ -18,7 +18,6 @@ from ..const import (
ATTR_AUDIO_INPUT, ATTR_AUDIO_INPUT,
ATTR_AUDIO_OUTPUT, ATTR_AUDIO_OUTPUT,
ATTR_AUTH_API, ATTR_AUTH_API,
ATTR_AUTO_UART,
ATTR_AUTO_UPDATE, ATTR_AUTO_UPDATE,
ATTR_BOOT, ATTR_BOOT,
ATTR_BUILD_FROM, ATTR_BUILD_FROM,
@ -73,6 +72,7 @@ from ..const import (
ATTR_SYSTEM, ATTR_SYSTEM,
ATTR_TIMEOUT, ATTR_TIMEOUT,
ATTR_TMPFS, ATTR_TMPFS,
ATTR_UART,
ATTR_UDEV, ATTR_UDEV,
ATTR_URL, ATTR_URL,
ATTR_USB, ATTR_USB,
@ -90,7 +90,6 @@ from ..const import (
AddonStartup, AddonStartup,
AddonState, AddonState,
) )
from ..coresys import CoreSys
from ..discovery.validate import valid_discovery_service from ..discovery.validate import valid_discovery_service
from ..validate import ( from ..validate import (
docker_image, docker_image,
@ -101,49 +100,13 @@ from ..validate import (
uuid_match, uuid_match,
version_tag, version_tag,
) )
from .options import RE_SCHEMA_ELEMENT
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
RE_VOLUME = re.compile(r"^(config|ssl|addons|backup|share|media)(?::(rw|ro))?$") RE_VOLUME = re.compile(r"^(config|ssl|addons|backup|share|media)(?::(rw|ro))?$")
RE_SERVICE = re.compile(r"^(?P<service>mqtt|mysql):(?P<rights>provide|want|need)$") RE_SERVICE = re.compile(r"^(?P<service>mqtt|mysql):(?P<rights>provide|want|need)$")
V_STR = "str"
V_INT = "int"
V_FLOAT = "float"
V_BOOL = "bool"
V_PASSWORD = "password"
V_EMAIL = "email"
V_URL = "url"
V_PORT = "port"
V_MATCH = "match"
V_LIST = "list"
RE_SCHEMA_ELEMENT = re.compile(
r"^(?:"
r"|bool"
r"|email"
r"|url"
r"|port"
r"|str(?:\((?P<s_min>\d+)?,(?P<s_max>\d+)?\))?"
r"|password(?:\((?P<p_min>\d+)?,(?P<p_max>\d+)?\))?"
r"|int(?:\((?P<i_min>\d+)?,(?P<i_max>\d+)?\))?"
r"|float(?:\((?P<f_min>[\d\.]+)?,(?P<f_max>[\d\.]+)?\))?"
r"|match\((?P<match>.*)\)"
r"|list\((?P<list>.+)\)"
r")\??$"
)
_SCHEMA_LENGTH_PARTS = (
"i_min",
"i_max",
"f_min",
"f_max",
"s_min",
"s_max",
"p_min",
"p_max",
)
RE_DOCKER_IMAGE_BUILD = re.compile( RE_DOCKER_IMAGE_BUILD = re.compile(
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$" r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$"
@ -173,27 +136,63 @@ RE_MACHINE = re.compile(
) )
def _simple_startup(value) -> str: def _migrate_addon_config(protocol=False):
"""Define startup schema.""" """Migrate addon config."""
if value == "before":
return AddonStartup.SERVICES.value def _migrate(config: Dict[str, Any]):
if value == "after": name = config.get(ATTR_NAME)
return AddonStartup.APPLICATION.value if not name:
return value raise vol.Invalid("Invalid Add-on config!")
# Startup 2018-03-30
if config.get(ATTR_STARTUP) in ("before", "after"):
value = config[ATTR_STARTUP]
if protocol:
_LOGGER.warning(
"Add-on config 'startup' with '%s' is depircated. Please report this to the maintainer of %s",
value,
name,
)
if value == "before":
config[ATTR_STARTUP] = AddonStartup.SERVICES.value
elif value == "after":
config[ATTR_STARTUP] = AddonStartup.APPLICATION.value
# UART 2021-01-20
if "auto_uart" in config:
if protocol:
_LOGGER.warning(
"Add-on config 'auto_uart' is deprecated, use 'uart'. Please report this to the maintainer of %s",
name,
)
config[ATTR_UART] = config.pop("auto_uart")
# Device 2021-01-20
if ATTR_DEVICES in config and any(":" in line for line in config[ATTR_DEVICES]):
if protocol:
_LOGGER.warning(
"Add-on config 'devices' use a deprecated format, the new format uses a list of paths only. Please report this to the maintainer of %s",
name,
)
config[ATTR_DEVICES] = [line.split(":")[0] for line in config[ATTR_DEVICES]]
return config
return _migrate
# pylint: disable=no-value-for-parameter # pylint: disable=no-value-for-parameter
SCHEMA_ADDON_CONFIG = vol.Schema( _SCHEMA_ADDON_CONFIG = vol.Schema(
{ {
vol.Required(ATTR_NAME): vol.Coerce(str), vol.Required(ATTR_NAME): str,
vol.Required(ATTR_VERSION): version_tag, vol.Required(ATTR_VERSION): version_tag,
vol.Required(ATTR_SLUG): vol.Coerce(str), vol.Required(ATTR_SLUG): str,
vol.Required(ATTR_DESCRIPTON): vol.Coerce(str), vol.Required(ATTR_DESCRIPTON): str,
vol.Required(ATTR_ARCH): [vol.In(ARCH_ALL)], vol.Required(ATTR_ARCH): [vol.In(ARCH_ALL)],
vol.Optional(ATTR_MACHINE): vol.All([vol.Match(RE_MACHINE)], vol.Unique()), vol.Optional(ATTR_MACHINE): vol.All([vol.Match(RE_MACHINE)], vol.Unique()),
vol.Optional(ATTR_URL): vol.Url(), vol.Optional(ATTR_URL): vol.Url(),
vol.Optional(ATTR_STARTUP, default=AddonStartup.APPLICATION): vol.All( vol.Optional(ATTR_STARTUP, default=AddonStartup.APPLICATION): vol.Coerce(
_simple_startup, vol.Coerce(AddonStartup) AddonStartup
), ),
vol.Optional(ATTR_BOOT, default=AddonBoot.AUTO): vol.Coerce(AddonBoot), vol.Optional(ATTR_BOOT, default=AddonBoot.AUTO): vol.Coerce(AddonBoot),
vol.Optional(ATTR_INIT, default=True): vol.Boolean(), vol.Optional(ATTR_INIT, default=True): vol.Boolean(),
@ -211,21 +210,20 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_INGRESS_PORT, default=8099): vol.Any( vol.Optional(ATTR_INGRESS_PORT, default=8099): vol.Any(
network_port, vol.Equal(0) network_port, vol.Equal(0)
), ),
vol.Optional(ATTR_INGRESS_ENTRY): vol.Coerce(str), vol.Optional(ATTR_INGRESS_ENTRY): str,
vol.Optional(ATTR_PANEL_ICON, default="mdi:puzzle"): vol.Coerce(str), vol.Optional(ATTR_PANEL_ICON, default="mdi:puzzle"): str,
vol.Optional(ATTR_PANEL_TITLE): vol.Coerce(str), vol.Optional(ATTR_PANEL_TITLE): str,
vol.Optional(ATTR_PANEL_ADMIN, default=True): vol.Boolean(), vol.Optional(ATTR_PANEL_ADMIN, default=True): vol.Boolean(),
vol.Optional(ATTR_HOMEASSISTANT): vol.Maybe(version_tag), vol.Optional(ATTR_HOMEASSISTANT): vol.Maybe(version_tag),
vol.Optional(ATTR_HOST_NETWORK, default=False): vol.Boolean(), vol.Optional(ATTR_HOST_NETWORK, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_PID, default=False): vol.Boolean(), vol.Optional(ATTR_HOST_PID, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_IPC, default=False): vol.Boolean(), vol.Optional(ATTR_HOST_IPC, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(), vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")], vol.Optional(ATTR_DEVICES): [str],
vol.Optional(ATTR_AUTO_UART, default=False): vol.Boolean(),
vol.Optional(ATTR_UDEV, default=False): vol.Boolean(), vol.Optional(ATTR_UDEV, default=False): vol.Boolean(),
vol.Optional(ATTR_TMPFS): vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"), vol.Optional(ATTR_TMPFS): vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"),
vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)], vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)],
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)}, vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): str},
vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)], vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)],
vol.Optional(ATTR_APPARMOR, default=True): vol.Boolean(), vol.Optional(ATTR_APPARMOR, default=True): vol.Boolean(),
vol.Optional(ATTR_FULL_ACCESS, default=False): vol.Boolean(), vol.Optional(ATTR_FULL_ACCESS, default=False): vol.Boolean(),
@ -233,6 +231,7 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_VIDEO, default=False): vol.Boolean(), vol.Optional(ATTR_VIDEO, default=False): vol.Boolean(),
vol.Optional(ATTR_GPIO, default=False): vol.Boolean(), vol.Optional(ATTR_GPIO, default=False): vol.Boolean(),
vol.Optional(ATTR_USB, default=False): vol.Boolean(), vol.Optional(ATTR_USB, default=False): vol.Boolean(),
vol.Optional(ATTR_UART, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(), vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(),
vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(), vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(), vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(),
@ -244,26 +243,20 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(), vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(),
vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)], vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)],
vol.Optional(ATTR_DISCOVERY): [valid_discovery_service], vol.Optional(ATTR_DISCOVERY): [valid_discovery_service],
vol.Optional(ATTR_SNAPSHOT_EXCLUDE): [vol.Coerce(str)], vol.Optional(ATTR_SNAPSHOT_EXCLUDE): [str],
vol.Optional(ATTR_OPTIONS, default={}): dict, vol.Optional(ATTR_OPTIONS, default={}): dict,
vol.Optional(ATTR_SCHEMA, default={}): vol.Any( vol.Optional(ATTR_SCHEMA, default={}): vol.Any(
vol.Schema( vol.Schema(
{ {
vol.Coerce(str): vol.Any( str: vol.Any(
SCHEMA_ELEMENT, SCHEMA_ELEMENT,
[ [
vol.Any( vol.Any(
SCHEMA_ELEMENT, SCHEMA_ELEMENT,
{ {str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])},
vol.Coerce(str): vol.Any(
SCHEMA_ELEMENT, [SCHEMA_ELEMENT]
)
},
) )
], ],
vol.Schema( vol.Schema({str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}),
{vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}
),
) )
} }
), ),
@ -277,6 +270,8 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
extra=vol.REMOVE_EXTRA, extra=vol.REMOVE_EXTRA,
) )
SCHEMA_ADDON_CONFIG = vol.All(_migrate_addon_config(True), _SCHEMA_ADDON_CONFIG)
# pylint: disable=no-value-for-parameter # pylint: disable=no-value-for-parameter
SCHEMA_BUILD_CONFIG = vol.Schema( SCHEMA_BUILD_CONFIG = vol.Schema(
@ -300,15 +295,13 @@ SCHEMA_ADDON_USER = vol.Schema(
vol.Optional(ATTR_IMAGE): docker_image, vol.Optional(ATTR_IMAGE): docker_image,
vol.Optional(ATTR_UUID, default=lambda: uuid.uuid4().hex): uuid_match, vol.Optional(ATTR_UUID, default=lambda: uuid.uuid4().hex): uuid_match,
vol.Optional(ATTR_ACCESS_TOKEN): token, vol.Optional(ATTR_ACCESS_TOKEN): token,
vol.Optional(ATTR_INGRESS_TOKEN, default=secrets.token_urlsafe): vol.Coerce( vol.Optional(ATTR_INGRESS_TOKEN, default=secrets.token_urlsafe): str,
str
),
vol.Optional(ATTR_OPTIONS, default=dict): dict, vol.Optional(ATTR_OPTIONS, default=dict): dict,
vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(), vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(),
vol.Optional(ATTR_BOOT): vol.Coerce(AddonBoot), vol.Optional(ATTR_BOOT): vol.Coerce(AddonBoot),
vol.Optional(ATTR_NETWORK): docker_ports, vol.Optional(ATTR_NETWORK): docker_ports,
vol.Optional(ATTR_AUDIO_OUTPUT): vol.Maybe(vol.Coerce(str)), vol.Optional(ATTR_AUDIO_OUTPUT): vol.Maybe(str),
vol.Optional(ATTR_AUDIO_INPUT): vol.Maybe(vol.Coerce(str)), vol.Optional(ATTR_AUDIO_INPUT): vol.Maybe(str),
vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(), vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(), vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(),
vol.Optional(ATTR_WATCHDOG, default=False): vol.Boolean(), vol.Optional(ATTR_WATCHDOG, default=False): vol.Boolean(),
@ -317,18 +310,21 @@ SCHEMA_ADDON_USER = vol.Schema(
) )
SCHEMA_ADDON_SYSTEM = SCHEMA_ADDON_CONFIG.extend( SCHEMA_ADDON_SYSTEM = vol.All(
{ _migrate_addon_config(),
vol.Required(ATTR_LOCATON): vol.Coerce(str), _SCHEMA_ADDON_CONFIG.extend(
vol.Required(ATTR_REPOSITORY): vol.Coerce(str), {
} vol.Required(ATTR_LOCATON): str,
vol.Required(ATTR_REPOSITORY): str,
}
),
) )
SCHEMA_ADDONS_FILE = vol.Schema( SCHEMA_ADDONS_FILE = vol.Schema(
{ {
vol.Optional(ATTR_USER, default=dict): {vol.Coerce(str): SCHEMA_ADDON_USER}, vol.Optional(ATTR_USER, default=dict): {str: SCHEMA_ADDON_USER},
vol.Optional(ATTR_SYSTEM, default=dict): {vol.Coerce(str): SCHEMA_ADDON_SYSTEM}, vol.Optional(ATTR_SYSTEM, default=dict): {str: SCHEMA_ADDON_SYSTEM},
} }
) )
@ -338,263 +334,7 @@ SCHEMA_ADDON_SNAPSHOT = vol.Schema(
vol.Required(ATTR_USER): SCHEMA_ADDON_USER, vol.Required(ATTR_USER): SCHEMA_ADDON_USER,
vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM, vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM,
vol.Required(ATTR_STATE): vol.Coerce(AddonState), vol.Required(ATTR_STATE): vol.Coerce(AddonState),
vol.Required(ATTR_VERSION): vol.Coerce(str), vol.Required(ATTR_VERSION): version_tag,
}, },
extra=vol.REMOVE_EXTRA, extra=vol.REMOVE_EXTRA,
) )
def validate_options(coresys: CoreSys, raw_schema: Dict[str, Any]):
"""Validate schema."""
def validate(struct):
"""Create schema validator for add-ons options."""
options = {}
# read options
for key, value in struct.items():
# Ignore unknown options / remove from list
if key not in raw_schema:
_LOGGER.warning("Unknown options %s", key)
continue
typ = raw_schema[key]
try:
if isinstance(typ, list):
# nested value list
options[key] = _nested_validate_list(coresys, typ[0], value, key)
elif isinstance(typ, dict):
# nested value dict
options[key] = _nested_validate_dict(coresys, typ, value, key)
else:
# normal value
options[key] = _single_validate(coresys, typ, value, key)
except (IndexError, KeyError):
raise vol.Invalid(f"Type error for {key}") from None
_check_missing_options(raw_schema, options, "root")
return options
return validate
# pylint: disable=no-value-for-parameter
# pylint: disable=inconsistent-return-statements
def _single_validate(coresys: CoreSys, typ: str, value: Any, key: str):
"""Validate a single element."""
# if required argument
if value is None:
raise vol.Invalid(f"Missing required option '{key}'") from None
# Lookup secret
if str(value).startswith("!secret "):
secret: str = value.partition(" ")[2]
value = coresys.homeassistant.secrets.get(secret)
if value is None:
raise vol.Invalid(f"Unknown secret {secret}") from None
# parse extend data from type
match = RE_SCHEMA_ELEMENT.match(typ)
if not match:
raise vol.Invalid(f"Unknown type {typ}") from None
# prepare range
range_args = {}
for group_name in _SCHEMA_LENGTH_PARTS:
group_value = match.group(group_name)
if group_value:
range_args[group_name[2:]] = float(group_value)
if typ.startswith(V_STR) or typ.startswith(V_PASSWORD):
return vol.All(str(value), vol.Range(**range_args))(value)
elif typ.startswith(V_INT):
return vol.All(vol.Coerce(int), vol.Range(**range_args))(value)
elif typ.startswith(V_FLOAT):
return vol.All(vol.Coerce(float), vol.Range(**range_args))(value)
elif typ.startswith(V_BOOL):
return vol.Boolean()(value)
elif typ.startswith(V_EMAIL):
return vol.Email()(value)
elif typ.startswith(V_URL):
return vol.Url()(value)
elif typ.startswith(V_PORT):
return network_port(value)
elif typ.startswith(V_MATCH):
return vol.Match(match.group("match"))(str(value))
elif typ.startswith(V_LIST):
return vol.In(match.group("list").split("|"))(str(value))
raise vol.Invalid(f"Fatal error for {key} type {typ}") from None
def _nested_validate_list(coresys, typ, data_list, key):
"""Validate nested items."""
options = []
# Make sure it is a list
if not isinstance(data_list, list):
raise vol.Invalid(f"Invalid list for {key}") from None
# Process list
for element in data_list:
# Nested?
if isinstance(typ, dict):
c_options = _nested_validate_dict(coresys, typ, element, key)
options.append(c_options)
else:
options.append(_single_validate(coresys, typ, element, key))
return options
def _nested_validate_dict(coresys, typ, data_dict, key):
"""Validate nested items."""
options = {}
# Make sure it is a dict
if not isinstance(data_dict, dict):
raise vol.Invalid(f"Invalid dict for {key}") from None
# Process dict
for c_key, c_value in data_dict.items():
# Ignore unknown options / remove from list
if c_key not in typ:
_LOGGER.warning("Unknown options %s", c_key)
continue
# Nested?
if isinstance(typ[c_key], list):
options[c_key] = _nested_validate_list(
coresys, typ[c_key][0], c_value, c_key
)
else:
options[c_key] = _single_validate(coresys, typ[c_key], c_value, c_key)
_check_missing_options(typ, options, key)
return options
def _check_missing_options(origin, exists, root):
"""Check if all options are exists."""
missing = set(origin) - set(exists)
for miss_opt in missing:
if isinstance(origin[miss_opt], str) and origin[miss_opt].endswith("?"):
continue
raise vol.Invalid(f"Missing option {miss_opt} in {root}") from None
def schema_ui_options(raw_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate UI schema."""
ui_schema: List[Dict[str, Any]] = []
# read options
for key, value in raw_schema.items():
if isinstance(value, list):
# nested value list
_nested_ui_list(ui_schema, value, key)
elif isinstance(value, dict):
# nested value dict
_nested_ui_dict(ui_schema, value, key)
else:
# normal value
_single_ui_option(ui_schema, value, key)
return ui_schema
def _single_ui_option(
ui_schema: List[Dict[str, Any]], value: str, key: str, multiple: bool = False
) -> None:
"""Validate a single element."""
ui_node: Dict[str, Union[str, bool, float, List[str]]] = {"name": key}
# If multiple
if multiple:
ui_node["multiple"] = True
# Parse extend data from type
match = RE_SCHEMA_ELEMENT.match(value)
if not match:
return
# Prepare range
for group_name in _SCHEMA_LENGTH_PARTS:
group_value = match.group(group_name)
if not group_value:
continue
if group_name[2:] == "min":
ui_node["lengthMin"] = float(group_value)
elif group_name[2:] == "max":
ui_node["lengthMax"] = float(group_value)
# If required
if value.endswith("?"):
ui_node["optional"] = True
else:
ui_node["required"] = True
# Data types
if value.startswith(V_STR):
ui_node["type"] = "string"
elif value.startswith(V_PASSWORD):
ui_node["type"] = "string"
ui_node["format"] = "password"
elif value.startswith(V_INT):
ui_node["type"] = "integer"
elif value.startswith(V_FLOAT):
ui_node["type"] = "float"
elif value.startswith(V_BOOL):
ui_node["type"] = "boolean"
elif value.startswith(V_EMAIL):
ui_node["type"] = "string"
ui_node["format"] = "email"
elif value.startswith(V_URL):
ui_node["type"] = "string"
ui_node["format"] = "url"
elif value.startswith(V_PORT):
ui_node["type"] = "integer"
elif value.startswith(V_MATCH):
ui_node["type"] = "string"
elif value.startswith(V_LIST):
ui_node["type"] = "select"
ui_node["options"] = match.group("list").split("|")
ui_schema.append(ui_node)
def _nested_ui_list(
ui_schema: List[Dict[str, Any]], option_list: List[Any], key: str
) -> None:
"""UI nested list items."""
try:
element = option_list[0]
except IndexError:
_LOGGER.error("Invalid schema %s", key)
return
if isinstance(element, dict):
_nested_ui_dict(ui_schema, element, key, multiple=True)
else:
_single_ui_option(ui_schema, element, key, multiple=True)
def _nested_ui_dict(
ui_schema: List[Dict[str, Any]],
option_dict: Dict[str, Any],
key: str,
multiple: bool = False,
) -> None:
"""UI nested dict items."""
ui_node = {"name": key, "type": "schema", "optional": True, "multiple": multiple}
nested_schema = []
for c_key, c_value in option_dict.items():
# Nested?
if isinstance(c_value, list):
_nested_ui_list(nested_schema, c_value, c_key)
else:
_single_ui_option(nested_schema, c_value, c_key)
ui_node["schema"] = nested_schema
ui_schema.append(ui_node)

View File

@ -82,6 +82,7 @@ from ..const import (
ATTR_STARTUP, ATTR_STARTUP,
ATTR_STATE, ATTR_STATE,
ATTR_STDIN, ATTR_STDIN,
ATTR_UART,
ATTR_UDEV, ATTR_UDEV,
ATTR_UPDATE_AVAILABLE, ATTR_UPDATE_AVAILABLE,
ATTR_URL, ATTR_URL,
@ -237,7 +238,7 @@ class APIAddons(CoreSysAttributes):
ATTR_PRIVILEGED: addon.privileged, ATTR_PRIVILEGED: addon.privileged,
ATTR_FULL_ACCESS: addon.with_full_access, ATTR_FULL_ACCESS: addon.with_full_access,
ATTR_APPARMOR: addon.apparmor, ATTR_APPARMOR: addon.apparmor,
ATTR_DEVICES: _pretty_devices(addon), ATTR_DEVICES: addon.static_devices,
ATTR_ICON: addon.with_icon, ATTR_ICON: addon.with_icon,
ATTR_LOGO: addon.with_logo, ATTR_LOGO: addon.with_logo,
ATTR_CHANGELOG: addon.with_changelog, ATTR_CHANGELOG: addon.with_changelog,
@ -250,6 +251,7 @@ class APIAddons(CoreSysAttributes):
ATTR_HOMEASSISTANT_API: addon.access_homeassistant_api, ATTR_HOMEASSISTANT_API: addon.access_homeassistant_api,
ATTR_GPIO: addon.with_gpio, ATTR_GPIO: addon.with_gpio,
ATTR_USB: addon.with_usb, ATTR_USB: addon.with_usb,
ATTR_UART: addon.with_uart,
ATTR_KERNEL_MODULES: addon.with_kernel_modules, ATTR_KERNEL_MODULES: addon.with_kernel_modules,
ATTR_DEVICETREE: addon.with_devicetree, ATTR_DEVICETREE: addon.with_devicetree,
ATTR_UDEV: addon.with_udev, ATTR_UDEV: addon.with_udev,
@ -286,6 +288,8 @@ class APIAddons(CoreSysAttributes):
ATTR_VERSION: addon.version, ATTR_VERSION: addon.version,
ATTR_UPDATE_AVAILABLE: addon.need_update, ATTR_UPDATE_AVAILABLE: addon.need_update,
ATTR_WATCHDOG: addon.watchdog, ATTR_WATCHDOG: addon.watchdog,
ATTR_DEVICES: addon.static_devices
+ [device.path for device in addon.devices],
} }
) )

View File

@ -1,50 +1,46 @@
"""Init file for Supervisor hardware RESTful API.""" """Init file for Supervisor hardware RESTful API."""
import asyncio
import logging import logging
from typing import Any, Awaitable, Dict, List from typing import Any, Awaitable, Dict
from aiohttp import web from aiohttp import web
from ..const import ( from ..const import ATTR_AUDIO, ATTR_DEVICES, ATTR_INPUT, ATTR_NAME, ATTR_OUTPUT
ATTR_AUDIO,
ATTR_DISK,
ATTR_GPIO,
ATTR_INPUT,
ATTR_OUTPUT,
ATTR_SERIAL,
ATTR_USB,
)
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..hardware.const import (
ATTR_ATTRIBUTES,
ATTR_BY_ID,
ATTR_DEV_PATH,
ATTR_SUBSYSTEM,
ATTR_SYSFS,
)
from ..hardware.data import Device
from .utils import api_process from .utils import api_process
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
def device_struct(device: Device) -> Dict[str, Any]:
"""Return a dict with information of a interface to be used in th API."""
return {
ATTR_NAME: device.name,
ATTR_SYSFS: device.sysfs,
ATTR_DEV_PATH: device.path,
ATTR_SUBSYSTEM: device.subsystem,
ATTR_BY_ID: device.by_id,
ATTR_ATTRIBUTES: device.attributes,
}
class APIHardware(CoreSysAttributes): class APIHardware(CoreSysAttributes):
"""Handle RESTful API for hardware functions.""" """Handle RESTful API for hardware functions."""
@api_process @api_process
async def info(self, request: web.Request) -> Dict[str, Any]: async def info(self, request: web.Request) -> Dict[str, Any]:
"""Show hardware info.""" """Show hardware info."""
serial: List[str] = []
# Create Serial list with device links
for device in self.sys_hardware.serial_devices:
serial.append(device.path.as_posix())
for link in device.links:
serial.append(link.as_posix())
return { return {
ATTR_SERIAL: serial, ATTR_DEVICES: [
ATTR_INPUT: list(self.sys_hardware.input_devices), device_struct(device) for device in self.sys_hardware.devices
ATTR_DISK: [ ]
device.path.as_posix() for device in self.sys_hardware.disk_devices
],
ATTR_GPIO: list(self.sys_hardware.gpio_devices),
ATTR_USB: [
device.path.as_posix() for device in self.sys_hardware.usb_devices
],
ATTR_AUDIO: self.sys_hardware.audio_devices,
} }
@api_process @api_process
@ -64,6 +60,6 @@ class APIHardware(CoreSysAttributes):
} }
@api_process @api_process
def trigger(self, request: web.Request) -> Awaitable[None]: async def trigger(self, request: web.Request) -> Awaitable[None]:
"""Trigger a udev device reload.""" """Trigger a udev device reload."""
return asyncio.shield(self.sys_hardware.udev_trigger()) _LOGGER.warning("Ignoring DEPRECATED hardware trigger function call.")

View File

@ -35,12 +35,12 @@ from .core import Core
from .coresys import CoreSys from .coresys import CoreSys
from .dbus import DBusManager from .dbus import DBusManager
from .discovery import Discovery from .discovery import Discovery
from .hardware.module import HardwareManager
from .hassos import HassOS from .hassos import HassOS
from .homeassistant import HomeAssistant from .homeassistant import HomeAssistant
from .host import HostManager from .host import HostManager
from .ingress import Ingress from .ingress import Ingress
from .misc.filter import filter_data from .misc.filter import filter_data
from .misc.hwmon import HwMonitor
from .misc.scheduler import Scheduler from .misc.scheduler import Scheduler
from .misc.tasks import Tasks from .misc.tasks import Tasks
from .plugins import PluginManager from .plugins import PluginManager
@ -73,7 +73,7 @@ async def initialize_coresys() -> CoreSys:
coresys.addons = AddonManager(coresys) coresys.addons = AddonManager(coresys)
coresys.snapshots = SnapshotManager(coresys) coresys.snapshots = SnapshotManager(coresys)
coresys.host = HostManager(coresys) coresys.host = HostManager(coresys)
coresys.hwmonitor = HwMonitor(coresys) coresys.hardware = HardwareManager(coresys)
coresys.ingress = Ingress(coresys) coresys.ingress = Ingress(coresys)
coresys.tasks = Tasks(coresys) coresys.tasks = Tasks(coresys)
coresys.services = ServiceManager(coresys) coresys.services = ServiceManager(coresys)

View File

@ -89,7 +89,7 @@ ATTR_AUDIO = "audio"
ATTR_AUDIO_INPUT = "audio_input" ATTR_AUDIO_INPUT = "audio_input"
ATTR_AUDIO_OUTPUT = "audio_output" ATTR_AUDIO_OUTPUT = "audio_output"
ATTR_AUTH_API = "auth_api" ATTR_AUTH_API = "auth_api"
ATTR_AUTO_UART = "auto_uart" ATTR_UART = "uart"
ATTR_AUTO_UPDATE = "auto_update" ATTR_AUTO_UPDATE = "auto_update"
ATTR_AVAILABLE = "available" ATTR_AVAILABLE = "available"
ATTR_BLK_READ = "blk_read" ATTR_BLK_READ = "blk_read"
@ -359,9 +359,6 @@ ROLE_ADMIN = "admin"
ROLE_ALL = [ROLE_DEFAULT, ROLE_HOMEASSISTANT, ROLE_BACKUP, ROLE_MANAGER, ROLE_ADMIN] ROLE_ALL = [ROLE_DEFAULT, ROLE_HOMEASSISTANT, ROLE_BACKUP, ROLE_MANAGER, ROLE_ADMIN]
CHAN_ID = "chan_id"
CHAN_TYPE = "chan_type"
class AddonBoot(str, Enum): class AddonBoot(str, Enum):
"""Boot mode for the add-on.""" """Boot mode for the add-on."""

View File

@ -95,6 +95,8 @@ class Core(CoreSysAttributes):
setup_loads: List[Awaitable[None]] = [ setup_loads: List[Awaitable[None]] = [
# rest api views # rest api views
self.sys_api.load(), self.sys_api.load(),
# Load Host Hardware
self.sys_hardware.load(),
# Load DBus # Load DBus
self.sys_dbus.load(), self.sys_dbus.load(),
# Load Host # Load Host
@ -179,7 +181,7 @@ class Core(CoreSysAttributes):
try: try:
# HomeAssistant is already running / supervisor have only reboot # HomeAssistant is already running / supervisor have only reboot
if self.sys_hardware.last_boot == self.sys_config.last_boot: if self.sys_hardware.helper.last_boot == self.sys_config.last_boot:
_LOGGER.info("Supervisor reboot detected") _LOGGER.info("Supervisor reboot detected")
return return
@ -225,9 +227,6 @@ class Core(CoreSysAttributes):
if self.sys_homeassistant.version == LANDINGPAGE: if self.sys_homeassistant.version == LANDINGPAGE:
self.sys_create_task(self.sys_homeassistant.core.install()) self.sys_create_task(self.sys_homeassistant.core.install())
# Start observe the host Hardware
await self.sys_hwmonitor.load()
# Upate Host/Deivce information # Upate Host/Deivce information
self.sys_create_task(self.sys_host.reload()) self.sys_create_task(self.sys_host.reload())
self.sys_create_task(self.sys_updater.reload()) self.sys_create_task(self.sys_updater.reload())
@ -262,7 +261,7 @@ class Core(CoreSysAttributes):
self.sys_websession.close(), self.sys_websession.close(),
self.sys_websession_ssl.close(), self.sys_websession_ssl.close(),
self.sys_ingress.unload(), self.sys_ingress.unload(),
self.sys_hwmonitor.unload(), self.sys_hardware.unload(),
] ]
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
@ -296,7 +295,7 @@ class Core(CoreSysAttributes):
def _update_last_boot(self): def _update_last_boot(self):
"""Update last boot time.""" """Update last boot time."""
self.sys_config.last_boot = self.sys_hardware.last_boot self.sys_config.last_boot = self.sys_hardware.helper.last_boot
self.sys_config.save_data() self.sys_config.save_data()
async def repair(self): async def repair(self):

View File

@ -11,7 +11,6 @@ import sentry_sdk
from .config import CoreConfig from .config import CoreConfig
from .const import ENV_SUPERVISOR_DEV from .const import ENV_SUPERVISOR_DEV
from .docker import DockerAPI from .docker import DockerAPI
from .misc.hardware import Hardware
if TYPE_CHECKING: if TYPE_CHECKING:
from .addons import AddonManager from .addons import AddonManager
@ -26,7 +25,7 @@ if TYPE_CHECKING:
from .host import HostManager from .host import HostManager
from .ingress import Ingress from .ingress import Ingress
from .jobs import JobManager from .jobs import JobManager
from .misc.hwmon import HwMonitor from .hardware.module import HardwareManager
from .misc.scheduler import Scheduler from .misc.scheduler import Scheduler
from .misc.tasks import Tasks from .misc.tasks import Tasks
from .plugins import PluginManager from .plugins import PluginManager
@ -59,7 +58,6 @@ class CoreSys:
# Global objects # Global objects
self._config: CoreConfig = CoreConfig() self._config: CoreConfig = CoreConfig()
self._hardware: Hardware = Hardware()
self._docker: DockerAPI = DockerAPI() self._docker: DockerAPI = DockerAPI()
# Internal objects pointers # Internal objects pointers
@ -81,7 +79,7 @@ class CoreSys:
self._scheduler: Optional[Scheduler] = None self._scheduler: Optional[Scheduler] = None
self._store: Optional[StoreManager] = None self._store: Optional[StoreManager] = None
self._discovery: Optional[Discovery] = None self._discovery: Optional[Discovery] = None
self._hwmonitor: Optional[HwMonitor] = None self._hardware: Optional[HardwareManager] = None
self._plugins: Optional[PluginManager] = None self._plugins: Optional[PluginManager] = None
self._resolution: Optional[ResolutionManager] = None self._resolution: Optional[ResolutionManager] = None
self._jobs: Optional[JobManager] = None self._jobs: Optional[JobManager] = None
@ -111,11 +109,6 @@ class CoreSys:
"""Return CoreConfig object.""" """Return CoreConfig object."""
return self._config return self._config
@property
def hardware(self) -> Hardware:
"""Return Hardware object."""
return self._hardware
@property @property
def docker(self) -> DockerAPI: def docker(self) -> DockerAPI:
"""Return DockerAPI object.""" """Return DockerAPI object."""
@ -360,18 +353,18 @@ class CoreSys:
self._host = value self._host = value
@property @property
def hwmonitor(self) -> HwMonitor: def hardware(self) -> HardwareManager:
"""Return HwMonitor object.""" """Return HardwareManager object."""
if self._hwmonitor is None: if self._hardware is None:
raise RuntimeError("HwMonitor not set!") raise RuntimeError("HardwareManager not set!")
return self._hwmonitor return self._hardware
@hwmonitor.setter @hardware.setter
def hwmonitor(self, value: HwMonitor) -> None: def hardware(self, value: HardwareManager) -> None:
"""Set a HwMonitor object.""" """Set a HardwareManager object."""
if self._hwmonitor: if self._hardware:
raise RuntimeError("HwMonitor already set!") raise RuntimeError("HardwareManager already set!")
self._hwmonitor = value self._hardware = value
@property @property
def ingress(self) -> Ingress: def ingress(self) -> Ingress:
@ -489,11 +482,6 @@ class CoreSysAttributes:
"""Return CoreConfig object.""" """Return CoreConfig object."""
return self.coresys.config return self.coresys.config
@property
def sys_hardware(self) -> Hardware:
"""Return Hardware object."""
return self.coresys.hardware
@property @property
def sys_docker(self) -> DockerAPI: def sys_docker(self) -> DockerAPI:
"""Return DockerAPI object.""" """Return DockerAPI object."""
@ -585,9 +573,9 @@ class CoreSysAttributes:
return self.coresys.host return self.coresys.host
@property @property
def sys_hwmonitor(self) -> HwMonitor: def sys_hardware(self) -> HardwareManager:
"""Return HwMonitor object.""" """Return HwMonitor object."""
return self.coresys.hwmonitor return self.coresys.hardware
@property @property
def sys_ingress(self) -> Ingress: def sys_ingress(self) -> Ingress:

View File

@ -28,6 +28,7 @@ from ..const import (
) )
from ..coresys import CoreSys from ..coresys import CoreSys
from ..exceptions import CoreDNSError, DockerError from ..exceptions import CoreDNSError, DockerError
from ..hardware.const import PolicyGroup, UdevSubsystem
from ..utils import process_lock from ..utils import process_lock
from .interface import DockerInterface from .interface import DockerInterface
@ -124,34 +125,57 @@ class DockerAddon(DockerInterface):
} }
@property @property
def devices(self) -> List[str]: def devices(self) -> Optional[List[str]]:
"""Return needed devices.""" """Return needed devices."""
devices = [] devices = set()
# Extend add-on config # Extend add-on config
for device in self.addon.devices: for device_path in self.addon.static_devices:
if not Path(device.split(":")[0]).exists(): if not self.sys_hardware.exists_device_node(device_path):
_LOGGER.debug("Ignore static device path %s", device_path)
continue continue
devices.append(device) devices.add(f"{device_path.as_posix()}:{device_path.as_posix()}:rwm")
# Auto mapping UART devices # Auto mapping UART devices / LINKS
if self.addon.with_uart: # Deprecated: In the future the add-on needs to create device links based on API data by itself
for device in self.sys_hardware.serial_devices: if self.addon.with_uart and not self.addon.devices and not self.addon.with_udev:
devices.append(f"{device.path.as_posix()}:{device.path.as_posix()}:rwm") for device in self.sys_hardware.filter_devices(
if self.addon.with_udev: subsystem=UdevSubsystem.SERIAL
):
if not device.by_id:
continue continue
for device_link in device.links: devices.add(f"{device.by_id.as_posix()}:{device.by_id.as_posix()}:rwm")
devices.append(
f"{device_link.as_posix()}:{device_link.as_posix()}:rwm"
)
# Use video devices
if self.addon.with_video:
for device in self.sys_hardware.video_devices:
devices.append(f"{device.path!s}:{device.path!s}:rwm")
# Return None if no devices is present # Return None if no devices is present
return devices or None if devices:
return list(devices)
return None
@property
def cgroups_rules(self) -> Optional[List[str]]:
"""Return a list of needed cgroups permission."""
rules = set()
# Attach correct cgroups
for device in self.addon.devices:
rules.add(self.sys_hardware.policy.get_cgroups_rule(device))
# Video
if self.addon.with_video:
rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.VIDEO))
# GPIO
if self.addon.with_gpio:
rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.GPIO))
# UART
if self.addon.with_uart:
rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.UART))
# Return None if no rules is present
if rules:
return list(rules)
return None
@property @property
def ports(self) -> Optional[Dict[str, Union[str, int, None]]]: def ports(self) -> Optional[Dict[str, Union[str, int, None]]]:
@ -284,7 +308,7 @@ class DockerAddon(DockerInterface):
# Init other hardware mappings # Init other hardware mappings
# GPIO support # GPIO support
if self.addon.with_gpio and self.sys_hardware.support_gpio: if self.addon.with_gpio and self.sys_hardware.helper.support_gpio:
for gpio_path in ("/sys/class/gpio", "/sys/devices/platform/soc"): for gpio_path in ("/sys/class/gpio", "/sys/devices/platform/soc"):
volumes.update({gpio_path: {"bind": gpio_path, "mode": "rw"}}) volumes.update({gpio_path: {"bind": gpio_path, "mode": "rw"}})
@ -299,8 +323,15 @@ class DockerAddon(DockerInterface):
} }
) )
# Host udev support
if self.addon.with_udev:
volumes.update({"/run/dbus": {"bind": "/run/dbus", "mode": "ro"}})
# USB support # USB support
if self.addon.with_usb and self.sys_hardware.usb_devices: if (self.addon.with_usb and self.sys_hardware.helper.usb_devices) or any(
self.sys_hardware.check_subsystem_parents(device, UdevSubsystem.USB)
for device in self.addon.devices
):
volumes.update({"/dev/bus/usb": {"bind": "/dev/bus/usb", "mode": "rw"}}) volumes.update({"/dev/bus/usb": {"bind": "/dev/bus/usb", "mode": "rw"}})
# Kernel Modules support # Kernel Modules support
@ -369,6 +400,7 @@ class DockerAddon(DockerInterface):
ports=self.ports, ports=self.ports,
extra_hosts=self.network_mapping, extra_hosts=self.network_mapping,
devices=self.devices, devices=self.devices,
device_cgroup_rules=self.cgroups_rules,
cap_add=self.addon.privileged, cap_add=self.addon.privileged,
security_opt=self.security_opt, security_opt=self.security_opt,
environment=self.environment, environment=self.environment,

View File

@ -5,6 +5,7 @@ from typing import Dict
from ..const import ENV_TIME, MACHINE_ID from ..const import ENV_TIME, MACHINE_ID
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..hardware.const import PolicyGroup
from .interface import DockerInterface from .interface import DockerInterface
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -66,6 +67,9 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
hostname=self.name.replace("_", "-"), hostname=self.name.replace("_", "-"),
detach=True, detach=True,
privileged=True, privileged=True,
device_cgroup_rules=self.sys_hardware.policy.get_cgroups_rules(
PolicyGroup.AUDIO
),
environment={ENV_TIME: self.sys_config.timezone}, environment={ENV_TIME: self.sys_config.timezone},
volumes=self.volumes, volumes=self.volumes,
) )

View File

@ -1,13 +1,14 @@
"""Init file for Supervisor Docker object.""" """Init file for Supervisor Docker object."""
from ipaddress import IPv4Address from ipaddress import IPv4Address
import logging import logging
from typing import Awaitable, Dict, Optional from typing import Awaitable, Dict, List, Optional
import docker import docker
import requests import requests
from ..const import ENV_TIME, ENV_TOKEN, ENV_TOKEN_HASSIO, LABEL_MACHINE, MACHINE_ID from ..const import ENV_TIME, ENV_TOKEN, ENV_TOKEN_HASSIO, LABEL_MACHINE, MACHINE_ID
from ..exceptions import DockerError from ..exceptions import DockerError
from ..hardware.const import PolicyGroup
from .interface import CommandReturn, DockerInterface from .interface import CommandReturn, DockerInterface
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -47,6 +48,15 @@ class DockerHomeAssistant(DockerInterface):
"""Return IP address of this container.""" """Return IP address of this container."""
return self.sys_docker.network.gateway return self.sys_docker.network.gateway
@property
def cgroups_rules(self) -> List[str]:
"""Return a list of needed cgroups permission."""
return (
self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.UART)
+ self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.VIDEO)
+ self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.GPIO)
)
@property @property
def volumes(self) -> Dict[str, Dict[str, str]]: def volumes(self) -> Dict[str, Dict[str, str]]:
"""Return Volumes for the mount.""" """Return Volumes for the mount."""
@ -117,6 +127,7 @@ class DockerHomeAssistant(DockerInterface):
init=False, init=False,
network_mode="host", network_mode="host",
volumes=self.volumes, volumes=self.volumes,
device_cgroup_rules=self.cgroups_rules,
extra_hosts={ extra_hosts={
"supervisor": self.sys_docker.network.supervisor, "supervisor": self.sys_docker.network.supervisor,
"observer": self.sys_docker.network.observer, "observer": self.sys_docker.network.observer,

View File

@ -296,6 +296,14 @@ class DockerNotFound(DockerError):
# Hardware # Hardware
class HardwareError(HassioError):
"""General Hardware Error on Supervisor."""
class HardwareNotFound(HardwareError):
"""Hardware path or device doesn't exist on the Host."""
class HardwareNotSupportedError(HassioNotSupportedError): class HardwareNotSupportedError(HassioNotSupportedError):
"""Raise if hardware function is not supported.""" """Raise if hardware function is not supported."""

View File

@ -0,0 +1 @@
"""Hardware handler of Supervisor."""

View File

@ -0,0 +1,29 @@
"""Constants for hardware."""
from enum import Enum
ATTR_BY_ID = "by_id"
ATTR_SUBSYSTEM = "subsystem"
ATTR_SYSFS = "sysfs"
ATTR_DEV_PATH = "dev_path"
ATTR_ATTRIBUTES = "attributes"
class UdevSubsystem(str, Enum):
"""Udev subsystem class."""
SERIAL = "tty"
USB = "usb"
INPUT = "input"
DISK = "block"
PCI = "pci"
AUDIO = "sound"
class PolicyGroup(str, Enum):
"""Policy groups backend."""
UART = "uart"
GPIO = "gpio"
USB = "usb"
VIDEO = "video"
AUDIO = "audio"

View File

@ -0,0 +1,36 @@
"""Data representation of Hardware."""
from pathlib import Path
from typing import Dict, List, Optional
import attr
@attr.s(slots=True, frozen=True)
class Device:
"""Represent a device."""
name: str = attr.ib(eq=False)
path: Path = attr.ib(eq=False)
sysfs: Path = attr.ib(eq=True)
subsystem: str = attr.ib(eq=False)
links: List[Path] = attr.ib(eq=False)
attributes: Dict[str, str] = attr.ib(eq=False)
@property
def cgroups_major(self) -> int:
"""Return Major cgroups."""
return int(self.attributes.get("MAJOR", 0))
@property
def cgroups_minor(self) -> int:
"""Return Major cgroups."""
return int(self.attributes.get("MINOR", 0))
@property
def by_id(self) -> Optional[Path]:
"""Return path by-id."""
for link in self.links:
if not link.match("/dev/*/by-id/*"):
continue
return link
return None

View File

@ -0,0 +1,78 @@
"""Read hardware info from system."""
from datetime import datetime
import logging
from pathlib import Path
import re
import shutil
from typing import Optional, Union
import pyudev
from ..coresys import CoreSys, CoreSysAttributes
from .const import UdevSubsystem
_LOGGER: logging.Logger = logging.getLogger(__name__)
_PROC_STAT: Path = Path("/proc/stat")
_RE_BOOT_TIME: re.Pattern = re.compile(r"btime (\d+)")
_GPIO_DEVICES: Path = Path("/sys/class/gpio")
_SOC_DEVICES: Path = Path("/sys/devices/platform/soc")
_RE_HIDE_SYSFS: re.Pattern = re.compile(r"/sys/devices/virtual/(?:tty|block)/.*")
class HwHelper(CoreSysAttributes):
"""Representation of an interface to procfs, sysfs and udev."""
def __init__(self, coresys: CoreSys):
"""Init hardware object."""
self.coresys = coresys
@property
def support_audio(self) -> bool:
"""Return True if the system have audio support."""
return len(self.sys_hardware.filter_devices(subsystem=UdevSubsystem.AUDIO))
@property
def support_gpio(self) -> bool:
"""Return True if device support GPIOs."""
return _SOC_DEVICES.exists() and _GPIO_DEVICES.exists()
@property
def last_boot(self) -> Optional[str]:
"""Return last boot time."""
try:
with _PROC_STAT.open("r") as stat_file:
stats: str = stat_file.read()
except OSError as err:
_LOGGER.error("Can't read stat data: %s", err)
return None
# parse stat file
found: Optional[re.Match] = _RE_BOOT_TIME.search(stats)
if not found:
_LOGGER.error("Can't found last boot time!")
return None
return datetime.utcfromtimestamp(int(found.group(1)))
def hide_virtual_device(self, udev_device: pyudev.Device) -> bool:
"""Small helper to hide not needed Devices."""
return _RE_HIDE_SYSFS.match(udev_device.sys_path) is not None
def get_disk_total_space(self, path: Union[str, Path]) -> float:
"""Return total space (GiB) on disk for path."""
total, _, _ = shutil.disk_usage(path)
return round(total / (1024.0 ** 3), 1)
def get_disk_used_space(self, path: Union[str, Path]) -> float:
"""Return used space (GiB) on disk for path."""
_, used, _ = shutil.disk_usage(path)
return round(used / (1024.0 ** 3), 1)
def get_disk_free_space(self, path: Union[str, Path]) -> float:
"""Return free space (GiB) on disk for path."""
_, _, free = shutil.disk_usage(path)
return round(free / (1024.0 ** 3), 1)

View File

@ -0,0 +1,120 @@
"""Hardware Manager of Supervisor."""
import logging
from pathlib import Path
from typing import Dict, List, Optional
import pyudev
from supervisor.hardware.const import UdevSubsystem
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HardwareNotFound
from .data import Device
from .helper import HwHelper
from .monitor import HwMonitor
from .policy import HwPolicy
_LOGGER: logging.Logger = logging.getLogger(__name__)
class HardwareManager(CoreSysAttributes):
"""Hardware manager for supervisor."""
def __init__(self, coresys: CoreSys):
"""Initialize Hardware Monitor object."""
self.coresys: CoreSys = coresys
self._devices: Dict[str, Device] = {}
self._udev = pyudev.Context()
self._montior: HwMonitor = HwMonitor(coresys)
self._helper: HwHelper = HwHelper(coresys)
self._policy: HwPolicy = HwPolicy(coresys)
@property
def monitor(self) -> HwMonitor:
"""Return Hardware Monitor instance."""
return self._montior
@property
def helper(self) -> HwHelper:
"""Return Hardware Helper instance."""
return self._helper
@property
def policy(self) -> HwPolicy:
"""Return Hardware policy instance."""
return self._policy
@property
def devices(self) -> List[Device]:
"""Return List of devices."""
return list(self._devices.values())
def get_by_path(self, device_node: Path) -> Device:
"""Get Device by path."""
for device in self.devices:
if device_node in (device.path, device.sysfs):
return device
if device_node in device.links:
return device
raise HardwareNotFound()
def filter_devices(self, subsystem: Optional[UdevSubsystem] = None) -> List[Device]:
"""Return a filtered list."""
devices = set()
for device in self.devices:
if subsystem and device.subsystem != subsystem:
continue
devices.add(device)
return list(devices)
def update_device(self, device: Device) -> None:
"""Update or add a (new) Device."""
self._devices[device.name] = device
def delete_device(self, device: Device) -> None:
"""Remove a device from the list."""
self._devices.pop(device.name, None)
def exists_device_node(self, device_node: Path) -> bool:
"""Check if device exists on Host."""
try:
self.get_by_path(device_node)
except HardwareNotFound:
return False
return True
def check_subsystem_parents(self, device: Device, subsystem: UdevSubsystem) -> bool:
"""Return True if the device is part of the given subsystem parent."""
udev_device: pyudev.Device = pyudev.Devices.from_sys_path(
self._udev, str(device.sysfs)
)
return udev_device.find_parent(subsystem.value) is not None
def _import_devices(self) -> None:
"""Import fresh from udev database."""
self._devices.clear()
# Exctract all devices
for device in self._udev.list_devices():
# Skip devices without mapping
if not device.device_node or self.helper.hide_virtual_device(device):
continue
self._devices[device.sys_name] = Device(
device.sys_name,
Path(device.device_node),
Path(device.sys_path),
device.subsystem,
[Path(node) for node in device.device_links],
{attr: device.properties[attr] for attr in device.properties},
)
async def load(self) -> None:
"""Load hardware backend."""
self._import_devices()
await self.monitor.load()
async def unload(self) -> None:
"""Shutdown sessions."""
await self.monitor.unload()

View File

@ -1,14 +1,18 @@
"""Supervisor Hardware monitor based on udev.""" """Supervisor Hardware monitor based on udev."""
from datetime import timedelta from datetime import timedelta
import logging import logging
from pathlib import Path
from pprint import pformat from pprint import pformat
from typing import Optional from typing import Optional
import pyudev import pyudev
from ..const import CoreState
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..resolution.const import UnhealthyReason from ..resolution.const import UnhealthyReason
from ..utils import AsyncCallFilter from ..utils import AsyncCallFilter
from .const import UdevSubsystem
from .data import Device
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -26,7 +30,7 @@ class HwMonitor(CoreSysAttributes):
async def load(self) -> None: async def load(self) -> None:
"""Start hardware monitor.""" """Start hardware monitor."""
try: try:
self.monitor = pyudev.Monitor.from_netlink(self.context) self.monitor = pyudev.Monitor.from_netlink(self.context, "kernel")
self.observer = pyudev.MonitorObserver(self.monitor, self._udev_events) self.observer = pyudev.MonitorObserver(self.monitor, self._udev_events)
except OSError: except OSError:
self.sys_resolution.unhealthy = UnhealthyReason.PRIVILEGED self.sys_resolution.unhealthy = UnhealthyReason.PRIVILEGED
@ -53,9 +57,31 @@ class HwMonitor(CoreSysAttributes):
def _async_udev_events(self, action: str, device: pyudev.Device): def _async_udev_events(self, action: str, device: pyudev.Device):
"""Incomming events from udev into loop.""" """Incomming events from udev into loop."""
# Sound changes if self.sys_core.state in (CoreState.RUNNING, CoreState.FREEZE):
if device.subsystem == "sound": # Sound changes
self._action_sound(device) if device.subsystem == UdevSubsystem.AUDIO:
self._action_sound(device)
# Update device List
if not device.device_node or self.sys_hardware.helper.hide_virtual_device(
device
):
return
device = Device(
device.sys_name,
Path(device.device_node),
Path(device.sys_path),
device.subsystem,
[Path(node) for node in device.device_links],
{attr: device.properties[attr] for attr in device.properties},
)
# Process the action
if action == "add":
self.sys_hardware.update_device(device)
if action == "remove":
self.sys_hardware.delete_device(device)
@AsyncCallFilter(timedelta(seconds=5)) @AsyncCallFilter(timedelta(seconds=5))
def _action_sound(self, device: pyudev.Device): def _action_sound(self, device: pyudev.Device):

View File

@ -0,0 +1,33 @@
"""Policy / cgroups management of local host."""
import logging
from typing import Dict, List
from ..coresys import CoreSys, CoreSysAttributes
from .const import PolicyGroup
from .data import Device
_LOGGER: logging.Logger = logging.getLogger(__name__)
GROUP_CGROUPS: Dict[PolicyGroup, List[int]] = {
PolicyGroup.UART: [204, 188, 166, 244],
PolicyGroup.GPIO: [254],
PolicyGroup.VIDEO: [239, 29],
PolicyGroup.AUDIO: [116],
}
class HwPolicy(CoreSysAttributes):
"""Handle Hardware policy / cgroups."""
def __init__(self, coresys: CoreSys):
"""Init hardware policy object."""
self.coresys = coresys
def get_cgroups_rules(self, group: PolicyGroup) -> List[str]:
"""Generate cgroups rules for a policy group."""
return [f"c {dev}:* rwm" for dev in GROUP_CGROUPS.get(group, [])]
def get_cgroups_rule(self, device: Device) -> str:
"""Generate a cgroups rule for given device."""
return f"c {device.cgroups_major}:{device.cgroups_minor} rwm"

View File

@ -54,21 +54,21 @@ class InfoCenter(CoreSysAttributes):
@property @property
def total_space(self) -> float: def total_space(self) -> float:
"""Return total space (GiB) on disk for supervisor data directory.""" """Return total space (GiB) on disk for supervisor data directory."""
return self.coresys.hardware.get_disk_total_space( return self.sys_hardware.helper.get_disk_total_space(
self.coresys.config.path_supervisor self.coresys.config.path_supervisor
) )
@property @property
def used_space(self) -> float: def used_space(self) -> float:
"""Return used space (GiB) on disk for supervisor data directory.""" """Return used space (GiB) on disk for supervisor data directory."""
return self.coresys.hardware.get_disk_used_space( return self.sys_hardware.helper.get_disk_used_space(
self.coresys.config.path_supervisor self.coresys.config.path_supervisor
) )
@property @property
def free_space(self) -> float: def free_space(self) -> float:
"""Return available space (GiB) on disk for supervisor data directory.""" """Return available space (GiB) on disk for supervisor data directory."""
return self.coresys.hardware.get_disk_free_space( return self.sys_hardware.helper.get_disk_free_space(
self.coresys.config.path_supervisor self.coresys.config.path_supervisor
) )

View File

@ -1,232 +0,0 @@
"""Read hardware info from system."""
import asyncio
from datetime import datetime
import logging
from pathlib import Path
import re
import shutil
from typing import Any, Dict, List, Optional, Set, Union
import attr
import pyudev
from ..const import ATTR_DEVICES, ATTR_NAME, ATTR_TYPE, CHAN_ID, CHAN_TYPE
from ..exceptions import HardwareNotSupportedError
_LOGGER: logging.Logger = logging.getLogger(__name__)
ASOUND_CARDS: Path = Path("/proc/asound/cards")
RE_CARDS: re.Pattern = re.compile(r"(\d+) \[(\w*) *\]: (.*\w)")
ASOUND_DEVICES: Path = Path("/proc/asound/devices")
RE_DEVICES: re.Pattern = re.compile(r"\[.*(\d+)- (\d+).*\]: ([\w ]*)")
PROC_STAT: Path = Path("/proc/stat")
RE_BOOT_TIME: re.Pattern = re.compile(r"btime (\d+)")
GPIO_DEVICES: Path = Path("/sys/class/gpio")
SOC_DEVICES: Path = Path("/sys/devices/platform/soc")
RE_TTY: re.Pattern = re.compile(r"tty[A-Z]+")
RE_VIDEO_DEVICES = re.compile(r"^(?:vchiq|cec\d+|video\d+)")
@attr.s(slots=True, frozen=True)
class Device:
"""Represent a device."""
name: str = attr.ib()
path: Path = attr.ib()
subsystem: str = attr.ib()
links: List[Path] = attr.ib()
attributes: Dict[str, str] = attr.ib()
class Hardware:
"""Representation of an interface to procfs, sysfs and udev."""
def __init__(self):
"""Init hardware object."""
self.context = pyudev.Context()
@property
def devices(self) -> List[Device]:
"""Return a list of all available devices."""
dev_list: List[Device] = []
# Exctract all devices
for device in self.context.list_devices():
# Skip devices without mapping
if not device.device_node:
continue
dev_list.append(
Device(
device.sys_name,
Path(device.device_node),
device.subsystem,
[Path(node) for node in device.device_links],
{attr: device.properties[attr] for attr in device.properties},
)
)
return dev_list
@property
def video_devices(self) -> List[Device]:
"""Return all available video devices."""
dev_list: List[Device] = []
for device in self.devices:
if not RE_VIDEO_DEVICES.match(device.name):
continue
dev_list.append(device)
return dev_list
@property
def serial_devices(self) -> List[Device]:
"""Return all serial and connected devices."""
dev_list: List[Device] = []
for device in self.devices:
if device.subsystem != "tty" or (
"ID_VENDOR" not in device.attributes
and not RE_TTY.search(str(device.path))
):
continue
# Cleanup not usable device links
for link in device.links.copy():
if link.match("/dev/serial/by-id/*"):
continue
device.links.remove(link)
dev_list.append(device)
return dev_list
@property
def usb_devices(self) -> List[Device]:
"""Return all usb and connected devices."""
return [device for device in self.devices if device.subsystem == "usb"]
@property
def input_devices(self) -> Set[str]:
"""Return all input devices."""
dev_list: Set[str] = set()
for device in self.context.list_devices(subsystem="input"):
if "NAME" in device.properties:
dev_list.add(device.properties["NAME"].replace('"', "").strip())
return dev_list
@property
def disk_devices(self) -> List[Device]:
"""Return all disk devices."""
dev_list: List[Device] = []
for device in self.devices:
if device.subsystem != "block" or "ID_NAME" not in device.attributes:
continue
dev_list.append(device)
return dev_list
@property
def support_audio(self) -> bool:
"""Return True if the system have audio support."""
return bool(self.audio_devices)
@property
def audio_devices(self) -> Dict[str, Any]:
"""Return all available audio interfaces."""
if not ASOUND_CARDS.exists():
return {}
try:
cards = ASOUND_CARDS.read_text()
devices = ASOUND_DEVICES.read_text()
except OSError as err:
_LOGGER.error("Can't read asound data: %s", err)
return {}
audio_list: Dict[str, Any] = {}
# parse cards
for match in RE_CARDS.finditer(cards):
audio_list[match.group(1)] = {
ATTR_NAME: match.group(3),
ATTR_TYPE: match.group(2),
ATTR_DEVICES: [],
}
# parse devices
for match in RE_DEVICES.finditer(devices):
try:
audio_list[match.group(1)][ATTR_DEVICES].append(
{CHAN_ID: match.group(2), CHAN_TYPE: match.group(3)}
)
except KeyError:
_LOGGER.warning("Wrong audio device found %s", match.group(0))
continue
return audio_list
@property
def support_gpio(self) -> bool:
"""Return True if device support GPIOs."""
return SOC_DEVICES.exists() and GPIO_DEVICES.exists()
@property
def gpio_devices(self) -> Set[str]:
"""Return list of GPIO interface on device."""
dev_list: Set[str] = set()
for interface in GPIO_DEVICES.glob("gpio*"):
dev_list.add(interface.name)
return dev_list
@property
def last_boot(self) -> Optional[str]:
"""Return last boot time."""
try:
with PROC_STAT.open("r") as stat_file:
stats: str = stat_file.read()
except OSError as err:
_LOGGER.error("Can't read stat data: %s", err)
return None
# parse stat file
found: Optional[re.Match] = RE_BOOT_TIME.search(stats)
if not found:
_LOGGER.error("Can't found last boot time!")
return None
return datetime.utcfromtimestamp(int(found.group(1)))
def get_disk_total_space(self, path: Union[str, Path]) -> float:
"""Return total space (GiB) on disk for path."""
total, _, _ = shutil.disk_usage(path)
return round(total / (1024.0 ** 3), 1)
def get_disk_used_space(self, path: Union[str, Path]) -> float:
"""Return used space (GiB) on disk for path."""
_, used, _ = shutil.disk_usage(path)
return round(used / (1024.0 ** 3), 1)
def get_disk_free_space(self, path: Union[str, Path]) -> float:
"""Return free space (GiB) on disk for path."""
_, _, free = shutil.disk_usage(path)
return round(free / (1024.0 ** 3), 1)
async def udev_trigger(self) -> None:
"""Trigger a udev reload."""
proc = await asyncio.create_subprocess_shell(
"udevadm trigger && udevadm settle"
)
await proc.wait()
if proc.returncode == 0:
return
_LOGGER.warning("udevadm device triggering failed!")
raise HardwareNotSupportedError()

View File

@ -28,6 +28,8 @@ class JSONEncoder(json.JSONEncoder):
return o.isoformat() return o.isoformat()
if isinstance(o, set): if isinstance(o, set):
return list(o) return list(o)
if isinstance(o, Path):
return str(o)
if hasattr(o, "as_dict"): if hasattr(o, "as_dict"):
return o.as_dict() return o.as_dict()

View File

@ -28,6 +28,46 @@ def test_basic_config():
assert not valid_config["docker_api"] assert not valid_config["docker_api"]
def test_migration_startup():
"""Migrate Startup Type."""
config = load_json_fixture("basic-addon-config.json")
config["startup"] = "before"
valid_config = vd.SCHEMA_ADDON_CONFIG(config)
assert valid_config["startup"].value == "services"
config["startup"] = "after"
valid_config = vd.SCHEMA_ADDON_CONFIG(config)
assert valid_config["startup"].value == "application"
def test_migration_auto_uart():
"""Migrate auto uart Type."""
config = load_json_fixture("basic-addon-config.json")
config["auto_uart"] = True
valid_config = vd.SCHEMA_ADDON_CONFIG(config)
assert valid_config["uart"]
assert "auto_uart" not in valid_config
def test_migration_devices():
"""Migrate devices Type."""
config = load_json_fixture("basic-addon-config.json")
config["devices"] = ["test:test:rw", "bla"]
valid_config = vd.SCHEMA_ADDON_CONFIG(config)
assert valid_config["devices"] == ["test", "bla"]
def test_invalid_repository(): def test_invalid_repository():
"""Validate basic config with invalid repositories.""" """Validate basic config with invalid repositories."""
config = load_json_fixture("basic-addon-config.json") config = load_json_fixture("basic-addon-config.json")

View File

@ -0,0 +1,203 @@
"""Test add-ons schema to UI schema convertion."""
from pathlib import Path
import pytest
import voluptuous as vol
from supervisor.addons.options import AddonOptions, UiOptions
from supervisor.hardware.data import Device
def test_simple_schema(coresys):
"""Test with simple schema."""
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "password": "1234", "fires": True, "alias": "test"})
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "password": "1234", "fires": True})
with pytest.raises(vol.error.Invalid):
AddonOptions(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "password": "1234", "fires": "hah"})
with pytest.raises(vol.error.Invalid):
AddonOptions(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "fires": True})
def test_complex_schema_list(coresys):
"""Test with complex list schema."""
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": ["test", "blu"]})
with pytest.raises(vol.error.Invalid):
AddonOptions(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": ["test", 1]})
with pytest.raises(vol.error.Invalid):
AddonOptions(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": "test"})
def test_complex_schema_dict(coresys):
"""Test with complex dict schema."""
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "extend": {"test": "int"}},
)({"name": "Pascal", "password": "1234", "extend": {"test": 1}})
with pytest.raises(vol.error.Invalid):
AddonOptions(
coresys,
{"name": "str", "password": "password", "extend": {"test": "int"}},
)({"name": "Pascal", "password": "1234", "extend": {"wrong": 1}})
with pytest.raises(vol.error.Invalid):
AddonOptions(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": "test"})
def test_simple_device_schema(coresys):
"""Test with simple schema."""
for device in (
Device(
"ttyACM0",
Path("/dev/ttyACM0"),
Path("/sys/bus/usb/002"),
"tty",
[],
{"ID_VENDOR": "xy"},
),
Device(
"ttyUSB0",
Path("/dev/ttyUSB0"),
Path("/sys/bus/usb/001"),
"tty",
[Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")],
{"ID_VENDOR": "xy"},
),
Device("ttyS0", Path("/dev/ttyS0"), Path("/sys/bus/usb/003"), "tty", [], {}),
Device(
"video1",
Path("/dev/video1"),
Path("/sys/bus/usb/004"),
"misc",
[],
{"ID_VENDOR": "xy"},
),
):
coresys.hardware.update_device(device)
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "input": "device"},
)({"name": "Pascal", "password": "1234", "input": "/dev/ttyUSB0"})
data = AddonOptions(
coresys,
{"name": "str", "password": "password", "input": "device"},
)({"name": "Pascal", "password": "1234", "input": "/dev/serial/by-id/xyx"})
assert data["input"] == "/dev/ttyUSB0"
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "input": "device(subsystem=tty)"},
)({"name": "Pascal", "password": "1234", "input": "/dev/ttyACM0"})
with pytest.raises(vol.error.Invalid):
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "input": "device"},
)({"name": "Pascal", "password": "1234", "input": "/dev/not_exists"})
with pytest.raises(vol.error.Invalid):
assert AddonOptions(
coresys,
{"name": "str", "password": "password", "input": "device(subsystem=tty)"},
)({"name": "Pascal", "password": "1234", "input": "/dev/video1"})
def test_ui_simple_schema(coresys):
"""Test with simple schema."""
assert UiOptions(coresys)(
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
) == [
{"name": "name", "required": True, "type": "string"},
{"format": "password", "name": "password", "required": True, "type": "string"},
{"name": "fires", "required": True, "type": "boolean"},
{"name": "alias", "optional": True, "type": "string"},
]
def test_ui_group_schema(coresys):
"""Test with group schema."""
assert UiOptions(coresys)(
{
"name": "str",
"password": "password",
"fires": "bool",
"alias": "str?",
"extended": {"name": "str", "data": ["str"], "path": "str?"},
},
) == [
{"name": "name", "required": True, "type": "string"},
{"format": "password", "name": "password", "required": True, "type": "string"},
{"name": "fires", "required": True, "type": "boolean"},
{"name": "alias", "optional": True, "type": "string"},
{
"multiple": False,
"name": "extended",
"optional": True,
"schema": [
{"name": "name", "required": True, "type": "string"},
{"multiple": True, "name": "data", "required": True, "type": "string"},
{"name": "path", "optional": True, "type": "string"},
],
"type": "schema",
},
]
def test_ui_group_list(coresys):
"""Test with group schema."""
assert UiOptions(coresys)(
{
"name": "str",
"password": "password",
"fires": "bool",
"alias": "str?",
"extended": [{"name": "str", "data": ["str?"], "path": "str?"}],
},
) == [
{"name": "name", "required": True, "type": "string"},
{"format": "password", "name": "password", "required": True, "type": "string"},
{"name": "fires", "required": True, "type": "boolean"},
{"name": "alias", "optional": True, "type": "string"},
{
"multiple": True,
"name": "extended",
"optional": True,
"schema": [
{"name": "name", "required": True, "type": "string"},
{"multiple": True, "name": "data", "optional": True, "type": "string"},
{"name": "path", "optional": True, "type": "string"},
],
"type": "schema",
},
]

View File

@ -1,71 +0,0 @@
"""Test add-ons schema to UI schema convertion."""
import pytest
import voluptuous as vol
from supervisor.addons.validate import validate_options
def test_simple_schema(coresys):
"""Test with simple schema."""
assert validate_options(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "password": "1234", "fires": True, "alias": "test"})
assert validate_options(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "password": "1234", "fires": True})
with pytest.raises(vol.error.Invalid):
validate_options(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "password": "1234", "fires": "hah"})
with pytest.raises(vol.error.Invalid):
validate_options(
coresys,
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"},
)({"name": "Pascal", "fires": True})
def test_complex_schema_list(coresys):
"""Test with complex list schema."""
assert validate_options(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": ["test", "blu"]})
with pytest.raises(vol.error.Invalid):
validate_options(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": ["test", 1]})
with pytest.raises(vol.error.Invalid):
validate_options(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": "test"})
def test_complex_schema_dict(coresys):
"""Test with complex dict schema."""
assert validate_options(
coresys,
{"name": "str", "password": "password", "extend": {"test": "int"}},
)({"name": "Pascal", "password": "1234", "extend": {"test": 1}})
with pytest.raises(vol.error.Invalid):
validate_options(
coresys,
{"name": "str", "password": "password", "extend": {"test": "int"}},
)({"name": "Pascal", "password": "1234", "extend": {"wrong": 1}})
with pytest.raises(vol.error.Invalid):
validate_options(
coresys,
{"name": "str", "password": "password", "extend": ["str"]},
)({"name": "Pascal", "password": "1234", "extend": "test"})

View File

@ -1,73 +0,0 @@
"""Test add-ons schema to UI schema convertion."""
from supervisor.addons.validate import schema_ui_options
def test_simple_schema():
"""Test with simple schema."""
assert schema_ui_options(
{"name": "str", "password": "password", "fires": "bool", "alias": "str?"}
) == [
{"name": "name", "required": True, "type": "string"},
{"format": "password", "name": "password", "required": True, "type": "string"},
{"name": "fires", "required": True, "type": "boolean"},
{"name": "alias", "optional": True, "type": "string"},
]
def test_group_schema():
"""Test with group schema."""
assert schema_ui_options(
{
"name": "str",
"password": "password",
"fires": "bool",
"alias": "str?",
"extended": {"name": "str", "data": ["str"], "path": "str?"},
}
) == [
{"name": "name", "required": True, "type": "string"},
{"format": "password", "name": "password", "required": True, "type": "string"},
{"name": "fires", "required": True, "type": "boolean"},
{"name": "alias", "optional": True, "type": "string"},
{
"multiple": False,
"name": "extended",
"optional": True,
"schema": [
{"name": "name", "required": True, "type": "string"},
{"multiple": True, "name": "data", "required": True, "type": "string"},
{"name": "path", "optional": True, "type": "string"},
],
"type": "schema",
},
]
def test_group_list():
"""Test with group schema."""
assert schema_ui_options(
{
"name": "str",
"password": "password",
"fires": "bool",
"alias": "str?",
"extended": [{"name": "str", "data": ["str?"], "path": "str?"}],
}
) == [
{"name": "name", "required": True, "type": "string"},
{"format": "password", "name": "password", "required": True, "type": "string"},
{"name": "fires", "required": True, "type": "boolean"},
{"name": "alias", "optional": True, "type": "string"},
{
"multiple": True,
"name": "extended",
"optional": True,
"schema": [
{"name": "name", "required": True, "type": "string"},
{"multiple": True, "name": "data", "optional": True, "type": "string"},
{"name": "path", "optional": True, "type": "string"},
],
"type": "schema",
},
]

1
tests/api/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Test for API calls."""

View File

@ -0,0 +1,37 @@
"""Test Docker API."""
from pathlib import Path
import pytest
from supervisor.hardware.data import Device
@pytest.mark.asyncio
async def test_api_hardware_info(api_client):
"""Test docker info api."""
resp = await api_client.get("/hardware/info")
result = await resp.json()
assert result["result"] == "ok"
@pytest.mark.asyncio
async def test_api_hardware_info_device(api_client, coresys):
"""Test docker info api."""
coresys.hardware.update_device(
Device(
"sda",
Path("/dev/sda"),
Path("/sys/bus/usb/000"),
"sound",
[Path("/dev/serial/by-id/test")],
{"ID_NAME": "xy"},
)
)
resp = await api_client.get("/hardware/info")
result = await resp.json()
assert result["result"] == "ok"
assert result["data"]["devices"][-1]["name"] == "sda"
assert result["data"]["devices"][-1]["by_id"] == "/dev/serial/by-id/test"

View File

@ -3,6 +3,8 @@ from unittest.mock import patch
import pytest import pytest
# pylint: disable=redefined-outer-name
@pytest.fixture @pytest.fixture
def stub_auth(): def stub_auth():

View File

@ -0,0 +1 @@
"""Tests for hardware."""

View File

@ -0,0 +1,22 @@
"""Test HardwareManager Module."""
from pathlib import Path
from supervisor.hardware.data import Device
# pylint: disable=protected-access
def test_device_property(coresys):
"""Test device cgroup policy."""
device = Device(
"ttyACM0",
Path("/dev/ttyACM0"),
Path("/sys/bus/usb/001"),
"tty",
[Path("/dev/serial/by-id/fixed-device")],
{"MAJOR": "5", "MINOR": "10"},
)
assert device.by_id == device.links[0]
assert device.cgroups_major == 5
assert device.cgroups_minor == 10

View File

@ -0,0 +1,61 @@
"""Test hardware utils."""
from pathlib import Path
from unittest.mock import MagicMock, patch
from supervisor.hardware.data import Device
def test_have_audio(coresys):
"""Test usb device filter."""
assert not coresys.hardware.helper.support_audio
coresys.hardware.update_device(
Device(
"sda",
Path("/dev/sda"),
Path("/sys/bus/usb/000"),
"sound",
[],
{"ID_NAME": "xy"},
)
)
assert coresys.hardware.helper.support_audio
def test_hide_virtual_device(coresys):
"""Test hidding virtual devices."""
udev_device = MagicMock()
udev_device.sys_path = "/sys/devices/platform/test"
assert not coresys.hardware.helper.hide_virtual_device(udev_device)
udev_device.sys_path = "/sys/devices/virtual/block/test"
assert coresys.hardware.helper.hide_virtual_device(udev_device)
udev_device.sys_path = "/sys/devices/virtual/tty/test"
assert coresys.hardware.helper.hide_virtual_device(udev_device)
def test_free_space(coresys):
"""Test free space helper."""
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))):
free = coresys.hardware.helper.get_disk_free_space("/data")
assert free == 2.0
def test_total_space(coresys):
"""Test total space helper."""
with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)):
total = coresys.hardware.helper.get_disk_total_space("/data")
assert total == 10.0
def test_used_space(coresys):
"""Test used space helper."""
with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)):
used = coresys.hardware.helper.get_disk_used_space("/data")
assert used == 8.0

View File

@ -0,0 +1,108 @@
"""Test HardwareManager Module."""
from pathlib import Path
from supervisor.hardware.const import UdevSubsystem
from supervisor.hardware.data import Device
# pylint: disable=protected-access
def test_initial_device_initialize(coresys):
"""Initialize the local hardware."""
assert not coresys.hardware.devices
coresys.hardware._import_devices()
assert coresys.hardware.devices
def test_device_path_lookup(coresys):
"""Test device lookup."""
for device in (
Device(
"ttyACM0",
Path("/dev/ttyACM0"),
Path("/sys/bus/usb/001"),
"tty",
[],
{"ID_VENDOR": "xy"},
),
Device(
"ttyUSB0",
Path("/dev/ttyUSB0"),
Path("/sys/bus/usb/000"),
"tty",
[Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")],
{"ID_VENDOR": "xy"},
),
Device("ttyS0", Path("/dev/ttyS0"), Path("/sys/bus/usb/002"), "tty", [], {}),
Device(
"video1",
Path("/dev/video1"),
Path("/sys/bus/usb/003"),
"misc",
[],
{"ID_VENDOR": "xy"},
),
):
coresys.hardware.update_device(device)
assert coresys.hardware.exists_device_node(Path("/dev/ttyACM0"))
assert coresys.hardware.exists_device_node(Path("/dev/ttyS1"))
assert coresys.hardware.exists_device_node(Path("/dev/ttyS0"))
assert coresys.hardware.exists_device_node(Path("/dev/serial/by-id/xyx"))
assert coresys.hardware.exists_device_node(Path("/sys/bus/usb/001"))
assert not coresys.hardware.exists_device_node(Path("/dev/ttyS2"))
assert not coresys.hardware.exists_device_node(Path("/dev/ttyUSB1"))
def test_device_filter(coresys):
"""Test device filter."""
for device in (
Device(
"ttyACM0",
Path("/dev/ttyACM0"),
Path("/sys/bus/usb/000"),
"tty",
[],
{"ID_VENDOR": "xy"},
),
Device(
"ttyUSB0",
Path("/dev/ttyUSB0"),
Path("/sys/bus/usb/001"),
"tty",
[Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")],
{"ID_VENDOR": "xy"},
),
Device("ttyS0", Path("/dev/ttyS0"), Path("/sys/bus/usb/002"), "tty", [], {}),
Device(
"video1",
Path("/dev/video1"),
Path("/sys/bus/usb/003"),
"misc",
[],
{"ID_VENDOR": "xy"},
),
):
coresys.hardware.update_device(device)
assert sorted(
[device.path for device in coresys.hardware.filter_devices()]
) == sorted([device.path for device in coresys.hardware.devices])
assert sorted(
[
device.path
for device in coresys.hardware.filter_devices(
subsystem=UdevSubsystem.SERIAL
)
]
) == sorted(
[
device.path
for device in coresys.hardware.devices
if device.subsystem == UdevSubsystem.SERIAL
]
)

View File

@ -0,0 +1,29 @@
"""Test HardwareManager Module."""
from pathlib import Path
from supervisor.hardware.const import PolicyGroup
from supervisor.hardware.data import Device
# pylint: disable=protected-access
def test_device_policy(coresys):
"""Test device cgroup policy."""
device = Device(
"ttyACM0",
Path("/dev/ttyACM0"),
Path("/sys/bus/usb/001"),
"tty",
[],
{"MAJOR": "5", "MINOR": "10"},
)
assert coresys.hardware.policy.get_cgroups_rule(device) == "c 5:10 rwm"
def test_policy_group(coresys):
"""Test policy group generator."""
assert coresys.hardware.policy.get_cgroups_rules(PolicyGroup.VIDEO) == [
"c 239:* rwm",
"c 29:* rwm",
]

View File

@ -1,131 +0,0 @@
"""Test hardware utils."""
from pathlib import Path
from unittest.mock import PropertyMock, patch
from supervisor.misc.hardware import Device, Hardware
def test_read_all_devices():
"""Test to read all devices."""
system = Hardware()
assert system.devices
def test_video_devices():
"""Test video device filter."""
system = Hardware()
device_list = [
Device("test-dev", Path("/dev/test-dev"), "xy", [], {}),
Device("vchiq", Path("/dev/vchiq"), "xy", [], {}),
Device("cec0", Path("/dev/cec0"), "xy", [], {}),
Device("video1", Path("/dev/video1"), "xy", [], {}),
]
with patch(
"supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock
) as mock_device:
mock_device.return_value = device_list
assert [device.name for device in system.video_devices] == [
"vchiq",
"cec0",
"video1",
]
def test_serial_devices():
"""Test serial device filter."""
system = Hardware()
device_list = [
Device("ttyACM0", Path("/dev/ttyACM0"), "tty", [], {"ID_VENDOR": "xy"}),
Device(
"ttyUSB0",
Path("/dev/ttyUSB0"),
"tty",
[Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")],
{"ID_VENDOR": "xy"},
),
Device("ttyS0", Path("/dev/ttyS0"), "tty", [], {}),
Device("video1", Path("/dev/video1"), "misc", [], {"ID_VENDOR": "xy"}),
]
with patch(
"supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock
) as mock_device:
mock_device.return_value = device_list
assert [(device.name, device.links) for device in system.serial_devices] == [
("ttyACM0", []),
("ttyUSB0", [Path("/dev/serial/by-id/xyx")]),
("ttyS0", []),
]
def test_usb_devices():
"""Test usb device filter."""
system = Hardware()
device_list = [
Device("usb1", Path("/dev/bus/usb/1/1"), "usb", [], {}),
Device("usb2", Path("/dev/bus/usb/2/1"), "usb", [], {}),
Device("cec0", Path("/dev/cec0"), "xy", [], {}),
Device("video1", Path("/dev/video1"), "xy", [], {}),
]
with patch(
"supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock
) as mock_device:
mock_device.return_value = device_list
assert [device.name for device in system.usb_devices] == [
"usb1",
"usb2",
]
def test_block_devices():
"""Test usb device filter."""
system = Hardware()
device_list = [
Device("sda", Path("/dev/sda"), "block", [], {"ID_NAME": "xy"}),
Device("sdb", Path("/dev/sdb"), "block", [], {"ID_NAME": "xy"}),
Device("cec0", Path("/dev/cec0"), "xy", [], {}),
Device("video1", Path("/dev/video1"), "xy", [], {"ID_NAME": "xy"}),
]
with patch(
"supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock
) as mock_device:
mock_device.return_value = device_list
assert [device.name for device in system.disk_devices] == [
"sda",
"sdb",
]
def test_free_space():
"""Test free space helper."""
system = Hardware()
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))):
free = system.get_disk_free_space("/data")
assert free == 2.0
def test_total_space():
"""Test total space helper."""
system = Hardware()
with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)):
total = system.get_disk_total_space("/data")
assert total == 10.0
def test_used_space():
"""Test used space helper."""
system = Hardware()
with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)):
used = system.get_disk_used_space("/data")
assert used == 8.0

View File

@ -0,0 +1 @@
"""Test for Resolution."""

View File

@ -1,6 +1,6 @@
"""Test evaluation base.""" """Test evaluation base."""
# pylint: disable=import-error,protected-access # pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock from unittest.mock import AsyncMock, patch
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, IssueType, SuggestionType from supervisor.resolution.const import ContextType, IssueType, SuggestionType
@ -24,7 +24,8 @@ async def test_fixup(coresys: CoreSys):
mock_repositorie = AsyncMock() mock_repositorie = AsyncMock()
coresys.store.repositories["test"] = mock_repositorie coresys.store.repositories["test"] = mock_repositorie
await store_execute_reload() with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))):
await store_execute_reload()
assert mock_repositorie.load.called assert mock_repositorie.load.called
assert mock_repositorie.update.called assert mock_repositorie.update.called

View File

@ -1,7 +1,7 @@
"""Test evaluation base.""" """Test evaluation base."""
# pylint: disable=import-error,protected-access # pylint: disable=import-error,protected-access
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock from unittest.mock import AsyncMock, patch
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, IssueType, SuggestionType from supervisor.resolution.const import ContextType, IssueType, SuggestionType
@ -30,7 +30,8 @@ async def test_fixup(coresys: CoreSys, tmp_path):
mock_repositorie.git.path = test_repo mock_repositorie.git.path = test_repo
coresys.store.repositories["test"] = mock_repositorie coresys.store.repositories["test"] = mock_repositorie
await store_execute_reset() with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))):
await store_execute_reset()
assert not test_repo.exists() assert not test_repo.exists()
assert mock_repositorie.load.called assert mock_repositorie.load.called