Create issue for multiple data disks detected (#4218)

This commit is contained in:
Mike Degatano 2023-03-31 11:31:39 -04:00 committed by GitHub
parent c9ee76f1d3
commit 842e550dda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 523 additions and 35 deletions

View File

@ -1,4 +1,5 @@
"""Interface to UDisks2 over D-Bus."""
import asyncio
import logging
from typing import Any
@ -63,10 +64,24 @@ class UDisks2(DBusInterfaceProxy):
UDISKS2_DEFAULT_OPTIONS
)
unchanged_blocks = self._block_devices.keys() & set(block_devices)
for removed in self._block_devices.keys() - set(block_devices):
self._block_devices[removed].shutdown()
await self._resolve_block_device_paths(block_devices)
self._block_devices = {
device: self._block_devices[device]
if device in unchanged_blocks
else await UDisks2Block.new(device, self.dbus.bus)
for device in block_devices
}
# For existing block devices, need to check their type and call update
await asyncio.gather(
*[self._block_devices[path].check_type() for path in unchanged_blocks]
)
await asyncio.gather(
*[self._block_devices[path].update() for path in unchanged_blocks]
)
# Cache drives
drives = {
@ -75,6 +90,7 @@ class UDisks2(DBusInterfaceProxy):
if device.drive != DBUS_OBJECT_BASE
}
unchanged_drives = self._drives.keys() & set(drives)
for removed in self._drives.keys() - drives:
self._drives[removed].shutdown()
@ -85,6 +101,11 @@ class UDisks2(DBusInterfaceProxy):
for drive in drives
}
# Update existing drives
await asyncio.gather(
*[self._drives[path].update() for path in unchanged_drives]
)
@property
@dbus_property
def version(self) -> AwesomeVersion:
@ -126,26 +147,15 @@ class UDisks2(DBusInterfaceProxy):
@dbus_connected
async def resolve_device(self, devspec: DeviceSpecification) -> list[UDisks2Block]:
"""Return list of device object paths for specification."""
return await self._resolve_block_device_paths(
await self.dbus.Manager.call_resolve_device(
devspec.to_dict(), UDISKS2_DEFAULT_OPTIONS
)
return await asyncio.gather(
*[
UDisks2Block.new(path, self.dbus.bus, sync_properties=False)
for path in await self.dbus.Manager.call_resolve_device(
devspec.to_dict(), UDISKS2_DEFAULT_OPTIONS
)
]
)
async def _resolve_block_device_paths(
self, block_devices: list[str]
) -> list[UDisks2Block]:
"""Resolve block device object paths to objects. Cache new ones if necessary."""
resolved = {
device: self._block_devices[device]
if device in self._block_devices
and self._block_devices[device].is_connected
else await UDisks2Block.new(device, self.dbus.bus)
for device in block_devices
}
self._block_devices.update(resolved)
return list(resolved.values())
def shutdown(self) -> None:
"""Shutdown the object and disconnect from D-Bus.

View File

@ -63,9 +63,9 @@ class UDisks2Filesystem(DBusInterfaceProxy):
await self.dbus.Filesystem.call_unmount(options | UDISKS2_DEFAULT_OPTIONS)
@dbus_connected
async def set_label(self) -> None:
async def set_label(self, label: str) -> None:
"""Set filesystem label."""
await self.dbus.Filesystem.call_set_label(UDISKS2_DEFAULT_OPTIONS)
await self.dbus.Filesystem.call_set_label(label, UDISKS2_DEFAULT_OPTIONS)
@dbus_connected
async def check(self) -> bool:

5
supervisor/os/const.py Normal file
View File

@ -0,0 +1,5 @@
"""Constants for OS."""
FILESYSTEM_LABEL_DATA_DISK = "hassos-data"
FILESYSTEM_LABEL_OLD_DATA_DISK = "hassos-data-old"
PARTITION_NAME_EXTERNAL_DATA_DISK = "hassos-data-external"

View File

@ -23,9 +23,9 @@ from ..exceptions import (
from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.decorator import Job
from ..utils.sentry import capture_exception
from .const import PARTITION_NAME_EXTERNAL_DATA_DISK
LINUX_DATA_PARTITION_GUID: Final = "0FC63DAF-8483-4772-8E79-3D69D8477DE4"
EXTERNAL_DATA_DISK_PARTITION_NAME: Final = "hassos-data-external"
OS_AGENT_MARK_DATA_MOVE_VERSION: Final = AwesomeVersion("1.5.0")
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -250,7 +250,7 @@ class DataDisk(CoreSysAttributes):
try:
partition = await block_device.partition_table.create_partition(
0, 0, LINUX_DATA_PARTITION_GUID, EXTERNAL_DATA_DISK_PARTITION_NAME
0, 0, LINUX_DATA_PARTITION_GUID, PARTITION_NAME_EXTERNAL_DATA_DISK
)
except DBusError as err:
capture_exception(err)

View File

@ -0,0 +1,61 @@
"""Helpers to check for multiple data disks."""
from pathlib import Path
from ...const import CoreState
from ...coresys import CoreSys
from ...dbus.udisks2.block import UDisks2Block
from ...dbus.udisks2.data import DeviceSpecification
from ...os.const import FILESYSTEM_LABEL_DATA_DISK
from ..const import ContextType, IssueType, SuggestionType
from .base import CheckBase
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckMultipleDataDisks(coresys)
class CheckMultipleDataDisks(CheckBase):
"""CheckMultipleDataDisks class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
for block_device in self.sys_dbus.udisks2.block_devices:
if self._block_device_has_name_issue(block_device):
self.sys_resolution.create_issue(
IssueType.MULTIPLE_DATA_DISKS,
ContextType.SYSTEM,
reference=block_device.device.as_posix(),
suggestions=[SuggestionType.RENAME_DATA_DISK],
)
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
resolved = await self.sys_dbus.udisks2.resolve_device(
DeviceSpecification(path=Path(reference))
)
return resolved and self._block_device_has_name_issue(resolved[0])
def _block_device_has_name_issue(self, block_device: UDisks2Block) -> bool:
"""Return true if filesystem block device incorrectly has data disk name."""
return (
block_device.filesystem
and block_device.id_label == FILESYSTEM_LABEL_DATA_DISK
and block_device.device != self.sys_dbus.agent.datadisk.current_device
)
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.MULTIPLE_DATA_DISKS
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SYSTEM
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -78,6 +78,7 @@ class IssueType(str, Enum):
FREE_SPACE = "free_space"
IPV4_CONNECTION_PROBLEM = "ipv4_connection_problem"
MISSING_IMAGE = "missing_image"
MULTIPLE_DATA_DISKS = "multiple_data_disks"
NO_CURRENT_BACKUP = "no_current_backup"
PWNED = "pwned"
REBOOT_REQUIRED = "reboot_required"
@ -101,3 +102,4 @@ class SuggestionType(str, Enum):
EXECUTE_STOP = "execute_stop"
EXECUTE_UPDATE = "execute_update"
REGISTRY_LOGIN = "registry_login"
RENAME_DATA_DISK = "rename_data_disk"

View File

@ -0,0 +1,69 @@
"""Rename data disk fixup."""
import logging
from pathlib import Path
from ...coresys import CoreSys
from ...dbus.udisks2.data import DeviceSpecification
from ...exceptions import DBusError, ResolutionFixupError
from ...os.const import FILESYSTEM_LABEL_OLD_DATA_DISK
from ..const import ContextType, IssueType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupSystemRenameDataDisk(coresys)
class FixupSystemRenameDataDisk(FixupBase):
"""Storage class for fixup."""
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
resolved = await self.sys_dbus.udisks2.resolve_device(
DeviceSpecification(path=Path(reference))
)
if not resolved:
_LOGGER.info(
"Data disk at %s with name conflict was removed, skipping rename",
reference,
)
return
if not resolved[0].filesystem:
_LOGGER.warning(
"Data disk at %s no longer appears to be a filesystem, skipping rename",
reference,
)
return
_LOGGER.info(
"Renaming %s to %s to prevent data disk name conflict",
reference,
FILESYSTEM_LABEL_OLD_DATA_DISK,
)
try:
await resolved[0].filesystem.set_label(FILESYSTEM_LABEL_OLD_DATA_DISK)
except DBusError as err:
raise ResolutionFixupError(
f"Could not rename filesystem at {reference}: {err!s}", _LOGGER.error
) from err
@property
def suggestion(self) -> SuggestionType:
"""Return a SuggestionType enum."""
return SuggestionType.RENAME_DATA_DISK
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SYSTEM
@property
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.MULTIPLE_DATA_DISKS]

