Get lifetime info for NVMe devices (#6056)

* Get lifetime info for NVMe devices

* Fix lint and test issues

* Update tests/dbus_service_mocks/udisks2_manager.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Stefan Agner <stefan@agner.ch>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Mike Degatano 2025-08-04 07:53:56 -04:00 committed by GitHub
parent 9f2fda5dc7
commit 22afa60f55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 740 additions and 53 deletions

View File

@ -32,6 +32,7 @@ DBUS_IFACE_HOSTNAME = "org.freedesktop.hostname1"
DBUS_IFACE_IP4CONFIG = "org.freedesktop.NetworkManager.IP4Config" DBUS_IFACE_IP4CONFIG = "org.freedesktop.NetworkManager.IP4Config"
DBUS_IFACE_IP6CONFIG = "org.freedesktop.NetworkManager.IP6Config" DBUS_IFACE_IP6CONFIG = "org.freedesktop.NetworkManager.IP6Config"
DBUS_IFACE_NM = "org.freedesktop.NetworkManager" DBUS_IFACE_NM = "org.freedesktop.NetworkManager"
DBUS_IFACE_NVME_CONTROLLER = "org.freedesktop.UDisks2.NVMe.Controller"
DBUS_IFACE_PARTITION = "org.freedesktop.UDisks2.Partition" DBUS_IFACE_PARTITION = "org.freedesktop.UDisks2.Partition"
DBUS_IFACE_PARTITION_TABLE = "org.freedesktop.UDisks2.PartitionTable" DBUS_IFACE_PARTITION_TABLE = "org.freedesktop.UDisks2.PartitionTable"
DBUS_IFACE_RAUC_INSTALLER = "de.pengutronix.rauc.Installer" DBUS_IFACE_RAUC_INSTALLER = "de.pengutronix.rauc.Installer"
@ -87,6 +88,7 @@ DBUS_ATTR_CONNECTIVITY = "Connectivity"
DBUS_ATTR_CURRENT_DEVICE = "CurrentDevice" DBUS_ATTR_CURRENT_DEVICE = "CurrentDevice"
DBUS_ATTR_CURRENT_DNS_SERVER = "CurrentDNSServer" DBUS_ATTR_CURRENT_DNS_SERVER = "CurrentDNSServer"
DBUS_ATTR_CURRENT_DNS_SERVER_EX = "CurrentDNSServerEx" DBUS_ATTR_CURRENT_DNS_SERVER_EX = "CurrentDNSServerEx"
DBUS_ATTR_CONTROLLER_ID = "ControllerID"
DBUS_ATTR_DEFAULT = "Default" DBUS_ATTR_DEFAULT = "Default"
DBUS_ATTR_DEPLOYMENT = "Deployment" DBUS_ATTR_DEPLOYMENT = "Deployment"
DBUS_ATTR_DESCRIPTION = "Description" DBUS_ATTR_DESCRIPTION = "Description"
@ -111,6 +113,7 @@ DBUS_ATTR_DRIVER = "Driver"
DBUS_ATTR_EJECTABLE = "Ejectable" DBUS_ATTR_EJECTABLE = "Ejectable"
DBUS_ATTR_FALLBACK_DNS = "FallbackDNS" DBUS_ATTR_FALLBACK_DNS = "FallbackDNS"
DBUS_ATTR_FALLBACK_DNS_EX = "FallbackDNSEx" DBUS_ATTR_FALLBACK_DNS_EX = "FallbackDNSEx"
DBUS_ATTR_FGUID = "FGUID"
DBUS_ATTR_FINISH_TIMESTAMP = "FinishTimestamp" DBUS_ATTR_FINISH_TIMESTAMP = "FinishTimestamp"
DBUS_ATTR_FIRMWARE_TIMESTAMP_MONOTONIC = "FirmwareTimestampMonotonic" DBUS_ATTR_FIRMWARE_TIMESTAMP_MONOTONIC = "FirmwareTimestampMonotonic"
DBUS_ATTR_FREQUENCY = "Frequency" DBUS_ATTR_FREQUENCY = "Frequency"
@ -147,6 +150,7 @@ DBUS_ATTR_NAMESERVERS = "Nameservers"
DBUS_ATTR_NTP = "NTP" DBUS_ATTR_NTP = "NTP"
DBUS_ATTR_NTPSYNCHRONIZED = "NTPSynchronized" DBUS_ATTR_NTPSYNCHRONIZED = "NTPSynchronized"
DBUS_ATTR_NUMBER = "Number" DBUS_ATTR_NUMBER = "Number"
DBUS_ATTR_NVME_REVISION = "NVMeRevision"
DBUS_ATTR_OFFSET = "Offset" DBUS_ATTR_OFFSET = "Offset"
DBUS_ATTR_OPERATING_SYSTEM_PRETTY_NAME = "OperatingSystemPrettyName" DBUS_ATTR_OPERATING_SYSTEM_PRETTY_NAME = "OperatingSystemPrettyName"
DBUS_ATTR_OPERATION = "Operation" DBUS_ATTR_OPERATION = "Operation"
@ -161,15 +165,24 @@ DBUS_ATTR_REMOVABLE = "Removable"
DBUS_ATTR_RESOLV_CONF_MODE = "ResolvConfMode" DBUS_ATTR_RESOLV_CONF_MODE = "ResolvConfMode"
DBUS_ATTR_REVISION = "Revision" DBUS_ATTR_REVISION = "Revision"
DBUS_ATTR_RCMANAGER = "RcManager" DBUS_ATTR_RCMANAGER = "RcManager"
DBUS_ATTR_SANITIZE_PERCENT_REMAINING = "SanitizePercentRemaining"
DBUS_ATTR_SANITIZE_STATUS = "SanitizeStatus"
DBUS_ATTR_SEAT = "Seat" DBUS_ATTR_SEAT = "Seat"
DBUS_ATTR_SERIAL = "Serial" DBUS_ATTR_SERIAL = "Serial"
DBUS_ATTR_SIZE = "Size" DBUS_ATTR_SIZE = "Size"
DBUS_ATTR_SMART_CRITICAL_WARNING = "SmartCriticalWarning"
DBUS_ATTR_SMART_POWER_ON_HOURS = "SmartPowerOnHours"
DBUS_ATTR_SMART_SELFTEST_PERCENT_REMAINING = "SmartSelftestPercentRemaining"
DBUS_ATTR_SMART_SELFTEST_STATUS = "SmartSelftestStatus"
DBUS_ATTR_SMART_TEMPERATURE = "SmartTemperature"
DBUS_ATTR_SMART_UPDATED = "SmartUpdated"
DBUS_ATTR_SSID = "Ssid" DBUS_ATTR_SSID = "Ssid"
DBUS_ATTR_STATE = "State" DBUS_ATTR_STATE = "State"
DBUS_ATTR_STATE_FLAGS = "StateFlags" DBUS_ATTR_STATE_FLAGS = "StateFlags"
DBUS_ATTR_STATIC_HOSTNAME = "StaticHostname" DBUS_ATTR_STATIC_HOSTNAME = "StaticHostname"
DBUS_ATTR_STATIC_OPERATING_SYSTEM_CPE_NAME = "OperatingSystemCPEName" DBUS_ATTR_STATIC_OPERATING_SYSTEM_CPE_NAME = "OperatingSystemCPEName"
DBUS_ATTR_STRENGTH = "Strength" DBUS_ATTR_STRENGTH = "Strength"
DBUS_ATTR_SUBSYSTEM_NQN = "SubsystemNQN"
DBUS_ATTR_SUPPORTED_FILESYSTEMS = "SupportedFilesystems" DBUS_ATTR_SUPPORTED_FILESYSTEMS = "SupportedFilesystems"
DBUS_ATTR_SYMLINKS = "Symlinks" DBUS_ATTR_SYMLINKS = "Symlinks"
DBUS_ATTR_SWAP_SIZE = "SwapSize" DBUS_ATTR_SWAP_SIZE = "SwapSize"
@ -180,6 +193,7 @@ DBUS_ATTR_TIMEUSEC = "TimeUSec"
DBUS_ATTR_TIMEZONE = "Timezone" DBUS_ATTR_TIMEZONE = "Timezone"
DBUS_ATTR_TRANSACTION_STATISTICS = "TransactionStatistics" DBUS_ATTR_TRANSACTION_STATISTICS = "TransactionStatistics"
DBUS_ATTR_TYPE = "Type" DBUS_ATTR_TYPE = "Type"
DBUS_ATTR_UNALLOCATED_CAPACITY = "UnallocatedCapacity"
DBUS_ATTR_USER_LED = "UserLED" DBUS_ATTR_USER_LED = "UserLED"
DBUS_ATTR_USERSPACE_TIMESTAMP_MONOTONIC = "UserspaceTimestampMonotonic" DBUS_ATTR_USERSPACE_TIMESTAMP_MONOTONIC = "UserspaceTimestampMonotonic"
DBUS_ATTR_UUID_UPPERCASE = "UUID" DBUS_ATTR_UUID_UPPERCASE = "UUID"

View File

@ -2,6 +2,7 @@
import asyncio import asyncio
import logging import logging
from pathlib import Path
from typing import Any from typing import Any
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
@ -132,7 +133,10 @@ class UDisks2Manager(DBusInterfaceProxy):
for drive in drives for drive in drives
} }
# Update existing drives # For existing drives, need to check their type and call update
await asyncio.gather(
*[self._drives[path].check_type() for path in unchanged_drives]
)
await asyncio.gather( await asyncio.gather(
*[self._drives[path].update() for path in unchanged_drives] *[self._drives[path].update() for path in unchanged_drives]
) )
@ -160,20 +164,33 @@ class UDisks2Manager(DBusInterfaceProxy):
return list(self._drives.values()) return list(self._drives.values())
@dbus_connected @dbus_connected
def get_drive(self, drive_path: str) -> UDisks2Drive: def get_drive(self, object_path: str) -> UDisks2Drive:
"""Get additional info on drive from object path.""" """Get additional info on drive from object path."""
if drive_path not in self._drives: if object_path not in self._drives:
raise DBusObjectError(f"Drive {drive_path} not found") raise DBusObjectError(f"Drive {object_path} not found")
return self._drives[drive_path] return self._drives[object_path]
@dbus_connected @dbus_connected
def get_block_device(self, device_path: str) -> UDisks2Block: def get_block_device(self, object_path: str) -> UDisks2Block:
"""Get additional info on block device from object path.""" """Get additional info on block device from object path."""
if device_path not in self._block_devices: if object_path not in self._block_devices:
raise DBusObjectError(f"Block device {device_path} not found") raise DBusObjectError(f"Block device {object_path} not found")
return self._block_devices[device_path] return self._block_devices[object_path]
@dbus_connected
def get_block_device_by_path(self, device_path: Path) -> UDisks2Block:
"""Get additional info on block device from device path.
Uses cache only. Use `resolve_device` to force a call for fresh data.
"""
for device in self._block_devices.values():
if device.device == device_path:
return device
raise DBusObjectError(
f"Block device not found with device path {device_path.as_posix()}"
)
@dbus_connected @dbus_connected
async def resolve_device(self, devspec: DeviceSpecification) -> list[UDisks2Block]: async def resolve_device(self, devspec: DeviceSpecification) -> list[UDisks2Block]:

