diff --git a/supervisor/addons/utils.py b/supervisor/addons/utils.py index dfdea5002..fecf8062f 100644 --- a/supervisor/addons/utils.py +++ b/supervisor/addons/utils.py @@ -66,12 +66,8 @@ def rating_security(addon: AddonModel) -> int: if addon.host_pid: rating += -2 - # Full Access - if addon.with_full_access: - rating += -2 - - # Docker Access - if addon.access_docker_api: + # Docker Access & full Access + if addon.access_docker_api or addon.with_full_access: rating = 1 return max(min(6, rating), 1) diff --git a/supervisor/addons/validate.py b/supervisor/addons/validate.py index c86df3861..80c4a9df4 100644 --- a/supervisor/addons/validate.py +++ b/supervisor/addons/validate.py @@ -136,6 +136,26 @@ RE_MACHINE = re.compile( ) +def _warn_addon_config(config: Dict[str, Any]): + """Warn about miss configs.""" + name = config.get(ATTR_NAME) + if not name: + raise vol.Invalid("Invalid Add-on config!") + + if config.get(ATTR_FULL_ACCESS, False) and ( + config.get(ATTR_DEVICES) + or config.get(ATTR_UART) + or config.get(ATTR_USB) + or config.get(ATTR_GPIO) + ): + _LOGGER.warning( + "Add-on have full device access, and selective device access in the configuration. Please report this to the maintainer of %s", + name, + ) + + return config + + def _migrate_addon_config(protocol=False): """Migrate addon config.""" @@ -279,7 +299,9 @@ _SCHEMA_ADDON_CONFIG = vol.Schema( extra=vol.REMOVE_EXTRA, ) -SCHEMA_ADDON_CONFIG = vol.All(_migrate_addon_config(True), _SCHEMA_ADDON_CONFIG) +SCHEMA_ADDON_CONFIG = vol.All( + _migrate_addon_config(True), _warn_addon_config, _SCHEMA_ADDON_CONFIG +) # pylint: disable=no-value-for-parameter diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index 3901dd2fd..cf74a5d00 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -94,11 +94,6 @@ class DockerAddon(DockerInterface): """Return name of Docker container.""" return f"addon_{self.addon.slug}" - @property - def full_access(self) -> bool: - """Return True if full access is enabled.""" - return not self.addon.protected and self.addon.with_full_access - @property def environment(self) -> Dict[str, Optional[str]]: """Return environment for Docker add-on.""" @@ -130,11 +125,26 @@ class DockerAddon(DockerInterface): device = self.sys_hardware.get_by_path(device_path) except HardwareNotFound: _LOGGER.debug("Ignore static device path %s", device_path) - else: - rules.add(self.sys_hardware.policy.get_cgroups_rule(device)) + + # Check access + if not self.sys_hardware.policy.allowed_for_access(device): + _LOGGER.error( + "Add-on %s try to access to blocked device %s!", + self.addon.name, + device.name, + ) + continue + rules.add(self.sys_hardware.policy.get_cgroups_rule(device)) # Attach correct cgroups for devices for device in self.addon.devices: + if not self.sys_hardware.policy.allowed_for_access(device): + _LOGGER.error( + "Add-on %s try to access to blocked device %s!", + self.addon.name, + device.name, + ) + continue rules.add(self.sys_hardware.policy.get_cgroups_rule(device)) # Video @@ -153,6 +163,10 @@ class DockerAddon(DockerInterface): if self.addon.with_usb: rules.update(self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.USB)) + # Full Access + if not self.addon.protected and self.addon.with_full_access: + return [self.sys_hardware.policy.get_full_access()] + # Return None if no rules is present if rules: return list(rules) @@ -394,7 +408,6 @@ class DockerAddon(DockerInterface): hostname=self.addon.hostname, detach=True, init=self.addon.default_init, - privileged=self.full_access, stdin_open=self.addon.with_stdin, network_mode=self.network_mode, pid_mode=self.pid_mode, diff --git a/supervisor/hardware/disk.py b/supervisor/hardware/disk.py new file mode 100644 index 000000000..71fec8e01 --- /dev/null +++ b/supervisor/hardware/disk.py @@ -0,0 +1,120 @@ +"""Read disk hardware info from system.""" +import logging +from pathlib import Path +import shutil +from typing import Union + +from ..coresys import CoreSys, CoreSysAttributes +from .const import UdevSubsystem +from .data import Device + +_LOGGER: logging.Logger = logging.getLogger(__name__) + +_MOUNTINFO: Path = Path("/proc/self/mountinfo") +_BLOCK_DEVICE_CLASS = "/sys/class/block/{}" +_BLOCK_DEVICE_EMMC_LIFE_TIME = "/sys/block/{}/device/life_time" + + +class HwDisk(CoreSysAttributes): + """Representation of an interface to disk utils.""" + + def __init__(self, coresys: CoreSys): + """Init hardware object.""" + self.coresys = coresys + + def is_system_partition(self, device: Device) -> bool: + """Return true if this is a system disk/partition.""" + if device.subsystem != UdevSubsystem.DISK: + return False + + if device.attributes.get("ID_FS_LABEL", "").startswith("hassos"): + return True + return False + + def get_disk_total_space(self, path: Union[str, Path]) -> float: + """Return total space (GiB) on disk for path.""" + total, _, _ = shutil.disk_usage(path) + return round(total / (1024.0 ** 3), 1) + + def get_disk_used_space(self, path: Union[str, Path]) -> float: + """Return used space (GiB) on disk for path.""" + _, used, _ = shutil.disk_usage(path) + return round(used / (1024.0 ** 3), 1) + + def get_disk_free_space(self, path: Union[str, Path]) -> float: + """Return free space (GiB) on disk for path.""" + _, _, free = shutil.disk_usage(path) + return round(free / (1024.0 ** 3), 1) + + def _get_mountinfo(self, path: str) -> str: + mountinfo = _MOUNTINFO.read_text() + for line in mountinfo.splitlines(): + mountinfoarr = line.split() + if mountinfoarr[4] == path: + return mountinfoarr + return None + + def _get_mount_source(self, path: str) -> str: + mountinfoarr = self._get_mountinfo(path) + + if mountinfoarr is None: + return None + + # Find optional field separator + optionsep = 6 + while mountinfoarr[optionsep] != "-": + optionsep += 1 + return mountinfoarr[optionsep + 2] + + def _try_get_emmc_life_time(self, device_name: str) -> float: + # Get eMMC life_time + life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name)) + + if not life_time_path.exists(): + return None + + # JEDEC health status DEVICE_LIFE_TIME_EST_TYP_A/B + emmc_life_time = life_time_path.read_text().split() + + if len(emmc_life_time) < 2: + return None + + # Type B life time estimate represents the user partition. + life_time_value = int(emmc_life_time[1], 16) + + # 0=Not defined, 1-10=0-100% device life time used, 11=Exceeded + if life_time_value == 0: + return None + + if life_time_value == 11: + logging.warning( + "eMMC reports that its estimated life-time has been exceeded!" + ) + return 100.0 + + # Return the pessimistic estimate (0x02 -> 10%-20%, return 20%) + return life_time_value * 10.0 + + def get_disk_life_time(self, path: Union[str, Path]) -> float: + """Return life time estimate of the underlying SSD drive.""" + mount_source = self._get_mount_source(str(path)) + if mount_source == "overlay": + return None + + mount_source_path = Path(mount_source) + if not mount_source_path.is_block_device(): + return None + + # This looks a bit funky but it is more or less what lsblk is doing to get + # the parent dev reliably + + # Get class device... + mount_source_device_part = Path( + _BLOCK_DEVICE_CLASS.format(mount_source_path.name) + ) + + # ... resolve symlink and get parent device from that path. + mount_source_device_name = mount_source_device_part.resolve().parts[-2] + + # Currently only eMMC block devices supported + return self._try_get_emmc_life_time(mount_source_device_name) diff --git a/supervisor/hardware/helper.py b/supervisor/hardware/helper.py index 5bdb3e856..a10881cdc 100644 --- a/supervisor/hardware/helper.py +++ b/supervisor/hardware/helper.py @@ -3,8 +3,7 @@ from datetime import datetime import logging from pathlib import Path import re -import shutil -from typing import Optional, Union +from typing import Optional import pyudev @@ -19,10 +18,6 @@ _RE_BOOT_TIME: re.Pattern = re.compile(r"btime (\d+)") _RE_HIDE_SYSFS: re.Pattern = re.compile(r"/sys/devices/virtual/(?:tty|block|vc)/.*") -_MOUNTINFO: Path = Path("/proc/self/mountinfo") -_BLOCK_DEVICE_CLASS = "/sys/class/block/{}" -_BLOCK_DEVICE_EMMC_LIFE_TIME = "/sys/block/{}/device/life_time" - class HwHelper(CoreSysAttributes): """Representation of an interface to procfs, sysfs and udev.""" @@ -67,91 +62,3 @@ class HwHelper(CoreSysAttributes): def hide_virtual_device(self, udev_device: pyudev.Device) -> bool: """Small helper to hide not needed Devices.""" return _RE_HIDE_SYSFS.match(udev_device.sys_path) is not None - - def get_disk_total_space(self, path: Union[str, Path]) -> float: - """Return total space (GiB) on disk for path.""" - total, _, _ = shutil.disk_usage(path) - return round(total / (1024.0 ** 3), 1) - - def get_disk_used_space(self, path: Union[str, Path]) -> float: - """Return used space (GiB) on disk for path.""" - _, used, _ = shutil.disk_usage(path) - return round(used / (1024.0 ** 3), 1) - - def get_disk_free_space(self, path: Union[str, Path]) -> float: - """Return free space (GiB) on disk for path.""" - _, _, free = shutil.disk_usage(path) - return round(free / (1024.0 ** 3), 1) - - def _get_mountinfo(self, path: str) -> str: - mountinfo = _MOUNTINFO.read_text() - for line in mountinfo.splitlines(): - mountinfoarr = line.split() - if mountinfoarr[4] == path: - return mountinfoarr - return None - - def _get_mount_source(self, path: str) -> str: - mountinfoarr = self._get_mountinfo(path) - - if mountinfoarr is None: - return None - - # Find optional field separator - optionsep = 6 - while mountinfoarr[optionsep] != "-": - optionsep += 1 - return mountinfoarr[optionsep + 2] - - def _try_get_emmc_life_time(self, device_name: str) -> float: - # Get eMMC life_time - life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name)) - - if not life_time_path.exists(): - return None - - # JEDEC health status DEVICE_LIFE_TIME_EST_TYP_A/B - emmc_life_time = life_time_path.read_text().split() - - if len(emmc_life_time) < 2: - return None - - # Type B life time estimate represents the user partition. - life_time_value = int(emmc_life_time[1], 16) - - # 0=Not defined, 1-10=0-100% device life time used, 11=Exceeded - if life_time_value == 0: - return None - - if life_time_value == 11: - logging.warning( - "eMMC reports that its estimated life-time has been exceeded!" - ) - return 100.0 - - # Return the pessimistic estimate (0x02 -> 10%-20%, return 20%) - return life_time_value * 10.0 - - def get_disk_life_time(self, path: Union[str, Path]) -> float: - """Return life time estimate of the underlying SSD drive.""" - mount_source = self._get_mount_source(str(path)) - if mount_source == "overlay": - return None - - mount_source_path = Path(mount_source) - if not mount_source_path.is_block_device(): - return None - - # This looks a bit funky but it is more or less what lsblk is doing to get - # the parent dev reliably - - # Get class device... - mount_source_device_part = Path( - _BLOCK_DEVICE_CLASS.format(mount_source_path.name) - ) - - # ... resolve symlink and get parent device from that path. - mount_source_device_name = mount_source_device_part.resolve().parts[-2] - - # Currently only eMMC block devices supported - return self._try_get_emmc_life_time(mount_source_device_name) diff --git a/supervisor/hardware/module.py b/supervisor/hardware/module.py index f9846a0c6..a8818bc68 100644 --- a/supervisor/hardware/module.py +++ b/supervisor/hardware/module.py @@ -10,6 +10,7 @@ from supervisor.hardware.const import UdevSubsystem from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import HardwareNotFound from .data import Device +from .disk import HwDisk from .helper import HwHelper from .monitor import HwMonitor from .policy import HwPolicy @@ -29,6 +30,7 @@ class HardwareManager(CoreSysAttributes): self._montior: HwMonitor = HwMonitor(coresys) self._helper: HwHelper = HwHelper(coresys) self._policy: HwPolicy = HwPolicy(coresys) + self._disk: HwDisk = HwDisk(coresys) @property def monitor(self) -> HwMonitor: @@ -45,6 +47,11 @@ class HardwareManager(CoreSysAttributes): """Return Hardware policy instance.""" return self._policy + @property + def disk(self) -> HwDisk: + """Return Hardware disk instance.""" + return self._disk + @property def devices(self) -> List[Device]: """Return List of devices.""" diff --git a/supervisor/hardware/policy.py b/supervisor/hardware/policy.py index 7835f8efb..56ce993ab 100644 --- a/supervisor/hardware/policy.py +++ b/supervisor/hardware/policy.py @@ -3,7 +3,7 @@ import logging from typing import Dict, List from ..coresys import CoreSys, CoreSysAttributes -from .const import PolicyGroup +from .const import PolicyGroup, UdevSubsystem from .data import Device _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -63,4 +63,17 @@ class HwPolicy(CoreSysAttributes): def get_cgroups_rule(self, device: Device) -> str: """Generate a cgroups rule for given device.""" - return f"c {device.cgroups_major}:{device.cgroups_minor} rwm" + cgroup_type = "c" if device.subsystem != UdevSubsystem.DISK else "b" + + return f"{cgroup_type} {device.cgroups_major}:{device.cgroups_minor} rwm" + + def get_full_access(self) -> str: + """Get full access to all devices.""" + return "a *:* rwm" + + def allowed_for_access(self, device: Device) -> bool: + """Return True if allow to access to this device.""" + if self.sys_hardware.disk.is_system_partition(device): + return False + + return True diff --git a/supervisor/host/info.py b/supervisor/host/info.py index d6cacddfc..da83a600a 100644 --- a/supervisor/host/info.py +++ b/supervisor/host/info.py @@ -54,28 +54,28 @@ class InfoCenter(CoreSysAttributes): @property def total_space(self) -> float: """Return total space (GiB) on disk for supervisor data directory.""" - return self.sys_hardware.helper.get_disk_total_space( + return self.sys_hardware.disk.get_disk_total_space( self.coresys.config.path_supervisor ) @property def used_space(self) -> float: """Return used space (GiB) on disk for supervisor data directory.""" - return self.sys_hardware.helper.get_disk_used_space( + return self.sys_hardware.disk.get_disk_used_space( self.coresys.config.path_supervisor ) @property def free_space(self) -> float: """Return available space (GiB) on disk for supervisor data directory.""" - return self.sys_hardware.helper.get_disk_free_space( + return self.sys_hardware.disk.get_disk_free_space( self.coresys.config.path_supervisor ) @property def disk_life_time(self) -> float: """Return the estimated life-time usage (in %) of the SSD storing the data directory.""" - return self.sys_hardware.helper.get_disk_life_time( + return self.sys_hardware.disk.get_disk_life_time( self.coresys.config.path_supervisor ) diff --git a/tests/hardware/test_disk.py b/tests/hardware/test_disk.py new file mode 100644 index 000000000..0de8d204d --- /dev/null +++ b/tests/hardware/test_disk.py @@ -0,0 +1,81 @@ +"""Test hardware utils.""" +# pylint: disable=protected-access +from pathlib import Path +from unittest.mock import patch + +from supervisor.hardware.data import Device + + +def test_system_partition(coresys): + """Test if it is a system partition.""" + disk = Device( + "sda0", + Path("/dev/sda0"), + Path("/sys/bus/usb/001"), + "block", + [], + {"MAJOR": "5", "MINOR": "10"}, + ) + + assert not coresys.hardware.disk.is_system_partition(disk) + + disk = Device( + "sda0", + Path("/dev/sda0"), + Path("/sys/bus/usb/001"), + "block", + [], + {"MAJOR": "5", "MINOR": "10", "ID_FS_LABEL": "hassos-overlay"}, + ) + + assert coresys.hardware.disk.is_system_partition(disk) + + +def test_free_space(coresys): + """Test free space helper.""" + with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))): + free = coresys.hardware.disk.get_disk_free_space("/data") + + assert free == 2.0 + + +def test_total_space(coresys): + """Test total space helper.""" + with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)): + total = coresys.hardware.disk.get_disk_total_space("/data") + + assert total == 10.0 + + +def test_used_space(coresys): + """Test used space helper.""" + with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)): + used = coresys.hardware.disk.get_disk_used_space("/data") + + assert used == 8.0 + + +def test_get_mountinfo(coresys): + """Test mountinfo helper.""" + mountinfo = coresys.hardware.disk._get_mountinfo("/proc") + assert mountinfo[4] == "/proc" + + +def test_get_mount_source(coresys): + """Test mount source helper.""" + # For /proc the mount source is known to be "proc"... + mount_source = coresys.hardware.disk._get_mount_source("/proc") + assert mount_source == "proc" + + +def test_try_get_emmc_life_time(coresys, tmp_path): + """Test eMMC life time helper.""" + fake_life_time = tmp_path / "fake-mmcblk0-lifetime" + fake_life_time.write_text("0x01 0x02\n") + + with patch( + "supervisor.hardware.disk._BLOCK_DEVICE_EMMC_LIFE_TIME", + str(tmp_path / "fake-{}-lifetime"), + ): + value = coresys.hardware.disk._try_get_emmc_life_time("mmcblk0") + assert value == 20.0 diff --git a/tests/hardware/test_helper.py b/tests/hardware/test_helper.py index f05a45541..ed82e42e4 100644 --- a/tests/hardware/test_helper.py +++ b/tests/hardware/test_helper.py @@ -1,7 +1,7 @@ """Test hardware utils.""" # pylint: disable=protected-access from pathlib import Path -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock from supervisor.hardware.data import Device @@ -75,53 +75,3 @@ def test_hide_virtual_device(coresys): udev_device.sys_path = "/sys/devices/virtual/vc/vcs1" assert coresys.hardware.helper.hide_virtual_device(udev_device) - - -def test_free_space(coresys): - """Test free space helper.""" - with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0 ** 3))): - free = coresys.hardware.helper.get_disk_free_space("/data") - - assert free == 2.0 - - -def test_total_space(coresys): - """Test total space helper.""" - with patch("shutil.disk_usage", return_value=(10 * (1024.0 ** 3), 42, 42)): - total = coresys.hardware.helper.get_disk_total_space("/data") - - assert total == 10.0 - - -def test_used_space(coresys): - """Test used space helper.""" - with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0 ** 3), 42)): - used = coresys.hardware.helper.get_disk_used_space("/data") - - assert used == 8.0 - - -def test_get_mountinfo(coresys): - """Test mountinfo helper.""" - mountinfo = coresys.hardware.helper._get_mountinfo("/proc") - assert mountinfo[4] == "/proc" - - -def test_get_mount_source(coresys): - """Test mount source helper.""" - # For /proc the mount source is known to be "proc"... - mount_source = coresys.hardware.helper._get_mount_source("/proc") - assert mount_source == "proc" - - -def test_try_get_emmc_life_time(coresys, tmp_path): - """Test eMMC life time helper.""" - fake_life_time = tmp_path / "fake-mmcblk0-lifetime" - fake_life_time.write_text("0x01 0x02\n") - - with patch( - "supervisor.hardware.helper._BLOCK_DEVICE_EMMC_LIFE_TIME", - str(tmp_path / "fake-{}-lifetime"), - ): - value = coresys.hardware.helper._try_get_emmc_life_time("mmcblk0") - assert value == 20.0 diff --git a/tests/hardware/test_policy.py b/tests/hardware/test_policy.py index b0e464ee7..4cd0e044d 100644 --- a/tests/hardware/test_policy.py +++ b/tests/hardware/test_policy.py @@ -20,6 +20,17 @@ def test_device_policy(coresys): assert coresys.hardware.policy.get_cgroups_rule(device) == "c 5:10 rwm" + disk = Device( + "sda0", + Path("/dev/sda0"), + Path("/sys/bus/usb/001"), + "block", + [], + {"MAJOR": "5", "MINOR": "10"}, + ) + + assert coresys.hardware.policy.get_cgroups_rule(disk) == "b 5:10 rwm" + def test_policy_group(coresys): """Test policy group generator.""" @@ -46,3 +57,29 @@ def test_device_in_policy(coresys): assert coresys.hardware.policy.is_match_cgroup(PolicyGroup.UART, device) assert not coresys.hardware.policy.is_match_cgroup(PolicyGroup.GPIO, device) + + +def test_allowed_access(coresys): + """Test if is allow to access for device.""" + + disk = Device( + "sda0", + Path("/dev/sda0"), + Path("/sys/bus/usb/001"), + "block", + [], + {"MAJOR": "5", "MINOR": "10", "ID_FS_LABEL": "hassos-overlay"}, + ) + + assert not coresys.hardware.policy.allowed_for_access(disk) + + device = Device( + "ttyACM0", + Path("/dev/ttyACM0"), + Path("/sys/bus/usb/001"), + "tty", + [], + {"MAJOR": "204", "MINOR": "10"}, + ) + + assert coresys.hardware.policy.allowed_for_access(device)