mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-10-22 10:09:36 +00:00
Storage space usage API (#6046)
* Storage space usage API * Move to host API * add tests * fix test url * more tests * fix tests * fix test * PR comments * update test * tweak format and url * add .DS_Store to .gitignore * update tests * test coverage * update to new struct * update test
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -100,3 +100,6 @@ ENV/
|
||||
# mypy
|
||||
/.mypy_cache/*
|
||||
/.dmypy.json
|
||||
|
||||
# Mac
|
||||
.DS_Store
|
||||
@@ -198,6 +198,7 @@ class RestAPI(CoreSysAttributes):
|
||||
web.post("/host/reload", api_host.reload),
|
||||
web.post("/host/options", api_host.options),
|
||||
web.get("/host/services", api_host.services),
|
||||
web.get("/host/disks/default/usage", api_host.disk_usage),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ ATTR_LLMNR_HOSTNAME = "llmnr_hostname"
|
||||
ATTR_LOCAL_ONLY = "local_only"
|
||||
ATTR_LOCATION_ATTRIBUTES = "location_attributes"
|
||||
ATTR_LOCATIONS = "locations"
|
||||
ATTR_MAX_DEPTH = "max_depth"
|
||||
ATTR_MDNS = "mdns"
|
||||
ATTR_MODEL = "model"
|
||||
ATTR_MOUNTS = "mounts"
|
||||
|
||||
@@ -51,6 +51,7 @@ from .const import (
|
||||
ATTR_FORCE,
|
||||
ATTR_IDENTIFIERS,
|
||||
ATTR_LLMNR_HOSTNAME,
|
||||
ATTR_MAX_DEPTH,
|
||||
ATTR_STARTUP_TIME,
|
||||
ATTR_USE_NTP,
|
||||
ATTR_VIRTUALIZATION,
|
||||
@@ -289,3 +290,49 @@ class APIHost(CoreSysAttributes):
|
||||
) -> web.StreamResponse:
|
||||
"""Return systemd-journald logs. Wrapped as standard API handler."""
|
||||
return await self.advanced_logs_handler(request, identifier, follow)
|
||||
|
||||
@api_process
|
||||
async def disk_usage(self, request: web.Request) -> dict:
|
||||
"""Return a breakdown of storage usage for the system."""
|
||||
|
||||
max_depth = request.query.get(ATTR_MAX_DEPTH, 1)
|
||||
try:
|
||||
max_depth = int(max_depth)
|
||||
except ValueError:
|
||||
max_depth = 1
|
||||
|
||||
disk = self.sys_hardware.disk
|
||||
|
||||
total, used, _ = await self.sys_run_in_executor(
|
||||
disk.disk_usage, self.sys_config.path_supervisor
|
||||
)
|
||||
|
||||
known_paths = await self.sys_run_in_executor(
|
||||
disk.get_dir_sizes,
|
||||
{
|
||||
"addons_data": self.sys_config.path_addons_data,
|
||||
"addons_config": self.sys_config.path_addon_configs,
|
||||
"media": self.sys_config.path_media,
|
||||
"share": self.sys_config.path_share,
|
||||
"backup": self.sys_config.path_backup,
|
||||
"ssl": self.sys_config.path_ssl,
|
||||
"homeassistant": self.sys_config.path_homeassistant,
|
||||
},
|
||||
max_depth,
|
||||
)
|
||||
return {
|
||||
# this can be the disk/partition ID in the future
|
||||
"id": "root",
|
||||
"label": "Root",
|
||||
"total_bytes": total,
|
||||
"used_bytes": used,
|
||||
"children": [
|
||||
{
|
||||
"id": "system",
|
||||
"label": "System",
|
||||
"used_bytes": used
|
||||
- sum(path["used_bytes"] for path in known_paths),
|
||||
},
|
||||
*known_paths,
|
||||
],
|
||||
}
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
"""Read disk hardware info from system."""
|
||||
|
||||
import errno
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from typing import Any
|
||||
|
||||
from supervisor.resolution.const import UnhealthyReason
|
||||
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import DBusError, DBusObjectError, HardwareNotFound
|
||||
@@ -54,7 +58,7 @@ class HwDisk(CoreSysAttributes):
|
||||
|
||||
Must be run in executor.
|
||||
"""
|
||||
total, _, _ = shutil.disk_usage(path)
|
||||
total, _, _ = self.disk_usage(path)
|
||||
return round(total / (1024.0**3), 1)
|
||||
|
||||
def get_disk_used_space(self, path: str | Path) -> float:
|
||||
@@ -62,7 +66,7 @@ class HwDisk(CoreSysAttributes):
|
||||
|
||||
Must be run in executor.
|
||||
"""
|
||||
_, used, _ = shutil.disk_usage(path)
|
||||
_, used, _ = self.disk_usage(path)
|
||||
return round(used / (1024.0**3), 1)
|
||||
|
||||
def get_disk_free_space(self, path: str | Path) -> float:
|
||||
@@ -70,9 +74,84 @@ class HwDisk(CoreSysAttributes):
|
||||
|
||||
Must be run in executor.
|
||||
"""
|
||||
_, _, free = shutil.disk_usage(path)
|
||||
_, _, free = self.disk_usage(path)
|
||||
return round(free / (1024.0**3), 1)
|
||||
|
||||
def disk_usage(self, path: str | Path) -> tuple[int, int, int]:
|
||||
"""Return (total, used, free) in bytes for path.
|
||||
|
||||
Must be run in executor.
|
||||
"""
|
||||
return shutil.disk_usage(path)
|
||||
|
||||
def get_dir_structure_sizes(self, path: Path, max_depth: int = 1) -> dict[str, Any]:
|
||||
"""Return a recursive dict of subdirectories and their sizes, only if size > 0.
|
||||
|
||||
Excludes external mounts and symlinks to avoid counting files on other filesystems
|
||||
or following symlinks that could lead to infinite loops or incorrect sizes.
|
||||
"""
|
||||
|
||||
size = 0
|
||||
if not path.exists():
|
||||
return {"used_bytes": size}
|
||||
|
||||
children: list[dict[str, Any]] = []
|
||||
root_device = path.stat().st_dev
|
||||
|
||||
for child in path.iterdir():
|
||||
if not child.is_dir():
|
||||
size += child.stat(follow_symlinks=False).st_size
|
||||
continue
|
||||
|
||||
# Skip symlinks to avoid infinite loops
|
||||
if child.is_symlink():
|
||||
continue
|
||||
|
||||
try:
|
||||
# Skip if not on same device (external mount)
|
||||
if child.stat().st_dev != root_device:
|
||||
continue
|
||||
except OSError as err:
|
||||
if err.errno == errno.EBADMSG:
|
||||
self.sys_resolution.add_unhealthy_reason(
|
||||
UnhealthyReason.OSERROR_BAD_MESSAGE
|
||||
)
|
||||
break
|
||||
continue
|
||||
|
||||
child_result = self.get_dir_structure_sizes(child, max_depth - 1)
|
||||
if child_result["used_bytes"] > 0:
|
||||
size += child_result["used_bytes"]
|
||||
if max_depth > 1:
|
||||
children.append(
|
||||
{
|
||||
"id": child.name,
|
||||
"label": child.name,
|
||||
**child_result,
|
||||
}
|
||||
)
|
||||
|
||||
if children:
|
||||
return {"used_bytes": size, "children": children}
|
||||
|
||||
return {"used_bytes": size}
|
||||
|
||||
def get_dir_sizes(
|
||||
self, request: dict[str, Path], max_depth: int = 1
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Accept a dictionary of `name: Path` and return a dictionary with `name: <size>`.
|
||||
|
||||
Must be run in executor.
|
||||
"""
|
||||
return [
|
||||
{
|
||||
"id": name,
|
||||
"label": name,
|
||||
**self.get_dir_structure_sizes(path, max_depth),
|
||||
}
|
||||
for name, path in request.items()
|
||||
]
|
||||
|
||||
def _get_mountinfo(self, path: str) -> list[str] | None:
|
||||
mountinfo = _MOUNTINFO.read_text(encoding="utf-8")
|
||||
for line in mountinfo.splitlines():
|
||||
|
||||
@@ -381,6 +381,433 @@ async def test_advanced_logs_errors(coresys: CoreSys, api_client: TestClient):
|
||||
)
|
||||
|
||||
|
||||
async def test_disk_usage_api(api_client: TestClient, coresys: CoreSys):
|
||||
"""Test disk usage API endpoint."""
|
||||
# Mock the disk usage methods
|
||||
with (
|
||||
patch.object(coresys.hardware.disk, "disk_usage") as mock_disk_usage,
|
||||
patch.object(coresys.hardware.disk, "get_dir_sizes") as mock_dir_sizes,
|
||||
):
|
||||
# Mock the main disk usage call
|
||||
mock_disk_usage.return_value = (
|
||||
1000000000,
|
||||
500000000,
|
||||
500000000,
|
||||
) # 1GB total, 500MB used, 500MB free
|
||||
|
||||
# Mock the directory structure sizes for each path
|
||||
mock_dir_sizes.return_value = [
|
||||
{
|
||||
"id": "addons_data",
|
||||
"label": "Addons Data",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{"id": "addon1", "label": "addon1", "used_bytes": 50000000}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "addons_config",
|
||||
"label": "Addons Config",
|
||||
"used_bytes": 200000000,
|
||||
"children": [
|
||||
{"id": "media1", "label": "media1", "used_bytes": 100000000}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "media",
|
||||
"label": "Media",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{"id": "share1", "label": "share1", "used_bytes": 25000000}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "share",
|
||||
"label": "Share",
|
||||
"used_bytes": 300000000,
|
||||
"children": [
|
||||
{"id": "backup1", "label": "backup1", "used_bytes": 150000000}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "backup",
|
||||
"label": "Backup",
|
||||
"used_bytes": 10000000,
|
||||
"children": [{"id": "ssl1", "label": "ssl1", "used_bytes": 5000000}],
|
||||
},
|
||||
{
|
||||
"id": "ssl",
|
||||
"label": "SSL",
|
||||
"used_bytes": 40000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "homeassistant1",
|
||||
"label": "homeassistant1",
|
||||
"used_bytes": 20000000,
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "homeassistant",
|
||||
"label": "Home Assistant",
|
||||
"used_bytes": 40000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "homeassistant1",
|
||||
"label": "homeassistant1",
|
||||
"used_bytes": 20000000,
|
||||
}
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
# Test default max_depth=1
|
||||
resp = await api_client.get("/host/disks/default/usage")
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
|
||||
assert result["data"]["id"] == "root"
|
||||
assert result["data"]["label"] == "Root"
|
||||
assert result["data"]["total_bytes"] == 1000000000
|
||||
assert result["data"]["used_bytes"] == 500000000
|
||||
assert "children" in result["data"]
|
||||
children = result["data"]["children"]
|
||||
|
||||
# First child should be system
|
||||
assert children[0]["id"] == "system"
|
||||
assert children[0]["label"] == "System"
|
||||
|
||||
# Verify all expected directories are present in the remaining children
|
||||
assert children[1]["id"] == "addons_data"
|
||||
assert children[2]["id"] == "addons_config"
|
||||
assert children[3]["id"] == "media"
|
||||
assert children[4]["id"] == "share"
|
||||
assert children[5]["id"] == "backup"
|
||||
assert children[6]["id"] == "ssl"
|
||||
assert children[7]["id"] == "homeassistant"
|
||||
|
||||
# Verify the sizes are correct
|
||||
assert children[1]["used_bytes"] == 100000000
|
||||
assert children[2]["used_bytes"] == 200000000
|
||||
assert children[3]["used_bytes"] == 50000000
|
||||
assert children[4]["used_bytes"] == 300000000
|
||||
assert children[5]["used_bytes"] == 10000000
|
||||
assert children[6]["used_bytes"] == 40000000
|
||||
assert children[7]["used_bytes"] == 40000000
|
||||
|
||||
# Verify system space calculation (total used - sum of known paths)
|
||||
total_known_space = (
|
||||
100000000
|
||||
+ 200000000
|
||||
+ 50000000
|
||||
+ 300000000
|
||||
+ 10000000
|
||||
+ 40000000
|
||||
+ 40000000
|
||||
)
|
||||
expected_system_space = 500000000 - total_known_space
|
||||
assert children[0]["used_bytes"] == expected_system_space
|
||||
|
||||
# Verify disk_usage was called with supervisor path
|
||||
mock_disk_usage.assert_called_once_with(coresys.config.path_supervisor)
|
||||
|
||||
# Verify get_dir_sizes was called once with all paths
|
||||
assert mock_dir_sizes.call_count == 1
|
||||
call_args = mock_dir_sizes.call_args
|
||||
assert call_args[0][1] == 1 # max_depth parameter
|
||||
paths_dict = call_args[0][0] # paths dictionary
|
||||
assert paths_dict["addons_data"] == coresys.config.path_addons_data
|
||||
assert paths_dict["addons_config"] == coresys.config.path_addon_configs
|
||||
assert paths_dict["media"] == coresys.config.path_media
|
||||
assert paths_dict["share"] == coresys.config.path_share
|
||||
assert paths_dict["backup"] == coresys.config.path_backup
|
||||
assert paths_dict["ssl"] == coresys.config.path_ssl
|
||||
assert paths_dict["homeassistant"] == coresys.config.path_homeassistant
|
||||
|
||||
|
||||
async def test_disk_usage_api_with_custom_depth(
|
||||
api_client: TestClient, coresys: CoreSys
|
||||
):
|
||||
"""Test disk usage API endpoint with custom max_depth parameter."""
|
||||
with (
|
||||
patch.object(coresys.hardware.disk, "disk_usage") as mock_disk_usage,
|
||||
patch.object(coresys.hardware.disk, "get_dir_sizes") as mock_dir_sizes,
|
||||
):
|
||||
mock_disk_usage.return_value = (1000000000, 500000000, 500000000)
|
||||
|
||||
# Mock deeper directory structure
|
||||
mock_dir_sizes.return_value = [
|
||||
{
|
||||
"id": "addons_data",
|
||||
"label": "Addons Data",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "addon1",
|
||||
"label": "addon1",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "subdir1",
|
||||
"label": "subdir1",
|
||||
"used_bytes": 25000000,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "addons_config",
|
||||
"label": "Addons Config",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "addon1",
|
||||
"label": "addon1",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "subdir1",
|
||||
"label": "subdir1",
|
||||
"used_bytes": 25000000,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "media",
|
||||
"label": "Media",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "addon1",
|
||||
"label": "addon1",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "subdir1",
|
||||
"label": "subdir1",
|
||||
"used_bytes": 25000000,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "share",
|
||||
"label": "Share",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "addon1",
|
||||
"label": "addon1",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "subdir1",
|
||||
"label": "subdir1",
|
||||
"used_bytes": 25000000,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "backup",
|
||||
"label": "Backup",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "addon1",
|
||||
"label": "addon1",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "subdir1",
|
||||
"label": "subdir1",
|
||||
"used_bytes": 25000000,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "ssl",
|
||||
"label": "SSL",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "addon1",
|
||||
"label": "addon1",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "subdir1",
|
||||
"label": "subdir1",
|
||||
"used_bytes": 25000000,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "homeassistant",
|
||||
"label": "Home Assistant",
|
||||
"used_bytes": 100000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "addon1",
|
||||
"label": "addon1",
|
||||
"used_bytes": 50000000,
|
||||
"children": [
|
||||
{
|
||||
"id": "subdir1",
|
||||
"label": "subdir1",
|
||||
"used_bytes": 25000000,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
# Test with custom max_depth=2
|
||||
resp = await api_client.get("/host/disks/default/usage?max_depth=2")
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
assert result["data"]["used_bytes"] == 500000000
|
||||
assert result["data"]["children"]
|
||||
|
||||
# Verify max_depth=2 was passed to get_dir_sizes
|
||||
assert mock_dir_sizes.call_count == 1
|
||||
call_args = mock_dir_sizes.call_args
|
||||
assert call_args[0][1] == 2 # max_depth parameter
|
||||
|
||||
|
||||
async def test_disk_usage_api_invalid_depth(api_client: TestClient, coresys: CoreSys):
|
||||
"""Test disk usage API endpoint with invalid max_depth parameter."""
|
||||
with (
|
||||
patch.object(coresys.hardware.disk, "disk_usage") as mock_disk_usage,
|
||||
patch.object(coresys.hardware.disk, "get_dir_sizes") as mock_dir_sizes,
|
||||
):
|
||||
mock_disk_usage.return_value = (1000000000, 500000000, 500000000)
|
||||
mock_dir_sizes.return_value = [
|
||||
{
|
||||
"id": "addons_data",
|
||||
"label": "Addons Data",
|
||||
"used_bytes": 100000000,
|
||||
},
|
||||
{
|
||||
"id": "addons_config",
|
||||
"label": "Addons Config",
|
||||
"used_bytes": 100000000,
|
||||
},
|
||||
{
|
||||
"id": "media",
|
||||
"label": "Media",
|
||||
"used_bytes": 100000000,
|
||||
},
|
||||
{
|
||||
"id": "share",
|
||||
"label": "Share",
|
||||
"used_bytes": 100000000,
|
||||
},
|
||||
{
|
||||
"id": "backup",
|
||||
"label": "Backup",
|
||||
"used_bytes": 100000000,
|
||||
},
|
||||
{
|
||||
"id": "ssl",
|
||||
"label": "SSL",
|
||||
"used_bytes": 100000000,
|
||||
},
|
||||
{
|
||||
"id": "homeassistant",
|
||||
"label": "Home Assistant",
|
||||
"used_bytes": 100000000,
|
||||
},
|
||||
]
|
||||
|
||||
# Test with invalid max_depth (non-integer)
|
||||
resp = await api_client.get("/host/disks/default/usage?max_depth=invalid")
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
assert result["data"]["used_bytes"] == 500000000
|
||||
assert result["data"]["children"]
|
||||
|
||||
# Should default to max_depth=1 when invalid value is provided
|
||||
assert mock_dir_sizes.call_count == 1
|
||||
call_args = mock_dir_sizes.call_args
|
||||
assert call_args[0][1] == 1 # Should default to 1
|
||||
|
||||
|
||||
async def test_disk_usage_api_empty_directories(
|
||||
api_client: TestClient, coresys: CoreSys
|
||||
):
|
||||
"""Test disk usage API endpoint with empty directories."""
|
||||
with (
|
||||
patch.object(coresys.hardware.disk, "disk_usage") as mock_disk_usage,
|
||||
patch.object(coresys.hardware.disk, "get_dir_sizes") as mock_dir_sizes,
|
||||
):
|
||||
mock_disk_usage.return_value = (1000000000, 500000000, 500000000)
|
||||
|
||||
# Mock empty directory structures (no children)
|
||||
mock_dir_sizes.return_value = [
|
||||
{
|
||||
"id": "addons_data",
|
||||
"label": "Addons Data",
|
||||
"used_bytes": 0,
|
||||
},
|
||||
{
|
||||
"id": "addons_config",
|
||||
"label": "Addons Config",
|
||||
"used_bytes": 0,
|
||||
},
|
||||
{
|
||||
"id": "media",
|
||||
"label": "Media",
|
||||
"used_bytes": 0,
|
||||
},
|
||||
{
|
||||
"id": "share",
|
||||
"label": "Share",
|
||||
"used_bytes": 0,
|
||||
},
|
||||
{
|
||||
"id": "backup",
|
||||
"label": "Backup",
|
||||
"used_bytes": 0,
|
||||
},
|
||||
{
|
||||
"id": "ssl",
|
||||
"label": "SSL",
|
||||
"used_bytes": 0,
|
||||
},
|
||||
{
|
||||
"id": "homeassistant",
|
||||
"label": "Home Assistant",
|
||||
"used_bytes": 0,
|
||||
},
|
||||
]
|
||||
|
||||
resp = await api_client.get("/host/disks/default/usage")
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
|
||||
assert result["data"]["used_bytes"] == 500000000
|
||||
children = result["data"]["children"]
|
||||
|
||||
# First child should be system with all the space
|
||||
assert children[0]["id"] == "system"
|
||||
assert children[0]["used_bytes"] == 500000000
|
||||
|
||||
# All other directories should have size 0
|
||||
for i in range(1, len(children)):
|
||||
assert children[i]["used_bytes"] == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("action", ["reboot", "shutdown"])
|
||||
async def test_migration_blocks_shutdown(
|
||||
api_client: TestClient,
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
"""Test hardware utils."""
|
||||
|
||||
# pylint: disable=protected-access
|
||||
import errno
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -9,6 +11,7 @@ import pytest
|
||||
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.hardware.data import Device
|
||||
from supervisor.resolution.const import UnhealthyReason
|
||||
|
||||
from tests.common import mock_dbus_services
|
||||
from tests.dbus_service_mocks.base import DBusServiceMock
|
||||
@@ -150,6 +153,141 @@ def test_get_mount_source(coresys):
|
||||
assert mount_source == "proc"
|
||||
|
||||
|
||||
def test_get_dir_structure_sizes(coresys, tmp_path):
|
||||
"""Test directory structure size calculation."""
|
||||
# Create a test directory structure
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create some files
|
||||
(test_dir / "file1.txt").write_text("content1")
|
||||
(test_dir / "file2.txt").write_text("content2" * 100) # Larger file
|
||||
|
||||
# Create subdirectories
|
||||
subdir1 = test_dir / "subdir1"
|
||||
subdir1.mkdir()
|
||||
(subdir1 / "file3.txt").write_text("content3")
|
||||
|
||||
subdir2 = test_dir / "subdir2"
|
||||
subdir2.mkdir()
|
||||
(subdir2 / "file4.txt").write_text("content4")
|
||||
|
||||
# Create nested subdirectory
|
||||
nested_dir = subdir1 / "nested"
|
||||
nested_dir.mkdir()
|
||||
(nested_dir / "file5.txt").write_text("content5")
|
||||
|
||||
# Create a symlink (should be skipped)
|
||||
(test_dir / "symlink.txt").symlink_to(test_dir / "file1.txt")
|
||||
|
||||
# Test with max_depth=1 (default)
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=1)
|
||||
|
||||
# Verify the structure
|
||||
assert result["used_bytes"] > 0
|
||||
assert "children" not in result
|
||||
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=2)
|
||||
|
||||
# Verify the structure
|
||||
assert result["used_bytes"] > 0
|
||||
assert "children" in result
|
||||
children = result["children"]
|
||||
|
||||
# Should have subdir1 and subdir2, but not nested (due to max_depth=1)
|
||||
child_names = [child["id"] for child in children]
|
||||
assert "subdir1" in child_names
|
||||
assert "subdir2" in child_names
|
||||
assert "nested" not in child_names
|
||||
|
||||
# Verify sizes are calculated correctly
|
||||
subdir1 = next(child for child in children if child["id"] == "subdir1")
|
||||
subdir2 = next(child for child in children if child["id"] == "subdir2")
|
||||
assert subdir1["used_bytes"] > 0
|
||||
assert subdir2["used_bytes"] > 0
|
||||
assert "children" not in subdir1 # No children due to max_depth=1
|
||||
assert "children" not in subdir2
|
||||
|
||||
# Test with max_depth=2
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=3)
|
||||
|
||||
# Should now include nested directory
|
||||
child_names = [child["id"] for child in result["children"]]
|
||||
assert "subdir1" in child_names
|
||||
assert "subdir2" in child_names
|
||||
|
||||
subdir1 = next(child for child in result["children"] if child["id"] == "subdir1")
|
||||
nested_children = [child["id"] for child in subdir1["children"]]
|
||||
assert "nested" in nested_children
|
||||
nested = next(child for child in subdir1["children"] if child["id"] == "nested")
|
||||
assert nested["used_bytes"] > 0
|
||||
|
||||
# Test with max_depth=0 (should only count files in root, no children)
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir, max_depth=0)
|
||||
assert result["used_bytes"] > 0
|
||||
assert "children" not in result # No children due to max_depth=0
|
||||
|
||||
|
||||
def test_get_dir_structure_sizes_empty_dir(coresys, tmp_path):
|
||||
"""Test directory structure size calculation with empty directory."""
|
||||
empty_dir = tmp_path / "empty_dir"
|
||||
empty_dir.mkdir()
|
||||
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(empty_dir)
|
||||
|
||||
assert result["used_bytes"] == 0
|
||||
assert "children" not in result
|
||||
|
||||
|
||||
def test_get_dir_structure_sizes_nonexistent_dir(coresys, tmp_path):
|
||||
"""Test directory structure size calculation with nonexistent directory."""
|
||||
nonexistent_dir = tmp_path / "nonexistent"
|
||||
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(nonexistent_dir)
|
||||
|
||||
assert result["used_bytes"] == 0
|
||||
assert "children" not in result
|
||||
|
||||
|
||||
def test_get_dir_structure_sizes_only_files(coresys, tmp_path):
|
||||
"""Test directory structure size calculation with only files (no subdirectories)."""
|
||||
files_dir = tmp_path / "files_dir"
|
||||
files_dir.mkdir()
|
||||
|
||||
# Create some files
|
||||
(files_dir / "file1.txt").write_text("content1")
|
||||
(files_dir / "file2.txt").write_text("content2" * 50)
|
||||
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(files_dir)
|
||||
|
||||
assert result["used_bytes"] > 0
|
||||
assert "children" not in result # No children since no subdirectories
|
||||
|
||||
|
||||
def test_get_dir_structure_sizes_zero_size_children(coresys, tmp_path):
|
||||
"""Test directory structure size calculation with zero-size children."""
|
||||
test_dir = tmp_path / "zero_size_test"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create a file in root
|
||||
(test_dir / "file1.txt").write_text("content1")
|
||||
|
||||
# Create an empty subdirectory
|
||||
empty_subdir = test_dir / "empty_subdir"
|
||||
empty_subdir.mkdir()
|
||||
|
||||
# Create a subdirectory with content
|
||||
content_subdir = test_dir / "content_subdir"
|
||||
content_subdir.mkdir()
|
||||
(content_subdir / "file2.txt").write_text("content2")
|
||||
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir)
|
||||
|
||||
# Should include content_subdir but not empty_subdir (since size > 0)
|
||||
assert result["used_bytes"] > 0
|
||||
assert "children" not in result
|
||||
|
||||
|
||||
def test_try_get_emmc_life_time(coresys, tmp_path):
|
||||
"""Test eMMC life time helper."""
|
||||
fake_life_time = tmp_path / "fake-mmcblk0-lifetime"
|
||||
@@ -163,6 +301,53 @@ def test_try_get_emmc_life_time(coresys, tmp_path):
|
||||
assert value == 10.0
|
||||
|
||||
|
||||
def test_get_dir_structure_sizes_ebadmsg_error(coresys, tmp_path):
|
||||
"""Test directory structure size calculation with EBADMSG error."""
|
||||
# Create a test directory structure
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create some files
|
||||
(test_dir / "file1.txt").write_text("content1")
|
||||
|
||||
# Create a subdirectory
|
||||
subdir = test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "file2.txt").write_text("content2")
|
||||
|
||||
# Mock is_dir, is_symlink, and stat methods to handle the EBADMSG error correctly
|
||||
|
||||
def mock_is_dir(self):
|
||||
# Use the real is_dir for all paths
|
||||
return os.path.isdir(self)
|
||||
|
||||
def mock_is_symlink(self):
|
||||
# Use the real is_symlink for all paths
|
||||
return os.path.islink(self)
|
||||
|
||||
def mock_stat_ebadmsg(self, follow_symlinks=True):
|
||||
if self == subdir:
|
||||
raise OSError(errno.EBADMSG, "Bad message")
|
||||
# For other paths, use the real os.stat
|
||||
return os.stat(self, follow_symlinks=follow_symlinks)
|
||||
|
||||
with (
|
||||
patch.object(Path, "is_dir", mock_is_dir),
|
||||
patch.object(Path, "is_symlink", mock_is_symlink),
|
||||
patch.object(Path, "stat", mock_stat_ebadmsg),
|
||||
):
|
||||
result = coresys.hardware.disk.get_dir_structure_sizes(test_dir)
|
||||
|
||||
# The EBADMSG error should cause the loop to break, so we get 0 used space
|
||||
# because the error happens before processing the file in the root directory
|
||||
assert result["used_bytes"] == 0
|
||||
assert "children" not in result
|
||||
|
||||
# Verify that the unhealthy reason was added
|
||||
assert coresys.resolution.unhealthy
|
||||
assert UnhealthyReason.OSERROR_BAD_MESSAGE in coresys.resolution.unhealthy
|
||||
|
||||
|
||||
async def test_try_get_nvme_life_time(
|
||||
coresys: CoreSys, nvme_data_disk: NVMeControllerService
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user