mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-13 20:26:29 +00:00
Fix handling with full access / blocked devices (#2581)
* Fix handling with full access / blocked devices * address comment * Use name * Add validation warning * add GPIO check too * remove warning * return directly * fix tests
This commit is contained in:
parent
2145bbea81
commit
a0ac8ced31
@ -66,12 +66,8 @@ def rating_security(addon: AddonModel) -> int:
|
||||
if addon.host_pid:
|
||||
rating += -2
|
||||
|
||||
# Full Access
|
||||
if addon.with_full_access:
|
||||
rating += -2
|
||||
|
||||
# Docker Access
|
||||
if addon.access_docker_api:
|
||||
# Docker Access & full Access
|
||||
if addon.access_docker_api or addon.with_full_access:
|
||||
rating = 1
|
||||
|
||||
return max(min(6, rating), 1)
|
||||
|
@ -136,6 +136,26 @@ RE_MACHINE = re.compile(
|
||||
)
|
||||
|
||||
|
||||
def _warn_addon_config(config: Dict[str, Any]):
|
||||
"""Warn about miss configs."""
|
||||
name = config.get(ATTR_NAME)
|
||||
if not name:
|
||||
raise vol.Invalid("Invalid Add-on config!")
|
||||
|
||||
if config.get(ATTR_FULL_ACCESS, False) and (
|
||||
config.get(ATTR_DEVICES)
|
||||
or config.get(ATTR_UART)
|
||||
or config.get(ATTR_USB)
|
||||
or config.get(ATTR_GPIO)
|
||||
):
|
||||
_LOGGER.warning(
|
||||
"Add-on have full device access, and selective device access in the configuration. Please report this to the maintainer of %s",
|
||||
name,
|
||||
)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def _migrate_addon_config(protocol=False):
|
||||
"""Migrate addon config."""
|
||||
|
||||
@ -279,7 +299,9 @@ _SCHEMA_ADDON_CONFIG = vol.Schema(
|
||||
extra=vol.REMOVE_EXTRA,
|
||||
)
|
||||
|
||||
SCHEMA_ADDON_CONFIG = vol.All(_migrate_addon_config(True), _SCHEMA_ADDON_CONFIG)
|
||||
SCHEMA_ADDON_CONFIG = vol.All(
|
||||
_migrate_addon_config(True), _warn_addon_config, _SCHEMA_ADDON_CONFIG
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=no-value-for-parameter
|
||||
|
@ -94,11 +94,6 @@ class DockerAddon(DockerInterface):
|
||||
"""Return name of Docker container."""
|
||||
return f"addon_{self.addon.slug}"
|
||||
|
||||
@property
|
||||
def full_access(self) -> bool:
|
||||
"""Return True if full access is enabled."""
|
||||
return not self.addon.protected and self.addon.with_full_access
|
||||
|
||||
@property
|
||||
def environment(self) -> Dict[str, Optional[str]]:
|
||||
"""Return environment for Docker add-on."""
|
||||
@ -130,11 +125,26 @@ class DockerAddon(DockerInterface):
|
||||
device = self.sys_hardware.get_by_path(device_path)
|
||||
except HardwareNotFound:
|
||||
_LOGGER.debug("Ignore static device path %s", device_path)
|
||||
else:
|
||||
rules.add(self.sys_hardware.policy.get_cgroups_rule(device))
|
||||
|
||||
# Check access
|
||||
if not self.sys_hardware.policy.allowed_for_access(device):
|
||||
_LOGGER.error(
|
||||
"Add-on %s try to access to blocked device %s!",
|
||||
self.addon.name,
|
||||
device.name,
|
||||
)
|
||||
continue
|
||||
rules.add(self.sys_hardware.policy.get_cgroups_rule(device))
|
||||
|
||||
# Attach correct cgroups for devices
|
||||
for device in self.addon.devices:
|
||||
if not self.sys_hardware.policy.allowed_for_access(device):
|
||||
_LOGGER.error(
|
||||
"Add-on %s try to access to blocked device %s!",
|
||||
self.addon.name,
|
||||
device.name,
|
||||
)
|
||||
continue
|
||||
rules.add(self.sys_hardware.policy.get_cgroups_rule(device))
|
||||
|
||||
# Video
|
||||
@ -153,6 +163,10 @@ class DockerAddon(DockerInterface):
|
||||
if self.addon.with_usb:
|
||||
rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.USB))
|
||||
|
||||
# Full Access
|
||||
if not self.addon.protected and self.addon.with_full_access:
|
||||
return [self.sys_hardware.policy.get_full_access()]
|
||||
|
||||
# Return None if no rules is present
|
||||
if rules:
|
||||
return list(rules)
|
||||
@ -394,7 +408,6 @@ class DockerAddon(DockerInterface):
|
||||
hostname=self.addon.hostname,
|
||||
detach=True,
|
||||
init=self.addon.default_init,
|
||||
privileged=self.full_access,
|
||||
stdin_open=self.addon.with_stdin,
|
||||
network_mode=self.network_mode,
|
||||
pid_mode=self.pid_mode,
|
||||
|
120
supervisor/hardware/disk.py
Normal file
120
supervisor/hardware/disk.py
Normal file
@ -0,0 +1,120 @@
|
||||
"""Read disk hardware info from system."""
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from typing import Union
|
||||
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from .const import UdevSubsystem
|
||||
from .data import Device
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
_MOUNTINFO: Path = Path("/proc/self/mountinfo")
|
||||
_BLOCK_DEVICE_CLASS = "/sys/class/block/{}"
|
||||
_BLOCK_DEVICE_EMMC_LIFE_TIME = "/sys/block/{}/device/life_time"
|
||||
|
||||
|
||||
class HwDisk(CoreSysAttributes):
|
||||
"""Representation of an interface to disk utils."""
|
||||
|
||||
def __init__(self, coresys: CoreSys):
|
||||
"""Init hardware object."""
|
||||
self.coresys = coresys
|
||||
|
||||
def is_system_partition(self, device: Device) -> bool:
|
||||
"""Return true if this is a system disk/partition."""
|
||||
if device.subsystem != UdevSubsystem.DISK:
|
||||
return False
|
||||
|
||||
if device.attributes.get("ID_FS_LABEL", "").startswith("hassos"):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_disk_total_space(self, path: Union[str, Path]) -> float:
|
||||
"""Return total space (GiB) on disk for path."""
|
||||
total, _, _ = shutil.disk_usage(path)
|
||||
return round(total / (1024.0 ** 3), 1)
|
||||
|
||||
def get_disk_used_space(self, path: Union[str, Path]) -> float:
|
||||
"""Return used space (GiB) on disk for path."""
|
||||
_, used, _ = shutil.disk_usage(path)
|
||||
return round(used / (1024.0 ** 3), 1)
|
||||
|
||||
def get_disk_free_space(self, path: Union[str, Path]) -> float:
|
||||
"""Return free space (GiB) on disk for path."""
|
||||
_, _, free = shutil.disk_usage(path)
|
||||
return round(free / (1024.0 ** 3), 1)
|
||||
|
||||
def _get_mountinfo(self, path: str) -> str:
|
||||
mountinfo = _MOUNTINFO.read_text()
|
||||
for line in mountinfo.splitlines():
|
||||
mountinfoarr = line.split()
|
||||
if mountinfoarr[4] == path:
|
||||
return mountinfoarr
|
||||
return None
|
||||
|
||||
def _get_mount_source(self, path: str) -> str:
|
||||
mountinfoarr = self._get_mountinfo(path)
|
||||
|
||||
if mountinfoarr is None:
|
||||
return None
|
||||
|
||||
# Find optional field separator
|
||||
optionsep = 6
|
||||
while mountinfoarr[optionsep] != "-":
|
||||
optionsep += 1
|
||||
return mountinfoarr[optionsep + 2]
|
||||
|
||||
def _try_get_emmc_life_time(self, device_name: str) -> float:
|
||||
# Get eMMC life_time
|
||||
life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name))
|
||||
|
||||
if not life_time_path.exists():
|
||||
return None
|
||||
|
||||
# JEDEC health status DEVICE_LIFE_TIME_EST_TYP_A/B
|
||||
emmc_life_time = life_time_path.read_text().split()
|
||||
|
||||
if len(emmc_life_time) < 2:
|
||||
return None
|
||||
|
||||
# Type B life time estimate represents the user partition.
|
||||
life_time_value = int(emmc_life_time[1], 16)
|
||||
|
||||
# 0=Not defined, 1-10=0-100% device life time used, 11=Exceeded
|
||||
if life_time_value == 0:
|
||||
return None
|
||||
|
||||
if life_time_value == 11:
|
||||
logging.warning(
|
||||
"eMMC reports that its estimated life-time has been exceeded!"
|
||||
)
|
||||
return 100.0
|
||||
|
||||
# Return the pessimistic estimate (0x02 -> 10%-20%, return 20%)
|
||||
return life_time_value * 10.0
|
||||
|
||||
def get_disk_life_time(self, path: Union[str, Path]) -> float:
|
||||
"""Return life time estimate of the underlying SSD drive."""
|
||||
mount_source = self._get_mount_source(str(path))
|
||||
if mount_source == "overlay":
|
||||
return None
|
||||
|
||||
mount_source_path = Path(mount_source)
|
||||
if not mount_source_path.is_block_device():
|
||||
return None
|
||||
|
||||
# This looks a bit funky but it is more or less what lsblk is doing to get
|
||||
# the parent dev reliably
|
||||
|
||||
# Get class device...
|
||||
mount_source_device_part = Path(
|
||||
_BLOCK_DEVICE_CLASS.format(mount_source_path.name)
|
||||
)
|
||||
|
||||
# ... resolve symlink and get parent device from that path.
|
||||
mount_source_device_name = mount_source_device_part.resolve().parts[-2]
|
||||
|
||||
# Currently only eMMC block devices supported
|
||||
return self._try_get_emmc_life_time(mount_source_device_name)
|
@ -3,8 +3,7 @@ from datetime import datetime
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import re
|
||||
import shutil
|
||||
from typing import Optional, Union
|
||||
from typing import Optional
|
||||
|
||||
import pyudev
|
||||
|
||||
@ -19,10 +18,6 @@ _RE_BOOT_TIME: re.Pattern = re.compile(r"btime (\d+)")
|
||||
|
||||
_RE_HIDE_SYSFS: re.Pattern = re.compile(r"/sys/devices/virtual/(?:tty|block|vc)/.*")
|
||||
|
||||
_MOUNTINFO: Path = Path("/proc/self/mountinfo")
|
||||
_BLOCK_DEVICE_CLASS = "/sys/class/block/{}"
|
||||
_BLOCK_DEVICE_EMMC_LIFE_TIME = "/sys/block/{}/device/life_time"
|
||||
|
||||
|
||||
class HwHelper(CoreSysAttributes):
|
||||
"""Representation of an interface to procfs, sysfs and udev."""
|
||||
@ -67,91 +62,3 @@ class HwHelper(CoreSysAttributes):
|
||||
def hide_virtual_device(self, udev_device: pyudev.Device) -> bool:
|
||||
"""Small helper to hide not needed Devices."""
|
||||
return _RE_HIDE_SYSFS.match(udev_device.sys_path) is not None
|
||||
|
||||
def get_disk_total_space(self, path: Union[str, Path]) -> float:
|
||||
"""Return total space (GiB) on disk for path."""
|
||||
total, _, _ = shutil.disk_usage(path)
|
||||
return round(total / (1024.0 ** 3), 1)
|
||||
|
||||
def get_disk_used_space(self, path: Union[str, Path]) -> float:
|
||||
"""Return used space (GiB) on disk for path."""
|
||||
_, used, _ = shutil.disk_usage(path)
|
||||
return round(used / (1024.0 ** 3), 1)
|
||||
|
||||
def get_disk_free_space(self, path: Union[str, Path]) -> float:
|
||||
"""Return free space (GiB) on disk for path."""
|
||||
_, _, free = shutil.disk_usage(path)
|
||||
return round(free / (1024.0 ** 3), 1)
|
||||
|
||||
def _get_mountinfo(self, path: str) -> str:
|
||||
mountinfo = _MOUNTINFO.read_text()
|
||||
for line in mountinfo.splitlines():
|
||||
mountinfoarr = line.split()
|
||||
if mountinfoarr[4] == path:
|
||||
return mountinfoarr
|
||||
return None
|
||||
|
||||
def _get_mount_source(self, path: str) -> str:
|
||||
mountinfoarr = self._get_mountinfo(path)
|
||||
|
||||
if mountinfoarr is None:
|
||||
return None
|
||||
|
||||
# Find optional field separator
|
||||
optionsep = 6
|
||||
while mountinfoarr[optionsep] != "-":
|
||||
optionsep += 1
|
||||
return mountinfoarr[optionsep + 2]
|
||||
|
||||
def _try_get_emmc_life_time(self, device_name: str) -> float:
|
||||
# Get eMMC life_time
|
||||
life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name))
|
||||
|
||||
if not life_time_path.exists():
|
||||
return None
|
||||
|
||||
# JEDEC health status DEVICE_LIFE_TIME_EST_TYP_A/B
|
||||
emmc_life_time = life_time_path.read_text().split()
|
||||
|
||||
if len(emmc_life_time) < 2:
|
||||
return None
|
||||
|
||||
# Type B life time estimate represents the user partition.
|
||||
life_time_value = int(emmc_life_time[1], 16)
|
||||
|
||||
# 0=Not defined, 1-10=0-100% device life time used, 11=Exceeded
|
||||
if life_time_value == 0:
|
||||
return None
|
||||
|
||||
if life_time_value == 11:
|
||||
logging.warning(
|
||||
"eMMC reports that its estimated life-time has been exceeded!"
|
||||
)
|
||||
return 100.0
|
||||
|
||||
# Return the pessimistic estimate (0x02 -> 10%-20%, return 20%)
|
||||
return life_time_value * 10.0
|
||||
|
||||
def get_disk_life_time(self, path: Union[str, Path]) -> float:
|
||||
"""Return life time estimate of the underlying SSD drive."""
|
||||
mount_source = self._get_mount_source(str(path))
|
||||
if mount_source == "overlay":
|
||||
return None
|
||||
|
||||
mount_source_path = Path(mount_source)
|
||||
if not mount_source_path.is_block_device():
|
||||
return None
|
||||
|
||||
# This looks a bit funky but it is more or less what lsblk is doing to get
|
||||
# the parent dev reliably
|
||||
|
||||
# Get class device...
|
||||
mount_source_device_part = Path(
|
||||
_BLOCK_DEVICE_CLASS.format(mount_source_path.name)
|
||||
)
|
||||
|
||||
# ... resolve symlink and get parent device from that path.
|
||||
mount_source_device_name = mount_source_device_part.resolve().parts[-2]
|
||||
|
||||
# Currently only eMMC block devices supported
|
||||
return self._try_get_emmc_life_time(mount_source_device_name)
|
||||
|
@ -10,6 +10,7 @@ from supervisor.hardware.const import UdevSubsystem
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import HardwareNotFound
|
||||
from .data import Device
|
||||
from .disk import HwDisk
|
||||
from .helper import HwHelper
|
||||
from .monitor import HwMonitor
|
||||
from .policy import HwPolicy
|
||||
@ -29,6 +30,7 @@ class HardwareManager(CoreSysAttributes):
|
||||
self._montior: HwMonitor = HwMonitor(coresys)
|
||||
self._helper: HwHelper = HwHelper(coresys)
|
||||
self._policy: HwPolicy = HwPolicy(coresys)
|
||||
self._disk: HwDisk = HwDisk(coresys)
|
||||
|
||||
@property
|
||||
def monitor(self) -> HwMonitor:
|
||||
@ -45,6 +47,11 @@ class HardwareManager(CoreSysAttributes):
|
||||
"""Return Hardware policy instance."""
|
||||
return self._policy
|
||||
|
||||
@property
|
||||
def disk(self) -> HwDisk:
|
||||
"""Return Hardware disk instance."""
|
||||
return self._disk
|
||||
|
||||
@property
|
||||
def devices(self) -> List[Device]:
|
||||
"""Return List of devices."""
|
||||
|
@ -3,7 +3,7 @@ import logging
|
||||
from typing import Dict, List
|
||||
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from .const import PolicyGroup
|
||||
from .const import PolicyGroup, UdevSubsystem
|
||||
from .data import Device
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
@ -63,4 +63,17 @@ class HwPolicy(CoreSysAttributes):
|
||||
|
||||
def get_cgroups_rule(self, device: Device) -> str:
|
||||
"""Generate a cgroups rule for given device."""
|
||||
return f"c {device.cgroups_major}:{device.cgroups_minor} rwm"
|
||||
cgroup_type = "c" if device.subsystem != UdevSubsystem.DISK else "b"
|
||||
|
||||
return f"{cgroup_type} {device.cgroups_major}:{device.cgroups_minor} rwm"
|
||||
|
||||
def get_full_access(self) -> str:
|
||||
"""Get full access to all devices."""
|
||||
return "a *:* rwm"
|
||||
|
||||
def allowed_for_access(self, device: Device) -> bool:
|
||||
"""Return True if allow to access to this device."""
|
||||
if self.sys_hardware.disk.is_system_partition(device):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
@ -54,28 +54,28 @@ class InfoCenter(CoreSysAttributes):
|
||||
@property
|
||||
def total_space(self) -> float:
|
||||
"""Return total space (GiB) on disk for supervisor data directory."""
|
||||
return self.sys_hardware.helper.get_disk_total_space(
|
||||
return self.sys_hardware.disk.get_disk_total_space(
|
||||
self.coresys.config.path_supervisor
|
||||
)
|
||||
|
||||
@property
|
||||
def used_space(self) -> float:
|
||||
"""Return used space (GiB) on disk for supervisor data directory."""
|
||||
return self.sys_hardware.helper.get_disk_used_space(
|
||||
return self.sys_hardware.disk.get_disk_used_space(
|
||||
self.coresys.config.path_supervisor
|
||||
)
|
||||
|
||||
@property
|
||||
def free_space(self) -> float:
|
||||
"""Return available space (GiB) on disk for supervisor data directory."""
|
||||
return self.sys_hardware.helper.get_disk_free_space(
|
||||
return self.sys_hardware.disk.get_disk_free_space(
|
||||
self.coresys.config.path_supervisor
|
||||
)
|
||||
|
||||
@property
|
||||
def disk_life_time(self) -> float:
|
||||
"""Return the estimated life-time usage (in %) of the SSD storing the data directory."""
|
||||
return self.sys_hardware.helper.get_disk_life_time(
|
||||
return self.sys_hardware.disk.get_disk_life_time(
|
||||
self.coresys.config.path_supervisor
|
||||
)
|
||||
|
||||
|
81
tests/hardware/test_disk.py
Normal file
81
tests/hardware/test_disk.py
Normal file
@ -0,0 +1,81 @@
|
||||
"""Test hardware utils."""
|
||||
# pylint: disable=protected-access
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from supervisor.hardware.data import Device
|
||||
|
||||
|
||||
def test_system_partition(coresys):
|
||||
"""Test if it is a system partition."""
|
||||
disk = Device(
|
||||
"sda0",
|
||||
Path("/dev/sda0"),
|
||||
Path("/sys/bus/usb/001"),
|
||||
"block",
|
||||
[],
|
||||
{"MAJOR": "5", "MINOR": "10"},
|
||||
)
|
||||
|
||||
assert not coresys.hardware.disk.is_system_partition(disk)
|
||||
|
||||
disk = Device(
|
||||
"sda0",
|
||||
Path("/dev/sda0"),
|
||||
Path("/sys/bus/usb/001"),
|
||||
"block",
|
||||
[],
|
||||
{"MAJOR": "5", "MINOR": "10", "ID_FS_LABEL": "hassos-overlay"},
|
||||
)
|
||||
|
||||
assert coresys.hardware.disk.is_system_partition(disk)
|
||||
|
||||
|
||||
def test_free_space(coresys):
|
||||
"""Test free space helper."""
|
||||
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))):
|
||||
free = coresys.hardware.disk.get_disk_free_space("/data")
|
||||
|
||||
assert free == 2.0
|
||||
|
||||
|
||||
def test_total_space(coresys):
|
||||
"""Test total space helper."""
|
||||
with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)):
|
||||
total = coresys.hardware.disk.get_disk_total_space("/data")
|
||||
|
||||
assert total == 10.0
|
||||
|
||||
|
||||
def test_used_space(coresys):
|
||||
"""Test used space helper."""
|
||||
with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)):
|
||||
used = coresys.hardware.disk.get_disk_used_space("/data")
|
||||
|
||||
assert used == 8.0
|
||||
|
||||
|
||||
def test_get_mountinfo(coresys):
|
||||
"""Test mountinfo helper."""
|
||||
mountinfo = coresys.hardware.disk._get_mountinfo("/proc")
|
||||
assert mountinfo[4] == "/proc"
|
||||
|
||||
|
||||
def test_get_mount_source(coresys):
|
||||
"""Test mount source helper."""
|
||||
# For /proc the mount source is known to be "proc"...
|
||||
mount_source = coresys.hardware.disk._get_mount_source("/proc")
|
||||
assert mount_source == "proc"
|
||||
|
||||
|
||||
def test_try_get_emmc_life_time(coresys, tmp_path):
|
||||
"""Test eMMC life time helper."""
|
||||
fake_life_time = tmp_path / "fake-mmcblk0-lifetime"
|
||||
fake_life_time.write_text("0x01 0x02\n")
|
||||
|
||||
with patch(
|
||||
"supervisor.hardware.disk._BLOCK_DEVICE_EMMC_LIFE_TIME",
|
||||
str(tmp_path / "fake-{}-lifetime"),
|
||||
):
|
||||
value = coresys.hardware.disk._try_get_emmc_life_time("mmcblk0")
|
||||
assert value == 20.0
|
@ -1,7 +1,7 @@
|
||||
"""Test hardware utils."""
|
||||
# pylint: disable=protected-access
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from supervisor.hardware.data import Device
|
||||
|
||||
@ -75,53 +75,3 @@ def test_hide_virtual_device(coresys):
|
||||
|
||||
udev_device.sys_path = "/sys/devices/virtual/vc/vcs1"
|
||||
assert coresys.hardware.helper.hide_virtual_device(udev_device)
|
||||
|
||||
|
||||
def test_free_space(coresys):
|
||||
"""Test free space helper."""
|
||||
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))):
|
||||
free = coresys.hardware.helper.get_disk_free_space("/data")
|
||||
|
||||
assert free == 2.0
|
||||
|
||||
|
||||
def test_total_space(coresys):
|
||||
"""Test total space helper."""
|
||||
with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)):
|
||||
total = coresys.hardware.helper.get_disk_total_space("/data")
|
||||
|
||||
assert total == 10.0
|
||||
|
||||
|
||||
def test_used_space(coresys):
|
||||
"""Test used space helper."""
|
||||
with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)):
|
||||
used = coresys.hardware.helper.get_disk_used_space("/data")
|
||||
|
||||
assert used == 8.0
|
||||
|
||||
|
||||
def test_get_mountinfo(coresys):
|
||||
"""Test mountinfo helper."""
|
||||
mountinfo = coresys.hardware.helper._get_mountinfo("/proc")
|
||||
assert mountinfo[4] == "/proc"
|
||||
|
||||
|
||||
def test_get_mount_source(coresys):
|
||||
"""Test mount source helper."""
|
||||
# For /proc the mount source is known to be "proc"...
|
||||
mount_source = coresys.hardware.helper._get_mount_source("/proc")
|
||||
assert mount_source == "proc"
|
||||
|
||||
|
||||
def test_try_get_emmc_life_time(coresys, tmp_path):
|
||||
"""Test eMMC life time helper."""
|
||||
fake_life_time = tmp_path / "fake-mmcblk0-lifetime"
|
||||
fake_life_time.write_text("0x01 0x02\n")
|
||||
|
||||
with patch(
|
||||
"supervisor.hardware.helper._BLOCK_DEVICE_EMMC_LIFE_TIME",
|
||||
str(tmp_path / "fake-{}-lifetime"),
|
||||
):
|
||||
value = coresys.hardware.helper._try_get_emmc_life_time("mmcblk0")
|
||||
assert value == 20.0
|
||||
|
@ -20,6 +20,17 @@ def test_device_policy(coresys):
|
||||
|
||||
assert coresys.hardware.policy.get_cgroups_rule(device) == "c 5:10 rwm"
|
||||
|
||||
disk = Device(
|
||||
"sda0",
|
||||
Path("/dev/sda0"),
|
||||
Path("/sys/bus/usb/001"),
|
||||
"block",
|
||||
[],
|
||||
{"MAJOR": "5", "MINOR": "10"},
|
||||
)
|
||||
|
||||
assert coresys.hardware.policy.get_cgroups_rule(disk) == "b 5:10 rwm"
|
||||
|
||||
|
||||
def test_policy_group(coresys):
|
||||
"""Test policy group generator."""
|
||||
@ -46,3 +57,29 @@ def test_device_in_policy(coresys):
|
||||
|
||||
assert coresys.hardware.policy.is_match_cgroup(PolicyGroup.UART, device)
|
||||
assert not coresys.hardware.policy.is_match_cgroup(PolicyGroup.GPIO, device)
|
||||
|
||||
|
||||
def test_allowed_access(coresys):
|
||||
"""Test if is allow to access for device."""
|
||||
|
||||
disk = Device(
|
||||
"sda0",
|
||||
Path("/dev/sda0"),
|
||||
Path("/sys/bus/usb/001"),
|
||||
"block",
|
||||
[],
|
||||
{"MAJOR": "5", "MINOR": "10", "ID_FS_LABEL": "hassos-overlay"},
|
||||
)
|
||||
|
||||
assert not coresys.hardware.policy.allowed_for_access(disk)
|
||||
|
||||
device = Device(
|
||||
"ttyACM0",
|
||||
Path("/dev/ttyACM0"),
|
||||
Path("/sys/bus/usb/001"),
|
||||
"tty",
|
||||
[],
|
||||
{"MAJOR": "204", "MINOR": "10"},
|
||||
)
|
||||
|
||||
assert coresys.hardware.policy.allowed_for_access(device)
|
||||
|
Loading…
x
Reference in New Issue
Block a user