diff --git a/rootfs/etc/cont-init.d/udev.sh b/rootfs/etc/cont-init.d/udev.sh index 4aa88b3ed..5a8c69063 100644 --- a/rootfs/etc/cont-init.d/udev.sh +++ b/rootfs/etc/cont-init.d/udev.sh @@ -2,9 +2,16 @@ # ============================================================================== # Start udev service # ============================================================================== + +if bashio::fs.directory_exists /run/udev; then + bashio::log.info "Using udev information from host" + bashio::exit.ok +fi + + +bashio::log.info "Setup udev backend inside container" udevd --daemon -bashio::log.info "Update udev information" if udevadm trigger; then udevadm settle || true else diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index b7487d280..766d3cd31 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -10,7 +10,7 @@ import secrets import shutil import tarfile from tempfile import TemporaryDirectory -from typing import Any, Awaitable, Dict, List, Optional +from typing import Any, Awaitable, Dict, List, Optional, Set import aiohttp import voluptuous as vol @@ -55,13 +55,15 @@ from ..exceptions import ( HostAppArmorError, JsonFileError, ) +from ..hardware.data import Device from ..utils import check_port from ..utils.apparmor import adjust_profile from ..utils.json import read_json_file, write_json_file from ..utils.tar import atomic_contents_add, secure_path from .model import AddonModel, Data +from .options import AddonOptions from .utils import remove_data -from .validate import SCHEMA_ADDON_SNAPSHOT, validate_options +from .validate import SCHEMA_ADDON_SNAPSHOT _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -394,6 +396,20 @@ class Addon(AddonModel): """Return path to asound config for Docker.""" return Path(self.sys_config.path_extern_tmp, f"{self.slug}_pulse") + @property + def devices(self) -> Set[Device]: + """Create a schema for add-on options.""" + raw_schema = self.data[ATTR_SCHEMA] + if isinstance(raw_schema, bool) or not raw_schema: + return set() + + # Validate devices + options_validator = AddonOptions(self.coresys, raw_schema) + with suppress(vol.Invalid): + options_validator(self.options) + + return options_validator.devices + def save_persist(self) -> None: """Save data of add-on.""" self.sys_addons.data.save_data() @@ -442,20 +458,17 @@ class Addon(AddonModel): async def write_options(self) -> None: """Return True if add-on options is written to data.""" - schema = self.schema - options = self.options - # Update secrets for validation await self.sys_homeassistant.secrets.reload() try: - options = schema(options) + options = self.schema(self.options) write_json_file(self.path_options, options) except vol.Invalid as ex: _LOGGER.error( "Add-on %s has invalid options: %s", self.slug, - humanize_error(options, ex), + humanize_error(self.options, ex), ) except JsonFileError: _LOGGER.error("Add-on %s can't write options", self.slug) @@ -538,7 +551,7 @@ class Addon(AddonModel): # create voluptuous new_schema = vol.Schema( - vol.All(dict, validate_options(self.coresys, new_raw_schema)) + vol.All(dict, AddonOptions(self.coresys, new_raw_schema)) ) # validate diff --git a/supervisor/addons/model.py b/supervisor/addons/model.py index d37e7e112..0d5065272 100644 --- a/supervisor/addons/model.py +++ b/supervisor/addons/model.py @@ -12,7 +12,6 @@ from ..const import ( ATTR_ARCH, ATTR_AUDIO, ATTR_AUTH_API, - ATTR_AUTO_UART, ATTR_BOOT, ATTR_DESCRIPTON, ATTR_DEVICES, @@ -56,6 +55,7 @@ from ..const import ( ATTR_STDIN, ATTR_TIMEOUT, ATTR_TMPFS, + ATTR_UART, ATTR_UDEV, ATTR_URL, ATTR_USB, @@ -71,7 +71,8 @@ from ..const import ( AddonStartup, ) from ..coresys import CoreSys, CoreSysAttributes -from .validate import RE_SERVICE, RE_VOLUME, schema_ui_options, validate_options +from .options import AddonOptions, UiOptions +from .validate import RE_SERVICE, RE_VOLUME Data = Dict[str, Any] @@ -296,9 +297,9 @@ class AddonModel(CoreSysAttributes, ABC): return self.data[ATTR_HOST_DBUS] @property - def devices(self) -> List[str]: - """Return devices of add-on.""" - return self.data.get(ATTR_DEVICES, []) + def static_devices(self) -> List[Path]: + """Return static devices of add-on.""" + return [Path(node) for node in self.data.get(ATTR_DEVICES, [])] @property def tmpfs(self) -> Optional[str]: @@ -387,7 +388,7 @@ class AddonModel(CoreSysAttributes, ABC): @property def with_uart(self) -> bool: """Return True if we should map all UART device.""" - return self.data[ATTR_AUTO_UART] + return self.data[ATTR_UART] @property def with_udev(self) -> bool: @@ -522,8 +523,8 @@ class AddonModel(CoreSysAttributes, ABC): raw_schema = self.data[ATTR_SCHEMA] if isinstance(raw_schema, bool): - return vol.Schema(dict) - return vol.Schema(vol.All(dict, validate_options(self.coresys, raw_schema))) + raw_schema = {} + return vol.Schema(vol.All(dict, AddonOptions(self.coresys, raw_schema))) @property def schema_ui(self) -> Optional[List[Dict[str, Any]]]: @@ -532,7 +533,7 @@ class AddonModel(CoreSysAttributes, ABC): if isinstance(raw_schema, bool): return None - return schema_ui_options(raw_schema) + return UiOptions(self.coresys)(raw_schema) def __eq__(self, other): """Compaired add-on objects.""" diff --git a/supervisor/addons/options.py b/supervisor/addons/options.py new file mode 100644 index 000000000..ca73120f8 --- /dev/null +++ b/supervisor/addons/options.py @@ -0,0 +1,380 @@ +"""Add-on Options / UI rendering.""" +import logging +from pathlib import Path +import re +from typing import Any, Dict, List, Set, Union + +import voluptuous as vol + +from ..coresys import CoreSys, CoreSysAttributes +from ..exceptions import HardwareNotFound +from ..hardware.const import UdevSubsystem +from ..hardware.data import Device +from ..validate import network_port + +_LOGGER: logging.Logger = logging.getLogger(__name__) + +_STR = "str" +_INT = "int" +_FLOAT = "float" +_BOOL = "bool" +_PASSWORD = "password" +_EMAIL = "email" +_URL = "url" +_PORT = "port" +_MATCH = "match" +_LIST = "list" +_DEVICE = "device" + +RE_SCHEMA_ELEMENT = re.compile( + r"^(?:" + r"|bool" + r"|email" + r"|url" + r"|port" + r"|device(?:\((?Psubsystem=[a-z]+)\))?" + r"|str(?:\((?P\d+)?,(?P\d+)?\))?" + r"|password(?:\((?P\d+)?,(?P\d+)?\))?" + r"|int(?:\((?P\d+)?,(?P\d+)?\))?" + r"|float(?:\((?P[\d\.]+)?,(?P[\d\.]+)?\))?" + r"|match\((?P.*)\)" + r"|list\((?P.+)\)" + r")\??$" +) + +_SCHEMA_LENGTH_PARTS = ( + "i_min", + "i_max", + "f_min", + "f_max", + "s_min", + "s_max", + "p_min", + "p_max", +) + + +class AddonOptions(CoreSysAttributes): + """Validate Add-ons Options.""" + + def __init__( + self, + coresys: CoreSys, + raw_schema: Dict[str, Any], + ): + """Validate schema.""" + self.coresys: CoreSys = coresys + self.raw_schema: Dict[str, Any] = raw_schema + self.devices: Set[Device] = set() + + def __call__(self, struct): + """Create schema validator for add-ons options.""" + options = {} + + # read options + for key, value in struct.items(): + # Ignore unknown options / remove from list + if key not in self.raw_schema: + _LOGGER.warning("Unknown options %s", key) + continue + + typ = self.raw_schema[key] + try: + if isinstance(typ, list): + # nested value list + options[key] = self._nested_validate_list(typ[0], value, key) + elif isinstance(typ, dict): + # nested value dict + options[key] = self._nested_validate_dict(typ, value, key) + else: + # normal value + options[key] = self._single_validate(typ, value, key) + except (IndexError, KeyError): + raise vol.Invalid(f"Type error for {key}") from None + + self._check_missing_options(self.raw_schema, options, "root") + return options + + # pylint: disable=no-value-for-parameter + def _single_validate(self, typ: str, value: Any, key: str): + """Validate a single element.""" + # if required argument + if value is None: + raise vol.Invalid(f"Missing required option '{key}'") from None + + # Lookup secret + if str(value).startswith("!secret "): + secret: str = value.partition(" ")[2] + value = self.sys_homeassistant.secrets.get(secret) + if value is None: + raise vol.Invalid(f"Unknown secret {secret}") from None + + # parse extend data from type + match = RE_SCHEMA_ELEMENT.match(typ) + + if not match: + raise vol.Invalid(f"Unknown type {typ}") from None + + # prepare range + range_args = {} + for group_name in _SCHEMA_LENGTH_PARTS: + group_value = match.group(group_name) + if group_value: + range_args[group_name[2:]] = float(group_value) + + if typ.startswith(_STR) or typ.startswith(_PASSWORD): + return vol.All(str(value), vol.Range(**range_args))(value) + elif typ.startswith(_INT): + return vol.All(vol.Coerce(int), vol.Range(**range_args))(value) + elif typ.startswith(_FLOAT): + return vol.All(vol.Coerce(float), vol.Range(**range_args))(value) + elif typ.startswith(_BOOL): + return vol.Boolean()(value) + elif typ.startswith(_EMAIL): + return vol.Email()(value) + elif typ.startswith(_URL): + return vol.Url()(value) + elif typ.startswith(_PORT): + return network_port(value) + elif typ.startswith(_MATCH): + return vol.Match(match.group("match"))(str(value)) + elif typ.startswith(_LIST): + return vol.In(match.group("list").split("|"))(str(value)) + elif typ.startswith(_DEVICE): + try: + device = self.sys_hardware.get_by_path(Path(value)) + except HardwareNotFound: + raise vol.Invalid(f"Device {value} does not exists!") from None + + # Have filter + if match.group("filter"): + str_filter = match.group("filter") + device_filter = _create_device_filter(str_filter) + if device not in self.sys_hardware.filter_devices(**device_filter): + raise vol.Invalid( + f"Device {value} don't match the filter {str_filter}!" + ) + + # Device valid + self.devices.add(device) + return str(device.path) + + raise vol.Invalid(f"Fatal error for {key} type {typ}") from None + + def _nested_validate_list(self, typ: Any, data_list: List[Any], key: str): + """Validate nested items.""" + options = [] + + # Make sure it is a list + if not isinstance(data_list, list): + raise vol.Invalid(f"Invalid list for {key}") from None + + # Process list + for element in data_list: + # Nested? + if isinstance(typ, dict): + c_options = self._nested_validate_dict(typ, element, key) + options.append(c_options) + else: + options.append(self._single_validate(typ, element, key)) + + return options + + def _nested_validate_dict( + self, typ: Dict[Any, Any], data_dict: Dict[Any, Any], key: str + ): + """Validate nested items.""" + options = {} + + # Make sure it is a dict + if not isinstance(data_dict, dict): + raise vol.Invalid(f"Invalid dict for {key}") from None + + # Process dict + for c_key, c_value in data_dict.items(): + # Ignore unknown options / remove from list + if c_key not in typ: + _LOGGER.warning("Unknown options %s", c_key) + continue + + # Nested? + if isinstance(typ[c_key], list): + options[c_key] = self._nested_validate_list( + typ[c_key][0], c_value, c_key + ) + else: + options[c_key] = self._single_validate(typ[c_key], c_value, c_key) + + self._check_missing_options(typ, options, key) + return options + + def _check_missing_options( + self, origin: Dict[Any, Any], exists: Dict[Any, Any], root: str + ) -> None: + """Check if all options are exists.""" + missing = set(origin) - set(exists) + for miss_opt in missing: + if isinstance(origin[miss_opt], str) and origin[miss_opt].endswith("?"): + continue + raise vol.Invalid(f"Missing option {miss_opt} in {root}") from None + + +class UiOptions(CoreSysAttributes): + """Render UI Add-ons Options.""" + + def __init__(self, coresys: CoreSys) -> None: + """Initialize UI option render.""" + self.coresys = coresys + + def __call__(self, raw_schema: Dict[str, Any]) -> List[Dict[str, Any]]: + """Generate UI schema.""" + ui_schema: List[Dict[str, Any]] = [] + + # read options + for key, value in raw_schema.items(): + if isinstance(value, list): + # nested value list + self._nested_ui_list(ui_schema, value, key) + elif isinstance(value, dict): + # nested value dict + self._nested_ui_dict(ui_schema, value, key) + else: + # normal value + self._single_ui_option(ui_schema, value, key) + + return ui_schema + + def _single_ui_option( + self, + ui_schema: List[Dict[str, Any]], + value: str, + key: str, + multiple: bool = False, + ) -> None: + """Validate a single element.""" + ui_node: Dict[str, Union[str, bool, float, List[str]]] = {"name": key} + + # If multiple + if multiple: + ui_node["multiple"] = True + + # Parse extend data from type + match = RE_SCHEMA_ELEMENT.match(value) + if not match: + return + + # Prepare range + for group_name in _SCHEMA_LENGTH_PARTS: + group_value = match.group(group_name) + if not group_value: + continue + if group_name[2:] == "min": + ui_node["lengthMin"] = float(group_value) + elif group_name[2:] == "max": + ui_node["lengthMax"] = float(group_value) + + # If required + if value.endswith("?"): + ui_node["optional"] = True + else: + ui_node["required"] = True + + # Data types + if value.startswith(_STR): + ui_node["type"] = "string" + elif value.startswith(_PASSWORD): + ui_node["type"] = "string" + ui_node["format"] = "password" + elif value.startswith(_INT): + ui_node["type"] = "integer" + elif value.startswith(_FLOAT): + ui_node["type"] = "float" + elif value.startswith(_BOOL): + ui_node["type"] = "boolean" + elif value.startswith(_EMAIL): + ui_node["type"] = "string" + ui_node["format"] = "email" + elif value.startswith(_URL): + ui_node["type"] = "string" + ui_node["format"] = "url" + elif value.startswith(_PORT): + ui_node["type"] = "integer" + elif value.startswith(_MATCH): + ui_node["type"] = "string" + elif value.startswith(_LIST): + ui_node["type"] = "select" + ui_node["options"] = match.group("list").split("|") + elif value.startswith(_DEVICE): + ui_node["type"] = "select" + + # Have filter + if match.group("filter"): + device_filter = _create_device_filter(match.group("filter")) + ui_node["options"] = [ + device.path.as_posix() + for device in self.sys_hardware.filter_devices(**device_filter) + ] + else: + ui_node["options"] = [ + device.path.as_posix() for device in self.sys_hardware.devices() + ] + + ui_schema.append(ui_node) + + def _nested_ui_list( + self, + ui_schema: List[Dict[str, Any]], + option_list: List[Any], + key: str, + ) -> None: + """UI nested list items.""" + try: + element = option_list[0] + except IndexError: + _LOGGER.error("Invalid schema %s", key) + return + + if isinstance(element, dict): + self._nested_ui_dict(ui_schema, element, key, multiple=True) + else: + self._single_ui_option(ui_schema, element, key, multiple=True) + + def _nested_ui_dict( + self, + ui_schema: List[Dict[str, Any]], + option_dict: Dict[str, Any], + key: str, + multiple: bool = False, + ) -> None: + """UI nested dict items.""" + ui_node = { + "name": key, + "type": "schema", + "optional": True, + "multiple": multiple, + } + + nested_schema = [] + for c_key, c_value in option_dict.items(): + # Nested? + if isinstance(c_value, list): + self._nested_ui_list(nested_schema, c_value, c_key) + else: + self._single_ui_option(nested_schema, c_value, c_key) + + ui_node["schema"] = nested_schema + ui_schema.append(ui_node) + + +def _create_device_filter(str_filter: str) -> Dict[str, Any]: + """Generate device Filter.""" + raw_filter = dict(value.split("=") for value in str_filter.split(";")) + + clean_filter = {} + for key, value in raw_filter.items(): + if key == "subsystem": + clean_filter[key] = UdevSubsystem(value) + else: + clean_filter[key] = value + + return clean_filter diff --git a/supervisor/addons/validate.py b/supervisor/addons/validate.py index cc19d9ee3..b5f7975d6 100644 --- a/supervisor/addons/validate.py +++ b/supervisor/addons/validate.py @@ -2,7 +2,7 @@ import logging import re import secrets -from typing import Any, Dict, List, Union +from typing import Any, Dict import uuid import voluptuous as vol @@ -18,7 +18,6 @@ from ..const import ( ATTR_AUDIO_INPUT, ATTR_AUDIO_OUTPUT, ATTR_AUTH_API, - ATTR_AUTO_UART, ATTR_AUTO_UPDATE, ATTR_BOOT, ATTR_BUILD_FROM, @@ -73,6 +72,7 @@ from ..const import ( ATTR_SYSTEM, ATTR_TIMEOUT, ATTR_TMPFS, + ATTR_UART, ATTR_UDEV, ATTR_URL, ATTR_USB, @@ -90,7 +90,6 @@ from ..const import ( AddonStartup, AddonState, ) -from ..coresys import CoreSys from ..discovery.validate import valid_discovery_service from ..validate import ( docker_image, @@ -101,49 +100,13 @@ from ..validate import ( uuid_match, version_tag, ) +from .options import RE_SCHEMA_ELEMENT _LOGGER: logging.Logger = logging.getLogger(__name__) - RE_VOLUME = re.compile(r"^(config|ssl|addons|backup|share|media)(?::(rw|ro))?$") RE_SERVICE = re.compile(r"^(?Pmqtt|mysql):(?Pprovide|want|need)$") -V_STR = "str" -V_INT = "int" -V_FLOAT = "float" -V_BOOL = "bool" -V_PASSWORD = "password" -V_EMAIL = "email" -V_URL = "url" -V_PORT = "port" -V_MATCH = "match" -V_LIST = "list" - -RE_SCHEMA_ELEMENT = re.compile( - r"^(?:" - r"|bool" - r"|email" - r"|url" - r"|port" - r"|str(?:\((?P\d+)?,(?P\d+)?\))?" - r"|password(?:\((?P\d+)?,(?P\d+)?\))?" - r"|int(?:\((?P\d+)?,(?P\d+)?\))?" - r"|float(?:\((?P[\d\.]+)?,(?P[\d\.]+)?\))?" - r"|match\((?P.*)\)" - r"|list\((?P.+)\)" - r")\??$" -) - -_SCHEMA_LENGTH_PARTS = ( - "i_min", - "i_max", - "f_min", - "f_max", - "s_min", - "s_max", - "p_min", - "p_max", -) RE_DOCKER_IMAGE_BUILD = re.compile( r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$" @@ -173,27 +136,63 @@ RE_MACHINE = re.compile( ) -def _simple_startup(value) -> str: - """Define startup schema.""" - if value == "before": - return AddonStartup.SERVICES.value - if value == "after": - return AddonStartup.APPLICATION.value - return value +def _migrate_addon_config(protocol=False): + """Migrate addon config.""" + + def _migrate(config: Dict[str, Any]): + name = config.get(ATTR_NAME) + if not name: + raise vol.Invalid("Invalid Add-on config!") + + # Startup 2018-03-30 + if config.get(ATTR_STARTUP) in ("before", "after"): + value = config[ATTR_STARTUP] + if protocol: + _LOGGER.warning( + "Add-on config 'startup' with '%s' is depircated. Please report this to the maintainer of %s", + value, + name, + ) + if value == "before": + config[ATTR_STARTUP] = AddonStartup.SERVICES.value + elif value == "after": + config[ATTR_STARTUP] = AddonStartup.APPLICATION.value + + # UART 2021-01-20 + if "auto_uart" in config: + if protocol: + _LOGGER.warning( + "Add-on config 'auto_uart' is deprecated, use 'uart'. Please report this to the maintainer of %s", + name, + ) + config[ATTR_UART] = config.pop("auto_uart") + + # Device 2021-01-20 + if ATTR_DEVICES in config and any(":" in line for line in config[ATTR_DEVICES]): + if protocol: + _LOGGER.warning( + "Add-on config 'devices' use a deprecated format, the new format uses a list of paths only. Please report this to the maintainer of %s", + name, + ) + config[ATTR_DEVICES] = [line.split(":")[0] for line in config[ATTR_DEVICES]] + + return config + + return _migrate # pylint: disable=no-value-for-parameter -SCHEMA_ADDON_CONFIG = vol.Schema( +_SCHEMA_ADDON_CONFIG = vol.Schema( { - vol.Required(ATTR_NAME): vol.Coerce(str), + vol.Required(ATTR_NAME): str, vol.Required(ATTR_VERSION): version_tag, - vol.Required(ATTR_SLUG): vol.Coerce(str), - vol.Required(ATTR_DESCRIPTON): vol.Coerce(str), + vol.Required(ATTR_SLUG): str, + vol.Required(ATTR_DESCRIPTON): str, vol.Required(ATTR_ARCH): [vol.In(ARCH_ALL)], vol.Optional(ATTR_MACHINE): vol.All([vol.Match(RE_MACHINE)], vol.Unique()), vol.Optional(ATTR_URL): vol.Url(), - vol.Optional(ATTR_STARTUP, default=AddonStartup.APPLICATION): vol.All( - _simple_startup, vol.Coerce(AddonStartup) + vol.Optional(ATTR_STARTUP, default=AddonStartup.APPLICATION): vol.Coerce( + AddonStartup ), vol.Optional(ATTR_BOOT, default=AddonBoot.AUTO): vol.Coerce(AddonBoot), vol.Optional(ATTR_INIT, default=True): vol.Boolean(), @@ -211,21 +210,20 @@ SCHEMA_ADDON_CONFIG = vol.Schema( vol.Optional(ATTR_INGRESS_PORT, default=8099): vol.Any( network_port, vol.Equal(0) ), - vol.Optional(ATTR_INGRESS_ENTRY): vol.Coerce(str), - vol.Optional(ATTR_PANEL_ICON, default="mdi:puzzle"): vol.Coerce(str), - vol.Optional(ATTR_PANEL_TITLE): vol.Coerce(str), + vol.Optional(ATTR_INGRESS_ENTRY): str, + vol.Optional(ATTR_PANEL_ICON, default="mdi:puzzle"): str, + vol.Optional(ATTR_PANEL_TITLE): str, vol.Optional(ATTR_PANEL_ADMIN, default=True): vol.Boolean(), vol.Optional(ATTR_HOMEASSISTANT): vol.Maybe(version_tag), vol.Optional(ATTR_HOST_NETWORK, default=False): vol.Boolean(), vol.Optional(ATTR_HOST_PID, default=False): vol.Boolean(), vol.Optional(ATTR_HOST_IPC, default=False): vol.Boolean(), vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(), - vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")], - vol.Optional(ATTR_AUTO_UART, default=False): vol.Boolean(), + vol.Optional(ATTR_DEVICES): [str], vol.Optional(ATTR_UDEV, default=False): vol.Boolean(), vol.Optional(ATTR_TMPFS): vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"), vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)], - vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)}, + vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): str}, vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)], vol.Optional(ATTR_APPARMOR, default=True): vol.Boolean(), vol.Optional(ATTR_FULL_ACCESS, default=False): vol.Boolean(), @@ -233,6 +231,7 @@ SCHEMA_ADDON_CONFIG = vol.Schema( vol.Optional(ATTR_VIDEO, default=False): vol.Boolean(), vol.Optional(ATTR_GPIO, default=False): vol.Boolean(), vol.Optional(ATTR_USB, default=False): vol.Boolean(), + vol.Optional(ATTR_UART, default=False): vol.Boolean(), vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(), vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(), vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(), @@ -244,26 +243,20 @@ SCHEMA_ADDON_CONFIG = vol.Schema( vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(), vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)], vol.Optional(ATTR_DISCOVERY): [valid_discovery_service], - vol.Optional(ATTR_SNAPSHOT_EXCLUDE): [vol.Coerce(str)], + vol.Optional(ATTR_SNAPSHOT_EXCLUDE): [str], vol.Optional(ATTR_OPTIONS, default={}): dict, vol.Optional(ATTR_SCHEMA, default={}): vol.Any( vol.Schema( { - vol.Coerce(str): vol.Any( + str: vol.Any( SCHEMA_ELEMENT, [ vol.Any( SCHEMA_ELEMENT, - { - vol.Coerce(str): vol.Any( - SCHEMA_ELEMENT, [SCHEMA_ELEMENT] - ) - }, + {str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}, ) ], - vol.Schema( - {vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])} - ), + vol.Schema({str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}), ) } ), @@ -277,6 +270,8 @@ SCHEMA_ADDON_CONFIG = vol.Schema( extra=vol.REMOVE_EXTRA, ) +SCHEMA_ADDON_CONFIG = vol.All(_migrate_addon_config(True), _SCHEMA_ADDON_CONFIG) + # pylint: disable=no-value-for-parameter SCHEMA_BUILD_CONFIG = vol.Schema( @@ -300,15 +295,13 @@ SCHEMA_ADDON_USER = vol.Schema( vol.Optional(ATTR_IMAGE): docker_image, vol.Optional(ATTR_UUID, default=lambda: uuid.uuid4().hex): uuid_match, vol.Optional(ATTR_ACCESS_TOKEN): token, - vol.Optional(ATTR_INGRESS_TOKEN, default=secrets.token_urlsafe): vol.Coerce( - str - ), + vol.Optional(ATTR_INGRESS_TOKEN, default=secrets.token_urlsafe): str, vol.Optional(ATTR_OPTIONS, default=dict): dict, vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(), vol.Optional(ATTR_BOOT): vol.Coerce(AddonBoot), vol.Optional(ATTR_NETWORK): docker_ports, - vol.Optional(ATTR_AUDIO_OUTPUT): vol.Maybe(vol.Coerce(str)), - vol.Optional(ATTR_AUDIO_INPUT): vol.Maybe(vol.Coerce(str)), + vol.Optional(ATTR_AUDIO_OUTPUT): vol.Maybe(str), + vol.Optional(ATTR_AUDIO_INPUT): vol.Maybe(str), vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(), vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(), vol.Optional(ATTR_WATCHDOG, default=False): vol.Boolean(), @@ -317,18 +310,21 @@ SCHEMA_ADDON_USER = vol.Schema( ) -SCHEMA_ADDON_SYSTEM = SCHEMA_ADDON_CONFIG.extend( - { - vol.Required(ATTR_LOCATON): vol.Coerce(str), - vol.Required(ATTR_REPOSITORY): vol.Coerce(str), - } +SCHEMA_ADDON_SYSTEM = vol.All( + _migrate_addon_config(), + _SCHEMA_ADDON_CONFIG.extend( + { + vol.Required(ATTR_LOCATON): str, + vol.Required(ATTR_REPOSITORY): str, + } + ), ) SCHEMA_ADDONS_FILE = vol.Schema( { - vol.Optional(ATTR_USER, default=dict): {vol.Coerce(str): SCHEMA_ADDON_USER}, - vol.Optional(ATTR_SYSTEM, default=dict): {vol.Coerce(str): SCHEMA_ADDON_SYSTEM}, + vol.Optional(ATTR_USER, default=dict): {str: SCHEMA_ADDON_USER}, + vol.Optional(ATTR_SYSTEM, default=dict): {str: SCHEMA_ADDON_SYSTEM}, } ) @@ -338,263 +334,7 @@ SCHEMA_ADDON_SNAPSHOT = vol.Schema( vol.Required(ATTR_USER): SCHEMA_ADDON_USER, vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM, vol.Required(ATTR_STATE): vol.Coerce(AddonState), - vol.Required(ATTR_VERSION): vol.Coerce(str), + vol.Required(ATTR_VERSION): version_tag, }, extra=vol.REMOVE_EXTRA, ) - - -def validate_options(coresys: CoreSys, raw_schema: Dict[str, Any]): - """Validate schema.""" - - def validate(struct): - """Create schema validator for add-ons options.""" - options = {} - - # read options - for key, value in struct.items(): - # Ignore unknown options / remove from list - if key not in raw_schema: - _LOGGER.warning("Unknown options %s", key) - continue - - typ = raw_schema[key] - try: - if isinstance(typ, list): - # nested value list - options[key] = _nested_validate_list(coresys, typ[0], value, key) - elif isinstance(typ, dict): - # nested value dict - options[key] = _nested_validate_dict(coresys, typ, value, key) - else: - # normal value - options[key] = _single_validate(coresys, typ, value, key) - except (IndexError, KeyError): - raise vol.Invalid(f"Type error for {key}") from None - - _check_missing_options(raw_schema, options, "root") - return options - - return validate - - -# pylint: disable=no-value-for-parameter -# pylint: disable=inconsistent-return-statements -def _single_validate(coresys: CoreSys, typ: str, value: Any, key: str): - """Validate a single element.""" - # if required argument - if value is None: - raise vol.Invalid(f"Missing required option '{key}'") from None - - # Lookup secret - if str(value).startswith("!secret "): - secret: str = value.partition(" ")[2] - value = coresys.homeassistant.secrets.get(secret) - if value is None: - raise vol.Invalid(f"Unknown secret {secret}") from None - - # parse extend data from type - match = RE_SCHEMA_ELEMENT.match(typ) - - if not match: - raise vol.Invalid(f"Unknown type {typ}") from None - - # prepare range - range_args = {} - for group_name in _SCHEMA_LENGTH_PARTS: - group_value = match.group(group_name) - if group_value: - range_args[group_name[2:]] = float(group_value) - - if typ.startswith(V_STR) or typ.startswith(V_PASSWORD): - return vol.All(str(value), vol.Range(**range_args))(value) - elif typ.startswith(V_INT): - return vol.All(vol.Coerce(int), vol.Range(**range_args))(value) - elif typ.startswith(V_FLOAT): - return vol.All(vol.Coerce(float), vol.Range(**range_args))(value) - elif typ.startswith(V_BOOL): - return vol.Boolean()(value) - elif typ.startswith(V_EMAIL): - return vol.Email()(value) - elif typ.startswith(V_URL): - return vol.Url()(value) - elif typ.startswith(V_PORT): - return network_port(value) - elif typ.startswith(V_MATCH): - return vol.Match(match.group("match"))(str(value)) - elif typ.startswith(V_LIST): - return vol.In(match.group("list").split("|"))(str(value)) - - raise vol.Invalid(f"Fatal error for {key} type {typ}") from None - - -def _nested_validate_list(coresys, typ, data_list, key): - """Validate nested items.""" - options = [] - - # Make sure it is a list - if not isinstance(data_list, list): - raise vol.Invalid(f"Invalid list for {key}") from None - - # Process list - for element in data_list: - # Nested? - if isinstance(typ, dict): - c_options = _nested_validate_dict(coresys, typ, element, key) - options.append(c_options) - else: - options.append(_single_validate(coresys, typ, element, key)) - - return options - - -def _nested_validate_dict(coresys, typ, data_dict, key): - """Validate nested items.""" - options = {} - - # Make sure it is a dict - if not isinstance(data_dict, dict): - raise vol.Invalid(f"Invalid dict for {key}") from None - - # Process dict - for c_key, c_value in data_dict.items(): - # Ignore unknown options / remove from list - if c_key not in typ: - _LOGGER.warning("Unknown options %s", c_key) - continue - - # Nested? - if isinstance(typ[c_key], list): - options[c_key] = _nested_validate_list( - coresys, typ[c_key][0], c_value, c_key - ) - else: - options[c_key] = _single_validate(coresys, typ[c_key], c_value, c_key) - - _check_missing_options(typ, options, key) - return options - - -def _check_missing_options(origin, exists, root): - """Check if all options are exists.""" - missing = set(origin) - set(exists) - for miss_opt in missing: - if isinstance(origin[miss_opt], str) and origin[miss_opt].endswith("?"): - continue - raise vol.Invalid(f"Missing option {miss_opt} in {root}") from None - - -def schema_ui_options(raw_schema: Dict[str, Any]) -> List[Dict[str, Any]]: - """Generate UI schema.""" - ui_schema: List[Dict[str, Any]] = [] - - # read options - for key, value in raw_schema.items(): - if isinstance(value, list): - # nested value list - _nested_ui_list(ui_schema, value, key) - elif isinstance(value, dict): - # nested value dict - _nested_ui_dict(ui_schema, value, key) - else: - # normal value - _single_ui_option(ui_schema, value, key) - - return ui_schema - - -def _single_ui_option( - ui_schema: List[Dict[str, Any]], value: str, key: str, multiple: bool = False -) -> None: - """Validate a single element.""" - ui_node: Dict[str, Union[str, bool, float, List[str]]] = {"name": key} - - # If multiple - if multiple: - ui_node["multiple"] = True - - # Parse extend data from type - match = RE_SCHEMA_ELEMENT.match(value) - if not match: - return - - # Prepare range - for group_name in _SCHEMA_LENGTH_PARTS: - group_value = match.group(group_name) - if not group_value: - continue - if group_name[2:] == "min": - ui_node["lengthMin"] = float(group_value) - elif group_name[2:] == "max": - ui_node["lengthMax"] = float(group_value) - - # If required - if value.endswith("?"): - ui_node["optional"] = True - else: - ui_node["required"] = True - - # Data types - if value.startswith(V_STR): - ui_node["type"] = "string" - elif value.startswith(V_PASSWORD): - ui_node["type"] = "string" - ui_node["format"] = "password" - elif value.startswith(V_INT): - ui_node["type"] = "integer" - elif value.startswith(V_FLOAT): - ui_node["type"] = "float" - elif value.startswith(V_BOOL): - ui_node["type"] = "boolean" - elif value.startswith(V_EMAIL): - ui_node["type"] = "string" - ui_node["format"] = "email" - elif value.startswith(V_URL): - ui_node["type"] = "string" - ui_node["format"] = "url" - elif value.startswith(V_PORT): - ui_node["type"] = "integer" - elif value.startswith(V_MATCH): - ui_node["type"] = "string" - elif value.startswith(V_LIST): - ui_node["type"] = "select" - ui_node["options"] = match.group("list").split("|") - - ui_schema.append(ui_node) - - -def _nested_ui_list( - ui_schema: List[Dict[str, Any]], option_list: List[Any], key: str -) -> None: - """UI nested list items.""" - try: - element = option_list[0] - except IndexError: - _LOGGER.error("Invalid schema %s", key) - return - - if isinstance(element, dict): - _nested_ui_dict(ui_schema, element, key, multiple=True) - else: - _single_ui_option(ui_schema, element, key, multiple=True) - - -def _nested_ui_dict( - ui_schema: List[Dict[str, Any]], - option_dict: Dict[str, Any], - key: str, - multiple: bool = False, -) -> None: - """UI nested dict items.""" - ui_node = {"name": key, "type": "schema", "optional": True, "multiple": multiple} - - nested_schema = [] - for c_key, c_value in option_dict.items(): - # Nested? - if isinstance(c_value, list): - _nested_ui_list(nested_schema, c_value, c_key) - else: - _single_ui_option(nested_schema, c_value, c_key) - - ui_node["schema"] = nested_schema - ui_schema.append(ui_node) diff --git a/supervisor/api/addons.py b/supervisor/api/addons.py index 96550d8bc..360aa0af9 100644 --- a/supervisor/api/addons.py +++ b/supervisor/api/addons.py @@ -82,6 +82,7 @@ from ..const import ( ATTR_STARTUP, ATTR_STATE, ATTR_STDIN, + ATTR_UART, ATTR_UDEV, ATTR_UPDATE_AVAILABLE, ATTR_URL, @@ -237,7 +238,7 @@ class APIAddons(CoreSysAttributes): ATTR_PRIVILEGED: addon.privileged, ATTR_FULL_ACCESS: addon.with_full_access, ATTR_APPARMOR: addon.apparmor, - ATTR_DEVICES: _pretty_devices(addon), + ATTR_DEVICES: addon.static_devices, ATTR_ICON: addon.with_icon, ATTR_LOGO: addon.with_logo, ATTR_CHANGELOG: addon.with_changelog, @@ -250,6 +251,7 @@ class APIAddons(CoreSysAttributes): ATTR_HOMEASSISTANT_API: addon.access_homeassistant_api, ATTR_GPIO: addon.with_gpio, ATTR_USB: addon.with_usb, + ATTR_UART: addon.with_uart, ATTR_KERNEL_MODULES: addon.with_kernel_modules, ATTR_DEVICETREE: addon.with_devicetree, ATTR_UDEV: addon.with_udev, @@ -286,6 +288,8 @@ class APIAddons(CoreSysAttributes): ATTR_VERSION: addon.version, ATTR_UPDATE_AVAILABLE: addon.need_update, ATTR_WATCHDOG: addon.watchdog, + ATTR_DEVICES: addon.static_devices + + [device.path for device in addon.devices], } ) diff --git a/supervisor/api/hardware.py b/supervisor/api/hardware.py index 1db46dd5b..5128941ba 100644 --- a/supervisor/api/hardware.py +++ b/supervisor/api/hardware.py @@ -1,50 +1,46 @@ """Init file for Supervisor hardware RESTful API.""" -import asyncio import logging -from typing import Any, Awaitable, Dict, List +from typing import Any, Awaitable, Dict from aiohttp import web -from ..const import ( - ATTR_AUDIO, - ATTR_DISK, - ATTR_GPIO, - ATTR_INPUT, - ATTR_OUTPUT, - ATTR_SERIAL, - ATTR_USB, -) +from ..const import ATTR_AUDIO, ATTR_DEVICES, ATTR_INPUT, ATTR_NAME, ATTR_OUTPUT from ..coresys import CoreSysAttributes +from ..hardware.const import ( + ATTR_ATTRIBUTES, + ATTR_BY_ID, + ATTR_DEV_PATH, + ATTR_SUBSYSTEM, + ATTR_SYSFS, +) +from ..hardware.data import Device from .utils import api_process _LOGGER: logging.Logger = logging.getLogger(__name__) +def device_struct(device: Device) -> Dict[str, Any]: + """Return a dict with information of a interface to be used in th API.""" + return { + ATTR_NAME: device.name, + ATTR_SYSFS: device.sysfs, + ATTR_DEV_PATH: device.path, + ATTR_SUBSYSTEM: device.subsystem, + ATTR_BY_ID: device.by_id, + ATTR_ATTRIBUTES: device.attributes, + } + + class APIHardware(CoreSysAttributes): """Handle RESTful API for hardware functions.""" @api_process async def info(self, request: web.Request) -> Dict[str, Any]: """Show hardware info.""" - serial: List[str] = [] - - # Create Serial list with device links - for device in self.sys_hardware.serial_devices: - serial.append(device.path.as_posix()) - for link in device.links: - serial.append(link.as_posix()) - return { - ATTR_SERIAL: serial, - ATTR_INPUT: list(self.sys_hardware.input_devices), - ATTR_DISK: [ - device.path.as_posix() for device in self.sys_hardware.disk_devices - ], - ATTR_GPIO: list(self.sys_hardware.gpio_devices), - ATTR_USB: [ - device.path.as_posix() for device in self.sys_hardware.usb_devices - ], - ATTR_AUDIO: self.sys_hardware.audio_devices, + ATTR_DEVICES: [ + device_struct(device) for device in self.sys_hardware.devices + ] } @api_process @@ -64,6 +60,6 @@ class APIHardware(CoreSysAttributes): } @api_process - def trigger(self, request: web.Request) -> Awaitable[None]: + async def trigger(self, request: web.Request) -> Awaitable[None]: """Trigger a udev device reload.""" - return asyncio.shield(self.sys_hardware.udev_trigger()) + _LOGGER.warning("Ignoring DEPRECATED hardware trigger function call.") diff --git a/supervisor/bootstrap.py b/supervisor/bootstrap.py index 5b1ade822..3cedb8aeb 100644 --- a/supervisor/bootstrap.py +++ b/supervisor/bootstrap.py @@ -35,12 +35,12 @@ from .core import Core from .coresys import CoreSys from .dbus import DBusManager from .discovery import Discovery +from .hardware.module import HardwareManager from .hassos import HassOS from .homeassistant import HomeAssistant from .host import HostManager from .ingress import Ingress from .misc.filter import filter_data -from .misc.hwmon import HwMonitor from .misc.scheduler import Scheduler from .misc.tasks import Tasks from .plugins import PluginManager @@ -73,7 +73,7 @@ async def initialize_coresys() -> CoreSys: coresys.addons = AddonManager(coresys) coresys.snapshots = SnapshotManager(coresys) coresys.host = HostManager(coresys) - coresys.hwmonitor = HwMonitor(coresys) + coresys.hardware = HardwareManager(coresys) coresys.ingress = Ingress(coresys) coresys.tasks = Tasks(coresys) coresys.services = ServiceManager(coresys) diff --git a/supervisor/const.py b/supervisor/const.py index e81d22159..dda5ffb8b 100644 --- a/supervisor/const.py +++ b/supervisor/const.py @@ -89,7 +89,7 @@ ATTR_AUDIO = "audio" ATTR_AUDIO_INPUT = "audio_input" ATTR_AUDIO_OUTPUT = "audio_output" ATTR_AUTH_API = "auth_api" -ATTR_AUTO_UART = "auto_uart" +ATTR_UART = "uart" ATTR_AUTO_UPDATE = "auto_update" ATTR_AVAILABLE = "available" ATTR_BLK_READ = "blk_read" @@ -359,9 +359,6 @@ ROLE_ADMIN = "admin" ROLE_ALL = [ROLE_DEFAULT, ROLE_HOMEASSISTANT, ROLE_BACKUP, ROLE_MANAGER, ROLE_ADMIN] -CHAN_ID = "chan_id" -CHAN_TYPE = "chan_type" - class AddonBoot(str, Enum): """Boot mode for the add-on.""" diff --git a/supervisor/core.py b/supervisor/core.py index bcd3c2a12..4bd087bc8 100644 --- a/supervisor/core.py +++ b/supervisor/core.py @@ -95,6 +95,8 @@ class Core(CoreSysAttributes): setup_loads: List[Awaitable[None]] = [ # rest api views self.sys_api.load(), + # Load Host Hardware + self.sys_hardware.load(), # Load DBus self.sys_dbus.load(), # Load Host @@ -179,7 +181,7 @@ class Core(CoreSysAttributes): try: # HomeAssistant is already running / supervisor have only reboot - if self.sys_hardware.last_boot == self.sys_config.last_boot: + if self.sys_hardware.helper.last_boot == self.sys_config.last_boot: _LOGGER.info("Supervisor reboot detected") return @@ -225,9 +227,6 @@ class Core(CoreSysAttributes): if self.sys_homeassistant.version == LANDINGPAGE: self.sys_create_task(self.sys_homeassistant.core.install()) - # Start observe the host Hardware - await self.sys_hwmonitor.load() - # Upate Host/Deivce information self.sys_create_task(self.sys_host.reload()) self.sys_create_task(self.sys_updater.reload()) @@ -262,7 +261,7 @@ class Core(CoreSysAttributes): self.sys_websession.close(), self.sys_websession_ssl.close(), self.sys_ingress.unload(), - self.sys_hwmonitor.unload(), + self.sys_hardware.unload(), ] ) except asyncio.TimeoutError: @@ -296,7 +295,7 @@ class Core(CoreSysAttributes): def _update_last_boot(self): """Update last boot time.""" - self.sys_config.last_boot = self.sys_hardware.last_boot + self.sys_config.last_boot = self.sys_hardware.helper.last_boot self.sys_config.save_data() async def repair(self): diff --git a/supervisor/coresys.py b/supervisor/coresys.py index 954fb46c7..835c9f855 100644 --- a/supervisor/coresys.py +++ b/supervisor/coresys.py @@ -11,7 +11,6 @@ import sentry_sdk from .config import CoreConfig from .const import ENV_SUPERVISOR_DEV from .docker import DockerAPI -from .misc.hardware import Hardware if TYPE_CHECKING: from .addons import AddonManager @@ -26,7 +25,7 @@ if TYPE_CHECKING: from .host import HostManager from .ingress import Ingress from .jobs import JobManager - from .misc.hwmon import HwMonitor + from .hardware.module import HardwareManager from .misc.scheduler import Scheduler from .misc.tasks import Tasks from .plugins import PluginManager @@ -59,7 +58,6 @@ class CoreSys: # Global objects self._config: CoreConfig = CoreConfig() - self._hardware: Hardware = Hardware() self._docker: DockerAPI = DockerAPI() # Internal objects pointers @@ -81,7 +79,7 @@ class CoreSys: self._scheduler: Optional[Scheduler] = None self._store: Optional[StoreManager] = None self._discovery: Optional[Discovery] = None - self._hwmonitor: Optional[HwMonitor] = None + self._hardware: Optional[HardwareManager] = None self._plugins: Optional[PluginManager] = None self._resolution: Optional[ResolutionManager] = None self._jobs: Optional[JobManager] = None @@ -111,11 +109,6 @@ class CoreSys: """Return CoreConfig object.""" return self._config - @property - def hardware(self) -> Hardware: - """Return Hardware object.""" - return self._hardware - @property def docker(self) -> DockerAPI: """Return DockerAPI object.""" @@ -360,18 +353,18 @@ class CoreSys: self._host = value @property - def hwmonitor(self) -> HwMonitor: - """Return HwMonitor object.""" - if self._hwmonitor is None: - raise RuntimeError("HwMonitor not set!") - return self._hwmonitor + def hardware(self) -> HardwareManager: + """Return HardwareManager object.""" + if self._hardware is None: + raise RuntimeError("HardwareManager not set!") + return self._hardware - @hwmonitor.setter - def hwmonitor(self, value: HwMonitor) -> None: - """Set a HwMonitor object.""" - if self._hwmonitor: - raise RuntimeError("HwMonitor already set!") - self._hwmonitor = value + @hardware.setter + def hardware(self, value: HardwareManager) -> None: + """Set a HardwareManager object.""" + if self._hardware: + raise RuntimeError("HardwareManager already set!") + self._hardware = value @property def ingress(self) -> Ingress: @@ -489,11 +482,6 @@ class CoreSysAttributes: """Return CoreConfig object.""" return self.coresys.config - @property - def sys_hardware(self) -> Hardware: - """Return Hardware object.""" - return self.coresys.hardware - @property def sys_docker(self) -> DockerAPI: """Return DockerAPI object.""" @@ -585,9 +573,9 @@ class CoreSysAttributes: return self.coresys.host @property - def sys_hwmonitor(self) -> HwMonitor: + def sys_hardware(self) -> HardwareManager: """Return HwMonitor object.""" - return self.coresys.hwmonitor + return self.coresys.hardware @property def sys_ingress(self) -> Ingress: diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index f013fef64..ae32b1488 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -28,6 +28,7 @@ from ..const import ( ) from ..coresys import CoreSys from ..exceptions import CoreDNSError, DockerError +from ..hardware.const import PolicyGroup, UdevSubsystem from ..utils import process_lock from .interface import DockerInterface @@ -124,34 +125,57 @@ class DockerAddon(DockerInterface): } @property - def devices(self) -> List[str]: + def devices(self) -> Optional[List[str]]: """Return needed devices.""" - devices = [] + devices = set() # Extend add-on config - for device in self.addon.devices: - if not Path(device.split(":")[0]).exists(): + for device_path in self.addon.static_devices: + if not self.sys_hardware.exists_device_node(device_path): + _LOGGER.debug("Ignore static device path %s", device_path) continue - devices.append(device) + devices.add(f"{device_path.as_posix()}:{device_path.as_posix()}:rwm") - # Auto mapping UART devices - if self.addon.with_uart: - for device in self.sys_hardware.serial_devices: - devices.append(f"{device.path.as_posix()}:{device.path.as_posix()}:rwm") - if self.addon.with_udev: + # Auto mapping UART devices / LINKS + # Deprecated: In the future the add-on needs to create device links based on API data by itself + if self.addon.with_uart and not self.addon.devices and not self.addon.with_udev: + for device in self.sys_hardware.filter_devices( + subsystem=UdevSubsystem.SERIAL + ): + if not device.by_id: continue - for device_link in device.links: - devices.append( - f"{device_link.as_posix()}:{device_link.as_posix()}:rwm" - ) - - # Use video devices - if self.addon.with_video: - for device in self.sys_hardware.video_devices: - devices.append(f"{device.path!s}:{device.path!s}:rwm") + devices.add(f"{device.by_id.as_posix()}:{device.by_id.as_posix()}:rwm") # Return None if no devices is present - return devices or None + if devices: + return list(devices) + return None + + @property + def cgroups_rules(self) -> Optional[List[str]]: + """Return a list of needed cgroups permission.""" + rules = set() + + # Attach correct cgroups + for device in self.addon.devices: + rules.add(self.sys_hardware.policy.get_cgroups_rule(device)) + + # Video + if self.addon.with_video: + rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.VIDEO)) + + # GPIO + if self.addon.with_gpio: + rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.GPIO)) + + # UART + if self.addon.with_uart: + rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.UART)) + + # Return None if no rules is present + if rules: + return list(rules) + return None @property def ports(self) -> Optional[Dict[str, Union[str, int, None]]]: @@ -284,7 +308,7 @@ class DockerAddon(DockerInterface): # Init other hardware mappings # GPIO support - if self.addon.with_gpio and self.sys_hardware.support_gpio: + if self.addon.with_gpio and self.sys_hardware.helper.support_gpio: for gpio_path in ("/sys/class/gpio", "/sys/devices/platform/soc"): volumes.update({gpio_path: {"bind": gpio_path, "mode": "rw"}}) @@ -299,8 +323,15 @@ class DockerAddon(DockerInterface): } ) + # Host udev support + if self.addon.with_udev: + volumes.update({"/run/dbus": {"bind": "/run/dbus", "mode": "ro"}}) + # USB support - if self.addon.with_usb and self.sys_hardware.usb_devices: + if (self.addon.with_usb and self.sys_hardware.helper.usb_devices) or any( + self.sys_hardware.check_subsystem_parents(device, UdevSubsystem.USB) + for device in self.addon.devices + ): volumes.update({"/dev/bus/usb": {"bind": "/dev/bus/usb", "mode": "rw"}}) # Kernel Modules support @@ -369,6 +400,7 @@ class DockerAddon(DockerInterface): ports=self.ports, extra_hosts=self.network_mapping, devices=self.devices, + device_cgroup_rules=self.cgroups_rules, cap_add=self.addon.privileged, security_opt=self.security_opt, environment=self.environment, diff --git a/supervisor/docker/audio.py b/supervisor/docker/audio.py index d963b62ce..c7f5c2492 100644 --- a/supervisor/docker/audio.py +++ b/supervisor/docker/audio.py @@ -5,6 +5,7 @@ from typing import Dict from ..const import ENV_TIME, MACHINE_ID from ..coresys import CoreSysAttributes +from ..hardware.const import PolicyGroup from .interface import DockerInterface _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -66,6 +67,9 @@ class DockerAudio(DockerInterface, CoreSysAttributes): hostname=self.name.replace("_", "-"), detach=True, privileged=True, + device_cgroup_rules=self.sys_hardware.policy.get_cgroups_rules( + PolicyGroup.AUDIO + ), environment={ENV_TIME: self.sys_config.timezone}, volumes=self.volumes, ) diff --git a/supervisor/docker/homeassistant.py b/supervisor/docker/homeassistant.py index 17385d907..ff5ade52a 100644 --- a/supervisor/docker/homeassistant.py +++ b/supervisor/docker/homeassistant.py @@ -1,13 +1,14 @@ """Init file for Supervisor Docker object.""" from ipaddress import IPv4Address import logging -from typing import Awaitable, Dict, Optional +from typing import Awaitable, Dict, List, Optional import docker import requests from ..const import ENV_TIME, ENV_TOKEN, ENV_TOKEN_HASSIO, LABEL_MACHINE, MACHINE_ID from ..exceptions import DockerError +from ..hardware.const import PolicyGroup from .interface import CommandReturn, DockerInterface _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -47,6 +48,15 @@ class DockerHomeAssistant(DockerInterface): """Return IP address of this container.""" return self.sys_docker.network.gateway + @property + def cgroups_rules(self) -> List[str]: + """Return a list of needed cgroups permission.""" + return ( + self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.UART) + + self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.VIDEO) + + self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.GPIO) + ) + @property def volumes(self) -> Dict[str, Dict[str, str]]: """Return Volumes for the mount.""" @@ -117,6 +127,7 @@ class DockerHomeAssistant(DockerInterface): init=False, network_mode="host", volumes=self.volumes, + device_cgroup_rules=self.cgroups_rules, extra_hosts={ "supervisor": self.sys_docker.network.supervisor, "observer": self.sys_docker.network.observer, diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 848e6a397..3e273211b 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -296,6 +296,14 @@ class DockerNotFound(DockerError): # Hardware +class HardwareError(HassioError): + """General Hardware Error on Supervisor.""" + + +class HardwareNotFound(HardwareError): + """Hardware path or device doesn't exist on the Host.""" + + class HardwareNotSupportedError(HassioNotSupportedError): """Raise if hardware function is not supported.""" diff --git a/supervisor/hardware/__init__.py b/supervisor/hardware/__init__.py new file mode 100644 index 000000000..70058a506 --- /dev/null +++ b/supervisor/hardware/__init__.py @@ -0,0 +1 @@ +"""Hardware handler of Supervisor.""" diff --git a/supervisor/hardware/const.py b/supervisor/hardware/const.py new file mode 100644 index 000000000..736c479f3 --- /dev/null +++ b/supervisor/hardware/const.py @@ -0,0 +1,29 @@ +"""Constants for hardware.""" +from enum import Enum + +ATTR_BY_ID = "by_id" +ATTR_SUBSYSTEM = "subsystem" +ATTR_SYSFS = "sysfs" +ATTR_DEV_PATH = "dev_path" +ATTR_ATTRIBUTES = "attributes" + + +class UdevSubsystem(str, Enum): + """Udev subsystem class.""" + + SERIAL = "tty" + USB = "usb" + INPUT = "input" + DISK = "block" + PCI = "pci" + AUDIO = "sound" + + +class PolicyGroup(str, Enum): + """Policy groups backend.""" + + UART = "uart" + GPIO = "gpio" + USB = "usb" + VIDEO = "video" + AUDIO = "audio" diff --git a/supervisor/hardware/data.py b/supervisor/hardware/data.py new file mode 100644 index 000000000..957b5b2f1 --- /dev/null +++ b/supervisor/hardware/data.py @@ -0,0 +1,36 @@ +"""Data representation of Hardware.""" +from pathlib import Path +from typing import Dict, List, Optional + +import attr + + +@attr.s(slots=True, frozen=True) +class Device: + """Represent a device.""" + + name: str = attr.ib(eq=False) + path: Path = attr.ib(eq=False) + sysfs: Path = attr.ib(eq=True) + subsystem: str = attr.ib(eq=False) + links: List[Path] = attr.ib(eq=False) + attributes: Dict[str, str] = attr.ib(eq=False) + + @property + def cgroups_major(self) -> int: + """Return Major cgroups.""" + return int(self.attributes.get("MAJOR", 0)) + + @property + def cgroups_minor(self) -> int: + """Return Major cgroups.""" + return int(self.attributes.get("MINOR", 0)) + + @property + def by_id(self) -> Optional[Path]: + """Return path by-id.""" + for link in self.links: + if not link.match("/dev/*/by-id/*"): + continue + return link + return None diff --git a/supervisor/hardware/helper.py b/supervisor/hardware/helper.py new file mode 100644 index 000000000..9efc0bef5 --- /dev/null +++ b/supervisor/hardware/helper.py @@ -0,0 +1,78 @@ +"""Read hardware info from system.""" +from datetime import datetime +import logging +from pathlib import Path +import re +import shutil +from typing import Optional, Union + +import pyudev + +from ..coresys import CoreSys, CoreSysAttributes +from .const import UdevSubsystem + +_LOGGER: logging.Logger = logging.getLogger(__name__) + + +_PROC_STAT: Path = Path("/proc/stat") +_RE_BOOT_TIME: re.Pattern = re.compile(r"btime (\d+)") + +_GPIO_DEVICES: Path = Path("/sys/class/gpio") +_SOC_DEVICES: Path = Path("/sys/devices/platform/soc") + +_RE_HIDE_SYSFS: re.Pattern = re.compile(r"/sys/devices/virtual/(?:tty|block)/.*") + + +class HwHelper(CoreSysAttributes): + """Representation of an interface to procfs, sysfs and udev.""" + + def __init__(self, coresys: CoreSys): + """Init hardware object.""" + self.coresys = coresys + + @property + def support_audio(self) -> bool: + """Return True if the system have audio support.""" + return len(self.sys_hardware.filter_devices(subsystem=UdevSubsystem.AUDIO)) + + @property + def support_gpio(self) -> bool: + """Return True if device support GPIOs.""" + return _SOC_DEVICES.exists() and _GPIO_DEVICES.exists() + + @property + def last_boot(self) -> Optional[str]: + """Return last boot time.""" + try: + with _PROC_STAT.open("r") as stat_file: + stats: str = stat_file.read() + except OSError as err: + _LOGGER.error("Can't read stat data: %s", err) + return None + + # parse stat file + found: Optional[re.Match] = _RE_BOOT_TIME.search(stats) + if not found: + _LOGGER.error("Can't found last boot time!") + return None + + return datetime.utcfromtimestamp(int(found.group(1))) + + def hide_virtual_device(self, udev_device: pyudev.Device) -> bool: + """Small helper to hide not needed Devices.""" + return _RE_HIDE_SYSFS.match(udev_device.sys_path) is not None + + def get_disk_total_space(self, path: Union[str, Path]) -> float: + """Return total space (GiB) on disk for path.""" + total, _, _ = shutil.disk_usage(path) + return round(total / (1024.0 ** 3), 1) + + def get_disk_used_space(self, path: Union[str, Path]) -> float: + """Return used space (GiB) on disk for path.""" + _, used, _ = shutil.disk_usage(path) + return round(used / (1024.0 ** 3), 1) + + def get_disk_free_space(self, path: Union[str, Path]) -> float: + """Return free space (GiB) on disk for path.""" + _, _, free = shutil.disk_usage(path) + return round(free / (1024.0 ** 3), 1) diff --git a/supervisor/hardware/module.py b/supervisor/hardware/module.py new file mode 100644 index 000000000..f9846a0c6 --- /dev/null +++ b/supervisor/hardware/module.py @@ -0,0 +1,120 @@ +"""Hardware Manager of Supervisor.""" +import logging +from pathlib import Path +from typing import Dict, List, Optional + +import pyudev + +from supervisor.hardware.const import UdevSubsystem + +from ..coresys import CoreSys, CoreSysAttributes +from ..exceptions import HardwareNotFound +from .data import Device +from .helper import HwHelper +from .monitor import HwMonitor +from .policy import HwPolicy + +_LOGGER: logging.Logger = logging.getLogger(__name__) + + +class HardwareManager(CoreSysAttributes): + """Hardware manager for supervisor.""" + + def __init__(self, coresys: CoreSys): + """Initialize Hardware Monitor object.""" + self.coresys: CoreSys = coresys + self._devices: Dict[str, Device] = {} + self._udev = pyudev.Context() + + self._montior: HwMonitor = HwMonitor(coresys) + self._helper: HwHelper = HwHelper(coresys) + self._policy: HwPolicy = HwPolicy(coresys) + + @property + def monitor(self) -> HwMonitor: + """Return Hardware Monitor instance.""" + return self._montior + + @property + def helper(self) -> HwHelper: + """Return Hardware Helper instance.""" + return self._helper + + @property + def policy(self) -> HwPolicy: + """Return Hardware policy instance.""" + return self._policy + + @property + def devices(self) -> List[Device]: + """Return List of devices.""" + return list(self._devices.values()) + + def get_by_path(self, device_node: Path) -> Device: + """Get Device by path.""" + for device in self.devices: + if device_node in (device.path, device.sysfs): + return device + if device_node in device.links: + return device + raise HardwareNotFound() + + def filter_devices(self, subsystem: Optional[UdevSubsystem] = None) -> List[Device]: + """Return a filtered list.""" + devices = set() + for device in self.devices: + if subsystem and device.subsystem != subsystem: + continue + devices.add(device) + return list(devices) + + def update_device(self, device: Device) -> None: + """Update or add a (new) Device.""" + self._devices[device.name] = device + + def delete_device(self, device: Device) -> None: + """Remove a device from the list.""" + self._devices.pop(device.name, None) + + def exists_device_node(self, device_node: Path) -> bool: + """Check if device exists on Host.""" + try: + self.get_by_path(device_node) + except HardwareNotFound: + return False + return True + + def check_subsystem_parents(self, device: Device, subsystem: UdevSubsystem) -> bool: + """Return True if the device is part of the given subsystem parent.""" + udev_device: pyudev.Device = pyudev.Devices.from_sys_path( + self._udev, str(device.sysfs) + ) + return udev_device.find_parent(subsystem.value) is not None + + def _import_devices(self) -> None: + """Import fresh from udev database.""" + self._devices.clear() + + # Exctract all devices + for device in self._udev.list_devices(): + # Skip devices without mapping + if not device.device_node or self.helper.hide_virtual_device(device): + continue + + self._devices[device.sys_name] = Device( + device.sys_name, + Path(device.device_node), + Path(device.sys_path), + device.subsystem, + [Path(node) for node in device.device_links], + {attr: device.properties[attr] for attr in device.properties}, + ) + + async def load(self) -> None: + """Load hardware backend.""" + self._import_devices() + await self.monitor.load() + + async def unload(self) -> None: + """Shutdown sessions.""" + await self.monitor.unload() diff --git a/supervisor/misc/hwmon.py b/supervisor/hardware/monitor.py similarity index 69% rename from supervisor/misc/hwmon.py rename to supervisor/hardware/monitor.py index ebb133962..1ece408b1 100644 --- a/supervisor/misc/hwmon.py +++ b/supervisor/hardware/monitor.py @@ -1,14 +1,18 @@ """Supervisor Hardware monitor based on udev.""" from datetime import timedelta import logging +from pathlib import Path from pprint import pformat from typing import Optional import pyudev +from ..const import CoreState from ..coresys import CoreSys, CoreSysAttributes from ..resolution.const import UnhealthyReason from ..utils import AsyncCallFilter +from .const import UdevSubsystem +from .data import Device _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -26,7 +30,7 @@ class HwMonitor(CoreSysAttributes): async def load(self) -> None: """Start hardware monitor.""" try: - self.monitor = pyudev.Monitor.from_netlink(self.context) + self.monitor = pyudev.Monitor.from_netlink(self.context, "kernel") self.observer = pyudev.MonitorObserver(self.monitor, self._udev_events) except OSError: self.sys_resolution.unhealthy = UnhealthyReason.PRIVILEGED @@ -53,9 +57,31 @@ class HwMonitor(CoreSysAttributes): def _async_udev_events(self, action: str, device: pyudev.Device): """Incomming events from udev into loop.""" - # Sound changes - if device.subsystem == "sound": - self._action_sound(device) + if self.sys_core.state in (CoreState.RUNNING, CoreState.FREEZE): + # Sound changes + if device.subsystem == UdevSubsystem.AUDIO: + self._action_sound(device) + + # Update device List + if not device.device_node or self.sys_hardware.helper.hide_virtual_device( + device + ): + return + + device = Device( + device.sys_name, + Path(device.device_node), + Path(device.sys_path), + device.subsystem, + [Path(node) for node in device.device_links], + {attr: device.properties[attr] for attr in device.properties}, + ) + + # Process the action + if action == "add": + self.sys_hardware.update_device(device) + if action == "remove": + self.sys_hardware.delete_device(device) @AsyncCallFilter(timedelta(seconds=5)) def _action_sound(self, device: pyudev.Device): diff --git a/supervisor/hardware/policy.py b/supervisor/hardware/policy.py new file mode 100644 index 000000000..c77a5b9e2 --- /dev/null +++ b/supervisor/hardware/policy.py @@ -0,0 +1,33 @@ +"""Policy / cgroups management of local host.""" +import logging +from typing import Dict, List + +from ..coresys import CoreSys, CoreSysAttributes +from .const import PolicyGroup +from .data import Device + +_LOGGER: logging.Logger = logging.getLogger(__name__) + + +GROUP_CGROUPS: Dict[PolicyGroup, List[int]] = { + PolicyGroup.UART: [204, 188, 166, 244], + PolicyGroup.GPIO: [254], + PolicyGroup.VIDEO: [239, 29], + PolicyGroup.AUDIO: [116], +} + + +class HwPolicy(CoreSysAttributes): + """Handle Hardware policy / cgroups.""" + + def __init__(self, coresys: CoreSys): + """Init hardware policy object.""" + self.coresys = coresys + + def get_cgroups_rules(self, group: PolicyGroup) -> List[str]: + """Generate cgroups rules for a policy group.""" + return [f"c {dev}:* rwm" for dev in GROUP_CGROUPS.get(group, [])] + + def get_cgroups_rule(self, device: Device) -> str: + """Generate a cgroups rule for given device.""" + return f"c {device.cgroups_major}:{device.cgroups_minor} rwm" diff --git a/supervisor/host/info.py b/supervisor/host/info.py index f3f88205c..b0eaf6445 100644 --- a/supervisor/host/info.py +++ b/supervisor/host/info.py @@ -54,21 +54,21 @@ class InfoCenter(CoreSysAttributes): @property def total_space(self) -> float: """Return total space (GiB) on disk for supervisor data directory.""" - return self.coresys.hardware.get_disk_total_space( + return self.sys_hardware.helper.get_disk_total_space( self.coresys.config.path_supervisor ) @property def used_space(self) -> float: """Return used space (GiB) on disk for supervisor data directory.""" - return self.coresys.hardware.get_disk_used_space( + return self.sys_hardware.helper.get_disk_used_space( self.coresys.config.path_supervisor ) @property def free_space(self) -> float: """Return available space (GiB) on disk for supervisor data directory.""" - return self.coresys.hardware.get_disk_free_space( + return self.sys_hardware.helper.get_disk_free_space( self.coresys.config.path_supervisor ) diff --git a/supervisor/misc/hardware.py b/supervisor/misc/hardware.py deleted file mode 100644 index c2d73437d..000000000 --- a/supervisor/misc/hardware.py +++ /dev/null @@ -1,232 +0,0 @@ -"""Read hardware info from system.""" -import asyncio -from datetime import datetime -import logging -from pathlib import Path -import re -import shutil -from typing import Any, Dict, List, Optional, Set, Union - -import attr -import pyudev - -from ..const import ATTR_DEVICES, ATTR_NAME, ATTR_TYPE, CHAN_ID, CHAN_TYPE -from ..exceptions import HardwareNotSupportedError - -_LOGGER: logging.Logger = logging.getLogger(__name__) - -ASOUND_CARDS: Path = Path("/proc/asound/cards") -RE_CARDS: re.Pattern = re.compile(r"(\d+) \[(\w*) *\]: (.*\w)") - -ASOUND_DEVICES: Path = Path("/proc/asound/devices") -RE_DEVICES: re.Pattern = re.compile(r"\[.*(\d+)- (\d+).*\]: ([\w ]*)") - -PROC_STAT: Path = Path("/proc/stat") -RE_BOOT_TIME: re.Pattern = re.compile(r"btime (\d+)") - -GPIO_DEVICES: Path = Path("/sys/class/gpio") -SOC_DEVICES: Path = Path("/sys/devices/platform/soc") -RE_TTY: re.Pattern = re.compile(r"tty[A-Z]+") - -RE_VIDEO_DEVICES = re.compile(r"^(?:vchiq|cec\d+|video\d+)") - - -@attr.s(slots=True, frozen=True) -class Device: - """Represent a device.""" - - name: str = attr.ib() - path: Path = attr.ib() - subsystem: str = attr.ib() - links: List[Path] = attr.ib() - attributes: Dict[str, str] = attr.ib() - - -class Hardware: - """Representation of an interface to procfs, sysfs and udev.""" - - def __init__(self): - """Init hardware object.""" - self.context = pyudev.Context() - - @property - def devices(self) -> List[Device]: - """Return a list of all available devices.""" - dev_list: List[Device] = [] - - # Exctract all devices - for device in self.context.list_devices(): - # Skip devices without mapping - if not device.device_node: - continue - - dev_list.append( - Device( - device.sys_name, - Path(device.device_node), - device.subsystem, - [Path(node) for node in device.device_links], - {attr: device.properties[attr] for attr in device.properties}, - ) - ) - - return dev_list - - @property - def video_devices(self) -> List[Device]: - """Return all available video devices.""" - dev_list: List[Device] = [] - - for device in self.devices: - if not RE_VIDEO_DEVICES.match(device.name): - continue - dev_list.append(device) - - return dev_list - - @property - def serial_devices(self) -> List[Device]: - """Return all serial and connected devices.""" - dev_list: List[Device] = [] - for device in self.devices: - if device.subsystem != "tty" or ( - "ID_VENDOR" not in device.attributes - and not RE_TTY.search(str(device.path)) - ): - continue - - # Cleanup not usable device links - for link in device.links.copy(): - if link.match("/dev/serial/by-id/*"): - continue - device.links.remove(link) - - dev_list.append(device) - - return dev_list - - @property - def usb_devices(self) -> List[Device]: - """Return all usb and connected devices.""" - return [device for device in self.devices if device.subsystem == "usb"] - - @property - def input_devices(self) -> Set[str]: - """Return all input devices.""" - dev_list: Set[str] = set() - for device in self.context.list_devices(subsystem="input"): - if "NAME" in device.properties: - dev_list.add(device.properties["NAME"].replace('"', "").strip()) - - return dev_list - - @property - def disk_devices(self) -> List[Device]: - """Return all disk devices.""" - dev_list: List[Device] = [] - for device in self.devices: - if device.subsystem != "block" or "ID_NAME" not in device.attributes: - continue - dev_list.append(device) - - return dev_list - - @property - def support_audio(self) -> bool: - """Return True if the system have audio support.""" - return bool(self.audio_devices) - - @property - def audio_devices(self) -> Dict[str, Any]: - """Return all available audio interfaces.""" - if not ASOUND_CARDS.exists(): - return {} - - try: - cards = ASOUND_CARDS.read_text() - devices = ASOUND_DEVICES.read_text() - except OSError as err: - _LOGGER.error("Can't read asound data: %s", err) - return {} - - audio_list: Dict[str, Any] = {} - - # parse cards - for match in RE_CARDS.finditer(cards): - audio_list[match.group(1)] = { - ATTR_NAME: match.group(3), - ATTR_TYPE: match.group(2), - ATTR_DEVICES: [], - } - - # parse devices - for match in RE_DEVICES.finditer(devices): - try: - audio_list[match.group(1)][ATTR_DEVICES].append( - {CHAN_ID: match.group(2), CHAN_TYPE: match.group(3)} - ) - except KeyError: - _LOGGER.warning("Wrong audio device found %s", match.group(0)) - continue - - return audio_list - - @property - def support_gpio(self) -> bool: - """Return True if device support GPIOs.""" - return SOC_DEVICES.exists() and GPIO_DEVICES.exists() - - @property - def gpio_devices(self) -> Set[str]: - """Return list of GPIO interface on device.""" - dev_list: Set[str] = set() - for interface in GPIO_DEVICES.glob("gpio*"): - dev_list.add(interface.name) - - return dev_list - - @property - def last_boot(self) -> Optional[str]: - """Return last boot time.""" - try: - with PROC_STAT.open("r") as stat_file: - stats: str = stat_file.read() - except OSError as err: - _LOGGER.error("Can't read stat data: %s", err) - return None - - # parse stat file - found: Optional[re.Match] = RE_BOOT_TIME.search(stats) - if not found: - _LOGGER.error("Can't found last boot time!") - return None - - return datetime.utcfromtimestamp(int(found.group(1))) - - def get_disk_total_space(self, path: Union[str, Path]) -> float: - """Return total space (GiB) on disk for path.""" - total, _, _ = shutil.disk_usage(path) - return round(total / (1024.0 ** 3), 1) - - def get_disk_used_space(self, path: Union[str, Path]) -> float: - """Return used space (GiB) on disk for path.""" - _, used, _ = shutil.disk_usage(path) - return round(used / (1024.0 ** 3), 1) - - def get_disk_free_space(self, path: Union[str, Path]) -> float: - """Return free space (GiB) on disk for path.""" - _, _, free = shutil.disk_usage(path) - return round(free / (1024.0 ** 3), 1) - - async def udev_trigger(self) -> None: - """Trigger a udev reload.""" - proc = await asyncio.create_subprocess_shell( - "udevadm trigger && udevadm settle" - ) - - await proc.wait() - if proc.returncode == 0: - return - - _LOGGER.warning("udevadm device triggering failed!") - raise HardwareNotSupportedError() diff --git a/supervisor/utils/json.py b/supervisor/utils/json.py index 2d4ab1223..804ece97a 100644 --- a/supervisor/utils/json.py +++ b/supervisor/utils/json.py @@ -28,6 +28,8 @@ class JSONEncoder(json.JSONEncoder): return o.isoformat() if isinstance(o, set): return list(o) + if isinstance(o, Path): + return str(o) if hasattr(o, "as_dict"): return o.as_dict() diff --git a/tests/addons/test_config.py b/tests/addons/test_config.py index 84b1680ea..8ea531134 100644 --- a/tests/addons/test_config.py +++ b/tests/addons/test_config.py @@ -28,6 +28,46 @@ def test_basic_config(): assert not valid_config["docker_api"] +def test_migration_startup(): + """Migrate Startup Type.""" + config = load_json_fixture("basic-addon-config.json") + + config["startup"] = "before" + + valid_config = vd.SCHEMA_ADDON_CONFIG(config) + + assert valid_config["startup"].value == "services" + + config["startup"] = "after" + + valid_config = vd.SCHEMA_ADDON_CONFIG(config) + + assert valid_config["startup"].value == "application" + + +def test_migration_auto_uart(): + """Migrate auto uart Type.""" + config = load_json_fixture("basic-addon-config.json") + + config["auto_uart"] = True + + valid_config = vd.SCHEMA_ADDON_CONFIG(config) + + assert valid_config["uart"] + assert "auto_uart" not in valid_config + + +def test_migration_devices(): + """Migrate devices Type.""" + config = load_json_fixture("basic-addon-config.json") + + config["devices"] = ["test:test:rw", "bla"] + + valid_config = vd.SCHEMA_ADDON_CONFIG(config) + + assert valid_config["devices"] == ["test", "bla"] + + def test_invalid_repository(): """Validate basic config with invalid repositories.""" config = load_json_fixture("basic-addon-config.json") diff --git a/tests/addons/test_options.py b/tests/addons/test_options.py new file mode 100644 index 000000000..ef9f0a5ea --- /dev/null +++ b/tests/addons/test_options.py @@ -0,0 +1,203 @@ +"""Test add-ons schema to UI schema convertion.""" +from pathlib import Path + +import pytest +import voluptuous as vol + +from supervisor.addons.options import AddonOptions, UiOptions +from supervisor.hardware.data import Device + + +def test_simple_schema(coresys): + """Test with simple schema.""" + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, + )({"name": "Pascal", "password": "1234", "fires": True, "alias": "test"}) + + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, + )({"name": "Pascal", "password": "1234", "fires": True}) + + with pytest.raises(vol.error.Invalid): + AddonOptions( + coresys, + {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, + )({"name": "Pascal", "password": "1234", "fires": "hah"}) + + with pytest.raises(vol.error.Invalid): + AddonOptions( + coresys, + {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, + )({"name": "Pascal", "fires": True}) + + +def test_complex_schema_list(coresys): + """Test with complex list schema.""" + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "extend": ["str"]}, + )({"name": "Pascal", "password": "1234", "extend": ["test", "blu"]}) + + with pytest.raises(vol.error.Invalid): + AddonOptions( + coresys, + {"name": "str", "password": "password", "extend": ["str"]}, + )({"name": "Pascal", "password": "1234", "extend": ["test", 1]}) + + with pytest.raises(vol.error.Invalid): + AddonOptions( + coresys, + {"name": "str", "password": "password", "extend": ["str"]}, + )({"name": "Pascal", "password": "1234", "extend": "test"}) + + +def test_complex_schema_dict(coresys): + """Test with complex dict schema.""" + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "extend": {"test": "int"}}, + )({"name": "Pascal", "password": "1234", "extend": {"test": 1}}) + + with pytest.raises(vol.error.Invalid): + AddonOptions( + coresys, + {"name": "str", "password": "password", "extend": {"test": "int"}}, + )({"name": "Pascal", "password": "1234", "extend": {"wrong": 1}}) + + with pytest.raises(vol.error.Invalid): + AddonOptions( + coresys, + {"name": "str", "password": "password", "extend": ["str"]}, + )({"name": "Pascal", "password": "1234", "extend": "test"}) + + +def test_simple_device_schema(coresys): + """Test with simple schema.""" + for device in ( + Device( + "ttyACM0", + Path("/dev/ttyACM0"), + Path("/sys/bus/usb/002"), + "tty", + [], + {"ID_VENDOR": "xy"}, + ), + Device( + "ttyUSB0", + Path("/dev/ttyUSB0"), + Path("/sys/bus/usb/001"), + "tty", + [Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")], + {"ID_VENDOR": "xy"}, + ), + Device("ttyS0", Path("/dev/ttyS0"), Path("/sys/bus/usb/003"), "tty", [], {}), + Device( + "video1", + Path("/dev/video1"), + Path("/sys/bus/usb/004"), + "misc", + [], + {"ID_VENDOR": "xy"}, + ), + ): + coresys.hardware.update_device(device) + + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "input": "device"}, + )({"name": "Pascal", "password": "1234", "input": "/dev/ttyUSB0"}) + + data = AddonOptions( + coresys, + {"name": "str", "password": "password", "input": "device"}, + )({"name": "Pascal", "password": "1234", "input": "/dev/serial/by-id/xyx"}) + assert data["input"] == "/dev/ttyUSB0" + + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "input": "device(subsystem=tty)"}, + )({"name": "Pascal", "password": "1234", "input": "/dev/ttyACM0"}) + + with pytest.raises(vol.error.Invalid): + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "input": "device"}, + )({"name": "Pascal", "password": "1234", "input": "/dev/not_exists"}) + + with pytest.raises(vol.error.Invalid): + assert AddonOptions( + coresys, + {"name": "str", "password": "password", "input": "device(subsystem=tty)"}, + )({"name": "Pascal", "password": "1234", "input": "/dev/video1"}) + + +def test_ui_simple_schema(coresys): + """Test with simple schema.""" + assert UiOptions(coresys)( + {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, + ) == [ + {"name": "name", "required": True, "type": "string"}, + {"format": "password", "name": "password", "required": True, "type": "string"}, + {"name": "fires", "required": True, "type": "boolean"}, + {"name": "alias", "optional": True, "type": "string"}, + ] + + +def test_ui_group_schema(coresys): + """Test with group schema.""" + assert UiOptions(coresys)( + { + "name": "str", + "password": "password", + "fires": "bool", + "alias": "str?", + "extended": {"name": "str", "data": ["str"], "path": "str?"}, + }, + ) == [ + {"name": "name", "required": True, "type": "string"}, + {"format": "password", "name": "password", "required": True, "type": "string"}, + {"name": "fires", "required": True, "type": "boolean"}, + {"name": "alias", "optional": True, "type": "string"}, + { + "multiple": False, + "name": "extended", + "optional": True, + "schema": [ + {"name": "name", "required": True, "type": "string"}, + {"multiple": True, "name": "data", "required": True, "type": "string"}, + {"name": "path", "optional": True, "type": "string"}, + ], + "type": "schema", + }, + ] + + +def test_ui_group_list(coresys): + """Test with group schema.""" + assert UiOptions(coresys)( + { + "name": "str", + "password": "password", + "fires": "bool", + "alias": "str?", + "extended": [{"name": "str", "data": ["str?"], "path": "str?"}], + }, + ) == [ + {"name": "name", "required": True, "type": "string"}, + {"format": "password", "name": "password", "required": True, "type": "string"}, + {"name": "fires", "required": True, "type": "boolean"}, + {"name": "alias", "optional": True, "type": "string"}, + { + "multiple": True, + "name": "extended", + "optional": True, + "schema": [ + {"name": "name", "required": True, "type": "string"}, + {"multiple": True, "name": "data", "optional": True, "type": "string"}, + {"name": "path", "optional": True, "type": "string"}, + ], + "type": "schema", + }, + ] diff --git a/tests/addons/test_options_schema.py b/tests/addons/test_options_schema.py deleted file mode 100644 index aaeba2e7e..000000000 --- a/tests/addons/test_options_schema.py +++ /dev/null @@ -1,71 +0,0 @@ -"""Test add-ons schema to UI schema convertion.""" - -import pytest -import voluptuous as vol - -from supervisor.addons.validate import validate_options - - -def test_simple_schema(coresys): - """Test with simple schema.""" - assert validate_options( - coresys, - {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, - )({"name": "Pascal", "password": "1234", "fires": True, "alias": "test"}) - - assert validate_options( - coresys, - {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, - )({"name": "Pascal", "password": "1234", "fires": True}) - - with pytest.raises(vol.error.Invalid): - validate_options( - coresys, - {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, - )({"name": "Pascal", "password": "1234", "fires": "hah"}) - - with pytest.raises(vol.error.Invalid): - validate_options( - coresys, - {"name": "str", "password": "password", "fires": "bool", "alias": "str?"}, - )({"name": "Pascal", "fires": True}) - - -def test_complex_schema_list(coresys): - """Test with complex list schema.""" - assert validate_options( - coresys, - {"name": "str", "password": "password", "extend": ["str"]}, - )({"name": "Pascal", "password": "1234", "extend": ["test", "blu"]}) - - with pytest.raises(vol.error.Invalid): - validate_options( - coresys, - {"name": "str", "password": "password", "extend": ["str"]}, - )({"name": "Pascal", "password": "1234", "extend": ["test", 1]}) - - with pytest.raises(vol.error.Invalid): - validate_options( - coresys, - {"name": "str", "password": "password", "extend": ["str"]}, - )({"name": "Pascal", "password": "1234", "extend": "test"}) - - -def test_complex_schema_dict(coresys): - """Test with complex dict schema.""" - assert validate_options( - coresys, - {"name": "str", "password": "password", "extend": {"test": "int"}}, - )({"name": "Pascal", "password": "1234", "extend": {"test": 1}}) - - with pytest.raises(vol.error.Invalid): - validate_options( - coresys, - {"name": "str", "password": "password", "extend": {"test": "int"}}, - )({"name": "Pascal", "password": "1234", "extend": {"wrong": 1}}) - - with pytest.raises(vol.error.Invalid): - validate_options( - coresys, - {"name": "str", "password": "password", "extend": ["str"]}, - )({"name": "Pascal", "password": "1234", "extend": "test"}) diff --git a/tests/addons/test_ui_schema.py b/tests/addons/test_ui_schema.py deleted file mode 100644 index c61e429bc..000000000 --- a/tests/addons/test_ui_schema.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Test add-ons schema to UI schema convertion.""" - -from supervisor.addons.validate import schema_ui_options - - -def test_simple_schema(): - """Test with simple schema.""" - assert schema_ui_options( - {"name": "str", "password": "password", "fires": "bool", "alias": "str?"} - ) == [ - {"name": "name", "required": True, "type": "string"}, - {"format": "password", "name": "password", "required": True, "type": "string"}, - {"name": "fires", "required": True, "type": "boolean"}, - {"name": "alias", "optional": True, "type": "string"}, - ] - - -def test_group_schema(): - """Test with group schema.""" - assert schema_ui_options( - { - "name": "str", - "password": "password", - "fires": "bool", - "alias": "str?", - "extended": {"name": "str", "data": ["str"], "path": "str?"}, - } - ) == [ - {"name": "name", "required": True, "type": "string"}, - {"format": "password", "name": "password", "required": True, "type": "string"}, - {"name": "fires", "required": True, "type": "boolean"}, - {"name": "alias", "optional": True, "type": "string"}, - { - "multiple": False, - "name": "extended", - "optional": True, - "schema": [ - {"name": "name", "required": True, "type": "string"}, - {"multiple": True, "name": "data", "required": True, "type": "string"}, - {"name": "path", "optional": True, "type": "string"}, - ], - "type": "schema", - }, - ] - - -def test_group_list(): - """Test with group schema.""" - assert schema_ui_options( - { - "name": "str", - "password": "password", - "fires": "bool", - "alias": "str?", - "extended": [{"name": "str", "data": ["str?"], "path": "str?"}], - } - ) == [ - {"name": "name", "required": True, "type": "string"}, - {"format": "password", "name": "password", "required": True, "type": "string"}, - {"name": "fires", "required": True, "type": "boolean"}, - {"name": "alias", "optional": True, "type": "string"}, - { - "multiple": True, - "name": "extended", - "optional": True, - "schema": [ - {"name": "name", "required": True, "type": "string"}, - {"multiple": True, "name": "data", "optional": True, "type": "string"}, - {"name": "path", "optional": True, "type": "string"}, - ], - "type": "schema", - }, - ] diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 000000000..0b5314750 --- /dev/null +++ b/tests/api/__init__.py @@ -0,0 +1 @@ +"""Test for API calls.""" diff --git a/tests/api/test_hardware.py b/tests/api/test_hardware.py new file mode 100644 index 000000000..f2deb4978 --- /dev/null +++ b/tests/api/test_hardware.py @@ -0,0 +1,37 @@ +"""Test Docker API.""" +from pathlib import Path + +import pytest + +from supervisor.hardware.data import Device + + +@pytest.mark.asyncio +async def test_api_hardware_info(api_client): + """Test docker info api.""" + resp = await api_client.get("/hardware/info") + result = await resp.json() + + assert result["result"] == "ok" + + +@pytest.mark.asyncio +async def test_api_hardware_info_device(api_client, coresys): + """Test docker info api.""" + coresys.hardware.update_device( + Device( + "sda", + Path("/dev/sda"), + Path("/sys/bus/usb/000"), + "sound", + [Path("/dev/serial/by-id/test")], + {"ID_NAME": "xy"}, + ) + ) + + resp = await api_client.get("/hardware/info") + result = await resp.json() + + assert result["result"] == "ok" + assert result["data"]["devices"][-1]["name"] == "sda" + assert result["data"]["devices"][-1]["by_id"] == "/dev/serial/by-id/test" diff --git a/tests/api/test_ingress.py b/tests/api/test_ingress.py index 4da5aea07..fbde36acd 100644 --- a/tests/api/test_ingress.py +++ b/tests/api/test_ingress.py @@ -3,6 +3,8 @@ from unittest.mock import patch import pytest +# pylint: disable=redefined-outer-name + @pytest.fixture def stub_auth(): diff --git a/tests/hardware/__init__.py b/tests/hardware/__init__.py new file mode 100644 index 000000000..9250d0211 --- /dev/null +++ b/tests/hardware/__init__.py @@ -0,0 +1 @@ +"""Tests for hardware.""" diff --git a/tests/hardware/test_data.py b/tests/hardware/test_data.py new file mode 100644 index 000000000..15b104a8c --- /dev/null +++ b/tests/hardware/test_data.py @@ -0,0 +1,22 @@ +"""Test HardwareManager Module.""" +from pathlib import Path + +from supervisor.hardware.data import Device + +# pylint: disable=protected-access + + +def test_device_property(coresys): + """Test device cgroup policy.""" + device = Device( + "ttyACM0", + Path("/dev/ttyACM0"), + Path("/sys/bus/usb/001"), + "tty", + [Path("/dev/serial/by-id/fixed-device")], + {"MAJOR": "5", "MINOR": "10"}, + ) + + assert device.by_id == device.links[0] + assert device.cgroups_major == 5 + assert device.cgroups_minor == 10 diff --git a/tests/hardware/test_helper.py b/tests/hardware/test_helper.py new file mode 100644 index 000000000..e97d03747 --- /dev/null +++ b/tests/hardware/test_helper.py @@ -0,0 +1,61 @@ +"""Test hardware utils.""" +from pathlib import Path +from unittest.mock import MagicMock, patch + +from supervisor.hardware.data import Device + + +def test_have_audio(coresys): + """Test usb device filter.""" + assert not coresys.hardware.helper.support_audio + + coresys.hardware.update_device( + Device( + "sda", + Path("/dev/sda"), + Path("/sys/bus/usb/000"), + "sound", + [], + {"ID_NAME": "xy"}, + ) + ) + + assert coresys.hardware.helper.support_audio + + +def test_hide_virtual_device(coresys): + """Test hidding virtual devices.""" + udev_device = MagicMock() + + udev_device.sys_path = "/sys/devices/platform/test" + assert not coresys.hardware.helper.hide_virtual_device(udev_device) + + udev_device.sys_path = "/sys/devices/virtual/block/test" + assert coresys.hardware.helper.hide_virtual_device(udev_device) + + udev_device.sys_path = "/sys/devices/virtual/tty/test" + assert coresys.hardware.helper.hide_virtual_device(udev_device) + + +def test_free_space(coresys): + """Test free space helper.""" + with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))): + free = coresys.hardware.helper.get_disk_free_space("/data") + + assert free == 2.0 + + +def test_total_space(coresys): + """Test total space helper.""" + with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)): + total = coresys.hardware.helper.get_disk_total_space("/data") + + assert total == 10.0 + + +def test_used_space(coresys): + """Test used space helper.""" + with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)): + used = coresys.hardware.helper.get_disk_used_space("/data") + + assert used == 8.0 diff --git a/tests/hardware/test_module.py b/tests/hardware/test_module.py new file mode 100644 index 000000000..b93ee5906 --- /dev/null +++ b/tests/hardware/test_module.py @@ -0,0 +1,108 @@ +"""Test HardwareManager Module.""" +from pathlib import Path + +from supervisor.hardware.const import UdevSubsystem +from supervisor.hardware.data import Device + +# pylint: disable=protected-access + + +def test_initial_device_initialize(coresys): + """Initialize the local hardware.""" + + assert not coresys.hardware.devices + + coresys.hardware._import_devices() + + assert coresys.hardware.devices + + +def test_device_path_lookup(coresys): + """Test device lookup.""" + for device in ( + Device( + "ttyACM0", + Path("/dev/ttyACM0"), + Path("/sys/bus/usb/001"), + "tty", + [], + {"ID_VENDOR": "xy"}, + ), + Device( + "ttyUSB0", + Path("/dev/ttyUSB0"), + Path("/sys/bus/usb/000"), + "tty", + [Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")], + {"ID_VENDOR": "xy"}, + ), + Device("ttyS0", Path("/dev/ttyS0"), Path("/sys/bus/usb/002"), "tty", [], {}), + Device( + "video1", + Path("/dev/video1"), + Path("/sys/bus/usb/003"), + "misc", + [], + {"ID_VENDOR": "xy"}, + ), + ): + coresys.hardware.update_device(device) + + assert coresys.hardware.exists_device_node(Path("/dev/ttyACM0")) + assert coresys.hardware.exists_device_node(Path("/dev/ttyS1")) + assert coresys.hardware.exists_device_node(Path("/dev/ttyS0")) + assert coresys.hardware.exists_device_node(Path("/dev/serial/by-id/xyx")) + assert coresys.hardware.exists_device_node(Path("/sys/bus/usb/001")) + + assert not coresys.hardware.exists_device_node(Path("/dev/ttyS2")) + assert not coresys.hardware.exists_device_node(Path("/dev/ttyUSB1")) + + +def test_device_filter(coresys): + """Test device filter.""" + for device in ( + Device( + "ttyACM0", + Path("/dev/ttyACM0"), + Path("/sys/bus/usb/000"), + "tty", + [], + {"ID_VENDOR": "xy"}, + ), + Device( + "ttyUSB0", + Path("/dev/ttyUSB0"), + Path("/sys/bus/usb/001"), + "tty", + [Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")], + {"ID_VENDOR": "xy"}, + ), + Device("ttyS0", Path("/dev/ttyS0"), Path("/sys/bus/usb/002"), "tty", [], {}), + Device( + "video1", + Path("/dev/video1"), + Path("/sys/bus/usb/003"), + "misc", + [], + {"ID_VENDOR": "xy"}, + ), + ): + coresys.hardware.update_device(device) + + assert sorted( + [device.path for device in coresys.hardware.filter_devices()] + ) == sorted([device.path for device in coresys.hardware.devices]) + assert sorted( + [ + device.path + for device in coresys.hardware.filter_devices( + subsystem=UdevSubsystem.SERIAL + ) + ] + ) == sorted( + [ + device.path + for device in coresys.hardware.devices + if device.subsystem == UdevSubsystem.SERIAL + ] + ) diff --git a/tests/hardware/test_policy.py b/tests/hardware/test_policy.py new file mode 100644 index 000000000..8183212b2 --- /dev/null +++ b/tests/hardware/test_policy.py @@ -0,0 +1,29 @@ +"""Test HardwareManager Module.""" +from pathlib import Path + +from supervisor.hardware.const import PolicyGroup +from supervisor.hardware.data import Device + +# pylint: disable=protected-access + + +def test_device_policy(coresys): + """Test device cgroup policy.""" + device = Device( + "ttyACM0", + Path("/dev/ttyACM0"), + Path("/sys/bus/usb/001"), + "tty", + [], + {"MAJOR": "5", "MINOR": "10"}, + ) + + assert coresys.hardware.policy.get_cgroups_rule(device) == "c 5:10 rwm" + + +def test_policy_group(coresys): + """Test policy group generator.""" + assert coresys.hardware.policy.get_cgroups_rules(PolicyGroup.VIDEO) == [ + "c 239:* rwm", + "c 29:* rwm", + ] diff --git a/tests/misc/test_hardware.py b/tests/misc/test_hardware.py deleted file mode 100644 index 246374991..000000000 --- a/tests/misc/test_hardware.py +++ /dev/null @@ -1,131 +0,0 @@ -"""Test hardware utils.""" -from pathlib import Path -from unittest.mock import PropertyMock, patch - -from supervisor.misc.hardware import Device, Hardware - - -def test_read_all_devices(): - """Test to read all devices.""" - system = Hardware() - - assert system.devices - - -def test_video_devices(): - """Test video device filter.""" - system = Hardware() - device_list = [ - Device("test-dev", Path("/dev/test-dev"), "xy", [], {}), - Device("vchiq", Path("/dev/vchiq"), "xy", [], {}), - Device("cec0", Path("/dev/cec0"), "xy", [], {}), - Device("video1", Path("/dev/video1"), "xy", [], {}), - ] - - with patch( - "supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock - ) as mock_device: - mock_device.return_value = device_list - - assert [device.name for device in system.video_devices] == [ - "vchiq", - "cec0", - "video1", - ] - - -def test_serial_devices(): - """Test serial device filter.""" - system = Hardware() - device_list = [ - Device("ttyACM0", Path("/dev/ttyACM0"), "tty", [], {"ID_VENDOR": "xy"}), - Device( - "ttyUSB0", - Path("/dev/ttyUSB0"), - "tty", - [Path("/dev/ttyS1"), Path("/dev/serial/by-id/xyx")], - {"ID_VENDOR": "xy"}, - ), - Device("ttyS0", Path("/dev/ttyS0"), "tty", [], {}), - Device("video1", Path("/dev/video1"), "misc", [], {"ID_VENDOR": "xy"}), - ] - - with patch( - "supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock - ) as mock_device: - mock_device.return_value = device_list - - assert [(device.name, device.links) for device in system.serial_devices] == [ - ("ttyACM0", []), - ("ttyUSB0", [Path("/dev/serial/by-id/xyx")]), - ("ttyS0", []), - ] - - -def test_usb_devices(): - """Test usb device filter.""" - system = Hardware() - device_list = [ - Device("usb1", Path("/dev/bus/usb/1/1"), "usb", [], {}), - Device("usb2", Path("/dev/bus/usb/2/1"), "usb", [], {}), - Device("cec0", Path("/dev/cec0"), "xy", [], {}), - Device("video1", Path("/dev/video1"), "xy", [], {}), - ] - - with patch( - "supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock - ) as mock_device: - mock_device.return_value = device_list - - assert [device.name for device in system.usb_devices] == [ - "usb1", - "usb2", - ] - - -def test_block_devices(): - """Test usb device filter.""" - system = Hardware() - device_list = [ - Device("sda", Path("/dev/sda"), "block", [], {"ID_NAME": "xy"}), - Device("sdb", Path("/dev/sdb"), "block", [], {"ID_NAME": "xy"}), - Device("cec0", Path("/dev/cec0"), "xy", [], {}), - Device("video1", Path("/dev/video1"), "xy", [], {"ID_NAME": "xy"}), - ] - - with patch( - "supervisor.misc.hardware.Hardware.devices", new_callable=PropertyMock - ) as mock_device: - mock_device.return_value = device_list - - assert [device.name for device in system.disk_devices] == [ - "sda", - "sdb", - ] - - -def test_free_space(): - """Test free space helper.""" - system = Hardware() - with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))): - free = system.get_disk_free_space("/data") - - assert free == 2.0 - - -def test_total_space(): - """Test total space helper.""" - system = Hardware() - with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)): - total = system.get_disk_total_space("/data") - - assert total == 10.0 - - -def test_used_space(): - """Test used space helper.""" - system = Hardware() - with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)): - used = system.get_disk_used_space("/data") - - assert used == 8.0 diff --git a/tests/resolution/__init__.py b/tests/resolution/__init__.py new file mode 100644 index 000000000..13cbc1579 --- /dev/null +++ b/tests/resolution/__init__.py @@ -0,0 +1 @@ +"""Test for Resolution.""" diff --git a/tests/resolution/fixup/test_store_execute_reload.py b/tests/resolution/fixup/test_store_execute_reload.py index 177bf5192..3048cffcc 100644 --- a/tests/resolution/fixup/test_store_execute_reload.py +++ b/tests/resolution/fixup/test_store_execute_reload.py @@ -1,6 +1,6 @@ """Test evaluation base.""" # pylint: disable=import-error,protected-access -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch from supervisor.coresys import CoreSys from supervisor.resolution.const import ContextType, IssueType, SuggestionType @@ -24,7 +24,8 @@ async def test_fixup(coresys: CoreSys): mock_repositorie = AsyncMock() coresys.store.repositories["test"] = mock_repositorie - await store_execute_reload() + with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))): + await store_execute_reload() assert mock_repositorie.load.called assert mock_repositorie.update.called diff --git a/tests/resolution/fixup/test_store_execute_reset.py b/tests/resolution/fixup/test_store_execute_reset.py index 7c2ff331b..c307581d0 100644 --- a/tests/resolution/fixup/test_store_execute_reset.py +++ b/tests/resolution/fixup/test_store_execute_reset.py @@ -1,7 +1,7 @@ """Test evaluation base.""" # pylint: disable=import-error,protected-access from pathlib import Path -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch from supervisor.coresys import CoreSys from supervisor.resolution.const import ContextType, IssueType, SuggestionType @@ -30,7 +30,8 @@ async def test_fixup(coresys: CoreSys, tmp_path): mock_repositorie.git.path = test_repo coresys.store.repositories["test"] = mock_repositorie - await store_execute_reset() + with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))): + await store_execute_reset() assert not test_repo.exists() assert mock_repositorie.load.called