Files
supervisor/tests/hardware/test_disk.py
Michel Nederlof b92f5976a3 If D-Bus to UDisks2 is not connected, return None (#6265)
When fetching the host info without UDisks2 D-Bus connected it would raise an exception and disable managing addons via home assistant (and other GUI functions that need host info status).
2025-10-24 10:22:02 +02:00

390 lines
13 KiB
Python

"""Test hardware utils."""
# pylint: disable=protected-access
import errno
import os
from pathlib import Path
from unittest.mock import patch
from dbus_fast.aio import MessageBus
import pytest
from supervisor.coresys import CoreSys
from supervisor.hardware.data import Device
from supervisor.resolution.const import UnhealthyReason
from tests.common import mock_dbus_services
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.udisks2_manager import (
UDisks2Manager as UDisks2ManagerService,
)
from tests.dbus_service_mocks.udisks2_nvme_controller import (
NVMeController as NVMeControllerService,
)
MOCK_MOUNTINFO = """790 750 259:8 /supervisor /data rw,relatime master:118 - ext4 /dev/nvme0n1p8 rw,commit=30
810 750 0:24 /systemd-journal-gatewayd.sock /run/systemd-journal-gatewayd.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
811 750 0:24 /supervisor /run/os rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
813 750 0:24 /udev /run/udev ro,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
814 750 0:24 /machine-id /etc/machine-id ro - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
815 750 0:24 /docker.sock /run/docker.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
816 750 0:24 /dbus /run/dbus ro,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
820 750 0:24 /containerd/containerd.sock /run/containerd/containerd.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
821 750 0:24 /systemd/journal/socket /run/systemd/journal/socket rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
"""
@pytest.fixture(name="nvme_data_disk")
async def fixture_nvme_data_disk(
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
coresys: CoreSys,
dbus_session_bus: MessageBus,
) -> NVMeControllerService:
"""Mock using an NVMe data disk."""
nvme_service = (
await mock_dbus_services(
{
"udisks2_block": "/org/freedesktop/UDisks2/block_devices/nvme0n1",
"udisks2_drive": "/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
"udisks2_nvme_controller": "/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
},
dbus_session_bus,
)
)["udisks2_nvme_controller"]
udisks2_manager: UDisks2ManagerService = udisks2_services["udisks2_manager"]
udisks2_manager.block_devices.append(
"/org/freedesktop/UDisks2/block_devices/nvme0n1"
)
await coresys.dbus.udisks2.update()
with (
patch(
"supervisor.hardware.disk.Path.read_text",
return_value=MOCK_MOUNTINFO,
),
patch("supervisor.hardware.disk.Path.is_block_device", return_value=True),
patch(
"supervisor.hardware.disk.Path.resolve",
return_value=Path(
"/sys/devices/platform/soc/ffe07000.nvme/nvme_host/nvme0/nvme0:0000/block/nvme0n1/nvme0n1p8"
),
),
):
yield nvme_service
def test_system_partition_disk(coresys: CoreSys):
"""Test if it is a system disk/partition."""
disk = Device(
"sda1",
Path("/dev/sda1"),
Path("/sys/bus/usb/001"),
"block",
None,
[],
{"MAJOR": "5", "MINOR": "10"},
[],
)
assert not coresys.hardware.disk.is_used_by_system(disk)
disk = Device(
"sda1",
Path("/dev/sda1"),
Path("/sys/bus/usb/001"),
"block",
None,
[],
{"MAJOR": "5", "MINOR": "10", "ID_FS_LABEL": "hassos-overlay"},
[],
)
assert coresys.hardware.disk.is_used_by_system(disk)
coresys.hardware.update_device(disk)
disk_root = Device(
"sda",
Path("/dev/sda"),
Path("/sys/bus/usb/001"),
"block",
None,
[],
{"MAJOR": "5", "MINOR": "0"},
[Path("/dev/sda1")],
)
assert coresys.hardware.disk.is_used_by_system(disk_root)
def test_free_space(coresys):
"""Test free space helper."""
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
free = coresys.hardware.disk.get_disk_free_space("/data")
assert free == 2.0
def test_total_space(coresys):
"""Test total space helper."""
with patch("shutil.disk_usage", return_value=(10 * (1024.0**3), 42, 42)):
total = coresys.hardware.disk.get_disk_total_space("/data")
assert total == 10.0
def test_used_space(coresys):
"""Test used space helper."""
with patch("shutil.disk_usage", return_value=(42, 8 * (1024.0**3), 42)):
used = coresys.hardware.disk.get_disk_used_space("/data")
assert used == 8.0
def test_get_mountinfo(coresys):
"""Test mountinfo helper."""
mountinfo = coresys.hardware.disk._get_mountinfo("/proc")
assert mountinfo[4] == "/proc"
def test_get_mount_source(coresys):
"""Test mount source helper."""
# For /proc the mount source is known to be "proc"...
mount_source = coresys.hardware.disk._get_mount_source("/proc")
assert mount_source == "proc"
def test_get_dir_structure_sizes(coresys, tmp_path):
"""Test directory structure size calculation."""
# Create a test directory structure
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
# Create some files
(test_dir / "file1.txt").write_text("content1")
(test_dir / "file2.txt").write_text("content2" * 100) # Larger file
# Create subdirectories
subdir1 = test_dir / "subdir1"
subdir1.mkdir()
(subdir1 / "file3.txt").write_text("content3")
subdir2 = test_dir / "subdir2"
subdir2.mkdir()
(subdir2 / "file4.txt").write_text("content4")
# Create nested subdirectory
nested_dir = subdir1 / "nested"
nested_dir.mkdir()
(nested_dir / "file5.txt").write_text("content5")
# Create a symlink (should be skipped)
(test_dir / "symlink.txt").symlink_to(test_dir / "file1.txt")
# Test with max_depth=1 (default)
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=1)
# Verify the structure
assert result["used_bytes"] > 0
assert "children" not in result
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=2)
# Verify the structure
assert result["used_bytes"] > 0
assert "children" in result
children = result["children"]
# Should have subdir1 and subdir2, but not nested (due to max_depth=1)
child_names = [child["id"] for child in children]
assert "subdir1" in child_names
assert "subdir2" in child_names
assert "nested" not in child_names
# Verify sizes are calculated correctly
subdir1 = next(child for child in children if child["id"] == "subdir1")
subdir2 = next(child for child in children if child["id"] == "subdir2")
assert subdir1["used_bytes"] > 0
assert subdir2["used_bytes"] > 0
assert "children" not in subdir1 # No children due to max_depth=1
assert "children" not in subdir2
# Test with max_depth=2
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=3)
# Should now include nested directory
child_names = [child["id"] for child in result["children"]]
assert "subdir1" in child_names
assert "subdir2" in child_names
subdir1 = next(child for child in result["children"] if child["id"] == "subdir1")
nested_children = [child["id"] for child in subdir1["children"]]
assert "nested" in nested_children
nested = next(child for child in subdir1["children"] if child["id"] == "nested")
assert nested["used_bytes"] > 0
# Test with max_depth=0 (should only count files in root, no children)
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=0)
assert result["used_bytes"] > 0
assert "children" not in result # No children due to max_depth=0
def test_get_dir_structure_sizes_empty_dir(coresys, tmp_path):
"""Test directory structure size calculation with empty directory."""
empty_dir = tmp_path / "empty_dir"
empty_dir.mkdir()
result = coresys.hardware.disk.get_dir_structure_sizes(empty_dir)
assert result["used_bytes"] == 0
assert "children" not in result
def test_get_dir_structure_sizes_nonexistent_dir(coresys, tmp_path):
"""Test directory structure size calculation with nonexistent directory."""
nonexistent_dir = tmp_path / "nonexistent"
result = coresys.hardware.disk.get_dir_structure_sizes(nonexistent_dir)
assert result["used_bytes"] == 0
assert "children" not in result
def test_get_dir_structure_sizes_only_files(coresys, tmp_path):
"""Test directory structure size calculation with only files (no subdirectories)."""
files_dir = tmp_path / "files_dir"
files_dir.mkdir()
# Create some files
(files_dir / "file1.txt").write_text("content1")
(files_dir / "file2.txt").write_text("content2" * 50)
result = coresys.hardware.disk.get_dir_structure_sizes(files_dir)
assert result["used_bytes"] > 0
assert "children" not in result # No children since no subdirectories
def test_get_dir_structure_sizes_zero_size_children(coresys, tmp_path):
"""Test directory structure size calculation with zero-size children."""
test_dir = tmp_path / "zero_size_test"
test_dir.mkdir()
# Create a file in root
(test_dir / "file1.txt").write_text("content1")
# Create an empty subdirectory
empty_subdir = test_dir / "empty_subdir"
empty_subdir.mkdir()
# Create a subdirectory with content
content_subdir = test_dir / "content_subdir"
content_subdir.mkdir()
(content_subdir / "file2.txt").write_text("content2")
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir)
# Should include content_subdir but not empty_subdir (since size > 0)
assert result["used_bytes"] > 0
assert "children" not in result
def test_try_get_emmc_life_time(coresys, tmp_path):
"""Test eMMC life time helper."""
fake_life_time = tmp_path / "fake-mmcblk0-lifetime"
fake_life_time.write_text("0x01 0x02\n")
with patch(
"supervisor.hardware.disk._BLOCK_DEVICE_EMMC_LIFE_TIME",
str(tmp_path / "fake-{}-lifetime"),
):
value = coresys.hardware.disk._try_get_emmc_life_time("mmcblk0")
assert value == 10.0
def test_get_dir_structure_sizes_ebadmsg_error(coresys, tmp_path):
"""Test directory structure size calculation with EBADMSG error."""
# Create a test directory structure
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
# Create some files
(test_dir / "file1.txt").write_text("content1")
# Create a subdirectory
subdir = test_dir / "subdir"
subdir.mkdir()
(subdir / "file2.txt").write_text("content2")
# Mock is_dir, is_symlink, and stat methods to handle the EBADMSG error correctly
def mock_is_dir(self):
# Use the real is_dir for all paths
return os.path.isdir(self)
def mock_is_symlink(self):
# Use the real is_symlink for all paths
return os.path.islink(self)
def mock_stat_ebadmsg(self, follow_symlinks=True):
# Raise EBADMSG for any child of test_dir to ensure consistent behavior
if self.parent == test_dir:
raise OSError(errno.EBADMSG, "Bad message")
# For other paths, use the real os.stat
return os.stat(self, follow_symlinks=follow_symlinks)
with (
patch.object(Path, "is_dir", mock_is_dir),
patch.object(Path, "is_symlink", mock_is_symlink),
patch.object(Path, "stat", mock_stat_ebadmsg),
):
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir)
# The EBADMSG error should cause the loop to break on the first child
# Since all children raise EBADMSG, we should get 0 used space
assert result["used_bytes"] == 0
assert "children" not in result
# Verify that the unhealthy reason was added
assert coresys.resolution.unhealthy
assert UnhealthyReason.OSERROR_BAD_MESSAGE in coresys.resolution.unhealthy
async def test_try_get_nvme_life_time(
coresys: CoreSys, nvme_data_disk: NVMeControllerService
):
"""Test getting lifetime info from an NVMe."""
lifetime = await coresys.hardware.disk.get_disk_life_time(
coresys.config.path_supervisor
)
assert lifetime == 1
nvme_data_disk.smart_get_attributes_response["percent_used"].value = 50
lifetime = await coresys.hardware.disk.get_disk_life_time(
coresys.config.path_supervisor
)
assert lifetime == 50
async def test_try_get_nvme_life_time_missing_percent_used(
coresys: CoreSys, nvme_data_disk: NVMeControllerService
):
"""Test getting lifetime info from an NVMe when percent_used is missing."""
# Simulate a drive that doesn't provide percent_used
nvme_data_disk.set_missing_attributes(["percent_used"])
lifetime = await coresys.hardware.disk.get_disk_life_time(
coresys.config.path_supervisor
)
assert lifetime is None
async def test_try_get_nvme_life_time_dbus_not_connected(coresys: CoreSys):
"""Test getting lifetime info from an NVMe when DBUS is not connected."""
# Set the dbus for udisks2 bus to be None, to make it forcibly disconnected.
coresys.dbus.udisks2.dbus = None
lifetime = await coresys.hardware.disk.get_disk_life_time(
coresys.config.path_supervisor
)
assert lifetime is None