Fix mypy issues in backups and dbus (#5792)

* Fix mypy issues in backups module

* Fix mypy issues in dbus module

* Fix mypy issues in api after rebase

* TypedDict to dataclass and other small fixes

* Finish fixing mypy errors in dbus

* local_where must exist

* Fix references to name in tests
This commit is contained in:
Mike Degatano 2025-03-31 17:03:54 -04:00 committed by GitHub
parent 67b9a44160
commit 01a682cfaa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
48 changed files with 683 additions and 420 deletions

View File

@ -36,7 +36,6 @@ from ..const import (
ATTR_LOCATION, ATTR_LOCATION,
ATTR_NAME, ATTR_NAME,
ATTR_PASSWORD, ATTR_PASSWORD,
ATTR_PATH,
ATTR_PROTECTED, ATTR_PROTECTED,
ATTR_REPOSITORIES, ATTR_REPOSITORIES,
ATTR_SIZE, ATTR_SIZE,
@ -156,8 +155,8 @@ class APIBackups(CoreSysAttributes):
"""Make location attributes dictionary.""" """Make location attributes dictionary."""
return { return {
loc if loc else LOCATION_LOCAL: { loc if loc else LOCATION_LOCAL: {
ATTR_PROTECTED: backup.all_locations[loc][ATTR_PROTECTED], ATTR_PROTECTED: backup.all_locations[loc].protected,
ATTR_SIZE_BYTES: backup.all_locations[loc][ATTR_SIZE_BYTES], ATTR_SIZE_BYTES: backup.all_locations[loc].size_bytes,
} }
for loc in backup.locations for loc in backup.locations
} }
@ -262,7 +261,7 @@ class APIBackups(CoreSysAttributes):
def _location_to_mount(self, location: str | None) -> LOCATION_TYPE: def _location_to_mount(self, location: str | None) -> LOCATION_TYPE:
"""Convert a single location to a mount if possible.""" """Convert a single location to a mount if possible."""
if not location or location == LOCATION_CLOUD_BACKUP: if not location or location == LOCATION_CLOUD_BACKUP:
return location return cast(LOCATION_TYPE, location)
mount = self.sys_mounts.get(location) mount = self.sys_mounts.get(location)
if mount.usage != MountUsage.BACKUP: if mount.usage != MountUsage.BACKUP:
@ -474,7 +473,7 @@ class APIBackups(CoreSysAttributes):
raise APIError(f"Backup {backup.slug} is not in location {location}") raise APIError(f"Backup {backup.slug} is not in location {location}")
_LOGGER.info("Downloading backup %s", backup.slug) _LOGGER.info("Downloading backup %s", backup.slug)
filename = backup.all_locations[location][ATTR_PATH] filename = backup.all_locations[location].path
# If the file is missing, return 404 and trigger reload of location # If the file is missing, return 404 and trigger reload of location
if not await self.sys_run_in_executor(filename.is_file): if not await self.sys_run_in_executor(filename.is_file):
self.sys_create_task(self.sys_backups.reload(location)) self.sys_create_task(self.sys_backups.reload(location))
@ -512,7 +511,7 @@ class APIBackups(CoreSysAttributes):
location = locations.pop(0) location = locations.pop(0)
if location and location != LOCATION_CLOUD_BACKUP: if location and location != LOCATION_CLOUD_BACKUP:
tmp_path = cast(Mount, location).local_where or tmp_path tmp_path = cast(Mount, location).local_where
filename: str | None = None filename: str | None = None
if ATTR_FILENAME in request.query: if ATTR_FILENAME in request.query:

View File

@ -228,7 +228,11 @@ class APIOS(CoreSysAttributes):
@api_process @api_process
async def config_swap_info(self, request: web.Request) -> dict[str, Any]: async def config_swap_info(self, request: web.Request) -> dict[str, Any]:
"""Get swap settings.""" """Get swap settings."""
if not self.coresys.os.available or self.coresys.os.version < "15.0": if (
not self.coresys.os.available
or not self.coresys.os.version
or self.coresys.os.version < "15.0"
):
raise APINotFound( raise APINotFound(
"Home Assistant OS 15.0 or newer required for swap settings" "Home Assistant OS 15.0 or newer required for swap settings"
) )
@ -241,7 +245,11 @@ class APIOS(CoreSysAttributes):
@api_process @api_process
async def config_swap_options(self, request: web.Request) -> None: async def config_swap_options(self, request: web.Request) -> None:
"""Update swap settings.""" """Update swap settings."""
if not self.coresys.os.available or self.coresys.os.version < "15.0": if (
not self.coresys.os.available
or not self.coresys.os.version
or self.coresys.os.version < "15.0"
):
raise APINotFound( raise APINotFound(
"Home Assistant OS 15.0 or newer required for swap settings" "Home Assistant OS 15.0 or newer required for swap settings"
) )

View File

@ -6,7 +6,7 @@ from contextlib import asynccontextmanager
import logging import logging
import aiohttp import aiohttp
from aiohttp import WSMessageTypeError, web from aiohttp import WSCloseCode, WSMessageTypeError, web
from aiohttp.client_exceptions import ClientConnectorError from aiohttp.client_exceptions import ClientConnectorError
from aiohttp.client_ws import ClientWebSocketResponse from aiohttp.client_ws import ClientWebSocketResponse
from aiohttp.hdrs import AUTHORIZATION, CONTENT_TYPE from aiohttp.hdrs import AUTHORIZATION, CONTENT_TYPE
@ -205,7 +205,9 @@ class APIProxy(CoreSysAttributes):
logger.warning( logger.warning(
"Error WebSocket message received while proxying: %r", msg.data "Error WebSocket message received while proxying: %r", msg.data
) )
await target.close(code=source.close_code) await target.close(
code=source.close_code or WSCloseCode.INTERNAL_ERROR
)
case _: case _:
logger.warning( logger.warning(
"Cannot proxy WebSocket message of unsupported type: %r", "Cannot proxy WebSocket message of unsupported type: %r",

View File

@ -5,6 +5,7 @@ from collections import defaultdict
from collections.abc import AsyncGenerator, Awaitable from collections.abc import AsyncGenerator, Awaitable
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from copy import deepcopy from copy import deepcopy
from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
import io import io
import json import json
@ -14,7 +15,7 @@ import tarfile
from tarfile import TarFile from tarfile import TarFile
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
import time import time
from typing import Any, Self from typing import Any, Self, cast
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -35,11 +36,9 @@ from ..const import (
ATTR_FOLDERS, ATTR_FOLDERS,
ATTR_HOMEASSISTANT, ATTR_HOMEASSISTANT,
ATTR_NAME, ATTR_NAME,
ATTR_PATH,
ATTR_PROTECTED, ATTR_PROTECTED,
ATTR_REPOSITORIES, ATTR_REPOSITORIES,
ATTR_SIZE, ATTR_SIZE,
ATTR_SIZE_BYTES,
ATTR_SLUG, ATTR_SLUG,
ATTR_SUPERVISOR_VERSION, ATTR_SUPERVISOR_VERSION,
ATTR_TYPE, ATTR_TYPE,
@ -69,6 +68,15 @@ from .validate import SCHEMA_BACKUP
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@dataclass(slots=True)
class BackupLocation:
"""Backup location metadata."""
path: Path
protected: bool
size_bytes: int
def location_sort_key(value: str | None) -> str: def location_sort_key(value: str | None) -> str:
"""Sort locations, None is always first else alphabetical.""" """Sort locations, None is always first else alphabetical."""
return value if value else "" return value if value else ""
@ -91,16 +99,16 @@ class Backup(JobGroup):
coresys, JOB_GROUP_BACKUP.format_map(defaultdict(str, slug=slug)), slug coresys, JOB_GROUP_BACKUP.format_map(defaultdict(str, slug=slug)), slug
) )
self._data: dict[str, Any] = data or {ATTR_SLUG: slug} self._data: dict[str, Any] = data or {ATTR_SLUG: slug}
self._tmp: TemporaryDirectory = None self._tmp: TemporaryDirectory | None = None
self._outer_secure_tarfile: SecureTarFile | None = None self._outer_secure_tarfile: SecureTarFile | None = None
self._key: bytes | None = None self._key: bytes | None = None
self._aes: Cipher | None = None self._aes: Cipher | None = None
self._locations: dict[str | None, dict[str, Path | bool]] = { self._locations: dict[str | None, BackupLocation] = {
location: { location: BackupLocation(
ATTR_PATH: tar_file, path=tar_file,
ATTR_PROTECTED: data.get(ATTR_PROTECTED, False) if data else False, protected=data.get(ATTR_PROTECTED, False) if data else False,
ATTR_SIZE_BYTES: size_bytes, size_bytes=size_bytes,
} )
} }
@property @property
@ -131,7 +139,7 @@ class Backup(JobGroup):
@property @property
def protected(self) -> bool: def protected(self) -> bool:
"""Return backup date.""" """Return backup date."""
return self._locations[self.location][ATTR_PROTECTED] return self._locations[self.location].protected
@property @property
def compressed(self) -> bool: def compressed(self) -> bool:
@ -208,7 +216,7 @@ class Backup(JobGroup):
return self.locations[0] return self.locations[0]
@property @property
def all_locations(self) -> dict[str | None, dict[str, Path | bool]]: def all_locations(self) -> dict[str | None, BackupLocation]:
"""Return all locations this backup was found in.""" """Return all locations this backup was found in."""
return self._locations return self._locations
@ -234,7 +242,7 @@ class Backup(JobGroup):
@property @property
def size_bytes(self) -> int: def size_bytes(self) -> int:
"""Return backup size in bytes.""" """Return backup size in bytes."""
return self._locations[self.location][ATTR_SIZE_BYTES] return self._locations[self.location].size_bytes
@property @property
def is_new(self) -> bool: def is_new(self) -> bool:
@ -244,7 +252,7 @@ class Backup(JobGroup):
@property @property
def tarfile(self) -> Path: def tarfile(self) -> Path:
"""Return path to backup tarfile.""" """Return path to backup tarfile."""
return self._locations[self.location][ATTR_PATH] return self._locations[self.location].path
@property @property
def is_current(self) -> bool: def is_current(self) -> bool:
@ -296,7 +304,7 @@ class Backup(JobGroup):
# In case of conflict we always ignore the ones from the first one. But log them to let the user know # In case of conflict we always ignore the ones from the first one. But log them to let the user know
if conflict := { if conflict := {
loc: val[ATTR_PATH] loc: val.path
for loc, val in self.all_locations.items() for loc, val in self.all_locations.items()
if loc in backup.all_locations and backup.all_locations[loc] != val if loc in backup.all_locations and backup.all_locations[loc] != val
}: }:
@ -334,7 +342,7 @@ class Backup(JobGroup):
self._init_password(password) self._init_password(password)
self._data[ATTR_PROTECTED] = True self._data[ATTR_PROTECTED] = True
self._data[ATTR_CRYPTO] = CRYPTO_AES128 self._data[ATTR_CRYPTO] = CRYPTO_AES128
self._locations[self.location][ATTR_PROTECTED] = True self._locations[self.location].protected = True
if not compressed: if not compressed:
self._data[ATTR_COMPRESSED] = False self._data[ATTR_COMPRESSED] = False
@ -361,7 +369,7 @@ class Backup(JobGroup):
Checks if we can access the backup file and decrypt if necessary. Checks if we can access the backup file and decrypt if necessary.
""" """
backup_file: Path = self.all_locations[location][ATTR_PATH] backup_file: Path = self.all_locations[location].path
def _validate_file() -> None: def _validate_file() -> None:
ending = f".tar{'.gz' if self.compressed else ''}" ending = f".tar{'.gz' if self.compressed else ''}"
@ -416,6 +424,9 @@ class Backup(JobGroup):
json_file = backup.extractfile("./snapshot.json") json_file = backup.extractfile("./snapshot.json")
else: else:
json_file = backup.extractfile("./backup.json") json_file = backup.extractfile("./backup.json")
if not json_file:
raise BackupInvalidError("Metadata file cannot be read")
return size_bytes, json_file.read() return size_bytes, json_file.read()
# read backup.json # read backup.json
@ -424,7 +435,7 @@ class Backup(JobGroup):
except FileNotFoundError: except FileNotFoundError:
_LOGGER.error("No tarfile located at %s", self.tarfile) _LOGGER.error("No tarfile located at %s", self.tarfile)
return False return False
except (tarfile.TarError, KeyError) as err: except (BackupInvalidError, tarfile.TarError, KeyError) as err:
_LOGGER.error("Can't read backup tarfile %s: %s", self.tarfile, err) _LOGGER.error("Can't read backup tarfile %s: %s", self.tarfile, err)
return False return False
@ -447,8 +458,8 @@ class Backup(JobGroup):
return False return False
if self._data[ATTR_PROTECTED]: if self._data[ATTR_PROTECTED]:
self._locations[self.location][ATTR_PROTECTED] = True self._locations[self.location].protected = True
self._locations[self.location][ATTR_SIZE_BYTES] = size_bytes self._locations[self.location].size_bytes = size_bytes
return True return True
@ -456,7 +467,7 @@ class Backup(JobGroup):
async def create(self) -> AsyncGenerator[None]: async def create(self) -> AsyncGenerator[None]:
"""Create new backup file.""" """Create new backup file."""
def _open_outer_tarfile(): def _open_outer_tarfile() -> tuple[SecureTarFile, tarfile.TarFile]:
"""Create and open outer tarfile.""" """Create and open outer tarfile."""
if self.tarfile.is_file(): if self.tarfile.is_file():
raise BackupFileExistError( raise BackupFileExistError(
@ -485,20 +496,22 @@ class Backup(JobGroup):
return _outer_secure_tarfile, _outer_tarfile return _outer_secure_tarfile, _outer_tarfile
def _close_outer_tarfile() -> int: outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
"""Close outer tarfile."""
self._outer_secure_tarfile.close()
return self.tarfile.stat().st_size
self._outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
_open_outer_tarfile _open_outer_tarfile
) )
self._outer_secure_tarfile = outer_secure_tarfile
def _close_outer_tarfile() -> int:
"""Close outer tarfile."""
outer_secure_tarfile.close()
return self.tarfile.stat().st_size
try: try:
yield yield
finally: finally:
await self._create_cleanup(outer_tarfile) await self._create_cleanup(outer_tarfile)
size_bytes = await self.sys_run_in_executor(_close_outer_tarfile) size_bytes = await self.sys_run_in_executor(_close_outer_tarfile)
self._locations[self.location][ATTR_SIZE_BYTES] = size_bytes self._locations[self.location].size_bytes = size_bytes
self._outer_secure_tarfile = None self._outer_secure_tarfile = None
@asynccontextmanager @asynccontextmanager
@ -513,7 +526,7 @@ class Backup(JobGroup):
backup_tarfile = ( backup_tarfile = (
self.tarfile self.tarfile
if location == DEFAULT if location == DEFAULT
else self.all_locations[location][ATTR_PATH] else self.all_locations[cast(str | None, location)].path
) )
# extract an existing backup # extract an existing backup
@ -579,6 +592,10 @@ class Backup(JobGroup):
async def _addon_save(self, addon: Addon) -> asyncio.Task | None: async def _addon_save(self, addon: Addon) -> asyncio.Task | None:
"""Store an add-on into backup.""" """Store an add-on into backup."""
self.sys_jobs.current.reference = addon.slug self.sys_jobs.current.reference = addon.slug
if not self._outer_secure_tarfile:
raise RuntimeError(
"Cannot backup components without initializing backup tar"
)
tar_name = f"{addon.slug}.tar{'.gz' if self.compressed else ''}" tar_name = f"{addon.slug}.tar{'.gz' if self.compressed else ''}"
@ -610,7 +627,7 @@ class Backup(JobGroup):
return start_task return start_task
@Job(name="backup_store_addons", cleanup=False) @Job(name="backup_store_addons", cleanup=False)
async def store_addons(self, addon_list: list[str]) -> list[asyncio.Task]: async def store_addons(self, addon_list: list[Addon]) -> list[asyncio.Task]:
"""Add a list of add-ons into backup. """Add a list of add-ons into backup.
For each addon that needs to be started after backup, returns a Task which For each addon that needs to be started after backup, returns a Task which
@ -631,6 +648,8 @@ class Backup(JobGroup):
async def _addon_restore(self, addon_slug: str) -> asyncio.Task | None: async def _addon_restore(self, addon_slug: str) -> asyncio.Task | None:
"""Restore an add-on from backup.""" """Restore an add-on from backup."""
self.sys_jobs.current.reference = addon_slug self.sys_jobs.current.reference = addon_slug
if not self._tmp:
raise RuntimeError("Cannot restore components without opening backup tar")
tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}" tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}"
addon_file = SecureTarFile( addon_file = SecureTarFile(
@ -696,6 +715,12 @@ class Backup(JobGroup):
async def _folder_save(self, name: str): async def _folder_save(self, name: str):
"""Take backup of a folder.""" """Take backup of a folder."""
self.sys_jobs.current.reference = name self.sys_jobs.current.reference = name
if not self._outer_secure_tarfile:
raise RuntimeError(
"Cannot backup components without initializing backup tar"
)
outer_secure_tarfile = self._outer_secure_tarfile
slug_name = name.replace("/", "_") slug_name = name.replace("/", "_")
tar_name = f"{slug_name}.tar{'.gz' if self.compressed else ''}" tar_name = f"{slug_name}.tar{'.gz' if self.compressed else ''}"
origin_dir = Path(self.sys_config.path_supervisor, name) origin_dir = Path(self.sys_config.path_supervisor, name)
@ -725,7 +750,7 @@ class Backup(JobGroup):
return False return False
with self._outer_secure_tarfile.create_inner_tar( with outer_secure_tarfile.create_inner_tar(
f"./{tar_name}", f"./{tar_name}",
gzip=self.compressed, gzip=self.compressed,
key=self._key, key=self._key,
@ -759,6 +784,8 @@ class Backup(JobGroup):
async def _folder_restore(self, name: str) -> None: async def _folder_restore(self, name: str) -> None:
"""Restore a folder.""" """Restore a folder."""
self.sys_jobs.current.reference = name self.sys_jobs.current.reference = name
if not self._tmp:
raise RuntimeError("Cannot restore components without opening backup tar")
slug_name = name.replace("/", "_") slug_name = name.replace("/", "_")
tar_name = Path( tar_name = Path(
@ -767,7 +794,7 @@ class Backup(JobGroup):
origin_dir = Path(self.sys_config.path_supervisor, name) origin_dir = Path(self.sys_config.path_supervisor, name)
# Perform a restore # Perform a restore
def _restore() -> bool: def _restore() -> None:
# Check if exists inside backup # Check if exists inside backup
if not tar_name.exists(): if not tar_name.exists():
raise BackupInvalidError( raise BackupInvalidError(
@ -795,7 +822,6 @@ class Backup(JobGroup):
raise BackupError( raise BackupError(
f"Can't restore folder {name}: {err}", _LOGGER.warning f"Can't restore folder {name}: {err}", _LOGGER.warning
) from err ) from err
return True
# Unmount any mounts within folder # Unmount any mounts within folder
bind_mounts = [ bind_mounts = [
@ -808,7 +834,7 @@ class Backup(JobGroup):
await asyncio.gather(*[bind_mount.unmount() for bind_mount in bind_mounts]) await asyncio.gather(*[bind_mount.unmount() for bind_mount in bind_mounts])
try: try:
return await self.sys_run_in_executor(_restore) await self.sys_run_in_executor(_restore)
finally: finally:
if bind_mounts: if bind_mounts:
await asyncio.gather( await asyncio.gather(
@ -832,6 +858,11 @@ class Backup(JobGroup):
@Job(name="backup_store_homeassistant", cleanup=False) @Job(name="backup_store_homeassistant", cleanup=False)
async def store_homeassistant(self, exclude_database: bool = False): async def store_homeassistant(self, exclude_database: bool = False):
"""Backup Home Assistant Core configuration folder.""" """Backup Home Assistant Core configuration folder."""
if not self._outer_secure_tarfile:
raise RuntimeError(
"Cannot backup components without initializing backup tar"
)
self._data[ATTR_HOMEASSISTANT] = { self._data[ATTR_HOMEASSISTANT] = {
ATTR_VERSION: self.sys_homeassistant.version, ATTR_VERSION: self.sys_homeassistant.version,
ATTR_EXCLUDE_DATABASE: exclude_database, ATTR_EXCLUDE_DATABASE: exclude_database,
@ -855,6 +886,9 @@ class Backup(JobGroup):
@Job(name="backup_restore_homeassistant", cleanup=False) @Job(name="backup_restore_homeassistant", cleanup=False)
async def restore_homeassistant(self) -> Awaitable[None]: async def restore_homeassistant(self) -> Awaitable[None]:
"""Restore Home Assistant Core configuration folder.""" """Restore Home Assistant Core configuration folder."""
if not self._tmp:
raise RuntimeError("Cannot restore components without opening backup tar")
await self.sys_homeassistant.core.stop(remove_container=True) await self.sys_homeassistant.core.stop(remove_container=True)
# Restore Home Assistant Core config directory # Restore Home Assistant Core config directory

View File

@ -9,7 +9,7 @@ BUF_SIZE = 2**20 * 4 # 4MB
DEFAULT_FREEZE_TIMEOUT = 600 DEFAULT_FREEZE_TIMEOUT = 600
LOCATION_CLOUD_BACKUP = ".cloud_backup" LOCATION_CLOUD_BACKUP = ".cloud_backup"
LOCATION_TYPE = Mount | Literal[LOCATION_CLOUD_BACKUP] | None LOCATION_TYPE = Mount | Literal[".cloud_backup"] | None
class BackupType(StrEnum): class BackupType(StrEnum):

View File

@ -8,17 +8,16 @@ import errno
import logging import logging
from pathlib import Path from pathlib import Path
from shutil import copy from shutil import copy
from typing import cast
from ..addons.addon import Addon from ..addons.addon import Addon
from ..const import ( from ..const import (
ATTR_DAYS_UNTIL_STALE, ATTR_DAYS_UNTIL_STALE,
ATTR_PATH,
ATTR_PROTECTED,
ATTR_SIZE_BYTES,
FILE_HASSIO_BACKUPS, FILE_HASSIO_BACKUPS,
FOLDER_HOMEASSISTANT, FOLDER_HOMEASSISTANT,
CoreState, CoreState,
) )
from ..coresys import CoreSys
from ..dbus.const import UnitActiveState from ..dbus.const import UnitActiveState
from ..exceptions import ( from ..exceptions import (
BackupDataDiskBadMessageError, BackupDataDiskBadMessageError,
@ -37,7 +36,7 @@ from ..utils.common import FileConfiguration
from ..utils.dt import utcnow from ..utils.dt import utcnow
from ..utils.sentinel import DEFAULT from ..utils.sentinel import DEFAULT
from ..utils.sentry import async_capture_exception from ..utils.sentry import async_capture_exception
from .backup import Backup from .backup import Backup, BackupLocation
from .const import ( from .const import (
DEFAULT_FREEZE_TIMEOUT, DEFAULT_FREEZE_TIMEOUT,
LOCATION_CLOUD_BACKUP, LOCATION_CLOUD_BACKUP,
@ -58,7 +57,7 @@ JOB_PARTIAL_RESTORE = "backup_manager_partial_restore"
class BackupManager(FileConfiguration, JobGroup): class BackupManager(FileConfiguration, JobGroup):
"""Manage backups.""" """Manage backups."""
def __init__(self, coresys): def __init__(self, coresys: CoreSys):
"""Initialize a backup manager.""" """Initialize a backup manager."""
super().__init__(FILE_HASSIO_BACKUPS, SCHEMA_BACKUPS_CONFIG) super().__init__(FILE_HASSIO_BACKUPS, SCHEMA_BACKUPS_CONFIG)
super(FileConfiguration, self).__init__(coresys, JOB_GROUP_BACKUP_MANAGER) super(FileConfiguration, self).__init__(coresys, JOB_GROUP_BACKUP_MANAGER)
@ -69,7 +68,7 @@ class BackupManager(FileConfiguration, JobGroup):
@property @property
def list_backups(self) -> list[Backup]: def list_backups(self) -> list[Backup]:
"""Return a list of all backup objects.""" """Return a list of all backup objects."""
return self._backups.values() return list(self._backups.values())
@property @property
def days_until_stale(self) -> int: def days_until_stale(self) -> int:
@ -90,7 +89,7 @@ class BackupManager(FileConfiguration, JobGroup):
} | { } | {
mount.name: mount.local_where mount.name: mount.local_where
for mount in self.sys_mounts.backup_mounts for mount in self.sys_mounts.backup_mounts
if mount.state == UnitActiveState.ACTIVE if mount.state == UnitActiveState.ACTIVE and mount.local_where
} }
@property @property
@ -103,7 +102,7 @@ class BackupManager(FileConfiguration, JobGroup):
return job.uuid return job.uuid
return None return None
def get(self, slug: str) -> Backup: def get(self, slug: str) -> Backup | None:
"""Return backup object.""" """Return backup object."""
return self._backups.get(slug) return self._backups.get(slug)
@ -119,8 +118,7 @@ class BackupManager(FileConfiguration, JobGroup):
location = self.sys_mounts.default_backup_mount location = self.sys_mounts.default_backup_mount
if location: if location:
location_mount: Mount = location return cast(Mount, location).local_where
return location_mount.local_where
return self.sys_config.path_backup return self.sys_config.path_backup
@ -129,13 +127,14 @@ class BackupManager(FileConfiguration, JobGroup):
if location == DEFAULT and self.sys_mounts.default_backup_mount: if location == DEFAULT and self.sys_mounts.default_backup_mount:
location = self.sys_mounts.default_backup_mount location = self.sys_mounts.default_backup_mount
if location not in (DEFAULT, LOCATION_CLOUD_BACKUP, None): if (
location_mount: Mount = location location not in (DEFAULT, LOCATION_CLOUD_BACKUP, None)
if not await location_mount.is_mounted(): and not await (location_mount := cast(Mount, location)).is_mounted()
raise BackupMountDownError( ):
f"{location_mount.name} is down, cannot back-up to it", raise BackupMountDownError(
_LOGGER.error, f"{location_mount.name} is down, cannot back-up to it",
) _LOGGER.error,
)
def _get_location_name( def _get_location_name(
self, self,
@ -143,13 +142,13 @@ class BackupManager(FileConfiguration, JobGroup):
) -> str | None: ) -> str | None:
"""Get name of location (or None for local backup folder).""" """Get name of location (or None for local backup folder)."""
if location == LOCATION_CLOUD_BACKUP: if location == LOCATION_CLOUD_BACKUP:
return location return cast(str, location)
if location == DEFAULT and self.sys_mounts.default_backup_mount: if location == DEFAULT and self.sys_mounts.default_backup_mount:
location = self.sys_mounts.default_backup_mount location = self.sys_mounts.default_backup_mount
if location: if location:
return location.name return cast(Mount, location).name
return None return None
def _change_stage( def _change_stage(
@ -161,7 +160,7 @@ class BackupManager(FileConfiguration, JobGroup):
Must be called from an existing backup/restore job. Must be called from an existing backup/restore job.
""" """
job_name = self.sys_jobs.current.name job_name = cast(str, self.sys_jobs.current.name)
if "restore" in job_name: if "restore" in job_name:
action = "Restore" action = "Restore"
elif "freeze" in job_name: elif "freeze" in job_name:
@ -237,12 +236,9 @@ class BackupManager(FileConfiguration, JobGroup):
return backup return backup
def load(self) -> Awaitable[None]: async def load(self) -> None:
"""Load exists backups data. """Load exists backups data."""
await self.reload()
Return a coroutine.
"""
return self.reload()
async def reload(self, location: str | None | type[DEFAULT] = DEFAULT) -> bool: async def reload(self, location: str | None | type[DEFAULT] = DEFAULT) -> bool:
"""Load exists backups.""" """Load exists backups."""
@ -278,10 +274,12 @@ class BackupManager(FileConfiguration, JobGroup):
return False return False
# This is just so we don't have to cast repeatedly. Variable will only be used when location is not DEFAULT
location_name = "" if location == DEFAULT else cast(str | None, location)
locations = ( locations = (
self.backup_locations self.backup_locations
if location == DEFAULT if location == DEFAULT
else {location: self.backup_locations[location]} else {location_name: self.backup_locations[location_name]}
) )
tasks = [ tasks = [
self.sys_create_task(_load_backup(_location, tar_file)) self.sys_create_task(_load_backup(_location, tar_file))
@ -311,9 +309,9 @@ class BackupManager(FileConfiguration, JobGroup):
err, err,
) )
elif location in backup.all_locations: elif location_name in backup.all_locations:
if len(backup.all_locations) > 1: if len(backup.all_locations) > 1:
del backup.all_locations[location] del backup.all_locations[location_name]
else: else:
del self._backups[backup.slug] del self._backups[backup.slug]
@ -336,7 +334,7 @@ class BackupManager(FileConfiguration, JobGroup):
else list(backup.all_locations.keys()) else list(backup.all_locations.keys())
) )
for location in targets: for location in targets:
backup_tarfile = backup.all_locations[location][ATTR_PATH] backup_tarfile = backup.all_locations[location].path
try: try:
await self.sys_run_in_executor(backup_tarfile.unlink) await self.sys_run_in_executor(backup_tarfile.unlink)
del backup.all_locations[location] del backup.all_locations[location]
@ -370,7 +368,7 @@ class BackupManager(FileConfiguration, JobGroup):
all_new_locations: dict[str | None, Path] = {} all_new_locations: dict[str | None, Path] = {}
def copy_to_additional_locations() -> dict[str | None, Path]: def copy_to_additional_locations() -> None:
"""Copy backup file to additional locations.""" """Copy backup file to additional locations."""
nonlocal all_new_locations nonlocal all_new_locations
for location in locations: for location in locations:
@ -380,7 +378,7 @@ class BackupManager(FileConfiguration, JobGroup):
copy(backup.tarfile, self.sys_config.path_core_backup) copy(backup.tarfile, self.sys_config.path_core_backup)
) )
elif location: elif location:
location_mount: Mount = location location_mount = cast(Mount, location)
if not location_mount.local_where.is_mount(): if not location_mount.local_where.is_mount():
raise BackupMountDownError( raise BackupMountDownError(
f"{location_mount.name} is down, cannot copy to it", f"{location_mount.name} is down, cannot copy to it",
@ -413,11 +411,11 @@ class BackupManager(FileConfiguration, JobGroup):
finally: finally:
backup.all_locations.update( backup.all_locations.update(
{ {
loc: { loc: BackupLocation(
ATTR_PATH: path, path=path,
ATTR_PROTECTED: backup.protected, protected=backup.protected,
ATTR_SIZE_BYTES: backup.size_bytes, size_bytes=backup.size_bytes,
} )
for loc, path in all_new_locations.items() for loc, path in all_new_locations.items()
} }
) )
@ -591,13 +589,13 @@ class BackupManager(FileConfiguration, JobGroup):
self, {JobCondition.FREE_SPACE}, "BackupManager.do_backup_full" self, {JobCondition.FREE_SPACE}, "BackupManager.do_backup_full"
) )
backup = self._create_backup( new_backup = self._create_backup(
name, filename, BackupType.FULL, password, compressed, location, extra name, filename, BackupType.FULL, password, compressed, location, extra
) )
_LOGGER.info("Creating new full backup with slug %s", backup.slug) _LOGGER.info("Creating new full backup with slug %s", new_backup.slug)
backup = await self._do_backup( backup = await self._do_backup(
backup, new_backup,
self.sys_addons.installed, self.sys_addons.installed,
ALL_FOLDERS, ALL_FOLDERS,
True, True,
@ -652,21 +650,21 @@ class BackupManager(FileConfiguration, JobGroup):
if len(addons) == 0 and len(folders) == 0 and not homeassistant: if len(addons) == 0 and len(folders) == 0 and not homeassistant:
_LOGGER.error("Nothing to create backup for") _LOGGER.error("Nothing to create backup for")
backup = self._create_backup( new_backup = self._create_backup(
name, filename, BackupType.PARTIAL, password, compressed, location, extra name, filename, BackupType.PARTIAL, password, compressed, location, extra
) )
_LOGGER.info("Creating new partial backup with slug %s", backup.slug) _LOGGER.info("Creating new partial backup with slug %s", new_backup.slug)
addon_list = [] addon_list = []
for addon_slug in addons: for addon_slug in addons:
addon = self.sys_addons.get(addon_slug) addon = self.sys_addons.get(addon_slug)
if addon and addon.is_installed: if addon and addon.is_installed:
addon_list.append(addon) addon_list.append(cast(Addon, addon))
continue continue
_LOGGER.warning("Add-on %s not found/installed", addon_slug) _LOGGER.warning("Add-on %s not found/installed", addon_slug)
backup = await self._do_backup( backup = await self._do_backup(
backup, new_backup,
addon_list, addon_list,
folders, folders,
homeassistant, homeassistant,
@ -772,13 +770,15 @@ class BackupManager(FileConfiguration, JobGroup):
f"Backup {backup.slug} does not exist in {location}", _LOGGER.error f"Backup {backup.slug} does not exist in {location}", _LOGGER.error
) )
location = location if location != DEFAULT else backup.location location_name = (
if backup.all_locations[location][ATTR_PROTECTED]: cast(str | None, location) if location != DEFAULT else backup.location
)
if backup.all_locations[location_name].protected:
backup.set_password(password) backup.set_password(password)
else: else:
backup.set_password(None) backup.set_password(None)
await backup.validate_backup(location) await backup.validate_backup(location_name)
@Job( @Job(
name=JOB_FULL_RESTORE, name=JOB_FULL_RESTORE,
@ -857,7 +857,7 @@ class BackupManager(FileConfiguration, JobGroup):
backup: Backup, backup: Backup,
homeassistant: bool = False, homeassistant: bool = False,
addons: list[str] | None = None, addons: list[str] | None = None,
folders: list[Path] | None = None, folders: list[str] | None = None,
password: str | None = None, password: str | None = None,
location: str | None | type[DEFAULT] = DEFAULT, location: str | None | type[DEFAULT] = DEFAULT,
) -> bool: ) -> bool:

View File

@ -89,9 +89,10 @@ class OSAgent(DBusInterfaceProxy):
"""Return if diagnostics is enabled on OS-Agent.""" """Return if diagnostics is enabled on OS-Agent."""
return self.properties[DBUS_ATTR_DIAGNOSTICS] return self.properties[DBUS_ATTR_DIAGNOSTICS]
@dbus_connected
def set_diagnostics(self, value: bool) -> Awaitable[None]: def set_diagnostics(self, value: bool) -> Awaitable[None]:
"""Enable or disable OS-Agent diagnostics.""" """Enable or disable OS-Agent diagnostics."""
return self.dbus.set_diagnostics(value) return self.connected_dbus.set("diagnostics", value)
@property @property
def all(self) -> list[DBusInterface]: def all(self) -> list[DBusInterface]:

View File

@ -30,11 +30,13 @@ class AppArmor(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def load_profile(self, profile: Path, cache: Path) -> None: async def load_profile(self, profile: Path, cache: Path) -> None:
"""Load/Update AppArmor profile.""" """Load/Update AppArmor profile."""
await self.dbus.AppArmor.call_load_profile(profile.as_posix(), cache.as_posix()) await self.connected_dbus.AppArmor.call(
"load_profile", profile.as_posix(), cache.as_posix()
)
@dbus_connected @dbus_connected
async def unload_profile(self, profile: Path, cache: Path) -> None: async def unload_profile(self, profile: Path, cache: Path) -> None:
"""Remove AppArmor profile.""" """Remove AppArmor profile."""
await self.dbus.AppArmor.call_unload_profile( await self.connected_dbus.AppArmor.call(
profile.as_posix(), cache.as_posix() "unload_profile", profile.as_posix(), cache.as_posix()
) )

View File

@ -1,6 +1,7 @@
"""Board management for OS Agent.""" """Board management for OS Agent."""
import logging import logging
from typing import cast
from dbus_fast.aio.message_bus import MessageBus from dbus_fast.aio.message_bus import MessageBus
@ -47,7 +48,7 @@ class BoardManager(DBusInterfaceProxy):
if self.board != BOARD_NAME_GREEN: if self.board != BOARD_NAME_GREEN:
raise BoardInvalidError("Green board is not in use", _LOGGER.error) raise BoardInvalidError("Green board is not in use", _LOGGER.error)
return self._board_proxy return cast(Green, self._board_proxy)
@property @property
def supervised(self) -> Supervised: def supervised(self) -> Supervised:
@ -55,7 +56,7 @@ class BoardManager(DBusInterfaceProxy):
if self.board != BOARD_NAME_SUPERVISED: if self.board != BOARD_NAME_SUPERVISED:
raise BoardInvalidError("Supervised board is not in use", _LOGGER.error) raise BoardInvalidError("Supervised board is not in use", _LOGGER.error)
return self._board_proxy return cast(Supervised, self._board_proxy)
@property @property
def yellow(self) -> Yellow: def yellow(self) -> Yellow:
@ -63,7 +64,7 @@ class BoardManager(DBusInterfaceProxy):
if self.board != BOARD_NAME_YELLOW: if self.board != BOARD_NAME_YELLOW:
raise BoardInvalidError("Yellow board is not in use", _LOGGER.error) raise BoardInvalidError("Yellow board is not in use", _LOGGER.error)
return self._board_proxy return cast(Yellow, self._board_proxy)
async def connect(self, bus: MessageBus) -> None: async def connect(self, bus: MessageBus) -> None:
"""Connect to D-Bus.""" """Connect to D-Bus."""

View File

@ -8,6 +8,7 @@ from dbus_fast.aio.message_bus import MessageBus
from ....const import ATTR_ACTIVITY_LED, ATTR_POWER_LED, ATTR_USER_LED from ....const import ATTR_ACTIVITY_LED, ATTR_POWER_LED, ATTR_USER_LED
from ...const import DBUS_ATTR_ACTIVITY_LED, DBUS_ATTR_POWER_LED, DBUS_ATTR_USER_LED from ...const import DBUS_ATTR_ACTIVITY_LED, DBUS_ATTR_POWER_LED, DBUS_ATTR_USER_LED
from ...interface import dbus_property from ...interface import dbus_property
from ...utils import dbus_connected
from .const import BOARD_NAME_GREEN from .const import BOARD_NAME_GREEN
from .interface import BoardProxy from .interface import BoardProxy
from .validate import SCHEMA_GREEN_BOARD from .validate import SCHEMA_GREEN_BOARD
@ -26,10 +27,11 @@ class Green(BoardProxy):
"""Get activity LED enabled.""" """Get activity LED enabled."""
return self.properties[DBUS_ATTR_ACTIVITY_LED] return self.properties[DBUS_ATTR_ACTIVITY_LED]
@dbus_connected
def set_activity_led(self, enabled: bool) -> Awaitable[None]: def set_activity_led(self, enabled: bool) -> Awaitable[None]:
"""Enable/disable activity LED.""" """Enable/disable activity LED."""
self._data[ATTR_ACTIVITY_LED] = enabled self._data[ATTR_ACTIVITY_LED] = enabled
return self.dbus.Boards.Green.set_activity_led(enabled) return self.connected_dbus.Boards.Green.set("activity_led", enabled)
@property @property
@dbus_property @dbus_property
@ -37,10 +39,11 @@ class Green(BoardProxy):
"""Get power LED enabled.""" """Get power LED enabled."""
return self.properties[DBUS_ATTR_POWER_LED] return self.properties[DBUS_ATTR_POWER_LED]
@dbus_connected
def set_power_led(self, enabled: bool) -> Awaitable[None]: def set_power_led(self, enabled: bool) -> Awaitable[None]:
"""Enable/disable power LED.""" """Enable/disable power LED."""
self._data[ATTR_POWER_LED] = enabled self._data[ATTR_POWER_LED] = enabled
return self.dbus.Boards.Green.set_power_led(enabled) return self.connected_dbus.Boards.Green.set("power_led", enabled)
@property @property
@dbus_property @dbus_property
@ -48,10 +51,11 @@ class Green(BoardProxy):
"""Get user LED enabled.""" """Get user LED enabled."""
return self.properties[DBUS_ATTR_USER_LED] return self.properties[DBUS_ATTR_USER_LED]
@dbus_connected
def set_user_led(self, enabled: bool) -> Awaitable[None]: def set_user_led(self, enabled: bool) -> Awaitable[None]:
"""Enable/disable disk LED.""" """Enable/disable disk LED."""
self._data[ATTR_USER_LED] = enabled self._data[ATTR_USER_LED] = enabled
return self.dbus.Boards.Green.set_user_led(enabled) return self.connected_dbus.Boards.Green.set("user_led", enabled)
async def connect(self, bus: MessageBus) -> None: async def connect(self, bus: MessageBus) -> None:
"""Connect to D-Bus.""" """Connect to D-Bus."""

View File

@ -14,16 +14,25 @@ class BoardProxy(FileConfiguration, DBusInterfaceProxy):
bus_name: str = DBUS_NAME_HAOS bus_name: str = DBUS_NAME_HAOS
def __init__(self, name: str, file_schema: Schema | None = None) -> None: def __init__(self, board_name: str, file_schema: Schema | None = None) -> None:
"""Initialize properties.""" """Initialize properties."""
self._board_name: str = board_name
self._object_path: str = f"{DBUS_OBJECT_HAOS_BOARDS}/{board_name}"
self._properties_interface: str = f"{DBUS_IFACE_HAOS_BOARDS}.{board_name}"
super().__init__(FILE_HASSIO_BOARD, file_schema or SCHEMA_BASE_BOARD) super().__init__(FILE_HASSIO_BOARD, file_schema or SCHEMA_BASE_BOARD)
super(FileConfiguration, self).__init__() super(FileConfiguration, self).__init__()
self._name: str = name @property
self.object_path: str = f"{DBUS_OBJECT_HAOS_BOARDS}/{name}" def object_path(self) -> str:
self.properties_interface: str = f"{DBUS_IFACE_HAOS_BOARDS}.{name}" """Object path for dbus object."""
return self._object_path
@property @property
def name(self) -> str: def properties_interface(self) -> str:
"""Get name.""" """Primary interface of object to get property values from."""
return self._name return self._properties_interface
@property
def board_name(self) -> str:
"""Get board name."""
return self._board_name

View File

@ -8,6 +8,7 @@ from dbus_fast.aio.message_bus import MessageBus
from ....const import ATTR_DISK_LED, ATTR_HEARTBEAT_LED, ATTR_POWER_LED from ....const import ATTR_DISK_LED, ATTR_HEARTBEAT_LED, ATTR_POWER_LED
from ...const import DBUS_ATTR_DISK_LED, DBUS_ATTR_HEARTBEAT_LED, DBUS_ATTR_POWER_LED from ...const import DBUS_ATTR_DISK_LED, DBUS_ATTR_HEARTBEAT_LED, DBUS_ATTR_POWER_LED
from ...interface import dbus_property from ...interface import dbus_property
from ...utils import dbus_connected
from .const import BOARD_NAME_YELLOW from .const import BOARD_NAME_YELLOW
from .interface import BoardProxy from .interface import BoardProxy
from .validate import SCHEMA_YELLOW_BOARD from .validate import SCHEMA_YELLOW_BOARD
@ -26,10 +27,11 @@ class Yellow(BoardProxy):
"""Get heartbeat LED enabled.""" """Get heartbeat LED enabled."""
return self.properties[DBUS_ATTR_HEARTBEAT_LED] return self.properties[DBUS_ATTR_HEARTBEAT_LED]
@dbus_connected
def set_heartbeat_led(self, enabled: bool) -> Awaitable[None]: def set_heartbeat_led(self, enabled: bool) -> Awaitable[None]:
"""Enable/disable heartbeat LED.""" """Enable/disable heartbeat LED."""
self._data[ATTR_HEARTBEAT_LED] = enabled self._data[ATTR_HEARTBEAT_LED] = enabled
return self.dbus.Boards.Yellow.set_heartbeat_led(enabled) return self.connected_dbus.Boards.Yellow.set("heartbeat_led", enabled)
@property @property
@dbus_property @dbus_property
@ -37,10 +39,11 @@ class Yellow(BoardProxy):
"""Get power LED enabled.""" """Get power LED enabled."""
return self.properties[DBUS_ATTR_POWER_LED] return self.properties[DBUS_ATTR_POWER_LED]
@dbus_connected
def set_power_led(self, enabled: bool) -> Awaitable[None]: def set_power_led(self, enabled: bool) -> Awaitable[None]:
"""Enable/disable power LED.""" """Enable/disable power LED."""
self._data[ATTR_POWER_LED] = enabled self._data[ATTR_POWER_LED] = enabled
return self.dbus.Boards.Yellow.set_power_led(enabled) return self.connected_dbus.Boards.Yellow.set("power_led", enabled)
@property @property
@dbus_property @dbus_property
@ -48,10 +51,11 @@ class Yellow(BoardProxy):
"""Get disk LED enabled.""" """Get disk LED enabled."""
return self.properties[DBUS_ATTR_DISK_LED] return self.properties[DBUS_ATTR_DISK_LED]
@dbus_connected
def set_disk_led(self, enabled: bool) -> Awaitable[None]: def set_disk_led(self, enabled: bool) -> Awaitable[None]:
"""Enable/disable disk LED.""" """Enable/disable disk LED."""
self._data[ATTR_DISK_LED] = enabled self._data[ATTR_DISK_LED] = enabled
return self.dbus.Boards.Yellow.set_disk_led(enabled) return self.connected_dbus.Boards.Yellow.set("disk_led", enabled)
async def connect(self, bus: MessageBus) -> None: async def connect(self, bus: MessageBus) -> None:
"""Connect to D-Bus.""" """Connect to D-Bus."""

View File

@ -14,4 +14,6 @@ class CGroup(DBusInterface):
@dbus_connected @dbus_connected
async def add_devices_allowed(self, container_id: str, permission: str) -> None: async def add_devices_allowed(self, container_id: str, permission: str) -> None:
"""Update cgroup devices and add new devices.""" """Update cgroup devices and add new devices."""
await self.dbus.CGroup.call_add_devices_allowed(container_id, permission) await self.connected_dbus.CGroup.call(
"add_devices_allowed", container_id, permission
)

View File

@ -28,14 +28,14 @@ class DataDisk(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def change_device(self, device: Path) -> None: async def change_device(self, device: Path) -> None:
"""Migrate data disk to a new device.""" """Migrate data disk to a new device."""
await self.dbus.DataDisk.call_change_device(device.as_posix()) await self.connected_dbus.DataDisk.call("change_device", device.as_posix())
@dbus_connected @dbus_connected
async def reload_device(self) -> None: async def reload_device(self) -> None:
"""Reload device data.""" """Reload device data."""
await self.dbus.DataDisk.call_reload_device() await self.connected_dbus.DataDisk.call("reload_device")
@dbus_connected @dbus_connected
async def mark_data_move(self) -> None: async def mark_data_move(self) -> None:
"""Create marker to signal to do data disk migration next reboot.""" """Create marker to signal to do data disk migration next reboot."""
await self.dbus.DataDisk.call_mark_data_move() await self.connected_dbus.DataDisk.call("mark_data_move")

View File

@ -27,7 +27,7 @@ class Swap(DBusInterfaceProxy):
def set_swap_size(self, size: str) -> Awaitable[None]: def set_swap_size(self, size: str) -> Awaitable[None]:
"""Set swap size.""" """Set swap size."""
return self.dbus.Config.Swap.set_swap_size(size) return self.connected_dbus.Config.Swap.set("swap_size", size)
@property @property
@dbus_property @dbus_property
@ -37,4 +37,4 @@ class Swap(DBusInterfaceProxy):
def set_swappiness(self, swappiness: int) -> Awaitable[None]: def set_swappiness(self, swappiness: int) -> Awaitable[None]:
"""Set swappiness.""" """Set swappiness."""
return self.dbus.Config.Swap.set_swappiness(swappiness) return self.connected_dbus.Config.Swap.set("swappiness", swappiness)

View File

@ -14,4 +14,4 @@ class System(DBusInterface):
@dbus_connected @dbus_connected
async def schedule_wipe_device(self) -> bool: async def schedule_wipe_device(self) -> bool:
"""Schedule a factory reset on next system boot.""" """Schedule a factory reset on next system boot."""
return await self.dbus.System.call_schedule_wipe_device() return await self.connected_dbus.System.call("schedule_wipe_device")

View File

@ -84,4 +84,4 @@ class Hostname(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def set_static_hostname(self, hostname: str) -> None: async def set_static_hostname(self, hostname: str) -> None:
"""Change local hostname.""" """Change local hostname."""
await self.dbus.call_set_static_hostname(hostname, False) await self.connected_dbus.call("set_static_hostname", hostname, False)

View File

@ -1,13 +1,13 @@
"""Interface class for D-Bus wrappers.""" """Interface class for D-Bus wrappers."""
from abc import ABC from abc import ABC, abstractmethod
from collections.abc import Callable from collections.abc import Callable
from functools import wraps from functools import wraps
from typing import Any from typing import Any
from dbus_fast.aio.message_bus import MessageBus from dbus_fast.aio.message_bus import MessageBus
from supervisor.exceptions import DBusInterfaceError from supervisor.exceptions import DBusInterfaceError, DBusNotConnectedError
from ..utils.dbus import DBus from ..utils.dbus import DBus
from .utils import dbus_connected from .utils import dbus_connected
@ -31,10 +31,18 @@ class DBusInterface(ABC):
dbus: DBus | None = None dbus: DBus | None = None
name: str | None = None name: str | None = None
bus_name: str | None = None
object_path: str | None = None
_shutdown: bool = False _shutdown: bool = False
@property
@abstractmethod
def bus_name(self) -> str:
"""Bus name for dbus object."""
@property
@abstractmethod
def object_path(self) -> str:
"""Object path for dbus object."""
@property @property
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""Return True, if they is connected to D-Bus.""" """Return True, if they is connected to D-Bus."""
@ -45,6 +53,13 @@ class DBusInterface(ABC):
"""Return True, if the object has been shutdown.""" """Return True, if the object has been shutdown."""
return self._shutdown return self._shutdown
@property
def connected_dbus(self) -> DBus:
"""Return dbus object. Raise if not connected."""
if not self.dbus:
raise DBusNotConnectedError()
return self.dbus
async def connect(self, bus: MessageBus) -> None: async def connect(self, bus: MessageBus) -> None:
"""Connect to D-Bus.""" """Connect to D-Bus."""
await self.initialize(await DBus.connect(bus, self.bus_name, self.object_path)) await self.initialize(await DBus.connect(bus, self.bus_name, self.object_path))
@ -67,7 +82,7 @@ class DBusInterface(ABC):
def disconnect(self) -> None: def disconnect(self) -> None:
"""Disconnect from D-Bus.""" """Disconnect from D-Bus."""
if self.is_connected: if self.is_connected:
self.dbus.disconnect() self.connected_dbus.disconnect()
self.dbus = None self.dbus = None
def shutdown(self) -> None: def shutdown(self) -> None:
@ -79,17 +94,20 @@ class DBusInterface(ABC):
self.disconnect() self.disconnect()
class DBusInterfaceProxy(DBusInterface): class DBusInterfaceProxy(DBusInterface, ABC):
"""Handle D-Bus interface proxy.""" """Handle D-Bus interface proxy."""
properties_interface: str | None = None
properties: dict[str, Any] | None = None
sync_properties: bool = True sync_properties: bool = True
_sync_properties_callback: Callable | None = None _sync_properties_callback: Callable | None = None
def __init__(self): def __init__(self) -> None:
"""Initialize properties.""" """Initialize properties."""
self.properties = {} self.properties: dict[str, Any] = {}
@property
@abstractmethod
def properties_interface(self) -> str:
"""Primary interface of object to get property values from."""
async def connect(self, bus: MessageBus) -> None: async def connect(self, bus: MessageBus) -> None:
"""Connect to D-Bus.""" """Connect to D-Bus."""
@ -99,7 +117,7 @@ class DBusInterfaceProxy(DBusInterface):
"""Initialize object with already connected dbus object.""" """Initialize object with already connected dbus object."""
await super().initialize(connected_dbus) await super().initialize(connected_dbus)
if not self.dbus.properties: if not self.connected_dbus.properties:
self.disconnect() self.disconnect()
raise DBusInterfaceError( raise DBusInterfaceError(
f"D-Bus object {self.object_path} is not usable, introspection is missing required properties interface" f"D-Bus object {self.object_path} is not usable, introspection is missing required properties interface"
@ -107,13 +125,13 @@ class DBusInterfaceProxy(DBusInterface):
await self.update() await self.update()
if self.sync_properties and self.is_connected: if self.sync_properties and self.is_connected:
self._sync_properties_callback = self.dbus.sync_property_changes( self._sync_properties_callback = self.connected_dbus.sync_property_changes(
self.properties_interface, self.update self.properties_interface, self.update
) )
def stop_sync_property_changes(self) -> None: def stop_sync_property_changes(self) -> None:
"""Stop syncing property changes to object.""" """Stop syncing property changes to object."""
if not self._sync_properties_callback: if not self._sync_properties_callback or not self.dbus:
return return
self.dbus.stop_sync_property_changes(self._sync_properties_callback) self.dbus.stop_sync_property_changes(self._sync_properties_callback)
@ -125,4 +143,6 @@ class DBusInterfaceProxy(DBusInterface):
if changed and self.properties: if changed and self.properties:
self.properties.update(changed) self.properties.update(changed)
else: else:
self.properties = await self.dbus.get_properties(self.properties_interface) self.properties = await self.connected_dbus.get_properties(
self.properties_interface
)

View File

@ -35,9 +35,9 @@ class Logind(DBusInterface):
@dbus_connected @dbus_connected
async def reboot(self) -> None: async def reboot(self) -> None:
"""Reboot host computer.""" """Reboot host computer."""
await self.dbus.Manager.call_reboot(False) await self.connected_dbus.Manager.call("reboot", False)
@dbus_connected @dbus_connected
async def power_off(self) -> None: async def power_off(self) -> None:
"""Power off host computer.""" """Power off host computer."""
await self.dbus.Manager.call_power_off(False) await self.connected_dbus.Manager.call("power_off", False)

View File

@ -115,7 +115,9 @@ class DBusManager(CoreSysAttributes):
return return
try: try:
self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect() self._bus = connected_bus = await MessageBus(
bus_type=BusType.SYSTEM
).connect()
except Exception as err: except Exception as err:
raise DBusFatalError( raise DBusFatalError(
"Cannot connect to system D-Bus. Disabled any kind of host control!" "Cannot connect to system D-Bus. Disabled any kind of host control!"
@ -124,17 +126,17 @@ class DBusManager(CoreSysAttributes):
_LOGGER.info("Connected to system D-Bus.") _LOGGER.info("Connected to system D-Bus.")
errors = await asyncio.gather( errors = await asyncio.gather(
*[dbus.connect(self.bus) for dbus in self.all], return_exceptions=True *[dbus.connect(connected_bus) for dbus in self.all], return_exceptions=True
) )
for err in errors: for error in errors:
if err: if error:
dbus = self.all[errors.index(err)] dbus = self.all[errors.index(error)]
_LOGGER.warning( _LOGGER.warning(
"Can't load dbus interface %s %s: %s", "Can't load dbus interface %s %s: %s",
dbus.name, dbus.name,
dbus.object_path, dbus.object_path,
err, error,
) )
self.sys_host.supported_features.cache_clear() self.sys_host.supported_features.cache_clear()

View File

@ -1,7 +1,7 @@
"""Network Manager implementation for DBUS.""" """Network Manager implementation for DBUS."""
import logging import logging
from typing import Any from typing import Any, cast
from awesomeversion import AwesomeVersion, AwesomeVersionException from awesomeversion import AwesomeVersion, AwesomeVersionException
from dbus_fast.aio.message_bus import MessageBus from dbus_fast.aio.message_bus import MessageBus
@ -106,11 +106,11 @@ class NetworkManager(DBusInterfaceProxy):
self, connection_object: str, device_object: str self, connection_object: str, device_object: str
) -> NetworkConnection: ) -> NetworkConnection:
"""Activate a connction on a device.""" """Activate a connction on a device."""
obj_active_con = await self.dbus.call_activate_connection( obj_active_con = await self.connected_dbus.call(
connection_object, device_object, DBUS_OBJECT_BASE "activate_connection", connection_object, device_object, DBUS_OBJECT_BASE
) )
active_con = NetworkConnection(obj_active_con) active_con = NetworkConnection(obj_active_con)
await active_con.connect(self.dbus.bus) await active_con.connect(self.connected_dbus.bus)
return active_con return active_con
@dbus_connected @dbus_connected
@ -121,21 +121,22 @@ class NetworkManager(DBusInterfaceProxy):
( (
_, _,
obj_active_con, obj_active_con,
) = await self.dbus.call_add_and_activate_connection( ) = await self.connected_dbus.call(
settings, device_object, DBUS_OBJECT_BASE "add_and_activate_connection", settings, device_object, DBUS_OBJECT_BASE
) )
active_con = NetworkConnection(obj_active_con) active_con = NetworkConnection(obj_active_con)
await active_con.connect(self.dbus.bus) await active_con.connect(self.connected_dbus.bus)
return active_con.settings, active_con # Settings were provided so settings will not be None here or call would've failed
return cast(NetworkSetting, active_con.settings), active_con
@dbus_connected @dbus_connected
async def check_connectivity(self, *, force: bool = False) -> ConnectivityState: async def check_connectivity(self, *, force: bool = False) -> ConnectivityState:
"""Check the connectivity of the host.""" """Check the connectivity of the host."""
if force: if force:
return await self.dbus.call_check_connectivity() return await self.connected_dbus.call("check_connectivity")
else: else:
return await self.dbus.get_connectivity() return await self.connected_dbus.get("connectivity")
async def connect(self, bus: MessageBus) -> None: async def connect(self, bus: MessageBus) -> None:
"""Connect to system's D-Bus.""" """Connect to system's D-Bus."""
@ -160,9 +161,10 @@ class NetworkManager(DBusInterfaceProxy):
self.dns.disconnect() self.dns.disconnect()
self.settings.disconnect() self.settings.disconnect()
@dbus_connected
async def _validate_version(self) -> None: async def _validate_version(self) -> None:
"""Validate Version of NetworkManager.""" """Validate Version of NetworkManager."""
self.properties = await self.dbus.get_properties(DBUS_IFACE_NM) self.properties = await self.connected_dbus.get_properties(DBUS_IFACE_NM)
try: try:
if self.version >= MINIMAL_VERSION: if self.version >= MINIMAL_VERSION:
@ -206,7 +208,7 @@ class NetworkManager(DBusInterfaceProxy):
# Connect to interface # Connect to interface
try: try:
await interface.connect(self.dbus.bus) await interface.connect(self.connected_dbus.bus)
except (DBusFatalError, DBusInterfaceError) as err: except (DBusFatalError, DBusInterfaceError) as err:
# Docker creates and deletes interfaces quite often, sometimes # Docker creates and deletes interfaces quite often, sometimes
# this causes a race condition: A device disappears while we # this causes a race condition: A device disappears while we

View File

@ -25,9 +25,13 @@ class NetworkWirelessAP(DBusInterfaceProxy):
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize NetworkWireless AP object.""" """Initialize NetworkWireless AP object."""
self._object_path: str = object_path
super().__init__() super().__init__()
self.object_path: str = object_path @property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property

View File

@ -35,14 +35,17 @@ class NetworkConnection(DBusInterfaceProxy):
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize NetworkConnection object.""" """Initialize NetworkConnection object."""
super().__init__() self._object_path: str = object_path
self.object_path: str = object_path
self._ipv4: IpConfiguration | None = None self._ipv4: IpConfiguration | None = None
self._ipv6: IpConfiguration | None = None self._ipv6: IpConfiguration | None = None
self._state_flags: set[ConnectionStateFlags] = {ConnectionStateFlags.NONE} self._state_flags: set[ConnectionStateFlags] = {ConnectionStateFlags.NONE}
self._settings: NetworkSetting | None = None self._settings: NetworkSetting | None = None
super().__init__()
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property
@ -134,7 +137,7 @@ class NetworkConnection(DBusInterfaceProxy):
await self.ipv4.update() await self.ipv4.update()
elif self.properties[DBUS_ATTR_IP4CONFIG] != DBUS_OBJECT_BASE: elif self.properties[DBUS_ATTR_IP4CONFIG] != DBUS_OBJECT_BASE:
self.ipv4 = IpConfiguration(self.properties[DBUS_ATTR_IP4CONFIG]) self.ipv4 = IpConfiguration(self.properties[DBUS_ATTR_IP4CONFIG])
await self.ipv4.connect(self.dbus.bus) await self.ipv4.connect(self.connected_dbus.bus)
else: else:
self.ipv4 = None self.ipv4 = None
@ -148,7 +151,7 @@ class NetworkConnection(DBusInterfaceProxy):
await self.ipv6.update() await self.ipv6.update()
elif self.properties[DBUS_ATTR_IP6CONFIG] != DBUS_OBJECT_BASE: elif self.properties[DBUS_ATTR_IP6CONFIG] != DBUS_OBJECT_BASE:
self.ipv6 = IpConfiguration(self.properties[DBUS_ATTR_IP6CONFIG], False) self.ipv6 = IpConfiguration(self.properties[DBUS_ATTR_IP6CONFIG], False)
await self.ipv6.connect(self.dbus.bus) await self.ipv6.connect(self.connected_dbus.bus)
else: else:
self.ipv6 = None self.ipv6 = None
@ -162,7 +165,7 @@ class NetworkConnection(DBusInterfaceProxy):
await self.settings.reload() await self.settings.reload()
elif self.properties[DBUS_ATTR_CONNECTION] != DBUS_OBJECT_BASE: elif self.properties[DBUS_ATTR_CONNECTION] != DBUS_OBJECT_BASE:
self.settings = NetworkSetting(self.properties[DBUS_ATTR_CONNECTION]) self.settings = NetworkSetting(self.properties[DBUS_ATTR_CONNECTION])
await self.settings.connect(self.dbus.bus) await self.settings.connect(self.connected_dbus.bus)
else: else:
self.settings = None self.settings = None

View File

@ -36,14 +36,16 @@ class NetworkInterface(DBusInterfaceProxy):
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize NetworkConnection object.""" """Initialize NetworkConnection object."""
super().__init__() self._object_path: str = object_path
self.object_path: str = object_path
self.primary: bool = False self.primary: bool = False
self._connection: NetworkConnection | None = None self._connection: NetworkConnection | None = None
self._wireless: NetworkWireless | None = None self._wireless: NetworkWireless | None = None
super().__init__()
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property
@ -130,7 +132,9 @@ class NetworkInterface(DBusInterfaceProxy):
self.sync_properties = self.managed self.sync_properties = self.managed
if self.sync_properties and self.is_connected: if self.sync_properties and self.is_connected:
self.dbus.sync_property_changes(self.properties_interface, self.update) self.connected_dbus.sync_property_changes(
self.properties_interface, self.update
)
@dbus_connected @dbus_connected
async def update(self, changed: dict[str, Any] | None = None) -> None: async def update(self, changed: dict[str, Any] | None = None) -> None:
@ -157,7 +161,7 @@ class NetworkInterface(DBusInterfaceProxy):
self.connection = NetworkConnection( self.connection = NetworkConnection(
self.properties[DBUS_ATTR_ACTIVE_CONNECTION] self.properties[DBUS_ATTR_ACTIVE_CONNECTION]
) )
await self.connection.connect(self.dbus.bus) await self.connection.connect(self.connected_dbus.bus)
else: else:
self.connection = None self.connection = None
@ -169,7 +173,7 @@ class NetworkInterface(DBusInterfaceProxy):
await self.wireless.update() await self.wireless.update()
else: else:
self.wireless = NetworkWireless(self.object_path) self.wireless = NetworkWireless(self.object_path)
await self.wireless.connect(self.dbus.bus) await self.wireless.connect(self.connected_dbus.bus)
def shutdown(self) -> None: def shutdown(self) -> None:
"""Shutdown the object and disconnect from D-Bus. """Shutdown the object and disconnect from D-Bus.

View File

@ -29,13 +29,22 @@ class IpConfiguration(DBusInterfaceProxy):
def __init__(self, object_path: str, ip4: bool = True) -> None: def __init__(self, object_path: str, ip4: bool = True) -> None:
"""Initialize properties.""" """Initialize properties."""
super().__init__()
self._ip4: bool = ip4 self._ip4: bool = ip4
self.object_path: str = object_path self._object_path: str = object_path
self.properties_interface: str = ( self._properties_interface: str = (
DBUS_IFACE_IP4CONFIG if ip4 else DBUS_IFACE_IP6CONFIG DBUS_IFACE_IP4CONFIG if ip4 else DBUS_IFACE_IP6CONFIG
) )
super().__init__()
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property
def properties_interface(self) -> str:
"""Primary interface of object to get property values from."""
return self._properties_interface
@property @property
@dbus_property @dbus_property

View File

@ -79,7 +79,7 @@ def _merge_settings_attribute(
new_settings: dict[str, dict[str, Variant]], new_settings: dict[str, dict[str, Variant]],
attribute: str, attribute: str,
*, *,
ignore_current_value: list[str] = None, ignore_current_value: list[str] | None = None,
) -> None: ) -> None:
"""Merge settings attribute if present.""" """Merge settings attribute if present."""
if attribute in new_settings: if attribute in new_settings:
@ -103,7 +103,7 @@ class NetworkSetting(DBusInterface):
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize NetworkConnection object.""" """Initialize NetworkConnection object."""
self.object_path: str = object_path self._object_path: str = object_path
self._connection: ConnectionProperties | None = None self._connection: ConnectionProperties | None = None
self._wireless: WirelessProperties | None = None self._wireless: WirelessProperties | None = None
@ -113,6 +113,12 @@ class NetworkSetting(DBusInterface):
self._ipv4: IpProperties | None = None self._ipv4: IpProperties | None = None
self._ipv6: IpProperties | None = None self._ipv6: IpProperties | None = None
self._match: MatchProperties | None = None self._match: MatchProperties | None = None
super().__init__()
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
def connection(self) -> ConnectionProperties | None: def connection(self) -> ConnectionProperties | None:
@ -157,14 +163,16 @@ class NetworkSetting(DBusInterface):
@dbus_connected @dbus_connected
async def get_settings(self) -> dict[str, Any]: async def get_settings(self) -> dict[str, Any]:
"""Return connection settings.""" """Return connection settings."""
return await self.dbus.Settings.Connection.call_get_settings() return await self.connected_dbus.Settings.Connection.call("get_settings")
@dbus_connected @dbus_connected
async def update(self, settings: dict[str, dict[str, Variant]]) -> None: async def update(self, settings: dict[str, dict[str, Variant]]) -> None:
"""Update connection settings.""" """Update connection settings."""
new_settings: dict[ new_settings: dict[
str, dict[str, Variant] str, dict[str, Variant]
] = await self.dbus.Settings.Connection.call_get_settings(unpack_variants=False) ] = await self.connected_dbus.Settings.Connection.call(
"get_settings", unpack_variants=False
)
_merge_settings_attribute( _merge_settings_attribute(
new_settings, new_settings,
@ -192,19 +200,19 @@ class NetworkSetting(DBusInterface):
) )
_merge_settings_attribute(new_settings, settings, CONF_ATTR_MATCH) _merge_settings_attribute(new_settings, settings, CONF_ATTR_MATCH)
await self.dbus.Settings.Connection.call_update(new_settings) await self.connected_dbus.Settings.Connection.call("update", new_settings)
@dbus_connected @dbus_connected
async def delete(self) -> None: async def delete(self) -> None:
"""Delete connection settings.""" """Delete connection settings."""
await self.dbus.Settings.Connection.call_delete() await self.connected_dbus.Settings.Connection.call("delete")
async def connect(self, bus: MessageBus) -> None: async def connect(self, bus: MessageBus) -> None:
"""Get connection information.""" """Get connection information."""
await super().connect(bus) await super().connect(bus)
await self.reload() await self.reload()
self.dbus.Settings.Connection.on_updated(self.reload) self.connected_dbus.Settings.Connection.on("updated", self.reload)
@dbus_connected @dbus_connected
async def reload(self): async def reload(self):

View File

@ -3,11 +3,12 @@
from __future__ import annotations from __future__ import annotations
import socket import socket
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, cast
from uuid import uuid4 from uuid import uuid4
from dbus_fast import Variant from dbus_fast import Variant
from ....host.configuration import VlanConfig
from ....host.const import InterfaceMethod, InterfaceType from ....host.const import InterfaceMethod, InterfaceType
from .. import NetworkManager from .. import NetworkManager
from . import ( from . import (
@ -140,12 +141,15 @@ def get_connection_from_interface(
uuid: str | None = None, uuid: str | None = None,
) -> dict[str, dict[str, Variant]]: ) -> dict[str, dict[str, Variant]]:
"""Generate message argument for network interface update.""" """Generate message argument for network interface update."""
# Simple input check to ensure it is safe to cast this for type checker
if interface.type == InterfaceType.VLAN and not interface.vlan:
raise ValueError("Interface has type vlan but no vlan config!")
# Generate/Update ID/name # Generate/Update ID/name
if not name or not name.startswith("Supervisor"): if not name or not name.startswith("Supervisor"):
name = f"Supervisor {interface.name}" name = f"Supervisor {interface.name}"
if interface.type == InterfaceType.VLAN: if interface.type == InterfaceType.VLAN:
name = f"{name}.{interface.vlan.id}" name = f"{name}.{cast(VlanConfig, interface.vlan).id}"
if interface.type == InterfaceType.ETHERNET: if interface.type == InterfaceType.ETHERNET:
iftype = "802-3-ethernet" iftype = "802-3-ethernet"
@ -186,14 +190,14 @@ def get_connection_from_interface(
CONF_ATTR_802_ETHERNET_ASSIGNED_MAC: Variant("s", "preserve") CONF_ATTR_802_ETHERNET_ASSIGNED_MAC: Variant("s", "preserve")
} }
elif interface.type == "vlan": elif interface.type == "vlan":
parent = interface.vlan.interface parent = cast(VlanConfig, interface.vlan).interface
if parent in network_manager and ( if parent in network_manager and (
parent_connection := network_manager.get(parent).connection parent_connection := network_manager.get(parent).connection
): ):
parent = parent_connection.uuid parent = parent_connection.uuid
conn[CONF_ATTR_VLAN] = { conn[CONF_ATTR_VLAN] = {
CONF_ATTR_VLAN_ID: Variant("u", interface.vlan.id), CONF_ATTR_VLAN_ID: Variant("u", cast(VlanConfig, interface.vlan).id),
CONF_ATTR_VLAN_PARENT: Variant("s", parent), CONF_ATTR_VLAN_PARENT: Variant("s", parent),
} }
elif interface.type == InterfaceType.WIRELESS: elif interface.type == InterfaceType.WIRELESS:

View File

@ -37,12 +37,14 @@ class NetworkManagerSettings(DBusInterface):
@dbus_connected @dbus_connected
async def add_connection(self, settings: Any) -> NetworkSetting: async def add_connection(self, settings: Any) -> NetworkSetting:
"""Add new connection.""" """Add new connection."""
obj_con_setting = await self.dbus.Settings.call_add_connection(settings) obj_con_setting = await self.connected_dbus.Settings.call(
"add_connection", settings
)
con_setting = NetworkSetting(obj_con_setting) con_setting = NetworkSetting(obj_con_setting)
await con_setting.connect(self.dbus.bus) await con_setting.connect(self.connected_dbus.bus)
return con_setting return con_setting
@dbus_connected @dbus_connected
async def reload_connections(self) -> bool: async def reload_connections(self) -> bool:
"""Reload all local connection files.""" """Reload all local connection files."""
return await self.dbus.Settings.call_reload_connections() return await self.connected_dbus.Settings.call("reload_connections")

View File

@ -29,11 +29,14 @@ class NetworkWireless(DBusInterfaceProxy):
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize NetworkConnection object.""" """Initialize NetworkConnection object."""
self._object_path: str = object_path
self._active: NetworkWirelessAP | None = None
super().__init__() super().__init__()
self.object_path: str = object_path @property
def object_path(self) -> str:
self._active: NetworkWirelessAP | None = None """Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property
@ -57,22 +60,26 @@ class NetworkWireless(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def request_scan(self) -> None: async def request_scan(self) -> None:
"""Request a new AP scan.""" """Request a new AP scan."""
await self.dbus.Device.Wireless.call_request_scan({}) await self.connected_dbus.Device.Wireless.call("request_scan", {})
@dbus_connected @dbus_connected
async def get_all_accesspoints(self) -> list[NetworkWirelessAP]: async def get_all_accesspoints(self) -> list[NetworkWirelessAP]:
"""Return a list of all access points path.""" """Return a list of all access points path."""
accesspoints_data = await self.dbus.Device.Wireless.call_get_all_access_points() accesspoints_data = await self.connected_dbus.Device.Wireless.call(
"get_all_access_points"
)
accesspoints = [NetworkWirelessAP(ap_obj) for ap_obj in accesspoints_data] accesspoints = [NetworkWirelessAP(ap_obj) for ap_obj in accesspoints_data]
for err in await asyncio.gather( for err in await asyncio.gather(
*[ap.connect(self.dbus.bus) for ap in accesspoints], return_exceptions=True *[ap.connect(self.connected_dbus.bus) for ap in accesspoints],
return_exceptions=True,
): ):
if err: if err:
_LOGGER.warning("Can't process an AP: %s", err) _LOGGER.warning("Can't process an AP: %s", err)
return accesspoints return accesspoints
@dbus_connected
async def update(self, changed: dict[str, Any] | None = None) -> None: async def update(self, changed: dict[str, Any] | None = None) -> None:
"""Update properties via D-Bus.""" """Update properties via D-Bus."""
await super().update(changed) await super().update(changed)
@ -90,6 +97,6 @@ class NetworkWireless(DBusInterfaceProxy):
self.active = NetworkWirelessAP( self.active = NetworkWirelessAP(
self.properties[DBUS_ATTR_ACTIVE_ACCESSPOINT] self.properties[DBUS_ATTR_ACTIVE_ACCESSPOINT]
) )
await self.active.connect(self.dbus.bus) await self.active.connect(self.connected_dbus.bus)
else: else:
self.active = None self.active = None

View File

@ -56,7 +56,7 @@ class Rauc(DBusInterfaceProxy):
object_path: str = DBUS_OBJECT_BASE object_path: str = DBUS_OBJECT_BASE
properties_interface: str = DBUS_IFACE_RAUC_INSTALLER properties_interface: str = DBUS_IFACE_RAUC_INSTALLER
def __init__(self): def __init__(self) -> None:
"""Initialize Properties.""" """Initialize Properties."""
super().__init__() super().__init__()
@ -104,22 +104,22 @@ class Rauc(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def install(self, raucb_file) -> None: async def install(self, raucb_file) -> None:
"""Install rauc bundle file.""" """Install rauc bundle file."""
await self.dbus.Installer.call_install(str(raucb_file)) await self.connected_dbus.Installer.call("install", str(raucb_file))
@dbus_connected @dbus_connected
async def get_slot_status(self) -> list[tuple[str, SlotStatusDataType]]: async def get_slot_status(self) -> list[tuple[str, SlotStatusDataType]]:
"""Get slot status.""" """Get slot status."""
return await self.dbus.Installer.call_get_slot_status() return await self.connected_dbus.Installer.call("get_slot_status")
@dbus_connected @dbus_connected
def signal_completed(self) -> DBusSignalWrapper: def signal_completed(self) -> DBusSignalWrapper:
"""Return a signal wrapper for completed signal.""" """Return a signal wrapper for completed signal."""
return self.dbus.signal(DBUS_SIGNAL_RAUC_INSTALLER_COMPLETED) return self.connected_dbus.signal(DBUS_SIGNAL_RAUC_INSTALLER_COMPLETED)
@dbus_connected @dbus_connected
async def mark(self, state: RaucState, slot_identifier: str) -> tuple[str, str]: async def mark(self, state: RaucState, slot_identifier: str) -> tuple[str, str]:
"""Get slot status.""" """Get slot status."""
return await self.dbus.Installer.call_mark(state, slot_identifier) return await self.connected_dbus.Installer.call("mark", state, slot_identifier)
@dbus_connected @dbus_connected
async def update(self, changed: dict[str, Any] | None = None) -> None: async def update(self, changed: dict[str, Any] | None = None) -> None:

View File

@ -60,17 +60,22 @@ class SystemdUnit(DBusInterface):
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize object.""" """Initialize object."""
super().__init__() super().__init__()
self.object_path = object_path self._object_path = object_path
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@dbus_connected @dbus_connected
async def get_active_state(self) -> UnitActiveState: async def get_active_state(self) -> UnitActiveState:
"""Get active state of the unit.""" """Get active state of the unit."""
return await self.dbus.Unit.get_active_state() return await self.connected_dbus.Unit.get("active_state")
@dbus_connected @dbus_connected
def properties_changed(self) -> DBusSignalWrapper: def properties_changed(self) -> DBusSignalWrapper:
"""Return signal wrapper for properties changed.""" """Return signal wrapper for properties changed."""
return self.dbus.signal(DBUS_SIGNAL_PROPERTIES_CHANGED) return self.connected_dbus.signal(DBUS_SIGNAL_PROPERTIES_CHANGED)
class Systemd(DBusInterfaceProxy): class Systemd(DBusInterfaceProxy):
@ -124,64 +129,66 @@ class Systemd(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def reboot(self) -> None: async def reboot(self) -> None:
"""Reboot host computer.""" """Reboot host computer."""
await self.dbus.Manager.call_reboot() await self.connected_dbus.Manager.call("reboot")
@dbus_connected @dbus_connected
async def power_off(self) -> None: async def power_off(self) -> None:
"""Power off host computer.""" """Power off host computer."""
await self.dbus.Manager.call_power_off() await self.connected_dbus.Manager.call("power_off")
@dbus_connected @dbus_connected
@systemd_errors @systemd_errors
async def start_unit(self, unit: str, mode: StartUnitMode) -> str: async def start_unit(self, unit: str, mode: StartUnitMode) -> str:
"""Start a systemd service unit. Returns object path of job.""" """Start a systemd service unit. Returns object path of job."""
return await self.dbus.Manager.call_start_unit(unit, mode) return await self.connected_dbus.Manager.call("start_unit", unit, mode)
@dbus_connected @dbus_connected
@systemd_errors @systemd_errors
async def stop_unit(self, unit: str, mode: StopUnitMode) -> str: async def stop_unit(self, unit: str, mode: StopUnitMode) -> str:
"""Stop a systemd service unit. Returns object path of job.""" """Stop a systemd service unit. Returns object path of job."""
return await self.dbus.Manager.call_stop_unit(unit, mode) return await self.connected_dbus.Manager.call("stop_unit", unit, mode)
@dbus_connected @dbus_connected
@systemd_errors @systemd_errors
async def reload_unit(self, unit: str, mode: StartUnitMode) -> str: async def reload_unit(self, unit: str, mode: StartUnitMode) -> str:
"""Reload a systemd service unit. Returns object path of job.""" """Reload a systemd service unit. Returns object path of job."""
return await self.dbus.Manager.call_reload_or_restart_unit(unit, mode) return await self.connected_dbus.Manager.call(
"reload_or_restart_unit", unit, mode
)
@dbus_connected @dbus_connected
@systemd_errors @systemd_errors
async def restart_unit(self, unit: str, mode: StartUnitMode) -> str: async def restart_unit(self, unit: str, mode: StartUnitMode) -> str:
"""Restart a systemd service unit. Returns object path of job.""" """Restart a systemd service unit. Returns object path of job."""
return await self.dbus.Manager.call_restart_unit(unit, mode) return await self.connected_dbus.Manager.call("restart_unit", unit, mode)
@dbus_connected @dbus_connected
async def list_units( async def list_units(
self, self,
) -> list[tuple[str, str, str, str, str, str, str, int, str, str]]: ) -> list[tuple[str, str, str, str, str, str, str, int, str, str]]:
"""Return a list of available systemd services.""" """Return a list of available systemd services."""
return await self.dbus.Manager.call_list_units() return await self.connected_dbus.Manager.call("list_units")
@dbus_connected @dbus_connected
async def start_transient_unit( async def start_transient_unit(
self, unit: str, mode: StartUnitMode, properties: list[tuple[str, Variant]] self, unit: str, mode: StartUnitMode, properties: list[tuple[str, Variant]]
) -> str: ) -> str:
"""Start a transient unit which is released when stopped or on reboot. Returns object path of job.""" """Start a transient unit which is released when stopped or on reboot. Returns object path of job."""
return await self.dbus.Manager.call_start_transient_unit( return await self.connected_dbus.Manager.call(
unit, mode, properties, [] "start_transient_unit", unit, mode, properties, []
) )
@dbus_connected @dbus_connected
@systemd_errors @systemd_errors
async def reset_failed_unit(self, unit: str) -> None: async def reset_failed_unit(self, unit: str) -> None:
"""Reset the failed state of a unit.""" """Reset the failed state of a unit."""
await self.dbus.Manager.call_reset_failed_unit(unit) await self.connected_dbus.Manager.call("reset_failed_unit", unit)
@dbus_connected @dbus_connected
@systemd_errors @systemd_errors
async def get_unit(self, unit: str) -> SystemdUnit: async def get_unit(self, unit: str) -> SystemdUnit:
"""Return systemd unit for unit name.""" """Return systemd unit for unit name."""
obj_path = await self.dbus.Manager.call_get_unit(unit) obj_path = await self.connected_dbus.Manager.call("get_unit", unit)
unit = SystemdUnit(obj_path) systemd_unit = SystemdUnit(obj_path)
await unit.connect(self.dbus.bus) await systemd_unit.connect(self.connected_dbus.bus)
return unit return systemd_unit

View File

@ -35,7 +35,7 @@ class TimeDate(DBusInterfaceProxy):
object_path: str = DBUS_OBJECT_TIMEDATE object_path: str = DBUS_OBJECT_TIMEDATE
properties_interface: str = DBUS_IFACE_TIMEDATE properties_interface: str = DBUS_IFACE_TIMEDATE
def __init__(self): def __init__(self) -> None:
"""Initialize object.""" """Initialize object."""
super().__init__() super().__init__()
self._timezone_tzinfo: tzinfo | None = None self._timezone_tzinfo: tzinfo | None = None
@ -97,9 +97,11 @@ class TimeDate(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def set_time(self, utc: datetime) -> None: async def set_time(self, utc: datetime) -> None:
"""Set time & date on host as UTC.""" """Set time & date on host as UTC."""
await self.dbus.call_set_time(int(utc.timestamp() * 1000000), False, False) await self.connected_dbus.call(
"set_time", int(utc.timestamp() * 1000000), False, False
)
@dbus_connected @dbus_connected
async def set_ntp(self, use_ntp: bool) -> None: async def set_ntp(self, use_ntp: bool) -> None:
"""Turn NTP on or off.""" """Turn NTP on or off."""
await self.dbus.call_set_ntp(use_ntp, False) await self.connected_dbus.call("set_ntp", use_ntp, False)

View File

@ -74,11 +74,11 @@ class UDisks2Manager(DBusInterfaceProxy):
) )
else: else:
# Register for signals on devices added/removed # Register for signals on devices added/removed
self.udisks2_object_manager.dbus.object_manager.on_interfaces_added( self.udisks2_object_manager.dbus.object_manager.on(
self._interfaces_added "interfaces_added", self._interfaces_added
) )
self.udisks2_object_manager.dbus.object_manager.on_interfaces_removed( self.udisks2_object_manager.dbus.object_manager.on(
self._interfaces_removed "interfaces_removed", self._interfaces_removed
) )
@dbus_connected @dbus_connected
@ -91,8 +91,8 @@ class UDisks2Manager(DBusInterfaceProxy):
if not changed: if not changed:
# Cache block devices # Cache block devices
block_devices = await self.dbus.Manager.call_get_block_devices( block_devices = await self.connected_dbus.Manager.call(
UDISKS2_DEFAULT_OPTIONS "get_block_devices", UDISKS2_DEFAULT_OPTIONS
) )
unchanged_blocks = self._block_devices.keys() & set(block_devices) unchanged_blocks = self._block_devices.keys() & set(block_devices)
@ -102,7 +102,7 @@ class UDisks2Manager(DBusInterfaceProxy):
self._block_devices = { self._block_devices = {
device: self._block_devices[device] device: self._block_devices[device]
if device in unchanged_blocks if device in unchanged_blocks
else await UDisks2Block.new(device, self.dbus.bus) else await UDisks2Block.new(device, self.connected_dbus.bus)
for device in block_devices for device in block_devices
} }
@ -128,7 +128,7 @@ class UDisks2Manager(DBusInterfaceProxy):
self._drives = { self._drives = {
drive: self._drives[drive] drive: self._drives[drive]
if drive in self._drives if drive in self._drives
else await UDisks2Drive.new(drive, self.dbus.bus) else await UDisks2Drive.new(drive, self.connected_dbus.bus)
for drive in drives for drive in drives
} }
@ -180,13 +180,14 @@ class UDisks2Manager(DBusInterfaceProxy):
"""Return list of device object paths for specification.""" """Return list of device object paths for specification."""
return await asyncio.gather( return await asyncio.gather(
*[ *[
UDisks2Block.new(path, self.dbus.bus, sync_properties=False) UDisks2Block.new(path, self.connected_dbus.bus, sync_properties=False)
for path in await self.dbus.Manager.call_resolve_device( for path in await self.connected_dbus.Manager.call(
devspec.to_dict(), UDISKS2_DEFAULT_OPTIONS "resolve_device", devspec.to_dict(), UDISKS2_DEFAULT_OPTIONS
) )
] ]
) )
@dbus_connected
async def _interfaces_added( async def _interfaces_added(
self, object_path: str, properties: dict[str, dict[str, Any]] self, object_path: str, properties: dict[str, dict[str, Any]]
) -> None: ) -> None:
@ -200,13 +201,13 @@ class UDisks2Manager(DBusInterfaceProxy):
if DBUS_IFACE_BLOCK in properties: if DBUS_IFACE_BLOCK in properties:
self._block_devices[object_path] = await UDisks2Block.new( self._block_devices[object_path] = await UDisks2Block.new(
object_path, self.dbus.bus object_path, self.connected_dbus.bus
) )
return return
if DBUS_IFACE_DRIVE in properties: if DBUS_IFACE_DRIVE in properties:
self._drives[object_path] = await UDisks2Drive.new( self._drives[object_path] = await UDisks2Drive.new(
object_path, self.dbus.bus object_path, self.connected_dbus.bus
) )
async def _interfaces_removed( async def _interfaces_removed(

View File

@ -60,7 +60,7 @@ class UDisks2Block(DBusInterfaceProxy):
def __init__(self, object_path: str, *, sync_properties: bool = True) -> None: def __init__(self, object_path: str, *, sync_properties: bool = True) -> None:
"""Initialize object.""" """Initialize object."""
self.object_path = object_path self._object_path = object_path
self.sync_properties = sync_properties self.sync_properties = sync_properties
super().__init__() super().__init__()
@ -78,6 +78,11 @@ class UDisks2Block(DBusInterfaceProxy):
await obj.connect(bus) await obj.connect(bus)
return obj return obj
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
def filesystem(self) -> UDisks2Filesystem | None: def filesystem(self) -> UDisks2Filesystem | None:
"""Filesystem interface if block device is one.""" """Filesystem interface if block device is one."""
@ -212,48 +217,54 @@ class UDisks2Block(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def check_type(self) -> None: async def check_type(self) -> None:
"""Check if type of block device has changed and adjust interfaces if so.""" """Check if type of block device has changed and adjust interfaces if so."""
introspection = await self.dbus.introspect() introspection = await self.connected_dbus.introspect()
interfaces = {intr.name for intr in introspection.interfaces} interfaces = {intr.name for intr in introspection.interfaces}
# If interfaces changed, update the proxy from introspection and reload interfaces # If interfaces changed, update the proxy from introspection and reload interfaces
if interfaces != set(self.dbus.proxies.keys()): if interfaces != set(self.connected_dbus.proxies.keys()):
await self.dbus.init_proxy(introspection=introspection) await self.connected_dbus.init_proxy(introspection=introspection)
await self._reload_interfaces() await self._reload_interfaces()
@dbus_connected @dbus_connected
async def _reload_interfaces(self) -> None: async def _reload_interfaces(self) -> None:
"""Reload interfaces from introspection as necessary.""" """Reload interfaces from introspection as necessary."""
# Check if block device is a filesystem # Check if block device is a filesystem
if not self.filesystem and DBUS_IFACE_FILESYSTEM in self.dbus.proxies: if not self.filesystem and DBUS_IFACE_FILESYSTEM in self.connected_dbus.proxies:
self._filesystem = UDisks2Filesystem( self._filesystem = UDisks2Filesystem(
self.object_path, sync_properties=self.sync_properties self.object_path, sync_properties=self.sync_properties
) )
await self._filesystem.initialize(self.dbus) await self._filesystem.initialize(self.connected_dbus)
elif self.filesystem and DBUS_IFACE_FILESYSTEM not in self.dbus.proxies: elif (
self.filesystem and DBUS_IFACE_FILESYSTEM not in self.connected_dbus.proxies
):
self.filesystem.stop_sync_property_changes() self.filesystem.stop_sync_property_changes()
self._filesystem = None self._filesystem = None
# Check if block device is a partition # Check if block device is a partition
if not self.partition and DBUS_IFACE_PARTITION in self.dbus.proxies: if not self.partition and DBUS_IFACE_PARTITION in self.connected_dbus.proxies:
self._partition = UDisks2Partition( self._partition = UDisks2Partition(
self.object_path, sync_properties=self.sync_properties self.object_path, sync_properties=self.sync_properties
) )
await self._partition.initialize(self.dbus) await self._partition.initialize(self.connected_dbus)
elif self.partition and DBUS_IFACE_PARTITION not in self.dbus.proxies: elif self.partition and DBUS_IFACE_PARTITION not in self.connected_dbus.proxies:
self.partition.stop_sync_property_changes() self.partition.stop_sync_property_changes()
self._partition = None self._partition = None
# Check if block device is a partition table # Check if block device is a partition table
if not self.partition_table and DBUS_IFACE_PARTITION_TABLE in self.dbus.proxies: if (
not self.partition_table
and DBUS_IFACE_PARTITION_TABLE in self.connected_dbus.proxies
):
self._partition_table = UDisks2PartitionTable( self._partition_table = UDisks2PartitionTable(
self.object_path, sync_properties=self.sync_properties self.object_path, sync_properties=self.sync_properties
) )
await self._partition_table.initialize(self.dbus) await self._partition_table.initialize(self.connected_dbus)
elif ( elif (
self.partition_table and DBUS_IFACE_PARTITION_TABLE not in self.dbus.proxies self.partition_table
and DBUS_IFACE_PARTITION_TABLE not in self.connected_dbus.proxies
): ):
self.partition_table.stop_sync_property_changes() self.partition_table.stop_sync_property_changes()
self._partition_table = None self._partition_table = None
@ -263,5 +274,7 @@ class UDisks2Block(DBusInterfaceProxy):
self, type_: FormatType = FormatType.GPT, options: FormatOptions | None = None self, type_: FormatType = FormatType.GPT, options: FormatOptions | None = None
) -> None: ) -> None:
"""Format block device.""" """Format block device."""
options = options.to_dict() if options else {} format_options = options.to_dict() if options else {}
await self.dbus.Block.call_format(type_, options | UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Block.call(
"format", type_, format_options | UDISKS2_DEFAULT_OPTIONS
)

View File

@ -1,7 +1,6 @@
"""Data for UDisks2.""" """Data for UDisks2."""
from dataclasses import dataclass from dataclasses import dataclass
from inspect import get_annotations
from pathlib import Path from pathlib import Path
from typing import Any, NotRequired, TypedDict from typing import Any, NotRequired, TypedDict
@ -23,41 +22,6 @@ def _optional_variant(signature: str, value: Any | None) -> Variant | None:
return Variant(signature, value) if value is not None else None return Variant(signature, value) if value is not None else None
UDisks2StandardOptionsDataType = TypedDict(
"UDisks2StandardOptionsDataType",
{"auth.no_user_interaction": NotRequired[bool]},
)
@dataclass(slots=True)
class UDisks2StandardOptions:
"""UDisks2 standard options.
http://storaged.org/doc/udisks2-api/latest/udisks-std-options.html
"""
auth_no_user_interaction: bool | None = None
@staticmethod
def from_dict(data: UDisks2StandardOptionsDataType) -> "UDisks2StandardOptions":
"""Create UDisks2StandardOptions from dict."""
return UDisks2StandardOptions(
auth_no_user_interaction=data.get("auth.no_user_interaction"),
)
def to_dict(self) -> dict[str, Variant]:
"""Return dict representation."""
data = {
"auth.no_user_interaction": _optional_variant(
"b", self.auth_no_user_interaction
),
}
return {k: v for k, v in data.items() if v}
_udisks2_standard_options_annotations = get_annotations(UDisks2StandardOptionsDataType)
class DeviceSpecificationDataType(TypedDict, total=False): class DeviceSpecificationDataType(TypedDict, total=False):
"""Device specification data type.""" """Device specification data type."""
@ -81,7 +45,7 @@ class DeviceSpecification:
def from_dict(data: DeviceSpecificationDataType) -> "DeviceSpecification": def from_dict(data: DeviceSpecificationDataType) -> "DeviceSpecification":
"""Create DeviceSpecification from dict.""" """Create DeviceSpecification from dict."""
return DeviceSpecification( return DeviceSpecification(
path=Path(data.get("path")), path=Path(data["path"]) if "path" in data else None,
label=data.get("label"), label=data.get("label"),
uuid=data.get("uuid"), uuid=data.get("uuid"),
) )
@ -109,13 +73,14 @@ FormatOptionsDataType = TypedDict(
"dry-run-first": NotRequired[bool], "dry-run-first": NotRequired[bool],
"no-discard": NotRequired[bool], "no-discard": NotRequired[bool],
"tear-down": NotRequired[bool], "tear-down": NotRequired[bool],
} # UDisks2 standard options
| _udisks2_standard_options_annotations, "auth.no_user_interaction": NotRequired[bool],
},
) )
@dataclass(slots=True) @dataclass(slots=True)
class FormatOptions(UDisks2StandardOptions): class FormatOptions:
"""Options for formatting a block device. """Options for formatting a block device.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Block.html#gdbus-method-org-freedesktop-UDisks2-Block.Format http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Block.html#gdbus-method-org-freedesktop-UDisks2-Block.Format
@ -131,6 +96,8 @@ class FormatOptions(UDisks2StandardOptions):
dry_run_first: bool | None = None dry_run_first: bool | None = None
no_discard: bool | None = None no_discard: bool | None = None
tear_down: bool | None = None tear_down: bool | None = None
# UDisks2 standard options
auth_no_user_interaction: bool | None = None
@staticmethod @staticmethod
def from_dict(data: FormatOptionsDataType) -> "FormatOptions": def from_dict(data: FormatOptionsDataType) -> "FormatOptions":
@ -146,7 +113,7 @@ class FormatOptions(UDisks2StandardOptions):
encrypt_type=EncryptType(data["encrypt.type"]) encrypt_type=EncryptType(data["encrypt.type"])
if "encrypt.type" in data if "encrypt.type" in data
else None, else None,
erase=EncryptType(data["erase"]) if "erase" in data else None, erase=EraseMode(data["erase"]) if "erase" in data else None,
update_partition_type=data.get("update-partition-type"), update_partition_type=data.get("update-partition-type"),
no_block=data.get("no-block"), no_block=data.get("no-block"),
dry_run_first=data.get("dry-run-first"), dry_run_first=data.get("dry-run-first"),
@ -188,13 +155,14 @@ MountOptionsDataType = TypedDict(
{ {
"fstype": NotRequired[str], "fstype": NotRequired[str],
"options": NotRequired[str], "options": NotRequired[str],
} # UDisks2 standard options
| _udisks2_standard_options_annotations, "auth.no_user_interaction": NotRequired[bool],
},
) )
@dataclass(slots=True) @dataclass(slots=True)
class MountOptions(UDisks2StandardOptions): class MountOptions:
"""Filesystem mount options. """Filesystem mount options.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Filesystem.html#gdbus-method-org-freedesktop-UDisks2-Filesystem.Mount http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Filesystem.html#gdbus-method-org-freedesktop-UDisks2-Filesystem.Mount
@ -202,6 +170,8 @@ class MountOptions(UDisks2StandardOptions):
fstype: str | None = None fstype: str | None = None
options: list[str] | None = None options: list[str] | None = None
# UDisks2 standard options
auth_no_user_interaction: bool | None = None
@staticmethod @staticmethod
def from_dict(data: MountOptionsDataType) -> "MountOptions": def from_dict(data: MountOptionsDataType) -> "MountOptions":
@ -227,22 +197,25 @@ class MountOptions(UDisks2StandardOptions):
UnmountOptionsDataType = TypedDict( UnmountOptionsDataType = TypedDict(
"UnountOptionsDataType", "UnmountOptionsDataType",
{ {
"force": NotRequired[bool], "force": NotRequired[bool],
} # UDisks2 standard options
| _udisks2_standard_options_annotations, "auth.no_user_interaction": NotRequired[bool],
},
) )
@dataclass(slots=True) @dataclass(slots=True)
class UnmountOptions(UDisks2StandardOptions): class UnmountOptions:
"""Filesystem unmount options. """Filesystem unmount options.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Filesystem.html#gdbus-method-org-freedesktop-UDisks2-Filesystem.Unmount http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Filesystem.html#gdbus-method-org-freedesktop-UDisks2-Filesystem.Unmount
""" """
force: bool | None = None force: bool | None = None
# UDisks2 standard options
auth_no_user_interaction: bool | None = None
@staticmethod @staticmethod
def from_dict(data: UnmountOptionsDataType) -> "UnmountOptions": def from_dict(data: UnmountOptionsDataType) -> "UnmountOptions":
@ -267,18 +240,24 @@ class UnmountOptions(UDisks2StandardOptions):
CreatePartitionOptionsDataType = TypedDict( CreatePartitionOptionsDataType = TypedDict(
"CreatePartitionOptionsDataType", "CreatePartitionOptionsDataType",
{"partition-type": NotRequired[str]} | _udisks2_standard_options_annotations, {
"partition-type": NotRequired[str],
# UDisks2 standard options
"auth.no_user_interaction": NotRequired[bool],
},
) )
@dataclass(slots=True) @dataclass(slots=True)
class CreatePartitionOptions(UDisks2StandardOptions): class CreatePartitionOptions:
"""Create partition options. """Create partition options.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.PartitionTable.html#gdbus-method-org-freedesktop-UDisks2-PartitionTable.CreatePartition http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.PartitionTable.html#gdbus-method-org-freedesktop-UDisks2-PartitionTable.CreatePartition
""" """
partition_type: str | None = None partition_type: str | None = None
# UDisks2 standard options
auth_no_user_interaction: bool | None = None
@staticmethod @staticmethod
def from_dict(data: CreatePartitionOptionsDataType) -> "CreatePartitionOptions": def from_dict(data: CreatePartitionOptionsDataType) -> "CreatePartitionOptions":
@ -303,18 +282,24 @@ class CreatePartitionOptions(UDisks2StandardOptions):
DeletePartitionOptionsDataType = TypedDict( DeletePartitionOptionsDataType = TypedDict(
"DeletePartitionOptionsDataType", "DeletePartitionOptionsDataType",
{"tear-down": NotRequired[bool]} | _udisks2_standard_options_annotations, {
"tear-down": NotRequired[bool],
# UDisks2 standard options
"auth.no_user_interaction": NotRequired[bool],
},
) )
@dataclass(slots=True) @dataclass(slots=True)
class DeletePartitionOptions(UDisks2StandardOptions): class DeletePartitionOptions:
"""Delete partition options. """Delete partition options.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Partition.html#gdbus-method-org-freedesktop-UDisks2-Partition.Delete http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Partition.html#gdbus-method-org-freedesktop-UDisks2-Partition.Delete
""" """
tear_down: bool | None = None tear_down: bool | None = None
# UDisks2 standard options
auth_no_user_interaction: bool | None = None
@staticmethod @staticmethod
def from_dict(data: DeletePartitionOptionsDataType) -> "DeletePartitionOptions": def from_dict(data: DeletePartitionOptionsDataType) -> "DeletePartitionOptions":

View File

@ -37,7 +37,7 @@ class UDisks2Drive(DBusInterfaceProxy):
def __init__(self, object_path: str) -> None: def __init__(self, object_path: str) -> None:
"""Initialize object.""" """Initialize object."""
self.object_path = object_path self._object_path = object_path
super().__init__() super().__init__()
@staticmethod @staticmethod
@ -47,6 +47,11 @@ class UDisks2Drive(DBusInterfaceProxy):
await obj.connect(bus) await obj.connect(bus)
return obj return obj
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property
def vendor(self) -> str: def vendor(self) -> str:
@ -124,4 +129,4 @@ class UDisks2Drive(DBusInterfaceProxy):
@dbus_connected @dbus_connected
async def eject(self) -> None: async def eject(self) -> None:
"""Eject media from drive.""" """Eject media from drive."""
await self.dbus.Drive.call_eject(UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Drive.call("eject", UDISKS2_DEFAULT_OPTIONS)

View File

@ -26,10 +26,15 @@ class UDisks2Filesystem(DBusInterfaceProxy):
def __init__(self, object_path: str, *, sync_properties: bool = True) -> None: def __init__(self, object_path: str, *, sync_properties: bool = True) -> None:
"""Initialize object.""" """Initialize object."""
self.object_path = object_path self._object_path = object_path
self.sync_properties = sync_properties self.sync_properties = sync_properties
super().__init__() super().__init__()
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property
def mount_points(self) -> list[Path]: def mount_points(self) -> list[Path]:
@ -53,26 +58,36 @@ class UDisks2Filesystem(DBusInterfaceProxy):
if not overridden in /etc/fstab. Therefore unclear if this can be useful to supervisor. if not overridden in /etc/fstab. Therefore unclear if this can be useful to supervisor.
http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Filesystem.html#gdbus-method-org-freedesktop-UDisks2-Filesystem.Mount http://storaged.org/doc/udisks2-api/latest/gdbus-org.freedesktop.UDisks2.Filesystem.html#gdbus-method-org-freedesktop-UDisks2-Filesystem.Mount
""" """
options = options.to_dict() if options else {} mount_options = options.to_dict() if options else {}
return await self.dbus.Filesystem.call_mount(options | UDISKS2_DEFAULT_OPTIONS) return await self.connected_dbus.Filesystem.call(
"mount", mount_options | UDISKS2_DEFAULT_OPTIONS
)
@dbus_connected @dbus_connected
async def unmount(self, options: UnmountOptions | None = None) -> None: async def unmount(self, options: UnmountOptions | None = None) -> None:
"""Unmount filesystem.""" """Unmount filesystem."""
options = options.to_dict() if options else {} unmount_options = options.to_dict() if options else {}
await self.dbus.Filesystem.call_unmount(options | UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Filesystem.call(
"unmount", unmount_options | UDISKS2_DEFAULT_OPTIONS
)
@dbus_connected @dbus_connected
async def set_label(self, label: str) -> None: async def set_label(self, label: str) -> None:
"""Set filesystem label.""" """Set filesystem label."""
await self.dbus.Filesystem.call_set_label(label, UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Filesystem.call(
"set_label", label, UDISKS2_DEFAULT_OPTIONS
)
@dbus_connected @dbus_connected
async def check(self) -> bool: async def check(self) -> bool:
"""Check filesystem for consistency. Returns true if it passed.""" """Check filesystem for consistency. Returns true if it passed."""
return await self.dbus.Filesystem.call_check(UDISKS2_DEFAULT_OPTIONS) return await self.connected_dbus.Filesystem.call(
"check", UDISKS2_DEFAULT_OPTIONS
)
@dbus_connected @dbus_connected
async def repair(self) -> bool: async def repair(self) -> bool:
"""Attempt to repair filesystem. Returns true if repair was successful.""" """Attempt to repair filesystem. Returns true if repair was successful."""
return await self.dbus.Filesystem.call_repair(UDISKS2_DEFAULT_OPTIONS) return await self.connected_dbus.Filesystem.call(
"repair", UDISKS2_DEFAULT_OPTIONS
)

View File

@ -29,10 +29,15 @@ class UDisks2Partition(DBusInterfaceProxy):
def __init__(self, object_path: str, *, sync_properties: bool = True) -> None: def __init__(self, object_path: str, *, sync_properties: bool = True) -> None:
"""Initialize object.""" """Initialize object."""
self.object_path = object_path self._object_path = object_path
self.sync_properties = sync_properties self.sync_properties = sync_properties
super().__init__() super().__init__()
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property
def number(self) -> int: def number(self) -> int:
@ -86,12 +91,16 @@ class UDisks2Partition(DBusInterfaceProxy):
for GPT type tables or a hexadecimal number for dos type tables. Can also use empty string for GPT type tables or a hexadecimal number for dos type tables. Can also use empty string
and let UDisks2 choose a default based on partition table and OS. and let UDisks2 choose a default based on partition table and OS.
""" """
await self.dbus.Partition.call_set_type(type_, UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Partition.call(
"set_type", type_, UDISKS2_DEFAULT_OPTIONS
)
@dbus_connected @dbus_connected
async def set_name(self, name: str) -> None: async def set_name(self, name: str) -> None:
"""Set the name/label of the partition.""" """Set the name/label of the partition."""
await self.dbus.Partition.call_set_name(name, UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Partition.call(
"set_name", name, UDISKS2_DEFAULT_OPTIONS
)
@dbus_connected @dbus_connected
async def resize(self, size: int = 0) -> None: async def resize(self, size: int = 0) -> None:
@ -100,10 +109,14 @@ class UDisks2Partition(DBusInterfaceProxy):
Position/offset cannot be changed, only size. May be slightly bigger then requested. Position/offset cannot be changed, only size. May be slightly bigger then requested.
Raises error if allocation fails. Raises error if allocation fails.
""" """
await self.dbus.Partition.call_resize(size, UDISKS2_DEFAULT_OPTIONS) await self.connected_dbus.Partition.call(
"resize", size, UDISKS2_DEFAULT_OPTIONS
)
@dbus_connected @dbus_connected
async def delete(self, options: DeletePartitionOptions | None = None) -> None: async def delete(self, options: DeletePartitionOptions | None = None) -> None:
"""Delete the partition.""" """Delete the partition."""
options = options.to_dict() if options else {} delete_options = options.to_dict() if options else {}
return await self.dbus.Partition.call_delete(options | UDISKS2_DEFAULT_OPTIONS) return await self.connected_dbus.Partition.call(
"delete", delete_options | UDISKS2_DEFAULT_OPTIONS
)

View File

@ -24,10 +24,15 @@ class UDisks2PartitionTable(DBusInterfaceProxy):
def __init__(self, object_path: str, *, sync_properties: bool = True) -> None: def __init__(self, object_path: str, *, sync_properties: bool = True) -> None:
"""Initialize object.""" """Initialize object."""
self.object_path = object_path self._object_path = object_path
self.sync_properties = sync_properties self.sync_properties = sync_properties
super().__init__() super().__init__()
@property
def object_path(self) -> str:
"""Object path for dbus object."""
return self._object_path
@property @property
@dbus_property @dbus_property
def partitions(self) -> list[str]: def partitions(self) -> list[str]:
@ -59,7 +64,12 @@ class UDisks2PartitionTable(DBusInterfaceProxy):
and let UDisks2 choose a default based on partition table and OS. and let UDisks2 choose a default based on partition table and OS.
Provide return value with UDisks2Block.new. Or UDisks2.get_block_device after UDisks2.update. Provide return value with UDisks2Block.new. Or UDisks2.get_block_device after UDisks2.update.
""" """
options = options.to_dict() if options else {} partition_options = options.to_dict() if options else {}
return await self.dbus.PartitionTable.call_create_partition( return await self.connected_dbus.PartitionTable.call(
offset, size, type_, name, options | UDISKS2_DEFAULT_OPTIONS "create_partition",
offset,
size,
type_,
name,
partition_options | UDISKS2_DEFAULT_OPTIONS,
) )

View File

@ -153,16 +153,9 @@ class Mount(CoreSysAttributes, ABC):
return self._state return self._state
@cached_property @cached_property
def local_where(self) -> Path | None: def local_where(self) -> Path:
"""Return where this is mounted within supervisor container. """Return where this is mounted within supervisor container."""
return self.sys_config.extern_to_local_path(self.where)
This returns none if 'where' is not within supervisor's host data directory.
"""
return (
self.sys_config.extern_to_local_path(self.where)
if self.where.is_relative_to(self.sys_config.path_extern_supervisor)
else None
)
@property @property
def container_where(self) -> PurePath | None: def container_where(self) -> PurePath | None:
@ -276,27 +269,25 @@ class Mount(CoreSysAttributes, ABC):
async def mount(self) -> None: async def mount(self) -> None:
"""Mount using systemd.""" """Mount using systemd."""
# If supervisor can see where it will mount, ensure there's an empty folder there
if self.local_where:
def ensure_empty_folder() -> None: def ensure_empty_folder() -> None:
if not self.local_where.exists(): if not self.local_where.exists():
_LOGGER.info( _LOGGER.info(
"Creating folder for mount: %s", self.local_where.as_posix() "Creating folder for mount: %s", self.local_where.as_posix()
) )
self.local_where.mkdir(parents=True) self.local_where.mkdir(parents=True)
elif not self.local_where.is_dir(): elif not self.local_where.is_dir():
raise MountInvalidError( raise MountInvalidError(
f"Cannot mount {self.name} at {self.local_where.as_posix()} as it is not a directory", f"Cannot mount {self.name} at {self.local_where.as_posix()} as it is not a directory",
_LOGGER.error, _LOGGER.error,
) )
elif any(self.local_where.iterdir()): elif any(self.local_where.iterdir()):
raise MountInvalidError( raise MountInvalidError(
f"Cannot mount {self.name} at {self.local_where.as_posix()} because it is not empty", f"Cannot mount {self.name} at {self.local_where.as_posix()} because it is not empty",
_LOGGER.error, _LOGGER.error,
) )
await self.sys_run_in_executor(ensure_empty_folder) await self.sys_run_in_executor(ensure_empty_folder)
try: try:
options = ( options = (
@ -542,6 +533,9 @@ class BindMount(Mount):
self, coresys: CoreSys, data: MountData, *, where: PurePath | None = None self, coresys: CoreSys, data: MountData, *, where: PurePath | None = None
) -> None: ) -> None:
"""Initialize object.""" """Initialize object."""
if where and not where.is_relative_to(coresys.config.path_extern_supervisor):
raise ValueError("Path must be within Supervisor's host data directory!")
super().__init__(coresys, data) super().__init__(coresys, data)
self._where = where self._where = where

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import asyncio import asyncio
from collections.abc import Awaitable, Callable, Coroutine from collections.abc import Awaitable, Callable, Coroutine
import logging import logging
from typing import Any from typing import Any, cast
from dbus_fast import ( from dbus_fast import (
ErrorType, ErrorType,
@ -305,9 +305,34 @@ class DBus:
else: else:
self._signal_monitors[interface][dbus_name].append(callback) self._signal_monitors[interface][dbus_name].append(callback)
@property
def _call_wrapper(self) -> DBusCallWrapper:
"""Get dbus call wrapper for current dbus object."""
return DBusCallWrapper(self, self.bus_name)
def __getattr__(self, name: str) -> DBusCallWrapper: def __getattr__(self, name: str) -> DBusCallWrapper:
"""Map to dbus method.""" """Map to dbus method."""
return getattr(DBusCallWrapper(self, self.bus_name), name) return getattr(self._call_wrapper, name)
def call(self, name: str, *args, unpack_variants: bool = True) -> Awaitable[Any]:
"""Call a dbus method."""
return self._call_wrapper.call(name, *args, unpack_variants=unpack_variants)
def get(self, name: str, *, unpack_variants: bool = True) -> Awaitable[Any]:
"""Get a dbus property value."""
return self._call_wrapper.get(name, unpack_variants=unpack_variants)
def set(self, name: str, value: Any) -> Awaitable[None]:
"""Set a dbus property."""
return self._call_wrapper.set(name, value)
def on(self, name: str, callback: Callable) -> None:
"""Add listener for a signal."""
self._call_wrapper.on(name, callback)
def off(self, name: str, callback: Callable) -> None:
"""Remove listener for a signal."""
self._call_wrapper.off(name, callback)
class DBusCallWrapper: class DBusCallWrapper:
@ -324,7 +349,9 @@ class DBusCallWrapper:
_LOGGER.error("D-Bus method %s not exists!", self.interface) _LOGGER.error("D-Bus method %s not exists!", self.interface)
raise DBusInterfaceMethodError() raise DBusInterfaceMethodError()
def __getattr__(self, name: str) -> Awaitable | Callable: def _dbus_action(
self, name: str
) -> DBusCallWrapper | Callable[..., Awaitable[Any]] | Callable[[Callable], None]:
"""Map to dbus method.""" """Map to dbus method."""
if not self._proxy: if not self._proxy:
return DBusCallWrapper(self.dbus, f"{self.interface}.{name}") return DBusCallWrapper(self.dbus, f"{self.interface}.{name}")
@ -409,6 +436,36 @@ class DBusCallWrapper:
# Didn't reach the dbus call yet, just happened to hit another interface. Return a wrapper # Didn't reach the dbus call yet, just happened to hit another interface. Return a wrapper
return DBusCallWrapper(self.dbus, f"{self.interface}.{name}") return DBusCallWrapper(self.dbus, f"{self.interface}.{name}")
def __getattr__(self, name: str) -> DBusCallWrapper:
"""Map to a dbus method."""
return cast(DBusCallWrapper, self._dbus_action(name))
def call(self, name: str, *args, unpack_variants: bool = True) -> Awaitable[Any]:
"""Call a dbus method."""
return cast(Callable[..., Awaitable[Any]], self._dbus_action(f"call_{name}"))(
*args, unpack_variants=unpack_variants
)
def get(self, name: str, *, unpack_variants: bool = True) -> Awaitable[Any]:
"""Get a dbus property value."""
return cast(Callable[[bool], Awaitable[Any]], self._dbus_action(f"get_{name}"))(
unpack_variants=unpack_variants
)
def set(self, name: str, value: Any) -> Awaitable[None]:
"""Set a dbus property."""
return cast(Callable[[Any], Awaitable[Any]], self._dbus_action(f"set_{name}"))(
value
)
def on(self, name: str, callback: Callable) -> None:
"""Add listener for a signal."""
cast(Callable[[Callable], None], self._dbus_action(f"on_{name}"))(callback)
def off(self, name: str, callback: Callable) -> None:
"""Remove listener for a signal."""
cast(Callable[[Callable], None], self._dbus_action(f"off_{name}"))(callback)
class DBusSignalWrapper: class DBusSignalWrapper:
"""Wrapper for D-Bus Signal.""" """Wrapper for D-Bus Signal."""

View File

@ -12,7 +12,7 @@ from awesomeversion import AwesomeVersion
import pytest import pytest
from supervisor.addons.addon import Addon from supervisor.addons.addon import Addon
from supervisor.backups.backup import Backup from supervisor.backups.backup import Backup, BackupLocation
from supervisor.const import CoreState from supervisor.const import CoreState
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.docker.manager import DockerAPI from supervisor.docker.manager import DockerAPI
@ -505,7 +505,9 @@ async def test_restore_immediate_errors(
with ( with (
patch.object( patch.object(
Backup, "all_locations", new={None: {"path": None, "protected": True}} Backup,
"all_locations",
new={None: BackupLocation(path=Path("/"), protected=True, size_bytes=0)},
), ),
patch.object( patch.object(
Backup, Backup,
@ -586,7 +588,9 @@ async def test_cloud_backup_core_only(api_client: TestClient, mock_full_backup:
# pylint: disable-next=protected-access # pylint: disable-next=protected-access
mock_full_backup._locations = { mock_full_backup._locations = {
".cloud_backup": {"path": None, "protected": False, "size_bytes": 10240} ".cloud_backup": BackupLocation(
path=Path("/"), protected=False, size_bytes=10240
)
} }
assert mock_full_backup.location == ".cloud_backup" assert mock_full_backup.location == ".cloud_backup"
@ -672,8 +676,10 @@ async def test_backup_to_multiple_locations(
assert orig_backup.exists() assert orig_backup.exists()
assert copy_backup.exists() assert copy_backup.exists()
assert coresys.backups.get(slug).all_locations == { assert coresys.backups.get(slug).all_locations == {
None: {"path": orig_backup, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
".cloud_backup": {"path": copy_backup, "protected": False, "size_bytes": 10240}, ".cloud_backup": BackupLocation(
path=copy_backup, protected=False, size_bytes=10240
),
} }
assert coresys.backups.get(slug).location is None assert coresys.backups.get(slug).location is None
@ -709,7 +715,7 @@ async def test_backup_to_multiple_locations_error_on_copy(
orig_backup = coresys.config.path_backup / f"{slug}.tar" orig_backup = coresys.config.path_backup / f"{slug}.tar"
assert await coresys.run_in_executor(orig_backup.exists) assert await coresys.run_in_executor(orig_backup.exists)
assert coresys.backups.get(slug).all_locations == { assert coresys.backups.get(slug).all_locations == {
None: {"path": orig_backup, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
} }
assert coresys.backups.get(slug).location is None assert coresys.backups.get(slug).location is None
@ -783,8 +789,10 @@ async def test_upload_to_multiple_locations(
assert orig_backup.exists() assert orig_backup.exists()
assert copy_backup.exists() assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == { assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
".cloud_backup": {"path": copy_backup, "protected": False, "size_bytes": 10240}, ".cloud_backup": BackupLocation(
path=copy_backup, protected=False, size_bytes=10240
),
} }
assert coresys.backups.get("7fed74c8").location is None assert coresys.backups.get("7fed74c8").location is None
@ -798,7 +806,7 @@ async def test_upload_duplicate_backup_new_location(
orig_backup = Path(copy(backup_file, coresys.config.path_backup)) orig_backup = Path(copy(backup_file, coresys.config.path_backup))
await coresys.backups.reload() await coresys.backups.reload()
assert coresys.backups.get("7fed74c8").all_locations == { assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False, "size_bytes": 10240} None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
} }
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp: with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
@ -815,8 +823,10 @@ async def test_upload_duplicate_backup_new_location(
assert orig_backup.exists() assert orig_backup.exists()
assert copy_backup.exists() assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == { assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
".cloud_backup": {"path": copy_backup, "protected": False, "size_bytes": 10240}, ".cloud_backup": BackupLocation(
path=copy_backup, protected=False, size_bytes=10240
),
} }
assert coresys.backups.get("7fed74c8").location is None assert coresys.backups.get("7fed74c8").location is None
@ -853,7 +863,7 @@ async def test_upload_with_filename(
orig_backup = coresys.config.path_backup / filename orig_backup = coresys.config.path_backup / filename
assert orig_backup.exists() assert orig_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == { assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False, "size_bytes": 10240} None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
} }
assert coresys.backups.get("7fed74c8").location is None assert coresys.backups.get("7fed74c8").location is None
@ -886,8 +896,10 @@ async def test_remove_backup_from_location(api_client: TestClient, coresys: Core
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240}, ".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
} }
resp = await api_client.delete( resp = await api_client.delete(
@ -899,7 +911,7 @@ async def test_remove_backup_from_location(api_client: TestClient, coresys: Core
assert not location_2.exists() assert not location_2.exists()
assert coresys.backups.get("7fed74c8") assert coresys.backups.get("7fed74c8")
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240} None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
} }
@ -912,7 +924,7 @@ async def test_remove_backup_file_not_found(api_client: TestClient, coresys: Cor
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": location, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=location, protected=False, size_bytes=10240),
} }
location.unlink() location.unlink()
@ -940,8 +952,10 @@ async def test_download_backup_from_location(
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240}, ".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
} }
# The use case of this is user might want to pick a particular mount if one is flaky # The use case of this is user might want to pick a particular mount if one is flaky
@ -1019,7 +1033,7 @@ async def test_restore_backup_from_location(
# The use case of this is user might want to pick a particular mount if one is flaky # The use case of this is user might want to pick a particular mount if one is flaky
# To simulate this, remove the file from one location and show one works and the other doesn't # To simulate this, remove the file from one location and show one works and the other doesn't
assert backup.location is None assert backup.location is None
(backup_local_path := backup.all_locations[None]["path"]).unlink() (backup_local_path := backup.all_locations[None].path).unlink()
test_file.unlink() test_file.unlink()
resp = await api_client.post( resp = await api_client.post(
@ -1055,12 +1069,12 @@ async def test_restore_backup_unencrypted_after_encrypted(
backup = coresys.backups.get("d9c48f8b") backup = coresys.backups.get("d9c48f8b")
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": Path(enc_tar), "protected": True, "size_bytes": 10240}, None: BackupLocation(path=Path(enc_tar), protected=True, size_bytes=10240),
".cloud_backup": { ".cloud_backup": BackupLocation(
"path": Path(unc_tar), path=Path(unc_tar),
"protected": False, protected=False,
"size_bytes": 10240, size_bytes=10240,
}, ),
} }
# pylint: disable=fixme # pylint: disable=fixme
@ -1173,12 +1187,12 @@ async def test_backup_mixed_encryption(api_client: TestClient, coresys: CoreSys)
backup = coresys.backups.get("d9c48f8b") backup = coresys.backups.get("d9c48f8b")
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": Path(enc_tar), "protected": True, "size_bytes": 10240}, None: BackupLocation(path=Path(enc_tar), protected=True, size_bytes=10240),
".cloud_backup": { ".cloud_backup": BackupLocation(
"path": Path(unc_tar), path=Path(unc_tar),
"protected": False, protected=False,
"size_bytes": 10240, size_bytes=10240,
}, ),
} }
resp = await api_client.get("/backups") resp = await api_client.get("/backups")

View File

@ -1,9 +1,11 @@
"""Mock test.""" """Mock test."""
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
from supervisor.backups.backup import BackupLocation
from supervisor.backups.const import LOCATION_CLOUD_BACKUP, LOCATION_TYPE, BackupType from supervisor.backups.const import LOCATION_CLOUD_BACKUP, LOCATION_TYPE, BackupType
from supervisor.backups.validate import ALL_FOLDERS from supervisor.backups.validate import ALL_FOLDERS
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
@ -41,7 +43,9 @@ def partial_backup_mock(backup_mock):
backup_instance.addon_list = [TEST_ADDON_SLUG] backup_instance.addon_list = [TEST_ADDON_SLUG]
backup_instance.supervisor_version = "9999.09.9.dev9999" backup_instance.supervisor_version = "9999.09.9.dev9999"
backup_instance.location = None backup_instance.location = None
backup_instance.all_locations = {None: {"protected": False}} backup_instance.all_locations = {
None: BackupLocation(path=Path("/"), protected=False, size_bytes=0)
}
backup_instance.validate_backup = AsyncMock() backup_instance.validate_backup = AsyncMock()
yield backup_mock yield backup_mock
@ -55,7 +59,9 @@ def full_backup_mock(backup_mock):
backup_instance.addon_list = [TEST_ADDON_SLUG] backup_instance.addon_list = [TEST_ADDON_SLUG]
backup_instance.supervisor_version = "9999.09.9.dev9999" backup_instance.supervisor_version = "9999.09.9.dev9999"
backup_instance.location = None backup_instance.location = None
backup_instance.all_locations = {None: {"protected": False}} backup_instance.all_locations = {
None: BackupLocation(path=Path("/"), protected=False, size_bytes=0)
}
backup_instance.validate_backup = AsyncMock() backup_instance.validate_backup = AsyncMock()
yield backup_mock yield backup_mock

View File

@ -9,7 +9,7 @@ from unittest.mock import MagicMock, patch
import pytest import pytest
from supervisor.backups.backup import Backup from supervisor.backups.backup import Backup, BackupLocation
from supervisor.backups.const import BackupType from supervisor.backups.const import BackupType
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.exceptions import ( from supervisor.exceptions import (
@ -86,7 +86,7 @@ async def test_consolidate_conflict_varied_encryption(
in caplog.text in caplog.text
) )
assert enc_backup.all_locations == { assert enc_backup.all_locations == {
None: {"path": unc_tar, "protected": False, "size_bytes": 10240} None: BackupLocation(path=unc_tar, protected=False, size_bytes=10240),
} }
@ -112,8 +112,8 @@ async def test_consolidate(
not in caplog.text not in caplog.text
) )
assert enc_backup.all_locations == { assert enc_backup.all_locations == {
None: {"path": enc_tar, "protected": True, "size_bytes": 10240}, None: BackupLocation(path=enc_tar, protected=True, size_bytes=10240),
"backup_test": {"path": unc_tar, "protected": False, "size_bytes": 10240}, "backup_test": BackupLocation(path=unc_tar, protected=False, size_bytes=10240),
} }

View File

@ -14,7 +14,7 @@ import pytest
from supervisor.addons.addon import Addon from supervisor.addons.addon import Addon
from supervisor.addons.const import AddonBackupMode from supervisor.addons.const import AddonBackupMode
from supervisor.addons.model import AddonModel from supervisor.addons.model import AddonModel
from supervisor.backups.backup import Backup from supervisor.backups.backup import Backup, BackupLocation
from supervisor.backups.const import LOCATION_TYPE, BackupType from supervisor.backups.const import LOCATION_TYPE, BackupType
from supervisor.backups.manager import BackupManager from supervisor.backups.manager import BackupManager
from supervisor.const import FOLDER_HOMEASSISTANT, FOLDER_SHARE, AddonState, CoreState from supervisor.const import FOLDER_HOMEASSISTANT, FOLDER_SHARE, AddonState, CoreState
@ -344,13 +344,13 @@ async def test_fail_invalid_full_backup(
await manager.do_restore_full(partial_backup_mock.return_value) await manager.do_restore_full(partial_backup_mock.return_value)
backup_instance = full_backup_mock.return_value backup_instance = full_backup_mock.return_value
backup_instance.all_locations[None]["protected"] = True backup_instance.all_locations[None].protected = True
backup_instance.validate_backup.side_effect = BackupInvalidError() backup_instance.validate_backup.side_effect = BackupInvalidError()
with pytest.raises(BackupInvalidError): with pytest.raises(BackupInvalidError):
await manager.do_restore_full(backup_instance) await manager.do_restore_full(backup_instance)
backup_instance.all_locations[None]["protected"] = False backup_instance.all_locations[None].protected = False
backup_instance.supervisor_version = "2022.08.4" backup_instance.supervisor_version = "2022.08.4"
with ( with (
patch.object( patch.object(
@ -373,13 +373,13 @@ async def test_fail_invalid_partial_backup(
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
backup_instance = partial_backup_mock.return_value backup_instance = partial_backup_mock.return_value
backup_instance.all_locations[None]["protected"] = True backup_instance.all_locations[None].protected = True
backup_instance.validate_backup.side_effect = BackupInvalidError() backup_instance.validate_backup.side_effect = BackupInvalidError()
with pytest.raises(BackupInvalidError): with pytest.raises(BackupInvalidError):
await manager.do_restore_partial(backup_instance) await manager.do_restore_partial(backup_instance)
backup_instance.all_locations[None]["protected"] = False backup_instance.all_locations[None].protected = False
backup_instance.homeassistant = None backup_instance.homeassistant = None
with pytest.raises(BackupInvalidError): with pytest.raises(BackupInvalidError):
@ -1747,7 +1747,7 @@ async def test_backup_remove_error(
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert location_name in backup.all_locations assert location_name in backup.all_locations
backup.all_locations[location_name]["path"] = (tar_file_mock := MagicMock()) backup.all_locations[location_name].path = (tar_file_mock := MagicMock())
tar_file_mock.unlink.side_effect = (err := OSError()) tar_file_mock.unlink.side_effect = (err := OSError())
err.errno = errno.EBUSY err.errno = errno.EBUSY
@ -2001,8 +2001,10 @@ async def test_backup_remove_multiple_locations(coresys: CoreSys):
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240}, ".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
} }
await coresys.backups.remove(backup) await coresys.backups.remove(backup)
@ -2021,8 +2023,10 @@ async def test_backup_remove_one_location_of_multiple(coresys: CoreSys):
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240}, None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240}, ".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
} }
await coresys.backups.remove(backup, locations=[".cloud_backup"]) await coresys.backups.remove(backup, locations=[".cloud_backup"])
@ -2030,7 +2034,7 @@ async def test_backup_remove_one_location_of_multiple(coresys: CoreSys):
assert not location_2.exists() assert not location_2.exists()
assert coresys.backups.get("7fed74c8") assert coresys.backups.get("7fed74c8")
assert backup.all_locations == { assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240} None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
} }
@ -2074,7 +2078,7 @@ async def test_remove_non_existing_backup_raises(
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert None in backup.all_locations assert None in backup.all_locations
backup.all_locations[None]["path"] = (tar_file_mock := MagicMock()) backup.all_locations[None].path = (tar_file_mock := MagicMock())
tar_file_mock.unlink.side_effect = (err := FileNotFoundError()) tar_file_mock.unlink.side_effect = (err := FileNotFoundError())
err.errno = errno.ENOENT err.errno = errno.ENOENT

View File

@ -24,7 +24,7 @@ async def test_dbus_green(green_service: GreenService, dbus_session_bus: Message
green = await Green().load_config() green = await Green().load_config()
await green.connect(dbus_session_bus) await green.connect(dbus_session_bus)
assert green.name == "Green" assert green.board_name == "Green"
assert green.activity_led is True assert green.activity_led is True
assert green.power_led is True assert green.power_led is True
assert green.user_led is True assert green.user_led is True

View File

@ -24,7 +24,7 @@ async def test_dbus_yellow(yellow_service: YellowService, dbus_session_bus: Mess
yellow = await Yellow().load_config() yellow = await Yellow().load_config()
await yellow.connect(dbus_session_bus) await yellow.connect(dbus_session_bus)
assert yellow.name == "Yellow" assert yellow.board_name == "Yellow"
assert yellow.disk_led is True assert yellow.disk_led is True
assert yellow.heartbeat_led is True assert yellow.heartbeat_led is True
assert yellow.power_led is True assert yellow.power_led is True

View File

@ -38,6 +38,14 @@ class TestInterface(DBusServiceMock):
return 4 return 4
class ServiceTest(DBusInterfaceProxy):
"""DBus test class."""
bus_name = "service.test.TestInterface"
object_path = "/service/test/TestInterface"
properties_interface = "service.test.TestInterface"
@pytest.fixture(name="test_service") @pytest.fixture(name="test_service")
async def fixture_test_service(dbus_session_bus: MessageBus) -> TestInterface: async def fixture_test_service(dbus_session_bus: MessageBus) -> TestInterface:
"""Export test interface on dbus.""" """Export test interface on dbus."""
@ -54,12 +62,8 @@ async def fixture_proxy(
dbus_session_bus: MessageBus, dbus_session_bus: MessageBus,
) -> DBusInterfaceProxy: ) -> DBusInterfaceProxy:
"""Get a proxy.""" """Get a proxy."""
proxy = DBusInterfaceProxy() proxy = ServiceTest()
proxy.bus_name = "service.test.TestInterface"
proxy.object_path = "/service/test/TestInterface"
proxy.properties_interface = "service.test.TestInterface"
proxy.sync_properties = getattr(request, "param", True) proxy.sync_properties = getattr(request, "param", True)
await proxy.connect(dbus_session_bus) await proxy.connect(dbus_session_bus)
yield proxy yield proxy
@ -122,10 +126,7 @@ async def test_dbus_connected_no_raise_after_shutdown(
test_service: TestInterface, dbus_session_bus: MessageBus test_service: TestInterface, dbus_session_bus: MessageBus
): ):
"""Test dbus connected methods do not raise DBusNotConnectedError after shutdown.""" """Test dbus connected methods do not raise DBusNotConnectedError after shutdown."""
proxy = DBusInterfaceProxy() proxy = ServiceTest()
proxy.bus_name = "service.test.TestInterface"
proxy.object_path = "/service/test/TestInterface"
proxy.properties_interface = "service.test.TestInterface"
proxy.sync_properties = False proxy.sync_properties = False
with pytest.raises(DBusNotConnectedError): with pytest.raises(DBusNotConnectedError):
@ -141,10 +142,13 @@ async def test_dbus_connected_no_raise_after_shutdown(
async def test_proxy_missing_properties_interface(dbus_session_bus: MessageBus): async def test_proxy_missing_properties_interface(dbus_session_bus: MessageBus):
"""Test proxy instance disconnects and errors when missing properties interface.""" """Test proxy instance disconnects and errors when missing properties interface."""
proxy = DBusInterfaceProxy()
proxy.bus_name = "test.no.properties.interface" class NoPropertiesService(DBusInterfaceProxy):
proxy.object_path = DBUS_OBJECT_BASE bus_name = "test.no.properties.interface"
proxy.properties_interface = "test.no.properties.interface" object_path = DBUS_OBJECT_BASE
properties_interface = "test.no.properties.interface"
proxy = NoPropertiesService()
def mock_introspect(*args, **kwargs): def mock_introspect(*args, **kwargs):
"""Return introspection without properties.""" """Return introspection without properties."""
@ -163,10 +167,12 @@ async def test_proxy_missing_properties_interface(dbus_session_bus: MessageBus):
async def test_initialize(test_service: TestInterface, dbus_session_bus: MessageBus): async def test_initialize(test_service: TestInterface, dbus_session_bus: MessageBus):
"""Test initialize for reusing connected dbus object.""" """Test initialize for reusing connected dbus object."""
proxy = DBusInterface()
proxy.bus_name = "service.test.TestInterface"
proxy.object_path = "/service/test/TestInterface"
class ServiceTestInterfaceOnly(DBusInterface):
bus_name = "service.test.TestInterface"
object_path = "/service/test/TestInterface"
proxy = ServiceTestInterfaceOnly()
assert proxy.is_connected is False assert proxy.is_connected is False
# Not connected # Not connected