Use Udisks2 for available data disks (#4202)

* Use Udisks2 for available data disks

* pylint issues
This commit is contained in:
Mike Degatano 2023-03-27 12:09:16 -04:00 committed by GitHub
parent 84e4d70a37
commit a3204f4ebd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 403 additions and 128 deletions

View File

@ -24,6 +24,7 @@ ATTR_DATA_DISK = "data_disk"
ATTR_DEVICE = "device"
ATTR_DEV_PATH = "dev_path"
ATTR_DISK_LED = "disk_led"
ATTR_DISKS = "disks"
ATTR_DRIVES = "drives"
ATTR_DT_SYNCHRONIZED = "dt_synchronized"
ATTR_DT_UTC = "dt_utc"

View File

@ -2,7 +2,6 @@
import asyncio
from collections.abc import Awaitable
import logging
from pathlib import Path
from typing import Any
from aiohttp import web
@ -12,6 +11,10 @@ from ..const import (
ATTR_BOARD,
ATTR_BOOT,
ATTR_DEVICES,
ATTR_ID,
ATTR_NAME,
ATTR_SERIAL,
ATTR_SIZE,
ATTR_UPDATE_AVAILABLE,
ATTR_VERSION,
ATTR_VERSION_LATEST,
@ -22,17 +25,21 @@ from ..resolution.const import ContextType, IssueType, SuggestionType
from ..validate import version_tag
from .const import (
ATTR_DATA_DISK,
ATTR_DEV_PATH,
ATTR_DEVICE,
ATTR_DISK_LED,
ATTR_DISKS,
ATTR_HEARTBEAT_LED,
ATTR_MODEL,
ATTR_POWER_LED,
ATTR_VENDOR,
)
from .utils import api_process, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)
SCHEMA_VERSION = vol.Schema({vol.Optional(ATTR_VERSION): version_tag})
SCHEMA_DISK = vol.Schema({vol.Required(ATTR_DEVICE): vol.All(str, vol.Coerce(Path))})
SCHEMA_DISK = vol.Schema({vol.Required(ATTR_DEVICE): str})
# pylint: disable=no-value-for-parameter
SCHEMA_YELLOW_OPTIONS = vol.Schema(
@ -56,7 +63,7 @@ class APIOS(CoreSysAttributes):
ATTR_UPDATE_AVAILABLE: self.sys_os.need_update,
ATTR_BOARD: self.sys_os.board,
ATTR_BOOT: self.sys_dbus.rauc.boot_slot,
ATTR_DATA_DISK: self.sys_os.datadisk.disk_used,
ATTR_DATA_DISK: self.sys_os.datadisk.disk_used.id,
}
@api_process
@ -83,7 +90,19 @@ class APIOS(CoreSysAttributes):
async def list_data(self, request: web.Request) -> dict[str, Any]:
"""Return possible data targets."""
return {
ATTR_DEVICES: self.sys_os.datadisk.available_disks,
ATTR_DEVICES: [disk.id for disk in self.sys_os.datadisk.available_disks],
ATTR_DISKS: [
{
ATTR_NAME: disk.name,
ATTR_VENDOR: disk.vendor,
ATTR_MODEL: disk.model,
ATTR_SERIAL: disk.serial,
ATTR_SIZE: disk.size,
ATTR_ID: disk.id,
ATTR_DEV_PATH: disk.device_path.as_posix(),
}
for disk in self.sys_os.datadisk.available_disks
],
}
@api_process

View File

@ -1,25 +1,86 @@
"""Home Assistant Operating-System DataDisk."""
from contextlib import suppress
from dataclasses import dataclass
import logging
from pathlib import Path
from awesomeversion import AwesomeVersion
from ..coresys import CoreSys, CoreSysAttributes
from ..dbus.udisks2.block import UDisks2Block
from ..dbus.udisks2.drive import UDisks2Drive
from ..exceptions import (
DBusError,
HardwareNotFound,
DBusObjectError,
HassOSDataDiskError,
HassOSError,
HassOSJobError,
HostError,
)
from ..hardware.const import UdevSubsystem
from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.decorator import Job
_LOGGER: logging.Logger = logging.getLogger(__name__)
@dataclass(slots=True)
class Disk:
"""Representation of disk."""
vendor: str
model: str
serial: str
id: str
size: int
device_path: Path
object_path: str
@staticmethod
def from_udisks2_drive(
drive: UDisks2Drive, drive_block_device: UDisks2Block
) -> "Disk":
"""Convert UDisks2Drive into a Disk object."""
return Disk(
vendor=drive.vendor,
model=drive.model,
serial=drive.serial,
id=drive.id or drive_block_device.device,
size=drive.size,
device_path=drive_block_device.device,
object_path=drive.object_path,
)
@property
def name(self) -> str:
"""Get disk name."""
name = self.vendor
if self.model:
name = f"{name} {self.model}".lstrip()
if self.serial:
name = f"{name} ({self.serial})" if name else self.serial
if name:
return name
return self.id
def _get_primary_block_device(devices: list[UDisks2Block]) -> UDisks2Block | None:
"""Return primary block device out of list or none if it cannot be determined."""
# If there's only one block device return that
if len(devices) == 1:
return devices[0]
# If there's multiple then find the (hopefully) one partition table
partition_tables = [device for device in devices if device.partition_table]
if len(partition_tables) == 1:
return partition_tables[0]
# Can't be determined if count of block devices or partition tables does not equal 1
return None
class DataDisk(CoreSysAttributes):
"""Handle DataDisk feature from OS."""
@ -28,26 +89,62 @@ class DataDisk(CoreSysAttributes):
self.coresys = coresys
@property
def disk_used(self) -> Path | None:
"""Return Path to used Disk for data."""
return self.sys_dbus.agent.datadisk.current_device
def disk_used(self) -> Disk | None:
"""Return current Disk for data."""
if not self.sys_dbus.agent.datadisk.current_device:
return None
block_device = next(
(
block
for block in self.sys_dbus.udisks2.block_devices
if block.device == self.sys_dbus.agent.datadisk.current_device
),
None,
)
if block_device and block_device.drive:
with suppress(DBusObjectError):
drive = self.sys_dbus.udisks2.get_drive(block_device.drive)
return Disk.from_udisks2_drive(drive, block_device)
return Disk(
vendor="",
model="",
serial="",
id=self.sys_dbus.agent.datadisk.current_device,
size=0,
device_path=self.sys_dbus.agent.datadisk.current_device,
object_path="",
)
@property
def available_disks(self) -> list[Path]:
"""Return a list of possible new disk locations."""
device_paths: list[Path] = []
for device in self.sys_hardware.devices:
# Filter devices out which can't be a target
if (
device.subsystem != UdevSubsystem.DISK
or device.attributes.get("DEVTYPE") != "disk"
or device.minor != 0
or self.sys_hardware.disk.is_used_by_system(device)
):
continue
device_paths.append(device.path)
def available_disks(self) -> list[Disk]:
"""Return a list of possible new disk locations.
return device_paths
Available disks are drives where nothing on it has been mounted
and it can be formatted.
"""
available: list[UDisks2Drive] = []
for drive in self.sys_dbus.udisks2.drives:
block_devices = self._get_block_devices_for_drive(drive)
primary = _get_primary_block_device(block_devices)
if primary and not any(
block.filesystem.mount_points
for block in block_devices
if block.filesystem
):
available.append(Disk.from_udisks2_drive(drive, primary))
return available
def _get_block_devices_for_drive(self, drive: UDisks2Drive) -> list[UDisks2Block]:
"""Get block devices for a drive."""
return [
block
for block in self.sys_dbus.udisks2.block_devices
if block.drive == drive.object_path
]
@Job(conditions=[JobCondition.OS_AGENT])
async def load(self) -> None:
@ -61,28 +158,25 @@ class DataDisk(CoreSysAttributes):
limit=JobExecutionLimit.ONCE,
on_condition=HassOSJobError,
)
async def migrate_disk(self, new_disk: Path) -> None:
async def migrate_disk(self, new_disk: str) -> None:
"""Move data partition to a new disk."""
# Validate integrity of the data input
try:
device = self.sys_hardware.get_by_path(new_disk)
except HardwareNotFound:
raise HassOSDataDiskError(
f"'{new_disk!s}' don't exists on the host!", _LOGGER.error
) from None
# Force a dbus update first so all info is up to date
await self.sys_dbus.udisks2.update()
if device.subsystem != UdevSubsystem.DISK or device.minor != 0:
raise HassOSDataDiskError(
f"'{new_disk!s}' is not a harddisk!", _LOGGER.error
try:
target_disk: Disk = next(
disk
for disk in self.available_disks
if disk.id == new_disk or disk.device_path.as_posix() == new_disk
)
if self.sys_hardware.disk.is_used_by_system(device):
except StopIteration:
raise HassOSDataDiskError(
f"'{new_disk}' is a system disk and can't be used!", _LOGGER.error
)
f"'{new_disk!s}' not a valid data disk target!", _LOGGER.error
) from None
# Migrate data on Host
try:
await self.sys_dbus.agent.datadisk.change_device(new_disk)
await self.sys_dbus.agent.datadisk.change_device(target_disk.device_path)
except DBusError as err:
raise HassOSDataDiskError(
f"Can't move data partition to {new_disk!s}: {err!s}", _LOGGER.error

View File

@ -1,13 +1,12 @@
"""Test OS API."""
from pathlib import Path
from unittest.mock import PropertyMock, patch
from aiohttp.test_utils import TestClient
import pytest
from supervisor.coresys import CoreSys
from supervisor.hardware.data import Device
from supervisor.host.control import SystemControl
from supervisor.os.manager import OSManager
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
@ -15,6 +14,7 @@ from supervisor.resolution.data import Issue, Suggestion
from tests.common import mock_dbus_services
from tests.dbus_service_mocks.agent_boards import Boards as BoardsService
from tests.dbus_service_mocks.agent_boards_yellow import Yellow as YellowService
from tests.dbus_service_mocks.agent_datadisk import DataDisk as DataDiskService
from tests.dbus_service_mocks.base import DBusServiceMock
# pylint: disable=protected-access
@ -49,50 +49,67 @@ async def test_api_os_info_with_agent(api_client: TestClient, coresys: CoreSys):
resp = await api_client.get("/os/info")
result = await resp.json()
assert result["data"]["data_disk"] == "/dev/sda"
assert result["data"]["data_disk"] == "BJTD4R-0x97cde291"
async def test_api_os_datadisk_move(api_client: TestClient, coresys: CoreSys):
"""Test datadisk move without exists disk."""
@pytest.mark.parametrize(
"new_disk",
["/dev/sdaaaa", "/dev/mmcblk1", "Generic-Flash-Disk-61BCDDB6"],
ids=["non-existent", "unavailable drive by path", "unavailable drive by id"],
)
async def test_api_os_datadisk_move_fail(
api_client: TestClient, coresys: CoreSys, new_disk: str
):
"""Test datadisk move to non-existent or invalid devices."""
coresys.os._available = True
resp = await api_client.post("/os/datadisk/move", json={"device": "/dev/sdaaaa"})
resp = await api_client.post("/os/datadisk/move", json={"device": new_disk})
result = await resp.json()
assert result["message"] == "'/dev/sdaaaa' don't exists on the host!"
assert result["message"] == f"'{new_disk}' not a valid data disk target!"
async def test_api_os_datadisk_list(api_client: TestClient, coresys: CoreSys):
"""Test datadisk list function."""
coresys.hardware.update_device(
Device(
"sda",
Path("/dev/sda"),
Path("/sys/bus/usb/000"),
"block",
None,
[Path("/dev/serial/by-id/test")],
{"ID_NAME": "xy", "MINOR": "0", "DEVTYPE": "disk"},
[],
)
)
coresys.hardware.update_device(
Device(
"sda1",
Path("/dev/sda1"),
Path("/sys/bus/usb/000/1"),
"block",
None,
[Path("/dev/serial/by-id/test1")],
{"ID_NAME": "xy", "MINOR": "1", "DEVTYPE": "partition"},
[],
)
)
resp = await api_client.get("/os/datadisk/list")
result = await resp.json()
assert result["data"]["devices"] == ["/dev/sda"]
assert result["data"]["devices"] == ["SSK-SSK-Storage-DF56419883D56"]
assert result["data"]["disks"] == [
{
"vendor": "SSK",
"model": "SSK Storage",
"serial": "DF56419883D56",
"id": "SSK-SSK-Storage-DF56419883D56",
"size": 250059350016,
"dev_path": "/dev/sda",
"name": "SSK SSK Storage (DF56419883D56)",
}
]
@pytest.mark.parametrize(
"new_disk",
["SSK-SSK-Storage-DF56419883D56", "/dev/sda"],
ids=["by drive id", "by device path"],
)
async def test_api_os_datadisk_migrate(
api_client: TestClient,
coresys: CoreSys,
os_agent_services: dict[str, DBusServiceMock],
new_disk: str,
):
"""Test migrating datadisk."""
datadisk_service: DataDiskService = os_agent_services["agent_datadisk"]
datadisk_service.ChangeDevice.calls.clear()
coresys.os._available = True
with patch.object(SystemControl, "reboot") as reboot:
resp = await api_client.post("/os/datadisk/move", json={"device": new_disk})
assert resp.status == 200
assert datadisk_service.ChangeDevice.calls == [("/dev/sda",)]
reboot.assert_called_once()
async def test_api_board_yellow_info(api_client: TestClient, coresys: CoreSys):

View File

@ -30,16 +30,16 @@ async def test_dbus_osagent_datadisk(
await os_agent.connect(dbus_session_bus)
assert os_agent.datadisk.current_device.as_posix() == "/dev/sda"
assert os_agent.datadisk.current_device.as_posix() == "/dev/mmcblk1"
datadisk_service.emit_properties_changed({"CurrentDevice": "/dev/sda1"})
datadisk_service.emit_properties_changed({"CurrentDevice": "/dev/mmcblk1p1"})
await datadisk_service.ping()
assert os_agent.datadisk.current_device.as_posix() == "/dev/sda1"
assert os_agent.datadisk.current_device.as_posix() == "/dev/mmcblk1p1"
datadisk_service.emit_properties_changed({}, ["CurrentDevice"])
await datadisk_service.ping()
await datadisk_service.ping()
assert os_agent.datadisk.current_device.as_posix() == "/dev/sda"
assert os_agent.datadisk.current_device.as_posix() == "/dev/mmcblk1"
async def test_dbus_osagent_datadisk_change_device(

View File

@ -27,7 +27,7 @@ class DataDisk(DBusServiceMock):
@dbus_property(access=PropertyAccess.READ)
def CurrentDevice(self) -> "s":
"""Get Current Device."""
return "/dev/sda"
return "/dev/mmcblk1"
@dbus_method()
def ChangeDevice(self, arg_0: "s") -> "b":

View File

@ -359,6 +359,60 @@ FIXTURES: dict[str, BlockFixture] = {
HintSymbolicIconName="",
UserspaceMountOptions=[],
),
"/org/freedesktop/UDisks2/block_devices/multi_part_table1": BlockFixture(
Device=b"/dev/parttable1",
PreferredDevice=b"/dev/parttable1",
Symlinks=[],
DeviceNumber=64769,
Id="",
Size=33554432,
ReadOnly=False,
Drive="/org/freedesktop/UDisks2/drives/Test_Multiple_Partition_Tables_123456789",
MDRaid="/",
MDRaidMember="/",
IdUsage="",
IdType="",
IdVersion="",
IdLabel="",
IdUUID="",
Configuration=[],
CryptoBackingDevice="/",
HintPartitionable=True,
HintSystem=True,
HintIgnore=False,
HintAuto=False,
HintName="",
HintIconName="",
HintSymbolicIconName="",
UserspaceMountOptions=[],
),
"/org/freedesktop/UDisks2/block_devices/multi_part_table2": BlockFixture(
Device=b"/dev/parttable2",
PreferredDevice=b"/dev/parttable2",
Symlinks=[],
DeviceNumber=64769,
Id="",
Size=33554432,
ReadOnly=False,
Drive="/org/freedesktop/UDisks2/drives/Test_Multiple_Partition_Tables_123456789",
MDRaid="/",
MDRaidMember="/",
IdUsage="",
IdType="",
IdVersion="",
IdLabel="",
IdUUID="",
Configuration=[],
CryptoBackingDevice="/",
HintPartitionable=True,
HintSystem=True,
HintIgnore=False,
HintAuto=False,
HintName="",
HintIconName="",
HintSymbolicIconName="",
UserspaceMountOptions=[],
),
}

View File

@ -149,6 +149,37 @@ FIXTURES: dict[str, DriveFixture] = {
CanPowerOff=True,
SiblingId="/sys/devices/platform/soc/ffe09000.usb/ff500000.usb/xhci-hcd.1.auto/usb2/2-1/2-1.4/2-1.4:1.0",
),
"/org/freedesktop/UDisks2/drives/Test_Multiple_Partition_Tables_123456789": DriveFixture(
Vendor="Test",
Model="Multiple Partition Tables",
Revision="",
Serial="123456789",
WWN="",
Id="Test-Multiple-Partition-Tables-123456789",
Configuration={},
Media="",
MediaCompatibility=[],
MediaRemovable=False,
MediaAvailable=True,
MediaChangeDetected=True,
Size=0,
TimeDetected=0,
TimeMediaDetected=0,
Optical=False,
OpticalBlank=False,
OpticalNumTracks=0,
OpticalNumAudioTracks=0,
OpticalNumDataTracks=0,
OpticalNumSessions=0,
RotationRate=0,
ConnectionBus="usb",
Seat="seat0",
Removable=True,
Ejectable=False,
SortKey="",
CanPowerOff=True,
SiblingId="",
),
}

View File

@ -23,6 +23,18 @@ class UDisks2Manager(DBusServiceMock):
interface = "org.freedesktop.UDisks2.Manager"
object_path = "/org/freedesktop/UDisks2/Manager"
block_devices = [
"/org/freedesktop/UDisks2/block_devices/loop0",
"/org/freedesktop/UDisks2/block_devices/mmcblk1",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p1",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p2",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p3",
"/org/freedesktop/UDisks2/block_devices/sda",
"/org/freedesktop/UDisks2/block_devices/sda1",
"/org/freedesktop/UDisks2/block_devices/sdb",
"/org/freedesktop/UDisks2/block_devices/sdb1",
"/org/freedesktop/UDisks2/block_devices/zram1",
]
@dbus_property(access=PropertyAccess.READ)
def Version(self) -> "s":
@ -83,18 +95,7 @@ class UDisks2Manager(DBusServiceMock):
@dbus_method()
def GetBlockDevices(self, options: "a{sv}") -> "ao":
"""Do GetBlockDevices method."""
return [
"/org/freedesktop/UDisks2/block_devices/loop0",
"/org/freedesktop/UDisks2/block_devices/mmcblk1",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p1",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p2",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p3",
"/org/freedesktop/UDisks2/block_devices/sda",
"/org/freedesktop/UDisks2/block_devices/sda1",
"/org/freedesktop/UDisks2/block_devices/sdb",
"/org/freedesktop/UDisks2/block_devices/sdb1",
"/org/freedesktop/UDisks2/block_devices/zram1",
]
return self.block_devices
@dbus_method()
def ResolveDevice(self, devspec: "a{sv}", options: "a{sv}") -> "ao":

View File

@ -41,6 +41,12 @@ FIXTURES: dict[str, PartitionTableFixture] = {
"/org/freedesktop/UDisks2/block_devices/sdb": PartitionTableFixture(
Partitions=["/org/freedesktop/UDisks2/block_devices/sdb1"], Type="gpt"
),
"/org/freedesktop/UDisks2/block_devices/multi_part_table1": PartitionTableFixture(
Partitions=[], Type="gpt"
),
"/org/freedesktop/UDisks2/block_devices/multi_part_table2": PartitionTableFixture(
Partitions=[], Type="gpt"
),
}

View File

@ -1,64 +1,116 @@
"""Test OS API."""
from pathlib import Path, PosixPath
from pathlib import PosixPath
from unittest.mock import patch
import pytest
from supervisor.core import Core
from supervisor.coresys import CoreSys
from supervisor.exceptions import HassOSDataDiskError
from supervisor.hardware.data import Device
from supervisor.os.data_disk import Disk
from tests.common import mock_dbus_services
from tests.dbus_service_mocks.agent_datadisk import DataDisk as DataDiskService
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.logind import Logind as LogindService
# pylint: disable=protected-access
@pytest.mark.asyncio
@pytest.fixture(autouse=True)
async def add_unusable_drive(
coresys: CoreSys,
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
):
"""Add mock drive with multiple partition tables for negative tests."""
await mock_dbus_services(
{
"udisks2_block": [
"/org/freedesktop/UDisks2/block_devices/multi_part_table1",
"/org/freedesktop/UDisks2/block_devices/multi_part_table2",
],
"udisks2_drive": "/org/freedesktop/UDisks2/drives/Test_Multiple_Partition_Tables_123456789",
},
coresys.dbus.bus,
)
udisks2_services["udisks2_manager"].block_devices = udisks2_services[
"udisks2_manager"
].block_devices + [
"/org/freedesktop/UDisks2/block_devices/multi_part_table1",
"/org/freedesktop/UDisks2/block_devices/multi_part_table2",
]
await coresys.dbus.udisks2.update()
async def tests_datadisk_current(coresys: CoreSys):
"""Test current datadisk."""
await coresys.dbus.agent.connect(coresys.dbus.bus)
await coresys.dbus.agent.update()
assert coresys.os.datadisk.disk_used == PosixPath("/dev/sda")
assert coresys.os.datadisk.disk_used == Disk(
vendor="",
model="BJTD4R",
serial="0x97cde291",
id="BJTD4R-0x97cde291",
size=31268536320,
device_path=PosixPath("/dev/mmcblk1"),
object_path="/org/freedesktop/UDisks2/drives/BJTD4R_0x97cde291",
)
@pytest.mark.asyncio
async def test_datadisk_move(coresys: CoreSys):
"""Test datadisk moved without exists device."""
await coresys.dbus.agent.connect(coresys.dbus.bus)
await coresys.dbus.agent.update()
@pytest.mark.parametrize(
"new_disk",
["/dev/sdaaaa", "/dev/mmcblk1", "Generic-Flash-Disk-61BCDDB6"],
ids=["non-existent", "unavailable drive by path", "unavailable drive by id"],
)
async def test_datadisk_move_fail(coresys: CoreSys, new_disk: str):
"""Test datadisk move to non-existent or invalid devices."""
coresys.os._available = True
with pytest.raises(HassOSDataDiskError):
await coresys.os.datadisk.migrate_disk(Path("/dev/sdaaaa"))
await coresys.os.datadisk.migrate_disk(new_disk)
@pytest.mark.asyncio
async def test_datadisk_list(coresys: CoreSys):
"""Test docker info api."""
await coresys.dbus.agent.connect(coresys.dbus.bus)
await coresys.dbus.agent.update()
assert {drive.object_path for drive in coresys.dbus.udisks2.drives} == {
"/org/freedesktop/UDisks2/drives/BJTD4R_0x97cde291",
"/org/freedesktop/UDisks2/drives/Generic_Flash_Disk_61BCDDB6",
"/org/freedesktop/UDisks2/drives/SSK_SSK_Storage_DF56419883D56",
"/org/freedesktop/UDisks2/drives/Test_Multiple_Partition_Tables_123456789",
}
coresys.hardware.update_device(
Device(
"sda",
Path("/dev/sda"),
Path("/sys/bus/usb/000"),
"block",
None,
[Path("/dev/serial/by-id/test")],
{"ID_NAME": "xy", "MINOR": "0", "DEVTYPE": "disk"},
[],
assert coresys.os.datadisk.available_disks == [
Disk(
vendor="SSK",
model="SSK Storage",
serial="DF56419883D56",
id="SSK-SSK-Storage-DF56419883D56",
size=250059350016,
device_path=PosixPath("/dev/sda"),
object_path="/org/freedesktop/UDisks2/drives/SSK_SSK_Storage_DF56419883D56",
)
)
coresys.hardware.update_device(
Device(
"sda1",
Path("/dev/sda1"),
Path("/sys/bus/usb/000/1"),
"block",
None,
[Path("/dev/serial/by-id/test1")],
{"ID_NAME": "xy", "MINOR": "1", "DEVTYPE": "partition"},
[],
)
)
]
assert coresys.os.datadisk.available_disks == [PosixPath("/dev/sda")]
@pytest.mark.parametrize(
"new_disk",
["SSK-SSK-Storage-DF56419883D56", "/dev/sda"],
ids=["by drive id", "by device path"],
)
async def test_datadisk_migrate(
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
new_disk: str,
):
"""Test migrating data disk."""
datadisk_service: DataDiskService = all_dbus_services["agent_datadisk"]
datadisk_service.ChangeDevice.calls.clear()
logind_service: LogindService = all_dbus_services["logind"]
logind_service.Reboot.calls.clear()
coresys.os._available = True
with patch.object(Core, "shutdown") as shutdown:
await coresys.os.datadisk.migrate_disk(new_disk)
shutdown.assert_called_once()
assert datadisk_service.ChangeDevice.calls == [("/dev/sda",)]
assert logind_service.Reboot.calls == [(False,)]