View File

@ -67,10 +67,10 @@ async def test_block_device_info(
assert sda.partition_table.type == PartitionTableType.GPT
assert sda.filesystem is None
assert sda1.id_label == "hassos-data"
assert sda1.id_label == "hassos-data-old"
assert sda1.symlinks == [
Path("/dev/disk/by-id/usb-SSK_SSK_Storage_DF56419883D56-0:0-part1"),
Path("/dev/disk/by-label/hassos-data"),
Path("/dev/disk/by-label/hassos-data-old"),
Path("/dev/disk/by-partlabel/hassos-data-external"),
Path("/dev/disk/by-partuuid/6f3f99f4-4d34-476b-b051-77886da57fa9"),
Path(
@ -93,7 +93,7 @@ async def test_block_device_info(
# Prop changes should not sync for this one
block_sda1_service.emit_properties_changed({"IdLabel": "test"})
await block_sda1_service.ping()
assert sda1.id_label == "hassos-data"
assert sda1.id_label == "hassos-data-old"
async def test_format(block_sda_service: BlockService, dbus_session_bus: MessageBus):
@ -139,7 +139,7 @@ async def test_check_type(dbus_session_bus: MessageBus):
assert sda.partition_table is None
assert sda1.filesystem is None
assert sda.id_label == ""
assert sda1.id_label == "hassos-data"
assert sda1.id_label == "hassos-data-old"
# Store current introspection then make sda into a partition table and sda1 into a filesystem
orig_introspection = await sda.dbus.introspect()

View File

@ -135,3 +135,14 @@ async def test_repair(
assert filesystem_sda1_service.Repair.calls == [
({"auth.no_user_interaction": Variant("b", True)},)
]
async def test_set_label(
sda1: UDisks2Filesystem, filesystem_sda1_service: FilesystemService
):
"""Test set label."""
filesystem_sda1_service.SetLabel.calls.clear()
await sda1.set_label("test")
assert filesystem_sda1_service.SetLabel.calls == [
("test", {"auth.no_user_interaction": Variant("b", True)})
]

View File

@ -8,16 +8,18 @@ from dbus_fast.aio.message_bus import MessageBus
import pytest
from supervisor.dbus.udisks2 import UDisks2
from supervisor.dbus.udisks2.const import PartitionTableType
from supervisor.dbus.udisks2.data import DeviceSpecification
from supervisor.exceptions import DBusNotConnectedError, DBusObjectError
from tests.common import mock_dbus_services
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.udisks2_manager import (
UDisks2Manager as UDisks2ManagerService,
)
@pytest.fixture(name="udisks2_manager_service", autouse=True)
@pytest.fixture(name="udisks2_manager_service")
async def fixture_udisks2_manager_service(
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]]
) -> UDisks2ManagerService:
@ -87,7 +89,128 @@ async def test_udisks2_manager_info(
]
async def test_get_block_device(dbus_session_bus: MessageBus):
async def test_update_checks_devices_and_drives(dbus_session_bus: MessageBus):
"""Test update rechecks block devices and drives correctly."""
mocked = await mock_dbus_services(
{
"udisks2_manager": None,
"udisks2_block": [
"/org/freedesktop/UDisks2/block_devices/sda",
"/org/freedesktop/UDisks2/block_devices/sda1",
"/org/freedesktop/UDisks2/block_devices/sdb",
],
"udisks2_drive": [
"/org/freedesktop/UDisks2/drives/SSK_SSK_Storage_DF56419883D56",
"/org/freedesktop/UDisks2/drives/Generic_Flash_Disk_61BCDDB6",
],
},
dbus_session_bus,
)
udisks2_manager_service: UDisks2ManagerService = mocked["udisks2_manager"]
udisks2_manager_service.block_devices = [
"/org/freedesktop/UDisks2/block_devices/sda",
"/org/freedesktop/UDisks2/block_devices/sda1",
"/org/freedesktop/UDisks2/block_devices/sdb",
]
udisks2 = UDisks2()
await udisks2.connect(dbus_session_bus)
assert len(udisks2.block_devices) == 3
assert (
udisks2.get_block_device(
"/org/freedesktop/UDisks2/block_devices/sda"
).partition_table
is None
)
assert (
udisks2.get_block_device(
"/org/freedesktop/UDisks2/block_devices/sda1"
).filesystem
is None
)
sdb = udisks2.get_block_device("/org/freedesktop/UDisks2/block_devices/sdb")
assert sdb.is_connected is True
with pytest.raises(DBusObjectError):
udisks2.get_block_device("/org/freedesktop/UDisks2/block_devices/mmcblk1")
assert len(udisks2.drives) == 2
assert (
udisks2.get_drive(
"/org/freedesktop/UDisks2/drives/SSK_SSK_Storage_DF56419883D56"
).is_connected
is True
)
flash_disk = udisks2.get_drive(
"/org/freedesktop/UDisks2/drives/Generic_Flash_Disk_61BCDDB6"
)
assert flash_disk.is_connected is True
with pytest.raises(DBusObjectError):
udisks2.get_drive("/org/freedesktop/UDisks2/drives/BJTD4R_0x97cde291")
await mock_dbus_services(
{
"udisks2_block": "/org/freedesktop/UDisks2/block_devices/mmcblk1",
"udisks2_drive": "/org/freedesktop/UDisks2/drives/BJTD4R_0x97cde291",
"udisks2_filesystem": "/org/freedesktop/UDisks2/block_devices/sda1",
"udisks2_partition_table": "/org/freedesktop/UDisks2/block_devices/sda",
},
dbus_session_bus,
)
udisks2_manager_service.block_devices = [
"/org/freedesktop/UDisks2/block_devices/sda",
"/org/freedesktop/UDisks2/block_devices/sda1",
"/org/freedesktop/UDisks2/block_devices/mmcblk1",
]
await udisks2.update()
assert len(udisks2.block_devices) == 3
assert (
udisks2.get_block_device(
"/org/freedesktop/UDisks2/block_devices/sda"
).partition_table.type
== PartitionTableType.GPT
)
assert (
udisks2.get_block_device(
"/org/freedesktop/UDisks2/block_devices/sda1"
).filesystem.mount_points
== []
)
assert (
udisks2.get_block_device(
"/org/freedesktop/UDisks2/block_devices/mmcblk1"
).is_connected
is True
)
with pytest.raises(DBusObjectError):
udisks2.get_block_device("/org/freedesktop/UDisks2/block_devices/sdb")
assert sdb.is_connected is False
assert sdb.is_shutdown is True
assert len(udisks2.drives) == 2
assert (
udisks2.get_drive(
"/org/freedesktop/UDisks2/drives/SSK_SSK_Storage_DF56419883D56"
).is_connected
is True
)
assert (
udisks2.get_drive(
"/org/freedesktop/UDisks2/drives/BJTD4R_0x97cde291"
).is_connected
is True
)
with pytest.raises(DBusObjectError):
udisks2.get_drive("/org/freedesktop/UDisks2/drives/Generic_Flash_Disk_61BCDDB6")
assert flash_disk.is_connected is False
assert flash_disk.is_shutdown is True
async def test_get_block_device(
udisks2_manager_service: UDisks2ManagerService, dbus_session_bus: MessageBus
):
"""Test get block device by object path."""
udisks2 = UDisks2()
@ -99,13 +222,15 @@ async def test_get_block_device(dbus_session_bus: MessageBus):
block_device = udisks2.get_block_device(
"/org/freedesktop/UDisks2/block_devices/sda1"
)
assert block_device.id_label == "hassos-data"
assert block_device.id_label == "hassos-data-old"
with pytest.raises(DBusObjectError):
udisks2.get_block_device("non_existent")
async def test_get_drive(dbus_session_bus: MessageBus):
async def test_get_drive(
udisks2_manager_service: UDisks2ManagerService, dbus_session_bus: MessageBus
):
"""Test get drive by object path."""
udisks2 = UDisks2()
@ -135,7 +260,7 @@ async def test_resolve_device(
devices = await udisks2.resolve_device(DeviceSpecification(path=Path("/dev/sda1")))
assert len(devices) == 1
assert devices[0].id_label == "hassos-data"
assert devices[0].id_label == "hassos-data-old"
assert udisks2_manager_service.ResolveDevice.calls == [
(
{"path": Variant("s", "/dev/sda1")},

View File

@ -242,7 +242,7 @@ FIXTURES: dict[str, BlockFixture] = {
PreferredDevice=b"/dev/sda1\x00",
Symlinks=[
b"/dev/disk/by-id/usb-SSK_SSK_Storage_DF56419883D56-0:0-part1\x00",
b"/dev/disk/by-label/hassos-data\x00",
b"/dev/disk/by-label/hassos-data-old\x00",
b"/dev/disk/by-partlabel/hassos-data-external\x00",
b"/dev/disk/by-partuuid/6f3f99f4-4d34-476b-b051-77886da57fa9\x00",
b"/dev/disk/by-path/platform-xhci-hcd.1.auto-usb-0:1.4:1.0-scsi-0:0:0:0-part1\x00",
@ -258,7 +258,7 @@ FIXTURES: dict[str, BlockFixture] = {
IdUsage="filesystem",
IdType="ext4",
IdVersion="1.0",
IdLabel="hassos-data",
IdLabel="hassos-data-old",
IdUUID="b82b23cb-0c47-4bbb-acf5-2a2afa8894a2",
Configuration=[],
CryptoBackingDevice="/",

View File

@ -35,6 +35,7 @@ class UDisks2Manager(DBusServiceMock):
"/org/freedesktop/UDisks2/block_devices/sdb1",
"/org/freedesktop/UDisks2/block_devices/zram1",
]
resolved_devices = ["/org/freedesktop/UDisks2/block_devices/sda1"]
@dbus_property(access=PropertyAccess.READ)
def Version(self) -> "s":
@ -100,4 +101,4 @@ class UDisks2Manager(DBusServiceMock):
@dbus_method()
def ResolveDevice(self, devspec: "a{sv}", options: "a{sv}") -> "ao":
"""Do ResolveDevice method."""
return ["/org/freedesktop/UDisks2/block_devices/sda1"]
return self.resolved_devices

View File

@ -0,0 +1,96 @@
"""Test check for multiple data disks."""
# pylint: disable=import-error
from dataclasses import replace
from unittest.mock import patch
import pytest
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.checks.multiple_data_disks import CheckMultipleDataDisks
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.udisks2_block import Block as BlockService
@pytest.fixture(name="sda1_block_service")
async def fixture_sda1_block_service(
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]]
) -> BlockService:
"""Return sda1 block service."""
yield udisks2_services["udisks2_block"][
"/org/freedesktop/UDisks2/block_devices/sda1"
]
async def test_base(coresys: CoreSys):
"""Test check basics."""
multiple_data_disks = CheckMultipleDataDisks(coresys)
assert multiple_data_disks.slug == "multiple_data_disks"
assert multiple_data_disks.enabled
async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
"""Test check."""
multiple_data_disks = CheckMultipleDataDisks(coresys)
coresys.core.state = CoreState.RUNNING
await multiple_data_disks.run_check()
assert len(coresys.resolution.issues) == 0
assert len(coresys.resolution.suggestions) == 0
sda1_block_service.emit_properties_changed({"IdLabel": "hassos-data"})
await sda1_block_service.ping()
await multiple_data_disks.run_check()
assert coresys.resolution.issues == [
Issue(IssueType.MULTIPLE_DATA_DISKS, ContextType.SYSTEM, reference="/dev/sda1")
]
assert coresys.resolution.suggestions == [
Suggestion(
SuggestionType.RENAME_DATA_DISK, ContextType.SYSTEM, reference="/dev/sda1"
)
]
async def test_approve(coresys: CoreSys, sda1_block_service: BlockService):
"""Test approve."""
multiple_data_disks = CheckMultipleDataDisks(coresys)
coresys.core.state = CoreState.RUNNING
assert not await multiple_data_disks.approve_check(reference="/dev/sda1")
sda1_block_service.fixture = replace(
sda1_block_service.fixture, IdLabel="hassos-data"
)
assert await multiple_data_disks.approve_check(reference="/dev/sda1")
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
multiple_data_disks = CheckMultipleDataDisks(coresys)
should_run = multiple_data_disks.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.multiple_data_disks.CheckMultipleDataDisks.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await multiple_data_disks()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await multiple_data_disks()
check.assert_not_called()
check.reset_mock()

View File

@ -0,0 +1,108 @@
"""Test system fixup rename data disk."""
# pylint: disable=import-error
from dbus_fast import Variant
import pytest
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
from supervisor.resolution.fixups.system_rename_data_disk import (
FixupSystemRenameDataDisk,
)
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.udisks2_filesystem import Filesystem as FilesystemService
from tests.dbus_service_mocks.udisks2_manager import (
UDisks2Manager as UDisks2ManagerService,
)
@pytest.fixture(name="sda1_filesystem_service")
async def fixture_sda1_filesystem_service(
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]]
) -> FilesystemService:
"""Return sda1 filesystem service."""
return udisks2_services["udisks2_filesystem"][
"/org/freedesktop/UDisks2/block_devices/sda1"
]
@pytest.fixture(name="udisks2_service")
async def fixture_udisks2_service(
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]]
) -> UDisks2ManagerService:
"""Return udisks2 manager service."""
return udisks2_services["udisks2_manager"]
async def test_fixup(coresys: CoreSys, sda1_filesystem_service: FilesystemService):
"""Test fixup."""
sda1_filesystem_service.SetLabel.calls.clear()
system_rename_data_disk = FixupSystemRenameDataDisk(coresys)
assert not system_rename_data_disk.auto
coresys.resolution.suggestions = Suggestion(
SuggestionType.RENAME_DATA_DISK, ContextType.SYSTEM, reference="/dev/sda1"
)
coresys.resolution.issues = Issue(
IssueType.MULTIPLE_DATA_DISKS, ContextType.SYSTEM, reference="/dev/sda1"
)
await system_rename_data_disk()
assert sda1_filesystem_service.SetLabel.calls == [
("hassos-data-old", {"auth.no_user_interaction": Variant("b", True)})
]
assert len(coresys.resolution.suggestions) == 0
assert len(coresys.resolution.issues) == 0
async def test_fixup_device_removed(
coresys: CoreSys,
udisks2_service: UDisks2ManagerService,
caplog: pytest.LogCaptureFixture,
):
"""Test fixup when device removed."""
system_rename_data_disk = FixupSystemRenameDataDisk(coresys)
assert not system_rename_data_disk.auto
coresys.resolution.suggestions = Suggestion(
SuggestionType.RENAME_DATA_DISK, ContextType.SYSTEM, reference="/dev/sda1"
)
coresys.resolution.issues = Issue(
IssueType.MULTIPLE_DATA_DISKS, ContextType.SYSTEM, reference="/dev/sda1"
)
udisks2_service.resolved_devices = []
await system_rename_data_disk()
assert len(coresys.resolution.suggestions) == 0
assert len(coresys.resolution.issues) == 0
assert "Data disk at /dev/sda1 with name conflict was removed" in caplog.text
async def test_fixup_device_not_filesystem(
coresys: CoreSys,
udisks2_service: UDisks2ManagerService,
caplog: pytest.LogCaptureFixture,
):
"""Test fixup when device is no longer a filesystem."""
system_rename_data_disk = FixupSystemRenameDataDisk(coresys)
assert not system_rename_data_disk.auto
coresys.resolution.suggestions = Suggestion(
SuggestionType.RENAME_DATA_DISK, ContextType.SYSTEM, reference="/dev/sda1"
)
coresys.resolution.issues = Issue(
IssueType.MULTIPLE_DATA_DISKS, ContextType.SYSTEM, reference="/dev/sda1"
)
udisks2_service.resolved_devices = ["/org/freedesktop/UDisks2/block_devices/sda"]
await system_rename_data_disk()
assert len(coresys.resolution.suggestions) == 0
assert len(coresys.resolution.issues) == 0
assert "Data disk at /dev/sda1 no longer appears to be a filesystem" in caplog.text