mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-11-06 17:40:16 +00:00
379 lines
13 KiB
Python
379 lines
13 KiB
Python
"""Test hardware utils."""
|
|
|
|
# pylint: disable=protected-access
|
|
import errno
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from dbus_fast.aio import MessageBus
|
|
import pytest
|
|
|
|
from supervisor.coresys import CoreSys
|
|
from supervisor.hardware.data import Device
|
|
from supervisor.resolution.const import UnhealthyReason
|
|
|
|
from tests.common import mock_dbus_services
|
|
from tests.dbus_service_mocks.base import DBusServiceMock
|
|
from tests.dbus_service_mocks.udisks2_manager import (
|
|
UDisks2Manager as UDisks2ManagerService,
|
|
)
|
|
from tests.dbus_service_mocks.udisks2_nvme_controller import (
|
|
NVMeController as NVMeControllerService,
|
|
)
|
|
|
|
MOCK_MOUNTINFO = """790 750 259:8 /supervisor /data rw,relatime master:118 - ext4 /dev/nvme0n1p8 rw,commit=30
|
|
810 750 0:24 /systemd-journal-gatewayd.sock /run/systemd-journal-gatewayd.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
811 750 0:24 /supervisor /run/os rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
813 750 0:24 /udev /run/udev ro,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
814 750 0:24 /machine-id /etc/machine-id ro - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
815 750 0:24 /docker.sock /run/docker.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
816 750 0:24 /dbus /run/dbus ro,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
820 750 0:24 /containerd/containerd.sock /run/containerd/containerd.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
821 750 0:24 /systemd/journal/socket /run/systemd/journal/socket rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
|
|
"""
|
|
|
|
|
|
@pytest.fixture(name="nvme_data_disk")
|
|
async def fixture_nvme_data_disk(
|
|
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
coresys: CoreSys,
|
|
dbus_session_bus: MessageBus,
|
|
) -> NVMeControllerService:
|
|
"""Mock using an NVMe data disk."""
|
|
nvme_service = (
|
|
await mock_dbus_services(
|
|
{
|
|
"udisks2_block": "/org/freedesktop/UDisks2/block_devices/nvme0n1",
|
|
"udisks2_drive": "/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
|
|
"udisks2_nvme_controller": "/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
|
|
},
|
|
dbus_session_bus,
|
|
)
|
|
)["udisks2_nvme_controller"]
|
|
udisks2_manager: UDisks2ManagerService = udisks2_services["udisks2_manager"]
|
|
udisks2_manager.block_devices.append(
|
|
"/org/freedesktop/UDisks2/block_devices/nvme0n1"
|
|
)
|
|
await coresys.dbus.udisks2.update()
|
|
|
|
with (
|
|
patch(
|
|
"supervisor.hardware.disk.Path.read_text",
|
|
return_value=MOCK_MOUNTINFO,
|
|
),
|
|
patch("supervisor.hardware.disk.Path.is_block_device", return_value=True),
|
|
patch(
|
|
"supervisor.hardware.disk.Path.resolve",
|
|
return_value=Path(
|
|
"/sys/devices/platform/soc/ffe07000.nvme/nvme_host/nvme0/nvme0:0000/block/nvme0n1/nvme0n1p8"
|
|
),
|
|
),
|
|
):
|
|
yield nvme_service
|
|
|
|
|
|
def test_system_partition_disk(coresys: CoreSys):
|
|
"""Test if it is a system disk/partition."""
|
|
disk = Device(
|
|
"sda1",
|
|
Path("/dev/sda1"),
|
|
Path("/sys/bus/usb/001"),
|
|
"block",
|
|
None,
|
|
[],
|
|
{"MAJOR": "5", "MINOR": "10"},
|
|
[],
|
|
)
|
|
|
|
assert not coresys.hardware.disk.is_used_by_system(disk)
|
|
|
|
disk = Device(
|
|
"sda1",
|
|
Path("/dev/sda1"),
|
|
Path("/sys/bus/usb/001"),
|
|
"block",
|
|
None,
|
|
[],
|
|
{"MAJOR": "5", "MINOR": "10", "ID_FS_LABEL": "hassos-overlay"},
|
|
[],
|
|
)
|
|
|
|
assert coresys.hardware.disk.is_used_by_system(disk)
|
|
|
|
coresys.hardware.update_device(disk)
|
|
disk_root = Device(
|
|
"sda",
|
|
Path("/dev/sda"),
|
|
Path("/sys/bus/usb/001"),
|
|
"block",
|
|
None,
|
|
[],
|
|
{"MAJOR": "5", "MINOR": "0"},
|
|
[Path("/dev/sda1")],
|
|
)
|
|
|
|
assert coresys.hardware.disk.is_used_by_system(disk_root)
|
|
|
|
|
|
def test_free_space(coresys):
|
|
"""Test free space helper."""
|
|
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
|
|
free = coresys.hardware.disk.get_disk_free_space("/data")
|
|
|
|
assert free == 2.0
|
|
|
|
|
|
def test_total_space(coresys):
|
|
"""Test total space helper."""
|
|
with patch("shutil.disk_usage", return_value=(10 * (1024.0**3), 42, 42)):
|
|
total = coresys.hardware.disk.get_disk_total_space("/data")
|
|
|
|
assert total == 10.0
|
|
|
|
|
|
def test_used_space(coresys):
|
|
"""Test used space helper."""
|
|
with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0**3), 42)):
|
|
used = coresys.hardware.disk.get_disk_used_space("/data")
|
|
|
|
assert used == 8.0
|
|
|
|
|
|
def test_get_mountinfo(coresys):
|
|
"""Test mountinfo helper."""
|
|
mountinfo = coresys.hardware.disk._get_mountinfo("/proc")
|
|
assert mountinfo[4] == "/proc"
|
|
|
|
|
|
def test_get_mount_source(coresys):
|
|
"""Test mount source helper."""
|
|
# For /proc the mount source is known to be "proc"...
|
|
mount_source = coresys.hardware.disk._get_mount_source("/proc")
|
|
assert mount_source == "proc"
|
|
|
|
|
|
def test_get_dir_structure_sizes(coresys, tmp_path):
|
|
"""Test directory structure size calculation."""
|
|
# Create a test directory structure
|
|
test_dir = tmp_path / "test_dir"
|
|
test_dir.mkdir()
|
|
|
|
# Create some files
|
|
(test_dir / "file1.txt").write_text("content1")
|
|
(test_dir / "file2.txt").write_text("content2" * 100) # Larger file
|
|
|
|
# Create subdirectories
|
|
subdir1 = test_dir / "subdir1"
|
|
subdir1.mkdir()
|
|
(subdir1 / "file3.txt").write_text("content3")
|
|
|
|
subdir2 = test_dir / "subdir2"
|
|
subdir2.mkdir()
|
|
(subdir2 / "file4.txt").write_text("content4")
|
|
|
|
# Create nested subdirectory
|
|
nested_dir = subdir1 / "nested"
|
|
nested_dir.mkdir()
|
|
(nested_dir / "file5.txt").write_text("content5")
|
|
|
|
# Create a symlink (should be skipped)
|
|
(test_dir / "symlink.txt").symlink_to(test_dir / "file1.txt")
|
|
|
|
# Test with max_depth=1 (default)
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=1)
|
|
|
|
# Verify the structure
|
|
assert result["used_bytes"] > 0
|
|
assert "children" not in result
|
|
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=2)
|
|
|
|
# Verify the structure
|
|
assert result["used_bytes"] > 0
|
|
assert "children" in result
|
|
children = result["children"]
|
|
|
|
# Should have subdir1 and subdir2, but not nested (due to max_depth=1)
|
|
child_names = [child["id"] for child in children]
|
|
assert "subdir1" in child_names
|
|
assert "subdir2" in child_names
|
|
assert "nested" not in child_names
|
|
|
|
# Verify sizes are calculated correctly
|
|
subdir1 = next(child for child in children if child["id"] == "subdir1")
|
|
subdir2 = next(child for child in children if child["id"] == "subdir2")
|
|
assert subdir1["used_bytes"] > 0
|
|
assert subdir2["used_bytes"] > 0
|
|
assert "children" not in subdir1 # No children due to max_depth=1
|
|
assert "children" not in subdir2
|
|
|
|
# Test with max_depth=2
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=3)
|
|
|
|
# Should now include nested directory
|
|
child_names = [child["id"] for child in result["children"]]
|
|
assert "subdir1" in child_names
|
|
assert "subdir2" in child_names
|
|
|
|
subdir1 = next(child for child in result["children"] if child["id"] == "subdir1")
|
|
nested_children = [child["id"] for child in subdir1["children"]]
|
|
assert "nested" in nested_children
|
|
nested = next(child for child in subdir1["children"] if child["id"] == "nested")
|
|
assert nested["used_bytes"] > 0
|
|
|
|
# Test with max_depth=0 (should only count files in root, no children)
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=0)
|
|
assert result["used_bytes"] > 0
|
|
assert "children" not in result # No children due to max_depth=0
|
|
|
|
|
|
def test_get_dir_structure_sizes_empty_dir(coresys, tmp_path):
|
|
"""Test directory structure size calculation with empty directory."""
|
|
empty_dir = tmp_path / "empty_dir"
|
|
empty_dir.mkdir()
|
|
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(empty_dir)
|
|
|
|
assert result["used_bytes"] == 0
|
|
assert "children" not in result
|
|
|
|
|
|
def test_get_dir_structure_sizes_nonexistent_dir(coresys, tmp_path):
|
|
"""Test directory structure size calculation with nonexistent directory."""
|
|
nonexistent_dir = tmp_path / "nonexistent"
|
|
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(nonexistent_dir)
|
|
|
|
assert result["used_bytes"] == 0
|
|
assert "children" not in result
|
|
|
|
|
|
def test_get_dir_structure_sizes_only_files(coresys, tmp_path):
|
|
"""Test directory structure size calculation with only files (no subdirectories)."""
|
|
files_dir = tmp_path / "files_dir"
|
|
files_dir.mkdir()
|
|
|
|
# Create some files
|
|
(files_dir / "file1.txt").write_text("content1")
|
|
(files_dir / "file2.txt").write_text("content2" * 50)
|
|
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(files_dir)
|
|
|
|
assert result["used_bytes"] > 0
|
|
assert "children" not in result # No children since no subdirectories
|
|
|
|
|
|
def test_get_dir_structure_sizes_zero_size_children(coresys, tmp_path):
|
|
"""Test directory structure size calculation with zero-size children."""
|
|
test_dir = tmp_path / "zero_size_test"
|
|
test_dir.mkdir()
|
|
|
|
# Create a file in root
|
|
(test_dir / "file1.txt").write_text("content1")
|
|
|
|
# Create an empty subdirectory
|
|
empty_subdir = test_dir / "empty_subdir"
|
|
empty_subdir.mkdir()
|
|
|
|
# Create a subdirectory with content
|
|
content_subdir = test_dir / "content_subdir"
|
|
content_subdir.mkdir()
|
|
(content_subdir / "file2.txt").write_text("content2")
|
|
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir)
|
|
|
|
# Should include content_subdir but not empty_subdir (since size > 0)
|
|
assert result["used_bytes"] > 0
|
|
assert "children" not in result
|
|
|
|
|
|
def test_try_get_emmc_life_time(coresys, tmp_path):
|
|
"""Test eMMC life time helper."""
|
|
fake_life_time = tmp_path / "fake-mmcblk0-lifetime"
|
|
fake_life_time.write_text("0x01 0x02\n")
|
|
|
|
with patch(
|
|
"supervisor.hardware.disk._BLOCK_DEVICE_EMMC_LIFE_TIME",
|
|
str(tmp_path / "fake-{}-lifetime"),
|
|
):
|
|
value = coresys.hardware.disk._try_get_emmc_life_time("mmcblk0")
|
|
assert value == 10.0
|
|
|
|
|
|
def test_get_dir_structure_sizes_ebadmsg_error(coresys, tmp_path):
|
|
"""Test directory structure size calculation with EBADMSG error."""
|
|
# Create a test directory structure
|
|
test_dir = tmp_path / "test_dir"
|
|
test_dir.mkdir()
|
|
|
|
# Create some files
|
|
(test_dir / "file1.txt").write_text("content1")
|
|
|
|
# Create a subdirectory
|
|
subdir = test_dir / "subdir"
|
|
subdir.mkdir()
|
|
(subdir / "file2.txt").write_text("content2")
|
|
|
|
# Mock is_dir, is_symlink, and stat methods to handle the EBADMSG error correctly
|
|
|
|
def mock_is_dir(self):
|
|
# Use the real is_dir for all paths
|
|
return os.path.isdir(self)
|
|
|
|
def mock_is_symlink(self):
|
|
# Use the real is_symlink for all paths
|
|
return os.path.islink(self)
|
|
|
|
def mock_stat_ebadmsg(self, follow_symlinks=True):
|
|
# Raise EBADMSG for any child of test_dir to ensure consistent behavior
|
|
if self.parent == test_dir:
|
|
raise OSError(errno.EBADMSG, "Bad message")
|
|
# For other paths, use the real os.stat
|
|
return os.stat(self, follow_symlinks=follow_symlinks)
|
|
|
|
with (
|
|
patch.object(Path, "is_dir", mock_is_dir),
|
|
patch.object(Path, "is_symlink", mock_is_symlink),
|
|
patch.object(Path, "stat", mock_stat_ebadmsg),
|
|
):
|
|
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir)
|
|
|
|
# The EBADMSG error should cause the loop to break on the first child
|
|
# Since all children raise EBADMSG, we should get 0 used space
|
|
assert result["used_bytes"] == 0
|
|
assert "children" not in result
|
|
|
|
# Verify that the unhealthy reason was added
|
|
assert coresys.resolution.unhealthy
|
|
assert UnhealthyReason.OSERROR_BAD_MESSAGE in coresys.resolution.unhealthy
|
|
|
|
|
|
async def test_try_get_nvme_life_time(
|
|
coresys: CoreSys, nvme_data_disk: NVMeControllerService
|
|
):
|
|
"""Test getting lifetime info from an NVMe."""
|
|
lifetime = await coresys.hardware.disk.get_disk_life_time(
|
|
coresys.config.path_supervisor
|
|
)
|
|
assert lifetime == 1
|
|
|
|
nvme_data_disk.smart_get_attributes_response["percent_used"].value = 50
|
|
lifetime = await coresys.hardware.disk.get_disk_life_time(
|
|
coresys.config.path_supervisor
|
|
)
|
|
assert lifetime == 50
|
|
|
|
|
|
async def test_try_get_nvme_life_time_missing_percent_used(
|
|
coresys: CoreSys, nvme_data_disk: NVMeControllerService
|
|
):
|
|
"""Test getting lifetime info from an NVMe when percent_used is missing."""
|
|
# Simulate a drive that doesn't provide percent_used
|
|
nvme_data_disk.set_missing_attributes(["percent_used"])
|
|
|
|
lifetime = await coresys.hardware.disk.get_disk_life_time(
|
|
coresys.config.path_supervisor
|
|
)
|
|
assert lifetime is None
|