mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-18 22:56:31 +00:00
Allow client to change boot slot via API (#4945)
* Allow client to change boot slot via API * Wrap call to rauc in job that checks for OS * Reboot after changing the active boot slot * Add test cases and clean up * BootName to BootSlot * Fix test * Rename boot_name to boot_slot * Fix tests after field change
This commit is contained in:
parent
c4143dacee
commit
2148de45a0
@ -183,6 +183,7 @@ class RestAPI(CoreSysAttributes):
|
||||
web.post("/os/datadisk/move", api_os.migrate_data),
|
||||
web.get("/os/datadisk/list", api_os.list_data),
|
||||
web.post("/os/datadisk/wipe", api_os.wipe_data),
|
||||
web.post("/os/boot-slot", api_os.set_boot_slot),
|
||||
]
|
||||
)
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
"""Const for API."""
|
||||
|
||||
from enum import StrEnum
|
||||
|
||||
CONTENT_TYPE_BINARY = "application/octet-stream"
|
||||
CONTENT_TYPE_JSON = "application/json"
|
||||
CONTENT_TYPE_PNG = "image/png"
|
||||
@ -14,6 +16,8 @@ ATTR_APPARMOR_VERSION = "apparmor_version"
|
||||
ATTR_ATTRIBUTES = "attributes"
|
||||
ATTR_AVAILABLE_UPDATES = "available_updates"
|
||||
ATTR_BACKGROUND = "background"
|
||||
ATTR_BOOT_SLOT = "boot_slot"
|
||||
ATTR_BOOT_SLOTS = "boot_slots"
|
||||
ATTR_BOOT_TIMESTAMP = "boot_timestamp"
|
||||
ATTR_BOOTS = "boots"
|
||||
ATTR_BROADCAST_LLMNR = "broadcast_llmnr"
|
||||
@ -51,6 +55,7 @@ ATTR_REVISION = "revision"
|
||||
ATTR_SEAT = "seat"
|
||||
ATTR_SIGNED = "signed"
|
||||
ATTR_STARTUP_TIME = "startup_time"
|
||||
ATTR_STATUS = "status"
|
||||
ATTR_SUBSYSTEM = "subsystem"
|
||||
ATTR_SYSFS = "sysfs"
|
||||
ATTR_SYSTEM_HEALTH_LED = "system_health_led"
|
||||
@ -60,3 +65,10 @@ ATTR_USAGE = "usage"
|
||||
ATTR_USE_NTP = "use_ntp"
|
||||
ATTR_USERS = "users"
|
||||
ATTR_VENDOR = "vendor"
|
||||
|
||||
|
||||
class BootSlot(StrEnum):
|
||||
"""Boot slots used by HAOS."""
|
||||
|
||||
A = "A"
|
||||
B = "B"
|
||||
|
@ -19,6 +19,7 @@ from ..const import (
|
||||
ATTR_POWER_LED,
|
||||
ATTR_SERIAL,
|
||||
ATTR_SIZE,
|
||||
ATTR_STATE,
|
||||
ATTR_UPDATE_AVAILABLE,
|
||||
ATTR_VERSION,
|
||||
ATTR_VERSION_LATEST,
|
||||
@ -28,13 +29,17 @@ from ..exceptions import BoardInvalidError
|
||||
from ..resolution.const import ContextType, IssueType, SuggestionType
|
||||
from ..validate import version_tag
|
||||
from .const import (
|
||||
ATTR_BOOT_SLOT,
|
||||
ATTR_BOOT_SLOTS,
|
||||
ATTR_DATA_DISK,
|
||||
ATTR_DEV_PATH,
|
||||
ATTR_DEVICE,
|
||||
ATTR_DISKS,
|
||||
ATTR_MODEL,
|
||||
ATTR_STATUS,
|
||||
ATTR_SYSTEM_HEALTH_LED,
|
||||
ATTR_VENDOR,
|
||||
BootSlot,
|
||||
)
|
||||
from .utils import api_process, api_validate
|
||||
|
||||
@ -42,6 +47,7 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
# pylint: disable=no-value-for-parameter
|
||||
SCHEMA_VERSION = vol.Schema({vol.Optional(ATTR_VERSION): version_tag})
|
||||
SCHEMA_SET_BOOT_SLOT = vol.Schema({vol.Required(ATTR_BOOT_SLOT): vol.Coerce(BootSlot)})
|
||||
SCHEMA_DISK = vol.Schema({vol.Required(ATTR_DEVICE): str})
|
||||
|
||||
SCHEMA_YELLOW_OPTIONS = vol.Schema(
|
||||
@ -74,6 +80,15 @@ class APIOS(CoreSysAttributes):
|
||||
ATTR_BOARD: self.sys_os.board,
|
||||
ATTR_BOOT: self.sys_dbus.rauc.boot_slot,
|
||||
ATTR_DATA_DISK: self.sys_os.datadisk.disk_used_id,
|
||||
ATTR_BOOT_SLOTS: {
|
||||
slot.bootname: {
|
||||
ATTR_STATE: slot.state,
|
||||
ATTR_STATUS: slot.boot_status,
|
||||
ATTR_VERSION: slot.bundle_version,
|
||||
}
|
||||
for slot in self.sys_os.slots
|
||||
if slot.bootname
|
||||
},
|
||||
}
|
||||
|
||||
@api_process
|
||||
@ -101,6 +116,12 @@ class APIOS(CoreSysAttributes):
|
||||
"""Trigger data disk wipe on Host."""
|
||||
return asyncio.shield(self.sys_os.datadisk.wipe_disk())
|
||||
|
||||
@api_process
|
||||
async def set_boot_slot(self, request: web.Request) -> None:
|
||||
"""Change the active boot slot and reboot into it."""
|
||||
body = await api_validate(SCHEMA_SET_BOOT_SLOT, request)
|
||||
await asyncio.shield(self.sys_os.set_boot_slot(body[ATTR_BOOT_SLOT]))
|
||||
|
||||
@api_process
|
||||
async def list_data(self, request: web.Request) -> dict[str, Any]:
|
||||
"""Return possible data targets."""
|
||||
|
@ -1,6 +1,8 @@
|
||||
"""D-Bus interface for rauc."""
|
||||
|
||||
from ctypes import c_uint32, c_uint64
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import Any, NotRequired, TypedDict
|
||||
|
||||
from dbus_fast.aio.message_bus import MessageBus
|
||||
|
||||
@ -23,6 +25,28 @@ from .utils import dbus_connected
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
SlotStatusDataType = TypedDict(
|
||||
"SlotStatusDataType",
|
||||
{
|
||||
"bundle.compatible": str,
|
||||
"sha256": str,
|
||||
"state": str,
|
||||
"size": c_uint64,
|
||||
"installed.count": c_uint32,
|
||||
"class": str,
|
||||
"device": str,
|
||||
"type": str,
|
||||
"bundle.version": str,
|
||||
"installed.timestamp": str,
|
||||
"status": str,
|
||||
"activated.count": NotRequired[c_uint32],
|
||||
"activated.timestamp": NotRequired[str],
|
||||
"boot-status": NotRequired[str],
|
||||
"bootname": NotRequired[str],
|
||||
"parent": NotRequired[str],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class Rauc(DBusInterfaceProxy):
|
||||
"""Handle D-Bus interface for rauc."""
|
||||
@ -83,7 +107,7 @@ class Rauc(DBusInterfaceProxy):
|
||||
await self.dbus.Installer.call_install(str(raucb_file))
|
||||
|
||||
@dbus_connected
|
||||
async def get_slot_status(self) -> list[tuple[str, dict[str, Any]]]:
|
||||
async def get_slot_status(self) -> list[tuple[str, SlotStatusDataType]]:
|
||||
"""Get slot status."""
|
||||
return await self.dbus.Installer.call_get_slot_status()
|
||||
|
||||
|
@ -133,6 +133,14 @@ class HassOSDataDiskError(HassOSError):
|
||||
"""Issues with the DataDisk feature from HAOS."""
|
||||
|
||||
|
||||
class HassOSSlotNotFound(HassOSError):
|
||||
"""Could not find boot slot."""
|
||||
|
||||
|
||||
class HassOSSlotUpdateError(HassOSError):
|
||||
"""Error while updating a slot via rauc."""
|
||||
|
||||
|
||||
# All Plugins
|
||||
|
||||
|
||||
|
@ -127,6 +127,7 @@ class HostManager(CoreSysAttributes):
|
||||
async def reload(self):
|
||||
"""Reload host functions."""
|
||||
await self.info.update()
|
||||
await self.sys_os.reload()
|
||||
|
||||
if self.sys_dbus.systemd.is_connected:
|
||||
await self.services.update()
|
||||
|
@ -1,8 +1,10 @@
|
||||
"""OS support on supervisor."""
|
||||
from collections.abc import Awaitable
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
import errno
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from pathlib import Path, PurePath
|
||||
|
||||
import aiohttp
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionException
|
||||
@ -10,16 +12,98 @@ from cpe import CPE
|
||||
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..dbus.agent.boards.const import BOARD_NAME_SUPERVISED
|
||||
from ..dbus.rauc import RaucState
|
||||
from ..exceptions import DBusError, HassOSJobError, HassOSUpdateError
|
||||
from ..dbus.rauc import RaucState, SlotStatusDataType
|
||||
from ..exceptions import (
|
||||
DBusError,
|
||||
HassOSJobError,
|
||||
HassOSSlotNotFound,
|
||||
HassOSSlotUpdateError,
|
||||
HassOSUpdateError,
|
||||
)
|
||||
from ..jobs.const import JobCondition, JobExecutionLimit
|
||||
from ..jobs.decorator import Job
|
||||
from ..resolution.const import UnhealthyReason
|
||||
from ..utils.sentry import capture_exception
|
||||
from .data_disk import DataDisk
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class SlotStatus:
|
||||
"""Status of a slot."""
|
||||
|
||||
bundle_compatible: str
|
||||
sha256: str
|
||||
state: str
|
||||
size: int
|
||||
installed_count: int
|
||||
class_: str
|
||||
device: PurePath
|
||||
type_: str
|
||||
bundle_version: AwesomeVersion
|
||||
installed_timestamp: datetime
|
||||
status: str
|
||||
activated_count: int | None = None
|
||||
activated_timestamp: datetime | None = None
|
||||
boot_status: RaucState | None = None
|
||||
bootname: str | None = None
|
||||
parent: str | None = None
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: SlotStatusDataType) -> "SlotStatus":
|
||||
"""Create SlotStatus from dictionary."""
|
||||
return cls(
|
||||
bundle_compatible=data["bundle.compatible"],
|
||||
sha256=data["sha256"],
|
||||
state=data["state"],
|
||||
size=data["size"],
|
||||
installed_count=data["installed.count"],
|
||||
class_=data["class"],
|
||||
device=PurePath(data["device"]),
|
||||
type_=data["type"],
|
||||
bundle_version=AwesomeVersion(data["bundle.version"]),
|
||||
installed_timestamp=datetime.fromisoformat(data["installed.timestamp"]),
|
||||
status=data["status"],
|
||||
activated_count=data.get("activated.count"),
|
||||
activated_timestamp=datetime.fromisoformat(data["activated.timestamp"])
|
||||
if "activated.timestamp" in data
|
||||
else None,
|
||||
boot_status=data.get("boot-status"),
|
||||
bootname=data.get("bootname"),
|
||||
parent=data.get("parent"),
|
||||
)
|
||||
|
||||
def to_dict(self) -> SlotStatusDataType:
|
||||
"""Get dictionary representation."""
|
||||
out: SlotStatusDataType = {
|
||||
"bundle.compatible": self.bundle_compatible,
|
||||
"sha256": self.sha256,
|
||||
"state": self.state,
|
||||
"size": self.size,
|
||||
"installed.count": self.installed_count,
|
||||
"class": self.class_,
|
||||
"device": self.device.as_posix(),
|
||||
"type": self.type_,
|
||||
"bundle.version": str(self.bundle_version),
|
||||
"installed.timestamp": str(self.installed_timestamp),
|
||||
"status": self.status,
|
||||
}
|
||||
|
||||
if self.activated_count is not None:
|
||||
out["activated.count"] = self.activated_count
|
||||
if self.activated_timestamp:
|
||||
out["activated.timestamp"] = str(self.activated_timestamp)
|
||||
if self.boot_status:
|
||||
out["boot-status"] = self.boot_status
|
||||
if self.bootname is not None:
|
||||
out["bootname"] = self.bootname
|
||||
if self.parent is not None:
|
||||
out["parent"] = self.parent
|
||||
|
||||
return out
|
||||
|
||||
|
||||
class OSManager(CoreSysAttributes):
|
||||
"""OS interface inside supervisor."""
|
||||
|
||||
@ -31,6 +115,7 @@ class OSManager(CoreSysAttributes):
|
||||
self._version: AwesomeVersion | None = None
|
||||
self._board: str | None = None
|
||||
self._os_name: str | None = None
|
||||
self._slots: dict[str, SlotStatus] | None = None
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
@ -70,6 +155,20 @@ class OSManager(CoreSysAttributes):
|
||||
"""Return Operating-System datadisk."""
|
||||
return self._datadisk
|
||||
|
||||
@property
|
||||
def slots(self) -> list[SlotStatus]:
|
||||
"""Return status of slots."""
|
||||
if not self._slots:
|
||||
return []
|
||||
return list(self._slots.values())
|
||||
|
||||
def get_slot_name(self, boot_name: str) -> str:
|
||||
"""Get slot name from boot name."""
|
||||
for name, status in self._slots.items():
|
||||
if status.bootname == boot_name:
|
||||
return name
|
||||
raise HassOSSlotNotFound()
|
||||
|
||||
def _get_download_url(self, version: AwesomeVersion) -> str:
|
||||
raw_url = self.sys_updater.ota_url
|
||||
if raw_url is None:
|
||||
@ -128,6 +227,14 @@ class OSManager(CoreSysAttributes):
|
||||
f"Can't write OTA file: {err!s}", _LOGGER.error
|
||||
) from err
|
||||
|
||||
@Job(name="os_manager_reload", conditions=[JobCondition.HAOS], internal=True)
|
||||
async def reload(self) -> None:
|
||||
"""Update cache of slot statuses."""
|
||||
self._slots = {
|
||||
slot[0]: SlotStatus.from_dict(slot[1])
|
||||
for slot in await self.sys_dbus.rauc.get_slot_status()
|
||||
}
|
||||
|
||||
async def load(self) -> None:
|
||||
"""Load HassOS data."""
|
||||
try:
|
||||
@ -149,6 +256,7 @@ class OSManager(CoreSysAttributes):
|
||||
self._version = AwesomeVersion(cpe.get_version()[0])
|
||||
self._board = cpe.get_target_hardware()[0]
|
||||
self._os_name = cpe.get_product()[0]
|
||||
await self.reload()
|
||||
|
||||
await self.datadisk.load()
|
||||
|
||||
@ -239,3 +347,27 @@ class OSManager(CoreSysAttributes):
|
||||
_LOGGER.error("Can't mark booted partition as healthy!")
|
||||
else:
|
||||
_LOGGER.info("Rauc: %s - %s", self.sys_dbus.rauc.boot_slot, response[1])
|
||||
await self.reload()
|
||||
|
||||
@Job(
|
||||
name="os_manager_set_boot_slot",
|
||||
conditions=[JobCondition.HAOS],
|
||||
on_condition=HassOSJobError,
|
||||
internal=True,
|
||||
)
|
||||
async def set_boot_slot(self, boot_name: str) -> None:
|
||||
"""Set active boot slot."""
|
||||
try:
|
||||
response = await self.sys_dbus.rauc.mark(
|
||||
RaucState.ACTIVE, self.get_slot_name(boot_name)
|
||||
)
|
||||
except DBusError as err:
|
||||
capture_exception(err)
|
||||
raise HassOSSlotUpdateError(
|
||||
f"Can't mark {boot_name} as active!", _LOGGER.error
|
||||
) from err
|
||||
|
||||
_LOGGER.info("Rauc: %s - %s", self.sys_dbus.rauc.boot_slot, response[1])
|
||||
|
||||
_LOGGER.info("Rebooting into new boot slot now")
|
||||
await self.sys_host.control.reboot()
|
||||
|
@ -1,8 +1,9 @@
|
||||
"""Test OS API."""
|
||||
|
||||
from unittest.mock import PropertyMock, patch
|
||||
from unittest.mock import Mock, PropertyMock, patch
|
||||
|
||||
from aiohttp.test_utils import TestClient
|
||||
from dbus_fast import DBusError, ErrorType
|
||||
import pytest
|
||||
|
||||
from supervisor.coresys import CoreSys
|
||||
@ -19,6 +20,7 @@ from tests.dbus_service_mocks.agent_boards_yellow import Yellow as YellowService
|
||||
from tests.dbus_service_mocks.agent_datadisk import DataDisk as DataDiskService
|
||||
from tests.dbus_service_mocks.agent_system import System as SystemService
|
||||
from tests.dbus_service_mocks.base import DBusServiceMock
|
||||
from tests.dbus_service_mocks.rauc import Rauc as RaucService
|
||||
|
||||
|
||||
@pytest.fixture(name="boards_service")
|
||||
@ -30,7 +32,7 @@ async def fixture_boards_service(
|
||||
|
||||
|
||||
async def test_api_os_info(api_client: TestClient):
|
||||
"""Test docker info api."""
|
||||
"""Test os info api."""
|
||||
resp = await api_client.get("/os/info")
|
||||
result = await resp.json()
|
||||
|
||||
@ -41,18 +43,37 @@ async def test_api_os_info(api_client: TestClient):
|
||||
"board",
|
||||
"boot",
|
||||
"data_disk",
|
||||
"boot_slots",
|
||||
):
|
||||
assert attr in result["data"]
|
||||
|
||||
|
||||
async def test_api_os_info_with_agent(api_client: TestClient, coresys: CoreSys):
|
||||
"""Test docker info api."""
|
||||
"""Test os info api for data disk."""
|
||||
resp = await api_client.get("/os/info")
|
||||
result = await resp.json()
|
||||
|
||||
assert result["data"]["data_disk"] == "BJTD4R-0x97cde291"
|
||||
|
||||
|
||||
async def test_api_os_info_boot_slots(
|
||||
api_client: TestClient, coresys: CoreSys, os_available
|
||||
):
|
||||
"""Test os info api for boot slots."""
|
||||
await coresys.os.load()
|
||||
resp = await api_client.get("/os/info")
|
||||
result = await resp.json()
|
||||
|
||||
assert result["data"]["boot_slots"] == {
|
||||
"A": {
|
||||
"state": "inactive",
|
||||
"status": "good",
|
||||
"version": "9.0.dev20220818",
|
||||
},
|
||||
"B": {"state": "booted", "status": "good", "version": "9.0.dev20220824"},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"new_disk",
|
||||
["/dev/sdaaaa", "/dev/mmcblk1", "Generic-Flash-Disk-61BCDDB6"],
|
||||
@ -131,6 +152,56 @@ async def test_api_os_datadisk_wipe(
|
||||
reboot.assert_called_once()
|
||||
|
||||
|
||||
async def test_api_set_boot_slot(
|
||||
api_client: TestClient,
|
||||
all_dbus_services: dict[str, DBusServiceMock],
|
||||
coresys: CoreSys,
|
||||
os_available,
|
||||
):
|
||||
"""Test changing the boot slot via API."""
|
||||
rauc_service: RaucService = all_dbus_services["rauc"]
|
||||
await coresys.os.load()
|
||||
|
||||
with patch.object(SystemControl, "reboot") as reboot:
|
||||
resp = await api_client.post("/os/boot-slot", json={"boot_slot": "A"})
|
||||
assert resp.status == 200
|
||||
|
||||
reboot.assert_called_once()
|
||||
assert rauc_service.Mark.calls == [("active", "kernel.0")]
|
||||
|
||||
|
||||
async def test_api_set_boot_slot_invalid(api_client: TestClient):
|
||||
"""Test invalid calls to set boot slot."""
|
||||
resp = await api_client.post("/os/boot-slot", json={"boot_slot": "C"})
|
||||
assert resp.status == 400
|
||||
result = await resp.json()
|
||||
assert "expected BootSlot or one of 'A', 'B'" in result["message"]
|
||||
|
||||
resp = await api_client.post("/os/boot-slot", json={"boot_slot": "A"})
|
||||
assert resp.status == 400
|
||||
result = await resp.json()
|
||||
assert "no Home Assistant OS available" in result["message"]
|
||||
|
||||
|
||||
async def test_api_set_boot_slot_error(
|
||||
api_client: TestClient,
|
||||
all_dbus_services: dict[str, DBusServiceMock],
|
||||
coresys: CoreSys,
|
||||
capture_exception: Mock,
|
||||
os_available,
|
||||
):
|
||||
"""Test changing the boot slot via API."""
|
||||
rauc_service: RaucService = all_dbus_services["rauc"]
|
||||
rauc_service.response_mark = DBusError(ErrorType.FAILED, "fail")
|
||||
await coresys.os.load()
|
||||
|
||||
resp = await api_client.post("/os/boot-slot", json={"boot_slot": "A"})
|
||||
assert resp.status == 400
|
||||
result = await resp.json()
|
||||
assert result["message"] == "Can't mark A as active!"
|
||||
capture_exception.assert_called_once()
|
||||
|
||||
|
||||
async def test_api_board_yellow_info(api_client: TestClient, coresys: CoreSys):
|
||||
"""Test yellow board info."""
|
||||
resp = await api_client.get("/os/boards/yellow")
|
||||
|
@ -97,3 +97,23 @@ async def test_dbus_rauc_connect_error(
|
||||
rauc = Rauc()
|
||||
await rauc.connect(dbus_session_bus)
|
||||
assert "Host has no rauc support" in caplog.text
|
||||
|
||||
|
||||
async def test_test_slot_status(
|
||||
rauc_service: RaucService, dbus_session_bus: MessageBus
|
||||
):
|
||||
"""Test get slot status."""
|
||||
rauc = Rauc()
|
||||
|
||||
await rauc.connect(dbus_session_bus)
|
||||
|
||||
slot_status = await rauc.get_slot_status()
|
||||
out = {}
|
||||
for slot in slot_status:
|
||||
for k in slot[1]:
|
||||
if k in out:
|
||||
out[k] += 1
|
||||
else:
|
||||
out[k] = 1
|
||||
|
||||
assert out
|
||||
|
@ -1,6 +1,6 @@
|
||||
"""Mock of rauc dbus service."""
|
||||
|
||||
from dbus_fast import Variant
|
||||
from dbus_fast import DBusError, Variant
|
||||
from dbus_fast.service import PropertyAccess, dbus_property, signal
|
||||
|
||||
from .base import DBusServiceMock, dbus_method
|
||||
@ -21,6 +21,7 @@ class Rauc(DBusServiceMock):
|
||||
|
||||
object_path = "/"
|
||||
interface = "de.pengutronix.rauc.Installer"
|
||||
response_mark: list[str] | DBusError = ["kernel.1", "marked slot kernel.1 as good"]
|
||||
|
||||
@dbus_property(access=PropertyAccess.READ)
|
||||
def Operation(self) -> "s":
|
||||
@ -70,7 +71,7 @@ class Rauc(DBusServiceMock):
|
||||
@dbus_method()
|
||||
def Mark(self, state: "s", slot_identifier: "s") -> "ss":
|
||||
"""Mark slot."""
|
||||
return ["kernel.1", "marked slot kernel.1 as good"]
|
||||
return self.response_mark
|
||||
|
||||
@dbus_method()
|
||||
def GetPrimary(self) -> "s":
|
||||
|
@ -9,6 +9,7 @@ from supervisor.coresys import CoreSys
|
||||
from supervisor.dbus.const import MulticastProtocolEnabled
|
||||
|
||||
from tests.dbus_service_mocks.base import DBusServiceMock
|
||||
from tests.dbus_service_mocks.rauc import Rauc as RaucService
|
||||
from tests.dbus_service_mocks.systemd import Systemd as SystemdService
|
||||
|
||||
|
||||
@ -56,3 +57,16 @@ async def test_reload(coresys: CoreSys, systemd_service: SystemdService):
|
||||
sound_update.assert_called_once()
|
||||
|
||||
assert systemd_service.ListUnits.calls == [()]
|
||||
|
||||
|
||||
async def test_reload_os(
|
||||
coresys: CoreSys, all_dbus_services: dict[str, DBusServiceMock], os_available
|
||||
):
|
||||
"""Test manager reload while on OS also reloads OS info cache."""
|
||||
rauc_service: RaucService = all_dbus_services["rauc"]
|
||||
rauc_service.GetSlotStatus.calls.clear()
|
||||
|
||||
await coresys.host.load()
|
||||
await coresys.host.reload()
|
||||
|
||||
assert rauc_service.GetSlotStatus.calls == [()]
|
||||
|
Loading…
x
Reference in New Issue
Block a user