Prevent multiple data disks with name hassos-data-external (#4222)

* Prevent multiple data disks with name hassos-data-external

* Fix pylint issues

* Do rename before format
This commit is contained in:
Mike Degatano 2023-04-01 02:40:19 -04:00 committed by GitHub
parent 842e550dda
commit fce0d2aaed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 487 additions and 15 deletions

View File

@ -30,6 +30,7 @@ DBUS_IFACE_HOSTNAME = "org.freedesktop.hostname1"
DBUS_IFACE_IP4CONFIG = "org.freedesktop.NetworkManager.IP4Config"
DBUS_IFACE_IP6CONFIG = "org.freedesktop.NetworkManager.IP6Config"
DBUS_IFACE_NM = "org.freedesktop.NetworkManager"
DBUS_IFACE_PARTITION = "org.freedesktop.UDisks2.Partition"
DBUS_IFACE_PARTITION_TABLE = "org.freedesktop.UDisks2.PartitionTable"
DBUS_IFACE_RAUC_INSTALLER = "de.pengutronix.rauc.Installer"
DBUS_IFACE_RESOLVED_MANAGER = "org.freedesktop.resolve1.Manager"
@ -130,10 +131,13 @@ DBUS_ATTR_MODE = "Mode"
DBUS_ATTR_MODEL = "Model"
DBUS_ATTR_MOUNT_POINTS = "MountPoints"
DBUS_ATTR_MULTICAST_DNS = "MulticastDNS"
DBUS_ATTR_NAME = "Name"
DBUS_ATTR_NAMESERVER_DATA = "NameserverData"
DBUS_ATTR_NAMESERVERS = "Nameservers"
DBUS_ATTR_NTP = "NTP"
DBUS_ATTR_NTPSYNCHRONIZED = "NTPSynchronized"
DBUS_ATTR_NUMBER = "Number"
DBUS_ATTR_OFFSET = "Offset"
DBUS_ATTR_OPERATING_SYSTEM_PRETTY_NAME = "OperatingSystemPrettyName"
DBUS_ATTR_OPERATION = "Operation"
DBUS_ATTR_PARSER_VERSION = "ParserVersion"
@ -156,12 +160,14 @@ DBUS_ATTR_STATIC_OPERATING_SYSTEM_CPE_NAME = "OperatingSystemCPEName"
DBUS_ATTR_STRENGTH = "Strength"
DBUS_ATTR_SUPPORTED_FILESYSTEMS = "SupportedFilesystems"
DBUS_ATTR_SYMLINKS = "Symlinks"
DBUS_ATTR_TABLE = "Table"
DBUS_ATTR_TIME_DETECTED = "TimeDetected"
DBUS_ATTR_TIMEUSEC = "TimeUSec"
DBUS_ATTR_TIMEZONE = "Timezone"
DBUS_ATTR_TRANSACTION_STATISTICS = "TransactionStatistics"
DBUS_ATTR_TYPE = "Type"
DBUS_ATTR_USERSPACE_TIMESTAMP_MONOTONIC = "UserspaceTimestampMonotonic"
DBUS_ATTR_UUID_UPPERCASE = "UUID"
DBUS_ATTR_UUID = "Uuid"
DBUS_ATTR_VARIANT = "Variant"
DBUS_ATTR_VENDOR = "Vendor"

View File

@ -26,6 +26,7 @@ from ..const import (
DBUS_ATTR_SYMLINKS,
DBUS_IFACE_BLOCK,
DBUS_IFACE_FILESYSTEM,
DBUS_IFACE_PARTITION,
DBUS_IFACE_PARTITION_TABLE,
DBUS_NAME_UDISKS2,
)
@ -34,6 +35,7 @@ from ..utils import dbus_connected
from .const import UDISKS2_DEFAULT_OPTIONS, FormatType
from .data import FormatOptions, udisks2_bytes_to_path
from .filesystem import UDisks2Filesystem
from .partition import UDisks2Partition
from .partition_table import UDisks2PartitionTable
ADDITIONAL_INTERFACES: dict[str, Callable[[str], DBusInterfaceProxy]] = {
@ -52,6 +54,7 @@ class UDisks2Block(DBusInterfaceProxy):
properties_interface: str = DBUS_IFACE_BLOCK
_filesystem: UDisks2Filesystem | None = None
_partition: UDisks2Partition | None = None
_partition_table: UDisks2PartitionTable | None = None
def __init__(self, object_path: str, *, sync_properties: bool = True) -> None:
@ -79,6 +82,11 @@ class UDisks2Block(DBusInterfaceProxy):
"""Filesystem interface if block device is one."""
return self._filesystem
@property
def partition(self) -> UDisks2Partition | None:
"""Partition interface if block device is one."""
return self._partition
@property
def partition_table(self) -> UDisks2PartitionTable | None:
"""Partition table interface if block device is one."""
@ -195,7 +203,7 @@ class UDisks2Block(DBusInterfaceProxy):
await asyncio.gather(
*[
intr.update()
for intr in (self.filesystem, self.partition_table)
for intr in (self.filesystem, self.partition, self.partition_table)
if intr
]
)
@ -225,6 +233,17 @@ class UDisks2Block(DBusInterfaceProxy):
self.filesystem.stop_sync_property_changes()
self._filesystem = None
# Check if block device is a partition
if not self.partition and DBUS_IFACE_PARTITION in self.dbus.proxies:
self._partition = UDisks2Partition(
self.object_path, sync_properties=self.sync_properties
)
await self._partition.initialize(self.dbus)
elif self.partition and DBUS_IFACE_PARTITION not in self.dbus.proxies:
self.partition.stop_sync_property_changes()
self._partition = None
# Check if block device is a partition table
if not self.partition_table and DBUS_IFACE_PARTITION_TABLE in self.dbus.proxies:
self._partition_table = UDisks2PartitionTable(

View File

@ -194,7 +194,7 @@ MountOptionsDataType = TypedDict(
)
@dataclass
@dataclass(slots=True)
class MountOptions(UDisks2StandardOptions):
"""Filesystem mount options.
@ -236,7 +236,7 @@ UnmountOptionsDataType = TypedDict(
)
@dataclass
@dataclass(slots=True)
class UnmountOptions(UDisks2StandardOptions):
"""Filesystem unmount options.
@ -272,7 +272,7 @@ CreatePartitionOptionsDataType = TypedDict(
)
@dataclass
@dataclass(slots=True)
class CreatePartitionOptions(UDisks2StandardOptions):
"""Create partition options.
@ -300,3 +300,39 @@ class CreatePartitionOptions(UDisks2StandardOptions):
),
}
return {k: v for k, v in data.items() if v}
DeletePartitionOptionsDataType = TypedDict(
"DeletePartitionOptionsDataType",
{"tear-down": NotRequired[bool]} | _udisks2_standard_options_annotations,
)
@dataclass(slots=True)
class DeletePartitionOptions(UDisks2StandardOptions):
"""Delete partition options.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Partition.html#gdbus-method-org-freedesktop-UDisks2-Partition.Delete
"""
tear_down: bool | None = None
@staticmethod
def from_dict(data: DeletePartitionOptionsDataType) -> "DeletePartitionOptions":
"""Create DeletePartitionOptions from dict."""
return DeletePartitionOptions(
tear_down=data.get("tear-down"),
# UDisks2 standard options
auth_no_user_interaction=data.get("auth.no_user_interaction"),
)
def to_dict(self) -> dict[str, Variant]:
"""Return dict representation."""
data = {
"tear-down": _optional_variant("b", self.tear_down),
# UDisks2 standard options
"auth.no_user_interaction": _optional_variant(
"b", self.auth_no_user_interaction
),
}
return {k: v for k, v in data.items() if v}

View File

@ -0,0 +1,109 @@
"""Interface to UDisks2 Partition over D-Bus."""
from ..const import (
DBUS_ATTR_NAME,
DBUS_ATTR_NUMBER,
DBUS_ATTR_OFFSET,
DBUS_ATTR_SIZE,
DBUS_ATTR_TABLE,
DBUS_ATTR_TYPE,
DBUS_ATTR_UUID_UPPERCASE,
DBUS_IFACE_PARTITION,
DBUS_NAME_UDISKS2,
)
from ..interface import DBusInterfaceProxy, dbus_property
from ..utils import dbus_connected
from .const import UDISKS2_DEFAULT_OPTIONS
from .data import DeletePartitionOptions
class UDisks2Partition(DBusInterfaceProxy):
"""Handle D-Bus interface for UDisks2 Partition device object.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Partition.html
"""
name = DBUS_IFACE_PARTITION
bus_name = DBUS_NAME_UDISKS2
properties_interface = DBUS_IFACE_PARTITION
def __init__(self, object_path: str, *, sync_properties: bool = True) -> None:
"""Initialize object."""
self.object_path = object_path
self.sync_properties = sync_properties
super().__init__()
@property
@dbus_property
def number(self) -> int:
"""Parition number in partition table."""
return self.properties[DBUS_ATTR_NUMBER]
@property
@dbus_property
def type_(self) -> str:
"""Partition type."""
return self.properties[DBUS_ATTR_TYPE]
@property
@dbus_property
def offset(self) -> int:
"""Offset of partition in bytes."""
return self.properties[DBUS_ATTR_OFFSET]
@property
@dbus_property
def size(self) -> int:
"""Size of partition in bytes."""
return self.properties[DBUS_ATTR_SIZE]
@property
@dbus_property
def name_(self) -> str:
"""Name/label of partition if known."""
return self.properties[DBUS_ATTR_NAME]
@property
@dbus_property
def uuid(self) -> str:
"""UUID of partition if known."""
return self.properties[DBUS_ATTR_UUID_UPPERCASE]
@property
@dbus_property
def table(self) -> str:
"""Object path of the partition table this belongs to.
Provide to UDisks2.get_block_device for UDisks2Block object.
"""
return self.properties[DBUS_ATTR_TABLE]
@dbus_connected
async def set_type(self, type_: str) -> None:
"""Set the type of the partition.
Type should be a GUID from https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs
for GPT type tables or a hexadecimal number for dos type tables. Can also use empty string
and let UDisks2 choose a default based on partition table and OS.
"""
await self.dbus.Partition.call_set_type(type_, UDISKS2_DEFAULT_OPTIONS)
@dbus_connected
async def set_name(self, name: str) -> None:
"""Set the name/label of the partition."""
await self.dbus.Partition.call_set_name(name, UDISKS2_DEFAULT_OPTIONS)
@dbus_connected
async def resize(self, size: int = 0) -> None:
"""Attempt to increase size of partition by specified bytes. 0 means determine and use maximal size.
Position/offset cannot be changed, only size. May be slightly bigger then requested.
Raises error if allocation fails.
"""
await self.dbus.Partition.call_resize(size, UDISKS2_DEFAULT_OPTIONS)
@dbus_connected
async def delete(self, options: DeletePartitionOptions | None = None) -> None:
"""Delete the partition."""
options = options.to_dict() if options else {}
return await self.dbus.Partition.call_delete(options | UDISKS2_DEFAULT_OPTIONS)

View File

@ -55,7 +55,8 @@ class UDisks2PartitionTable(DBusInterfaceProxy):
"""Create a new partition and return object path of new block device.
Type should be a GUID from https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs
or empty string and let UDisks2 choose a default based on partition table and OS.
for GPT type tables or a hexadecimal number for dos type tables. Can also use empty string
and let UDisks2 choose a default based on partition table and OS.
Provide return value with UDisks2Block.new. Or UDisks2.get_block_device after UDisks2.update.
"""
options = options.to_dict() if options else {}

View File

@ -3,3 +3,4 @@
FILESYSTEM_LABEL_DATA_DISK = "hassos-data"
FILESYSTEM_LABEL_OLD_DATA_DISK = "hassos-data-old"
PARTITION_NAME_EXTERNAL_DATA_DISK = "hassos-data-external"
PARTITION_NAME_OLD_EXTERNAL_DATA_DISK = "hassos-data-external-old"

View File

@ -23,7 +23,10 @@ from ..exceptions import (
from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.decorator import Job
from ..utils.sentry import capture_exception
from .const import PARTITION_NAME_EXTERNAL_DATA_DISK
from .const import (
PARTITION_NAME_EXTERNAL_DATA_DISK,
PARTITION_NAME_OLD_EXTERNAL_DATA_DISK,
)
LINUX_DATA_PARTITION_GUID: Final = "0FC63DAF-8483-4772-8E79-3D69D8477DE4"
OS_AGENT_MARK_DATA_MOVE_VERSION: Final = AwesomeVersion("1.5.0")
@ -186,9 +189,26 @@ class DataDisk(CoreSysAttributes):
]
if len(target_disk) != 1:
raise HassOSDataDiskError(
f"'{new_disk!s}' not a valid data disk target!", _LOGGER.error
f"'{new_disk}' not a valid data disk target!", _LOGGER.error
) from None
# If any other partition is named "hassos-data-external" error and ask for its removal
# otherwise it will create a race condition at startup
if self.disk_used and (
conflicts := [
block
for block in self.sys_dbus.udisks2.block_devices
if block.partition
and block.partition.name_ == PARTITION_NAME_EXTERNAL_DATA_DISK
and block.device != self.disk_used.device_path
and block.drive != target_disk[0].object_path
]
):
raise HassOSDataDiskError(
f"Partition(s) {', '.join([conflict.device.as_posix() for conflict in conflicts])} have name 'hassos-data-external' which prevents migration. Remove or rename them first.",
_LOGGER.error,
)
# Older OS did not have mark data move API. Must let OS do disk format & migration
if self.sys_dbus.agent.version < OS_AGENT_MARK_DATA_MOVE_VERSION:
try:
@ -201,11 +221,37 @@ class DataDisk(CoreSysAttributes):
) from err
else:
# Format disk then tell OS to migrate next reboot
current_block = (
self.sys_dbus.udisks2.get_block_device(
self.disk_used.device_object_path
)
if self.disk_used
else None
)
# If migrating from one external data disk to another, rename the old one to prevent conflicts
# Do this first because otherwise a subsequent failure could create a race condition on reboot
if (
current_block
and current_block.partition
and current_block.partition.name_ == PARTITION_NAME_EXTERNAL_DATA_DISK
):
try:
await current_block.partition.set_name(
PARTITION_NAME_OLD_EXTERNAL_DATA_DISK
)
except DBusError as err:
raise HassOSDataDiskError(
f"Could not rename existing external data disk to prevent name conflict: {err!s}",
_LOGGER.error,
) from err
partition = await self._format_device_with_single_partition(target_disk[0])
if self.disk_used and partition.size < self.disk_used.size:
if current_block and current_block.size > partition.size:
raise HassOSDataDiskError(
f"Cannot use {new_disk} as data disk as it is smaller then the current one (new: {partition.size}, current: {self.disk_used.size})"
f"Cannot use {new_disk} as data disk as it is smaller then the current one (new: {partition.size}, current: {current_block.size})",
_LOGGER.error,
)
try:
@ -255,7 +301,7 @@ class DataDisk(CoreSysAttributes):
except DBusError as err:
capture_exception(err)
raise HassOSDataDiskError(
f"Could not create new data partition: {err!s}"
f"Could not create new data partition: {err!s}", _LOGGER.error
) from err
try:
@ -264,7 +310,8 @@ class DataDisk(CoreSysAttributes):
)
except DBusError as err:
raise HassOSDataDiskError(
f"New data partition at {partition} is missing or unusable"
f"New data partition at {partition} is missing or unusable",
_LOGGER.error,
) from err
_LOGGER.debug(

View File

@ -225,7 +225,7 @@ async def fixture_udisks2_services(
"/org/freedesktop/UDisks2/block_devices/sda",
"/org/freedesktop/UDisks2/block_devices/sdb",
],
"udisks2_parition": [
"udisks2_partition": [
"/org/freedesktop/UDisks2/block_devices/mmcblk1p1",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p2",
"/org/freedesktop/UDisks2/block_devices/mmcblk1p3",

View File

@ -11,6 +11,7 @@ from supervisor.dbus.udisks2.block import UDisks2Block
from supervisor.dbus.udisks2.const import FormatType, PartitionTableType
from supervisor.dbus.udisks2.data import FormatOptions
from supervisor.dbus.udisks2.filesystem import UDisks2Filesystem
from supervisor.dbus.udisks2.partition import UDisks2Partition
from supervisor.dbus.udisks2.partition_table import UDisks2PartitionTable
from supervisor.utils.dbus import DBus
@ -57,6 +58,7 @@ async def test_block_device_info(
assert sda1.id_label is None
assert sda1.symlinks is None
assert sda1.filesystem is None
assert sda1.partition is None
await sda.connect(dbus_session_bus)
await sda1.connect(dbus_session_bus)
@ -66,6 +68,7 @@ async def test_block_device_info(
assert sda.id_label == ""
assert sda.partition_table.type == PartitionTableType.GPT
assert sda.filesystem is None
assert sda.partition is None
assert sda1.id_label == "hassos-data-old"
assert sda1.symlinks == [
@ -80,6 +83,7 @@ async def test_block_device_info(
]
assert sda1.partition_table is None
assert sda1.filesystem.mount_points == []
assert sda1.partition.number == 1
block_sda_service.emit_properties_changed({"IdLabel": "test"})
await block_sda_service.ping()
@ -135,9 +139,10 @@ async def test_check_type(dbus_session_bus: MessageBus):
await sda.connect(dbus_session_bus)
await sda1.connect(dbus_session_bus)
# Connected but neither are filesystems are partition tables
# Connected but neither are filesystems, partitions or partition tables
assert sda.partition_table is None
assert sda1.filesystem is None
assert sda1.partition is None
assert sda.id_label == ""
assert sda1.id_label == "hassos-data-old"
@ -147,11 +152,13 @@ async def test_check_type(dbus_session_bus: MessageBus):
{
"udisks2_partition_table": "/org/freedesktop/UDisks2/block_devices/sda",
"udisks2_filesystem": "/org/freedesktop/UDisks2/block_devices/sda1",
"udisks2_partition": "/org/freedesktop/UDisks2/block_devices/sda1",
},
dbus_session_bus,
)
sda_pt_service = services["udisks2_partition_table"]
sda1_fs_service = services["udisks2_filesystem"]
sda1_part_service = services["udisks2_partition"]
await sda.check_type()
await sda1.check_type()
@ -159,11 +166,14 @@ async def test_check_type(dbus_session_bus: MessageBus):
# Check that the type is now correct and property changes are syncing
assert sda.partition_table
assert sda1.filesystem
assert sda1.partition
partition_table: UDisks2PartitionTable = sda.partition_table
filesystem: UDisks2Filesystem = sda1.filesystem
partition: UDisks2Partition = sda1.partition
assert partition_table.type == PartitionTableType.GPT
assert filesystem.size == 250058113024
assert partition.name_ == "hassos-data-external"
sda_pt_service.emit_properties_changed({"Type": "dos"})
await sda_pt_service.ping()
@ -173,6 +183,10 @@ async def test_check_type(dbus_session_bus: MessageBus):
await sda1_fs_service.ping()
assert filesystem.size == 100
sda1_part_service.emit_properties_changed({"Name": "test"})
await sda1_part_service.ping()
assert partition.name_ == "test"
# Force introspection to return the original block device only introspection and re-check type
with patch.object(DBus, "introspect", return_value=orig_introspection):
await sda.check_type()
@ -183,6 +197,7 @@ async def test_check_type(dbus_session_bus: MessageBus):
assert sda1.is_connected is True
assert sda.partition_table is None
assert sda1.filesystem is None
assert sda1.partition is None
# Property changes should still sync for the block devices
sda_block_service.emit_properties_changed({"IdLabel": "test"})
@ -201,3 +216,7 @@ async def test_check_type(dbus_session_bus: MessageBus):
sda1_fs_service.emit_properties_changed({"Size": 250058113024})
await sda1_fs_service.ping()
assert filesystem.size == 100
sda1_part_service.emit_properties_changed({"Name": "hassos-data-external"})
await sda1_part_service.ping()
assert partition.name_ == "test"

View File

@ -0,0 +1,161 @@
"""Test UDisks2 Partition."""
from dbus_fast import Variant
from dbus_fast.aio.message_bus import MessageBus
import pytest
from supervisor.dbus.udisks2.data import DeletePartitionOptions
from supervisor.dbus.udisks2.partition import UDisks2Partition
from supervisor.exceptions import DBusNotConnectedError
from tests.common import mock_dbus_services
from tests.dbus_service_mocks.udisks2_partition import Partition as PartitionService
@pytest.fixture(name="partition_sda1_service")
async def fixture_partition_sda1_service(
dbus_session_bus: MessageBus,
) -> PartitionService:
"""Mock sda1 Partition service."""
yield (
await mock_dbus_services(
{"udisks2_partition": "/org/freedesktop/UDisks2/block_devices/sda1"},
dbus_session_bus,
)
)["udisks2_partition"]
@pytest.fixture(name="partition_sdb1_service")
async def fixture_partition_sdb_service(
dbus_session_bus: MessageBus,
) -> PartitionService:
"""Mock sdb1 Partition service."""
yield (
await mock_dbus_services(
{"udisks2_partition": "/org/freedesktop/UDisks2/block_devices/sdb1"},
dbus_session_bus,
)
)["udisks2_partition"]
async def test_partition_table_info(
partition_sda1_service: PartitionService,
partition_sdb1_service: PartitionService,
dbus_session_bus: MessageBus,
):
"""Test partition table info."""
sda1 = UDisks2Partition("/org/freedesktop/UDisks2/block_devices/sda1")
sdb1 = UDisks2Partition(
"/org/freedesktop/UDisks2/block_devices/sdb1", sync_properties=False
)
assert sda1.name_ is None
assert sda1.size is None
assert sdb1.name_ is None
assert sdb1.size is None
await sda1.connect(dbus_session_bus)
await sdb1.connect(dbus_session_bus)
assert sda1.name_ == "hassos-data-external"
assert sda1.size == 250058113024
assert sdb1.name_ == ""
assert sdb1.size == 67108864
partition_sda1_service.emit_properties_changed({"Name": "test"})
await partition_sda1_service.ping()
assert sda1.name_ == "test"
partition_sda1_service.emit_properties_changed({}, ["Name"])
await partition_sda1_service.ping()
await partition_sda1_service.ping()
assert sda1.name_ == "hassos-data-external"
# Prop changes should not sync for this one
partition_sdb1_service.emit_properties_changed({"Name": "test"})
await partition_sdb1_service.ping()
assert sdb1.name_ == ""
async def test_set_type(
partition_sda1_service: PartitionService, dbus_session_bus: MessageBus
):
"""Test setting partition type."""
partition_sda1_service.SetType.calls.clear()
sda1 = UDisks2Partition("/org/freedesktop/UDisks2/block_devices/sda1")
with pytest.raises(DBusNotConnectedError):
await sda1.set_type("0FC63DAF-8483-4772-8E79-3D69D8477DE4")
await sda1.connect(dbus_session_bus)
await sda1.set_type("0FC63DAF-8483-4772-8E79-3D69D8477DE4")
assert partition_sda1_service.SetType.calls == [
(
"0FC63DAF-8483-4772-8E79-3D69D8477DE4",
{"auth.no_user_interaction": Variant("b", True)},
)
]
async def test_set_name(
partition_sda1_service: PartitionService, dbus_session_bus: MessageBus
):
"""Test setting partition name."""
partition_sda1_service.SetName.calls.clear()
sda1 = UDisks2Partition("/org/freedesktop/UDisks2/block_devices/sda1")
with pytest.raises(DBusNotConnectedError):
await sda1.set_name("test")
await sda1.connect(dbus_session_bus)
await sda1.set_name("test")
assert partition_sda1_service.SetName.calls == [
("test", {"auth.no_user_interaction": Variant("b", True)})
]
async def test_resize(
partition_sda1_service: PartitionService, dbus_session_bus: MessageBus
):
"""Test resizing partition."""
partition_sda1_service.Resize.calls.clear()
sda1 = UDisks2Partition("/org/freedesktop/UDisks2/block_devices/sda1")
with pytest.raises(DBusNotConnectedError):
await sda1.resize()
await sda1.connect(dbus_session_bus)
await sda1.resize()
assert partition_sda1_service.Resize.calls == [
(0, {"auth.no_user_interaction": Variant("b", True)})
]
async def test_delete(
partition_sda1_service: PartitionService, dbus_session_bus: MessageBus
):
"""Test deleting partition."""
partition_sda1_service.Delete.calls.clear()
sda1 = UDisks2Partition("/org/freedesktop/UDisks2/block_devices/sda1")
with pytest.raises(DBusNotConnectedError):
await sda1.delete(DeletePartitionOptions(tear_down=True))
await sda1.connect(dbus_session_bus)
await sda1.delete(DeletePartitionOptions(tear_down=True))
assert partition_sda1_service.Delete.calls == [
(
{
"tear-down": Variant("b", True),
"auth.no_user_interaction": Variant("b", True),
},
)
]

View File

@ -1,4 +1,5 @@
"""Test OS API."""
from dataclasses import replace
from pathlib import PosixPath
from unittest.mock import patch
@ -15,6 +16,8 @@ from tests.dbus_service_mocks.agent_datadisk import DataDisk as DataDiskService
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.logind import Logind as LogindService
from tests.dbus_service_mocks.udisks2_block import Block as BlockService
from tests.dbus_service_mocks.udisks2_filesystem import Filesystem as FilesystemService
from tests.dbus_service_mocks.udisks2_partition import Partition as PartitionService
from tests.dbus_service_mocks.udisks2_partition_table import (
PartitionTable as PartitionTableService,
)
@ -71,7 +74,9 @@ async def test_datadisk_move_fail(coresys: CoreSys, new_disk: str):
"""Test datadisk move to non-existent or invalid devices."""
coresys.os._available = True
with pytest.raises(HassOSDataDiskError):
with pytest.raises(
HassOSDataDiskError, match=f"'{new_disk}' not a valid data disk target!"
):
await coresys.os.datadisk.migrate_disk(new_disk)
@ -195,9 +200,77 @@ async def test_datadisk_migrate_too_small(
await all_dbus_services["os_agent"].ping()
coresys.os._available = True
with pytest.raises(HassOSDataDiskError):
with pytest.raises(
HassOSDataDiskError,
match=r"Cannot use SSK-SSK-Storage-DF56419883D56 as data disk as it is smaller then the current one",
):
await coresys.os.datadisk.migrate_disk("SSK-SSK-Storage-DF56419883D56")
assert partition_table_service.CreatePartition.calls
assert datadisk_service.MarkDataMove.calls == []
assert logind_service.Reboot.calls == []
async def test_datadisk_migrate_multiple_external_data_disks(
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
):
"""Test migration stops when another hassos-data-external partition detected."""
datadisk_service: DataDiskService = all_dbus_services["agent_datadisk"]
datadisk_service.ChangeDevice.calls.clear()
datadisk_service.MarkDataMove.calls.clear()
sdb1_filesystem_service: FilesystemService = all_dbus_services[
"udisks2_filesystem"
]["/org/freedesktop/UDisks2/block_devices/sdb1"]
sdb1_filesystem_service.fixture = replace(
sdb1_filesystem_service.fixture, MountPoints=[]
)
coresys.os._available = True
with pytest.raises(
HassOSDataDiskError,
match=r"Partition\(s\) /dev/sda1 have name 'hassos-data-external' which prevents migration",
):
await coresys.os.datadisk.migrate_disk("Generic-Flash-Disk-61BCDDB6")
assert datadisk_service.ChangeDevice.calls == []
assert datadisk_service.MarkDataMove.calls == []
async def test_datadisk_migrate_between_external_renames(
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
):
"""Test migration from one external data disk to another renames the original."""
sdb1_partition_service: PartitionService = all_dbus_services["udisks2_partition"][
"/org/freedesktop/UDisks2/block_devices/sdb1"
]
sdb1_partition_service.SetName.calls.clear()
sdb1_filesystem_service: FilesystemService = all_dbus_services[
"udisks2_filesystem"
]["/org/freedesktop/UDisks2/block_devices/sdb1"]
sdb1_filesystem_service.fixture = replace(
sdb1_filesystem_service.fixture, MountPoints=[]
)
sdb1_block_service: BlockService = all_dbus_services["udisks2_block"][
"/org/freedesktop/UDisks2/block_devices/sdb1"
]
sdb1_block_service.fixture = replace(sdb1_block_service.fixture, Size=250058113024)
datadisk_service: DataDiskService = all_dbus_services["agent_datadisk"]
datadisk_service.MarkDataMove.calls.clear()
datadisk_service.emit_properties_changed({"CurrentDevice": "/dev/sda1"})
await datadisk_service.ping()
all_dbus_services["os_agent"].emit_properties_changed({"Version": "1.5.0"})
await all_dbus_services["os_agent"].ping()
coresys.os._available = True
await coresys.os.datadisk.migrate_disk("Generic-Flash-Disk-61BCDDB6")
assert datadisk_service.MarkDataMove.calls == [tuple()]
assert sdb1_partition_service.SetName.calls == [
("hassos-data-external-old", {"auth.no_user_interaction": Variant("b", True)})
]