View File

@ -1,6 +1,7 @@
"""Interface to UDisks2 Drive over D-Bus.""" """Interface to UDisks2 Drive over D-Bus."""
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import Any
from dbus_fast.aio import MessageBus from dbus_fast.aio import MessageBus
@ -18,11 +19,13 @@ from ..const import (
DBUS_ATTR_VENDOR, DBUS_ATTR_VENDOR,
DBUS_ATTR_WWN, DBUS_ATTR_WWN,
DBUS_IFACE_DRIVE, DBUS_IFACE_DRIVE,
DBUS_IFACE_NVME_CONTROLLER,
DBUS_NAME_UDISKS2, DBUS_NAME_UDISKS2,
) )
from ..interface import DBusInterfaceProxy, dbus_property from ..interface import DBusInterfaceProxy, dbus_property
from ..utils import dbus_connected from ..utils import dbus_connected
from .const import UDISKS2_DEFAULT_OPTIONS from .const import UDISKS2_DEFAULT_OPTIONS
from .nvme_controller import UDisks2NVMeController
class UDisks2Drive(DBusInterfaceProxy): class UDisks2Drive(DBusInterfaceProxy):
@ -35,11 +38,18 @@ class UDisks2Drive(DBusInterfaceProxy):
bus_name: str = DBUS_NAME_UDISKS2 bus_name: str = DBUS_NAME_UDISKS2
properties_interface: str = DBUS_IFACE_DRIVE properties_interface: str = DBUS_IFACE_DRIVE
_nvme_controller: UDisks2NVMeController | None = None
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize object.""" """Initialize object."""
self._object_path = object_path self._object_path = object_path
super().__init__() super().__init__()
async def connect(self, bus: MessageBus) -> None:
"""Connect to bus."""
await super().connect(bus)
await self._reload_interfaces()
@staticmethod @staticmethod
async def new(object_path: str, bus: MessageBus) -> "UDisks2Drive": async def new(object_path: str, bus: MessageBus) -> "UDisks2Drive":
"""Create and connect object.""" """Create and connect object."""
@ -52,6 +62,11 @@ class UDisks2Drive(DBusInterfaceProxy):
"""Object path for dbus object.""" """Object path for dbus object."""
return self._object_path return self._object_path
@property
def nvme_controller(self) -> UDisks2NVMeController | None:
"""NVMe controller interface if drive is one."""
return self._nvme_controller
@property @property
@dbus_property @dbus_property
def vendor(self) -> str: def vendor(self) -> str:
@ -130,3 +145,40 @@ class UDisks2Drive(DBusInterfaceProxy):
async def eject(self) -> None: async def eject(self) -> None:
"""Eject media from drive.""" """Eject media from drive."""
await self.connected_dbus.Drive.call("eject", UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Drive.call("eject", UDISKS2_DEFAULT_OPTIONS)
@dbus_connected
async def update(self, changed: dict[str, Any] | None = None) -> None:
"""Update properties via D-Bus."""
await super().update(changed)
if not changed and self.nvme_controller:
await self.nvme_controller.update()
@dbus_connected
async def check_type(self) -> None:
"""Check if type of drive has changed and adjust interfaces if so."""
introspection = await self.connected_dbus.introspect()
interfaces = {intr.name for intr in introspection.interfaces}
# If interfaces changed, update the proxy from introspection and reload interfaces
if interfaces != set(self.connected_dbus.proxies.keys()):
await self.connected_dbus.init_proxy(introspection=introspection)
await self._reload_interfaces()
@dbus_connected
async def _reload_interfaces(self) -> None:
"""Reload interfaces from introspection as necessary."""
# Check if drive is an nvme controller
if (
not self.nvme_controller
and DBUS_IFACE_NVME_CONTROLLER in self.connected_dbus.proxies
):
self._nvme_controller = UDisks2NVMeController(self.object_path)
await self._nvme_controller.initialize(self.connected_dbus)
elif (
self.nvme_controller
and DBUS_IFACE_NVME_CONTROLLER not in self.connected_dbus.proxies
):
self.nvme_controller.stop_sync_property_changes()
self._nvme_controller = None

View File

@ -0,0 +1,200 @@
"""Interface to UDisks2 NVME Controller over D-Bus."""
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any, cast
from dbus_fast.aio import MessageBus
from ..const import (
DBUS_ATTR_CONTROLLER_ID,
DBUS_ATTR_FGUID,
DBUS_ATTR_NVME_REVISION,
DBUS_ATTR_SANITIZE_PERCENT_REMAINING,
DBUS_ATTR_SANITIZE_STATUS,
DBUS_ATTR_SMART_CRITICAL_WARNING,
DBUS_ATTR_SMART_POWER_ON_HOURS,
DBUS_ATTR_SMART_SELFTEST_PERCENT_REMAINING,
DBUS_ATTR_SMART_SELFTEST_STATUS,
DBUS_ATTR_SMART_TEMPERATURE,
DBUS_ATTR_SMART_UPDATED,
DBUS_ATTR_STATE,
DBUS_ATTR_SUBSYSTEM_NQN,
DBUS_ATTR_UNALLOCATED_CAPACITY,
DBUS_IFACE_NVME_CONTROLLER,
DBUS_NAME_UDISKS2,
)
from ..interface import DBusInterfaceProxy, dbus_property
from ..utils import dbus_connected
from .const import UDISKS2_DEFAULT_OPTIONS
@dataclass(frozen=True, slots=True)
class SmartStatus:
"""Smart status information for NVMe devices.
https://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.NVMe.Controller.html#gdbus-method-org-freedesktop-UDisks2-NVMe-Controller.SmartGetAttributes
"""
available_spare: int
spare_threshold: int
percent_used: int
total_data_read: int
total_data_written: int
controller_busy_minutes: int
power_cycles: int
unsafe_shutdowns: int
media_errors: int
number_error_log_entries: int
temperature_sensors: list[int]
warning_composite_temperature: int
critical_composite_temperature: int
warning_temperature_minutes: int
critical_temperature_minutes: int
@classmethod
def from_smart_get_attributes_resp(cls, resp: dict[str, Any]):
"""Convert SmartGetAttributes response dictionary to instance."""
return cls(
available_spare=resp["avail_spare"],
spare_threshold=resp["spare_thresh"],
percent_used=resp["percent_used"],
total_data_read=resp["total_data_read"],
total_data_written=resp["total_data_written"],
controller_busy_minutes=resp["ctrl_busy_time"],
power_cycles=resp["power_cycles"],
unsafe_shutdowns=resp["unsafe_shutdowns"],
media_errors=resp["media_errors"],
number_error_log_entries=resp["num_err_log_entries"],
temperature_sensors=resp["temp_sensors"],
warning_composite_temperature=resp["wctemp"],
critical_composite_temperature=resp["cctemp"],
warning_temperature_minutes=resp["warning_temp_time"],
critical_temperature_minutes=resp["critical_temp_time"],
)
class UDisks2NVMeController(DBusInterfaceProxy):
"""Handle D-Bus interface for NVMe Controller object.
https://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.NVMe.Controller.html
"""
name: str = DBUS_IFACE_NVME_CONTROLLER
bus_name: str = DBUS_NAME_UDISKS2
properties_interface: str = DBUS_IFACE_NVME_CONTROLLER
def __init__(self, object_path: str) -> None:
"""Initialize object."""
self._object_path = object_path
super().__init__()
@staticmethod
async def new(object_path: str, bus: MessageBus) -> "UDisks2NVMeController":
"""Create and connect object."""
obj = UDisks2NVMeController(object_path)
await obj.connect(bus)
return obj
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property
@dbus_property
def state(self) -> str:
"""Return NVMe controller state."""
return self.properties[DBUS_ATTR_STATE]
@property
@dbus_property
def controller_id(self) -> int:
"""Return controller ID."""
return self.properties[DBUS_ATTR_CONTROLLER_ID]
@property
@dbus_property
def subsystem_nqn(self) -> str:
"""Return NVM Subsystem NVMe Qualified Name."""
return cast(bytes, self.properties[DBUS_ATTR_SUBSYSTEM_NQN]).decode("utf-8")
@property
@dbus_property
def fguid(self) -> str:
"""Return FRU GUID."""
return self.properties[DBUS_ATTR_FGUID]
@property
@dbus_property
def nvme_revision(self) -> str:
"""Return NVMe version information."""
return self.properties[DBUS_ATTR_NVME_REVISION]
@property
@dbus_property
def unallocated_capacity(self) -> int:
"""Return unallocated capacity."""
return self.properties[DBUS_ATTR_UNALLOCATED_CAPACITY]
@property
@dbus_property
def smart_updated(self) -> datetime | None:
"""Return last time smart information was updated (or None if it hasn't been).
If this is None other smart properties are not meaningful.
"""
if not (ts := self.properties[DBUS_ATTR_SMART_UPDATED]):
return None
return datetime.fromtimestamp(ts, UTC)
@property
@dbus_property
def smart_critical_warning(self) -> list[str]:
"""Return critical warnings issued for current state of controller."""
return self.properties[DBUS_ATTR_SMART_CRITICAL_WARNING]
@property
@dbus_property
def smart_power_on_hours(self) -> int:
"""Return hours the disk has been powered on."""
return self.properties[DBUS_ATTR_SMART_POWER_ON_HOURS]
@property
@dbus_property
def smart_temperature(self) -> int:
"""Return current composite temperature of controller in Kelvin."""
return self.properties[DBUS_ATTR_SMART_TEMPERATURE]
@property
@dbus_property
def smart_selftest_status(self) -> str:
"""Return status of last sel-test."""
return self.properties[DBUS_ATTR_SMART_SELFTEST_STATUS]
@property
@dbus_property
def smart_selftest_percent_remaining(self) -> int:
"""Return percent remaining of self-test."""
return self.properties[DBUS_ATTR_SMART_SELFTEST_PERCENT_REMAINING]
@property
@dbus_property
def sanitize_status(self) -> str:
"""Return status of last sanitize operation."""
return self.properties[DBUS_ATTR_SANITIZE_STATUS]
@property
@dbus_property
def sanitize_percent_remaining(self) -> int:
"""Return percent remaining of sanitize operation."""
return self.properties[DBUS_ATTR_SANITIZE_PERCENT_REMAINING]
@dbus_connected
async def smart_get_attributes(self) -> SmartStatus:
"""Return smart/health information of controller."""
return SmartStatus.from_smart_get_attributes_resp(
await self.connected_dbus.NVMe.Controller.call(
"smart_get_attributes", UDISKS2_DEFAULT_OPTIONS
)
)

View File

@ -5,7 +5,7 @@ from pathlib import Path
import shutil import shutil
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HardwareNotFound from ..exceptions import DBusError, DBusObjectError, HardwareNotFound
from .const import UdevSubsystem from .const import UdevSubsystem
from .data import Device from .data import Device
@ -14,6 +14,7 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
_MOUNTINFO: Path = Path("/proc/self/mountinfo") _MOUNTINFO: Path = Path("/proc/self/mountinfo")
_BLOCK_DEVICE_CLASS = "/sys/class/block/{}" _BLOCK_DEVICE_CLASS = "/sys/class/block/{}"
_BLOCK_DEVICE_EMMC_LIFE_TIME = "/sys/block/{}/device/life_time" _BLOCK_DEVICE_EMMC_LIFE_TIME = "/sys/block/{}/device/life_time"
_DEVICE_PATH = "/dev/{}"
class HwDisk(CoreSysAttributes): class HwDisk(CoreSysAttributes):
@ -92,8 +93,67 @@ class HwDisk(CoreSysAttributes):
optionsep += 1 optionsep += 1
return mountinfoarr[optionsep + 2] return mountinfoarr[optionsep + 2]
def _get_mount_source_device_name(self, path: str | Path) -> str | None:
"""Get mount source device name.
Must be run in executor.
"""
mount_source = self._get_mount_source(str(path))
if not mount_source or mount_source == "overlay":
return None
mount_source_path = Path(mount_source)
if not mount_source_path.is_block_device():
return None
# This looks a bit funky but it is more or less what lsblk is doing to get
# the parent dev reliably
# Get class device...
mount_source_device_part = Path(
_BLOCK_DEVICE_CLASS.format(mount_source_path.name)
)
# ... resolve symlink and get parent device from that path.
return mount_source_device_part.resolve().parts[-2]
async def _try_get_nvme_lifetime(self, device_name: str) -> float | None:
"""Get NVMe device lifetime."""
device_path = Path(_DEVICE_PATH.format(device_name))
try:
block_device = self.sys_dbus.udisks2.get_block_device_by_path(device_path)
drive = self.sys_dbus.udisks2.get_drive(block_device.drive)
except DBusObjectError:
_LOGGER.warning(
"Unable to find UDisks2 drive for device at %s", device_path.as_posix()
)
return None
# Exit if this isn't an NVMe device
if not drive.nvme_controller:
return None
try:
smart_log = await drive.nvme_controller.smart_get_attributes()
except DBusError as err:
_LOGGER.warning(
"Unable to get smart log for drive %s due to %s", drive.id, err
)
return None
# UDisks2 documentation specifies that value can exceed 100
if smart_log.percent_used >= 100:
_LOGGER.warning(
"NVMe controller reports that its estimated life-time has been exceeded!"
)
return 100.0
return smart_log.percent_used
def _try_get_emmc_life_time(self, device_name: str) -> float | None: def _try_get_emmc_life_time(self, device_name: str) -> float | None:
# Get eMMC life_time """Get eMMC life_time.
Must be run in executor.
"""
life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name)) life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name))
if not life_time_path.exists(): if not life_time_path.exists():
@ -121,29 +181,20 @@ class HwDisk(CoreSysAttributes):
# Return the pessimistic estimate (0x02 -> 10%-20%, return 20%) # Return the pessimistic estimate (0x02 -> 10%-20%, return 20%)
return life_time_value * 10.0 return life_time_value * 10.0
def get_disk_life_time(self, path: str | Path) -> float | None: async def get_disk_life_time(self, path: str | Path) -> float | None:
"""Return life time estimate of the underlying SSD drive. """Return life time estimate of the underlying SSD drive."""
mount_source_device_name = await self.sys_run_in_executor(
Must be run in executor. self._get_mount_source_device_name, path
"""
mount_source = self._get_mount_source(str(path))
if not mount_source or mount_source == "overlay":
return None
mount_source_path = Path(mount_source)
if not mount_source_path.is_block_device():
return None
# This looks a bit funky but it is more or less what lsblk is doing to get
# the parent dev reliably
# Get class device...
mount_source_device_part = Path(
_BLOCK_DEVICE_CLASS.format(mount_source_path.name)
) )
if mount_source_device_name is None:
return None
# ... resolve symlink and get parent device from that path. # First check if its an NVMe device and get lifetime information that way
mount_source_device_name = mount_source_device_part.resolve().parts[-2] nvme_lifetime = await self._try_get_nvme_lifetime(mount_source_device_name)
if nvme_lifetime is not None:
return nvme_lifetime
# Currently only eMMC block devices supported # Else try to get lifetime information for eMMC devices. Other types of devices will return None
return self._try_get_emmc_life_time(mount_source_device_name) return await self.sys_run_in_executor(
self._try_get_emmc_life_time, mount_source_device_name
)

View File

@ -135,9 +135,8 @@ class InfoCenter(CoreSysAttributes):
async def disk_life_time(self) -> float | None: async def disk_life_time(self) -> float | None:
"""Return the estimated life-time usage (in %) of the SSD storing the data directory.""" """Return the estimated life-time usage (in %) of the SSD storing the data directory."""
return await self.sys_run_in_executor( return await self.sys_hardware.disk.get_disk_life_time(
self.sys_hardware.disk.get_disk_life_time, self.coresys.config.path_supervisor
self.coresys.config.path_supervisor,
) )
async def get_dmesg(self) -> bytes: async def get_dmesg(self) -> bytes:

View File

@ -23,7 +23,11 @@ DEFAULT_RANGE_FOLLOW = "entries=:-99:18446744073709551615"
@pytest.fixture(name="coresys_disk_info") @pytest.fixture(name="coresys_disk_info")
async def fixture_coresys_disk_info(coresys: CoreSys) -> AsyncGenerator[CoreSys]: async def fixture_coresys_disk_info(coresys: CoreSys) -> AsyncGenerator[CoreSys]:
"""Mock basic disk information for host APIs.""" """Mock basic disk information for host APIs."""
coresys.hardware.disk.get_disk_life_time = lambda _: 0
async def mock_disk_lifetime(_):
return 0
coresys.hardware.disk.get_disk_life_time = mock_disk_lifetime
coresys.hardware.disk.get_disk_free_space = lambda _: 5000 coresys.hardware.disk.get_disk_free_space = lambda _: 5000
coresys.hardware.disk.get_disk_total_space = lambda _: 50000 coresys.hardware.disk.get_disk_total_space = lambda _: 50000
coresys.hardware.disk.get_disk_used_space = lambda _: 45000 coresys.hardware.disk.get_disk_used_space = lambda _: 45000

View File

@ -0,0 +1,72 @@
"""Test UDisks2 NVMe Controller."""
from datetime import UTC, datetime
from dbus_fast.aio import MessageBus
import pytest
from supervisor.dbus.udisks2.nvme_controller import UDisks2NVMeController
from tests.common import mock_dbus_services
from tests.dbus_service_mocks.udisks2_nvme_controller import (
NVMeController as NVMeControllerService,
)
@pytest.fixture(name="nvme_controller_service")
async def fixture_nvme_controller_service(
dbus_session_bus: MessageBus,
) -> NVMeControllerService:
"""Mock NVMe Controller service."""
yield (
await mock_dbus_services(
{
"udisks2_nvme_controller": "/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC"
},
dbus_session_bus,
)
)["udisks2_nvme_controller"]
async def test_nvme_controller_info(
nvme_controller_service: NVMeControllerService, dbus_session_bus: MessageBus
):
"""Test NVMe Controller info."""
controller = UDisks2NVMeController(
"/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC"
)
assert controller.state is None
assert controller.unallocated_capacity is None
assert controller.smart_updated is None
assert controller.smart_temperature is None
await controller.connect(dbus_session_bus)
assert controller.state == "live"
assert controller.unallocated_capacity == 0
assert controller.smart_updated == datetime.fromtimestamp(1753906112, UTC)
assert controller.smart_temperature == 311
nvme_controller_service.emit_properties_changed({"SmartTemperature": 300})
await nvme_controller_service.ping()
await nvme_controller_service.ping()
assert controller.smart_temperature == 300
@pytest.mark.usefixtures("nvme_controller_service")
async def test_nvme_controller_smart_get_attributes(dbus_session_bus: MessageBus):
"""Test NVMe Controller smart get attributes."""
controller = UDisks2NVMeController(
"/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC"
)
await controller.connect(dbus_session_bus)
smart_log = await controller.smart_get_attributes()
assert smart_log.available_spare == 100
assert smart_log.percent_used == 1
assert smart_log.total_data_read == 22890461184000
assert smart_log.total_data_written == 27723431936000
assert smart_log.controller_busy_minutes == 2682
assert smart_log.temperature_sensors == [310, 305, 0, 0, 0, 0, 0, 0]

View File

@ -410,6 +410,33 @@ FIXTURES: dict[str, BlockFixture] = {
HintSymbolicIconName="", HintSymbolicIconName="",
UserspaceMountOptions=[], UserspaceMountOptions=[],
), ),
"/org/freedesktop/UDisks2/block_devices/nvme0n1": BlockFixture(
Device=b"/dev/nvme0n1",
PreferredDevice=b"/dev/nvme0n1",
Symlinks=[],
DeviceNumber=66304,
Id="by-id-nvme-Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
Size=33554432,
ReadOnly=False,
Drive="/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
MDRaid="/",
MDRaidMember="/",
IdUsage="",
IdType="",
IdVersion="",
IdLabel="",
IdUUID="",
Configuration=[],
CryptoBackingDevice="/",
HintPartitionable=True,
HintSystem=True,
HintIgnore=False,
HintAuto=False,
HintName="",
HintIconName="",
HintSymbolicIconName="",
UserspaceMountOptions=[],
),
} }

View File

@ -177,6 +177,37 @@ FIXTURES: dict[str, DriveFixture] = {
CanPowerOff=True, CanPowerOff=True,
SiblingId="", SiblingId="",
), ),
"/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC": DriveFixture(
Vendor="",
Model="Samsung SSD 970 EVO Plus 2TB",
Revision="2B2QEXM7",
Serial="S40123456789ABC",
WWN="",
Id="Samsung-SSD-970-EVO-Plus-2TB-S40123456789ABC",
Configuration={},
Media="",
MediaCompatibility=[],
MediaRemovable=False,
MediaAvailable=True,
MediaChangeDetected=True,
Size=0,
TimeDetected=0,
TimeMediaDetected=0,
Optical=False,
OpticalBlank=False,
OpticalNumTracks=0,
OpticalNumAudioTracks=0,
OpticalNumDataTracks=0,
OpticalNumSessions=0,
RotationRate=0,
ConnectionBus="usb",
Seat="seat0",
Removable=True,
Ejectable=False,
SortKey="",
CanPowerOff=True,
SiblingId="",
),
} }

View File

@ -20,21 +20,25 @@ class UDisks2Manager(DBusServiceMock):
interface = "org.freedesktop.UDisks2.Manager" interface = "org.freedesktop.UDisks2.Manager"
object_path = "/org/freedesktop/UDisks2/Manager" object_path = "/org/freedesktop/UDisks2/Manager"
block_devices = [
"/org/freedesktop/UDisks2/block_devices/loop0", def __init__(self):
"/org/freedesktop/UDisks2/block_devices/mmcblk1", """Initialize object."""
"/org/freedesktop/UDisks2/block_devices/mmcblk1p1", super().__init__()
"/org/freedesktop/UDisks2/block_devices/mmcblk1p2", self.block_devices = [
"/org/freedesktop/UDisks2/block_devices/mmcblk1p3", "/org/freedesktop/UDisks2/block_devices/loop0",
"/org/freedesktop/UDisks2/block_devices/sda", "/org/freedesktop/UDisks2/block_devices/mmcblk1",
"/org/freedesktop/UDisks2/block_devices/sda1", "/org/freedesktop/UDisks2/block_devices/mmcblk1p1",
"/org/freedesktop/UDisks2/block_devices/sdb", "/org/freedesktop/UDisks2/block_devices/mmcblk1p2",
"/org/freedesktop/UDisks2/block_devices/sdb1", "/org/freedesktop/UDisks2/block_devices/mmcblk1p3",
"/org/freedesktop/UDisks2/block_devices/zram1", "/org/freedesktop/UDisks2/block_devices/sda",
] "/org/freedesktop/UDisks2/block_devices/sda1",
resolved_devices: list[list[str]] | list[str] = [ "/org/freedesktop/UDisks2/block_devices/sdb",
"/org/freedesktop/UDisks2/block_devices/sda1" "/org/freedesktop/UDisks2/block_devices/sdb1",
] "/org/freedesktop/UDisks2/block_devices/zram1",
]
self.resolved_devices: list[list[str]] | list[str] = [
"/org/freedesktop/UDisks2/block_devices/sda1"
]
@dbus_property(access=PropertyAccess.READ) @dbus_property(access=PropertyAccess.READ)
def Version(self) -> "s": def Version(self) -> "s":

View File

@ -0,0 +1,138 @@
"""Mock of UDisks2 Drive service."""
from dbus_fast import Variant
from dbus_fast.service import PropertyAccess, dbus_property
from .base import DBusServiceMock, dbus_method
BUS_NAME = "org.freedesktop.UDisks2"
DEFAULT_OBJECT_PATH = (
"/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC"
)
def setup(object_path: str | None = None) -> DBusServiceMock:
"""Create dbus mock object."""
return NVMeController(object_path if object_path else DEFAULT_OBJECT_PATH)
class NVMeController(DBusServiceMock):
"""NVMe Controller mock.
gdbus introspect --system --dest org.freedesktop.UDisks2 --object-path /org/freedesktop/UDisks2/drives/id
"""
interface = "org.freedesktop.UDisks2.NVMe.Controller"
def __init__(self, object_path: str):
"""Initialize object."""
super().__init__()
self.object_path = object_path
self.smart_get_attributes_response = {
"avail_spare": Variant("y", 0x64),
"spare_thresh": Variant("y", 0x0A),
"percent_used": Variant("y", 0x01),
"total_data_read": Variant("t", 22890461184000),
"total_data_written": Variant("t", 27723431936000),
"ctrl_busy_time": Variant("t", 2682),
"power_cycles": Variant("t", 652),
"unsafe_shutdowns": Variant("t", 107),
"media_errors": Variant("t", 0),
"num_err_log_entries": Variant("t", 1069),
"temp_sensors": Variant("aq", [310, 305, 0, 0, 0, 0, 0, 0]),
"wctemp": Variant("q", 358),
"cctemp": Variant("q", 358),
"warning_temp_time": Variant("i", 0),
"critical_temp_time": Variant("i", 0),
}
@dbus_property(access=PropertyAccess.READ)
def State(self) -> "s":
"""Get State."""
return "live"
@dbus_property(access=PropertyAccess.READ)
def ControllerID(self) -> "q":
"""Get ControllerID."""
return 4
@dbus_property(access=PropertyAccess.READ)
def SubsystemNQN(self) -> "ay":
"""Get SubsystemNQN."""
return b"nqn.2014.08.org.nvmexpress:144d144dS4J4NM0RB05961P Samsung SSD 970 EVO Plus 2TB"
@dbus_property(access=PropertyAccess.READ)
def FGUID(self) -> "s":
"""Get FGUID."""
return ""
@dbus_property(access=PropertyAccess.READ)
def NVMeRevision(self) -> "s":
"""Get NVMeRevision."""
return "1.3"
@dbus_property(access=PropertyAccess.READ)
def UnallocatedCapacity(self) -> "t":
"""Get UnallocatedCapacity."""
return 0
@dbus_property(access=PropertyAccess.READ)
def SmartUpdated(self) -> "t":
"""Get SmartUpdated."""
return 1753906112
@dbus_property(access=PropertyAccess.READ)
def SmartCriticalWarning(self) -> "as":
"""Get SmartCriticalWarning."""
return []
@dbus_property(access=PropertyAccess.READ)
def SmartPowerOnHours(self) -> "t":
"""Get SmartPowerOnHours."""
return 3208
@dbus_property(access=PropertyAccess.READ)
def SmartTemperature(self) -> "q":
"""Get SmartTemperature."""
return 311
@dbus_property(access=PropertyAccess.READ)
def SmartSelftestStatus(self) -> "s":
"""Get SmartSelftestStatus."""
return "success"
@dbus_property(access=PropertyAccess.READ)
def SmartSelftestPercentRemaining(self) -> "i":
"""Get SmartSelftestPercentRemaining."""
return -1
@dbus_property(access=PropertyAccess.READ)
def SanitizeStatus(self) -> "s":
"""Get SanitizeStatus."""
return ""
@dbus_property(access=PropertyAccess.READ)
def SanitizePercentRemaining(self) -> "i":
"""Get SanitizePercentRemaining."""
return -1
@dbus_method()
def SmartUpdate(self, options: "a{sv}") -> None:
"""Do SmartUpdate."""
@dbus_method()
def SmartGetAttributes(self, options: "a{sv}") -> "a{sv}":
"""Do SmartGetAttributes."""
return self.smart_get_attributes_response
@dbus_method()
def SmartSelftestStart(self, type_: "s", options: "a{sv}") -> None:
"""Do SmartSelftestStart."""
@dbus_method()
def SmartSelftestAbort(self, options: "a{sv}") -> None:
"""Do SmartSelftestAbort."""
@dbus_method()
def SanitizeStart(self, action: "s", options: "a{sv}") -> None:
"""Do SanitizeStart."""

View File

@ -4,9 +4,71 @@
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
from dbus_fast.aio import MessageBus
import pytest
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.hardware.data import Device from supervisor.hardware.data import Device
from tests.common import mock_dbus_services
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.udisks2_manager import (
UDisks2Manager as UDisks2ManagerService,
)
from tests.dbus_service_mocks.udisks2_nvme_controller import (
NVMeController as NVMeControllerService,
)
MOCK_MOUNTINFO = """790 750 259:8 /supervisor /data rw,relatime master:118 - ext4 /dev/nvme0n1p8 rw,commit=30
810 750 0:24 /systemd-journal-gatewayd.sock /run/systemd-journal-gatewayd.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
811 750 0:24 /supervisor /run/os rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
813 750 0:24 /udev /run/udev ro,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
814 750 0:24 /machine-id /etc/machine-id ro - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
815 750 0:24 /docker.sock /run/docker.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
816 750 0:24 /dbus /run/dbus ro,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
820 750 0:24 /containerd/containerd.sock /run/containerd/containerd.sock rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
821 750 0:24 /systemd/journal/socket /run/systemd/journal/socket rw,nosuid,nodev - tmpfs tmpfs rw,size=405464k,nr_inodes=819200,mode=755
"""
@pytest.fixture(name="nvme_data_disk")
async def fixture_nvme_data_disk(
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
coresys: CoreSys,
dbus_session_bus: MessageBus,
) -> NVMeControllerService:
"""Mock using an NVMe data disk."""
nvme_service = (
await mock_dbus_services(
{
"udisks2_block": "/org/freedesktop/UDisks2/block_devices/nvme0n1",
"udisks2_drive": "/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
"udisks2_nvme_controller": "/org/freedesktop/UDisks2/drives/Samsung_SSD_970_EVO_Plus_2TB_S40123456789ABC",
},
dbus_session_bus,
)
)["udisks2_nvme_controller"]
udisks2_manager: UDisks2ManagerService = udisks2_services["udisks2_manager"]
udisks2_manager.block_devices.append(
"/org/freedesktop/UDisks2/block_devices/nvme0n1"
)
await coresys.dbus.udisks2.update()
with (
patch(
"supervisor.hardware.disk.Path.read_text",
return_value=MOCK_MOUNTINFO,
),
patch("supervisor.hardware.disk.Path.is_block_device", return_value=True),
patch(
"supervisor.hardware.disk.Path.resolve",
return_value=Path(
"/sys/devices/platform/soc/ffe07000.nvme/nvme_host/nvme0/nvme0:0000/block/nvme0n1/nvme0n1p8"
),
),
):
yield nvme_service
def test_system_partition_disk(coresys: CoreSys): def test_system_partition_disk(coresys: CoreSys):
"""Test if it is a system disk/partition.""" """Test if it is a system disk/partition."""
@ -99,3 +161,19 @@ def test_try_get_emmc_life_time(coresys, tmp_path):
): ):
value = coresys.hardware.disk._try_get_emmc_life_time("mmcblk0") value = coresys.hardware.disk._try_get_emmc_life_time("mmcblk0")
assert value == 20.0 assert value == 20.0
async def test_try_get_nvme_life_time(
coresys: CoreSys, nvme_data_disk: NVMeControllerService
):
"""Test getting lifetime info from an NVMe."""
lifetime = await coresys.hardware.disk.get_disk_life_time(
coresys.config.path_supervisor
)
assert lifetime == 1
nvme_data_disk.smart_get_attributes_response["percent_used"].value = 50
lifetime = await coresys.hardware.disk.get_disk_life_time(
coresys.config.path_supervisor
)
assert lifetime == 50