mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-14 04:36:31 +00:00
Use backup mounts (#4289)
* Add support for backup mounts * Fix tests * Allow backups to local when there's a default location
This commit is contained in:
parent
6a95f97ec9
commit
b4fd5b28f6
@ -576,6 +576,7 @@ class RestAPI(CoreSysAttributes):
|
||||
self.webapp.add_routes(
|
||||
[
|
||||
web.get("/mounts", api_mounts.info),
|
||||
web.post("/mounts/options", api_mounts.options),
|
||||
web.post("/mounts", api_mounts.create_mount),
|
||||
web.put("/mounts/{mount}", api_mounts.update_mount),
|
||||
web.delete("/mounts/{mount}", api_mounts.delete_mount),
|
||||
|
@ -4,6 +4,7 @@ import logging
|
||||
from pathlib import Path
|
||||
import re
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Any
|
||||
|
||||
from aiohttp import web
|
||||
from aiohttp.hdrs import CONTENT_DISPOSITION
|
||||
@ -19,6 +20,7 @@ from ..const import (
|
||||
ATTR_DAYS_UNTIL_STALE,
|
||||
ATTR_FOLDERS,
|
||||
ATTR_HOMEASSISTANT,
|
||||
ATTR_LOCATON,
|
||||
ATTR_NAME,
|
||||
ATTR_PASSWORD,
|
||||
ATTR_PROTECTED,
|
||||
@ -31,6 +33,7 @@ from ..const import (
|
||||
)
|
||||
from ..coresys import CoreSysAttributes
|
||||
from ..exceptions import APIError
|
||||
from ..mounts.const import MountUsage
|
||||
from .const import CONTENT_TYPE_TAR
|
||||
from .utils import api_process, api_validate
|
||||
|
||||
@ -59,6 +62,7 @@ SCHEMA_BACKUP_FULL = vol.Schema(
|
||||
vol.Optional(ATTR_NAME): str,
|
||||
vol.Optional(ATTR_PASSWORD): vol.Maybe(str),
|
||||
vol.Optional(ATTR_COMPRESSED): vol.Maybe(vol.Boolean()),
|
||||
vol.Optional(ATTR_LOCATON): vol.Maybe(str),
|
||||
}
|
||||
)
|
||||
|
||||
@ -173,11 +177,27 @@ class APIBackups(CoreSysAttributes):
|
||||
ATTR_FOLDERS: backup.folders,
|
||||
}
|
||||
|
||||
def _location_to_mount(self, body: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Change location field to mount if necessary."""
|
||||
if not body.get(ATTR_LOCATON):
|
||||
return body
|
||||
|
||||
body[ATTR_LOCATON] = self.sys_mounts.get(body[ATTR_LOCATON])
|
||||
if body[ATTR_LOCATON].usage != MountUsage.BACKUP:
|
||||
raise APIError(
|
||||
f"Mount {body[ATTR_LOCATON].name} is not used for backups, cannot backup to there"
|
||||
)
|
||||
|
||||
return body
|
||||
|
||||
@api_process
|
||||
async def backup_full(self, request):
|
||||
"""Create full backup."""
|
||||
body = await api_validate(SCHEMA_BACKUP_FULL, request)
|
||||
backup = await asyncio.shield(self.sys_backups.do_backup_full(**body))
|
||||
|
||||
backup = await asyncio.shield(
|
||||
self.sys_backups.do_backup_full(**self._location_to_mount(body))
|
||||
)
|
||||
|
||||
if backup:
|
||||
return {ATTR_SLUG: backup.slug}
|
||||
@ -187,7 +207,9 @@ class APIBackups(CoreSysAttributes):
|
||||
async def backup_partial(self, request):
|
||||
"""Create a partial backup."""
|
||||
body = await api_validate(SCHEMA_BACKUP_PARTIAL, request)
|
||||
backup = await asyncio.shield(self.sys_backups.do_backup_partial(**body))
|
||||
backup = await asyncio.shield(
|
||||
self.sys_backups.do_backup_partial(**self._location_to_mount(body))
|
||||
)
|
||||
|
||||
if backup:
|
||||
return {ATTR_SLUG: backup.slug}
|
||||
|
@ -8,11 +8,18 @@ import voluptuous as vol
|
||||
from ..const import ATTR_NAME, ATTR_STATE
|
||||
from ..coresys import CoreSysAttributes
|
||||
from ..exceptions import APIError
|
||||
from ..mounts.const import ATTR_DEFAULT_BACKUP_MOUNT, MountUsage
|
||||
from ..mounts.mount import Mount
|
||||
from ..mounts.validate import SCHEMA_MOUNT_CONFIG
|
||||
from .const import ATTR_MOUNTS
|
||||
from .utils import api_process, api_validate
|
||||
|
||||
SCHEMA_OPTIONS = vol.Schema(
|
||||
{
|
||||
vol.Optional(ATTR_DEFAULT_BACKUP_MOUNT): vol.Maybe(str),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class APIMounts(CoreSysAttributes):
|
||||
"""Handle REST API for mounting options."""
|
||||
@ -21,12 +28,33 @@ class APIMounts(CoreSysAttributes):
|
||||
async def info(self, request: web.Request) -> dict[str, Any]:
|
||||
"""Return MountManager info."""
|
||||
return {
|
||||
ATTR_DEFAULT_BACKUP_MOUNT: self.sys_mounts.default_backup_mount.name
|
||||
if self.sys_mounts.default_backup_mount
|
||||
else None,
|
||||
ATTR_MOUNTS: [
|
||||
mount.to_dict() | {ATTR_STATE: mount.state}
|
||||
for mount in self.sys_mounts.mounts
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
@api_process
|
||||
async def options(self, request: web.Request) -> None:
|
||||
"""Set Mount Manager options."""
|
||||
body = await api_validate(SCHEMA_OPTIONS, request)
|
||||
|
||||
if ATTR_DEFAULT_BACKUP_MOUNT in body:
|
||||
name: str | None = body[ATTR_DEFAULT_BACKUP_MOUNT]
|
||||
if name is None:
|
||||
self.sys_mounts.default_backup_mount = None
|
||||
elif (mount := self.sys_mounts.get(name)).usage != MountUsage.BACKUP:
|
||||
raise APIError(
|
||||
f"Mount {name} is not used for backups, cannot use it as default backup mount"
|
||||
)
|
||||
else:
|
||||
self.sys_mounts.default_backup_mount = mount
|
||||
|
||||
self.sys_mounts.save_data()
|
||||
|
||||
@api_process
|
||||
async def create_mount(self, request: web.Request) -> None:
|
||||
"""Create a new mount in supervisor."""
|
||||
@ -35,41 +63,62 @@ class APIMounts(CoreSysAttributes):
|
||||
if body[ATTR_NAME] in self.sys_mounts:
|
||||
raise APIError(f"A mount already exists with name {body[ATTR_NAME]}")
|
||||
|
||||
await self.sys_mounts.create_mount(Mount.from_dict(self.coresys, body))
|
||||
mount = Mount.from_dict(self.coresys, body)
|
||||
await self.sys_mounts.create_mount(mount)
|
||||
|
||||
# If it's a backup mount, reload backups
|
||||
if mount.usage == MountUsage.BACKUP:
|
||||
self.sys_create_task(self.sys_backups.reload())
|
||||
|
||||
# If there's no default backup mount, set it to the new mount
|
||||
if not self.sys_mounts.default_backup_mount:
|
||||
self.sys_mounts.default_backup_mount = mount
|
||||
|
||||
self.sys_mounts.save_data()
|
||||
|
||||
@api_process
|
||||
async def update_mount(self, request: web.Request) -> None:
|
||||
"""Update an existing mount in supervisor."""
|
||||
mount = request.match_info.get("mount")
|
||||
name = request.match_info.get("mount")
|
||||
name_schema = vol.Schema(
|
||||
{vol.Optional(ATTR_NAME, default=mount): mount}, extra=vol.ALLOW_EXTRA
|
||||
{vol.Optional(ATTR_NAME, default=name): name}, extra=vol.ALLOW_EXTRA
|
||||
)
|
||||
body = await api_validate(vol.All(name_schema, SCHEMA_MOUNT_CONFIG), request)
|
||||
|
||||
if mount not in self.sys_mounts:
|
||||
raise APIError(f"No mount exists with name {mount}")
|
||||
if name not in self.sys_mounts:
|
||||
raise APIError(f"No mount exists with name {name}")
|
||||
|
||||
mount = Mount.from_dict(self.coresys, body)
|
||||
await self.sys_mounts.create_mount(mount)
|
||||
|
||||
# If it's a backup mount, reload backups
|
||||
if mount.usage == MountUsage.BACKUP:
|
||||
self.sys_create_task(self.sys_backups.reload())
|
||||
|
||||
# If this mount was the default backup mount and isn't for backups any more, remove it
|
||||
elif self.sys_mounts.default_backup_mount == mount:
|
||||
self.sys_mounts.default_backup_mount = None
|
||||
|
||||
await self.sys_mounts.create_mount(Mount.from_dict(self.coresys, body))
|
||||
self.sys_mounts.save_data()
|
||||
|
||||
@api_process
|
||||
async def delete_mount(self, request: web.Request) -> None:
|
||||
"""Delete an existing mount in supervisor."""
|
||||
mount = request.match_info.get("mount")
|
||||
name = request.match_info.get("mount")
|
||||
mount = await self.sys_mounts.remove_mount(name)
|
||||
|
||||
if mount not in self.sys_mounts:
|
||||
raise APIError(f"No mount exists with name {mount}")
|
||||
# If it was a backup mount, reload backups
|
||||
if mount.usage == MountUsage.BACKUP:
|
||||
self.sys_create_task(self.sys_backups.reload())
|
||||
|
||||
await self.sys_mounts.remove_mount(mount)
|
||||
self.sys_mounts.save_data()
|
||||
|
||||
@api_process
|
||||
async def reload_mount(self, request: web.Request) -> None:
|
||||
"""Reload an existing mount in supervisor."""
|
||||
mount = request.match_info.get("mount")
|
||||
name = request.match_info.get("mount")
|
||||
await self.sys_mounts.reload_mount(name)
|
||||
|
||||
if mount not in self.sys_mounts:
|
||||
raise APIError(f"No mount exists with name {mount}")
|
||||
|
||||
await self.sys_mounts.reload_mount(mount)
|
||||
# If it's a backup mount, reload backups
|
||||
if self.sys_mounts.get(name).usage == MountUsage.BACKUP:
|
||||
self.sys_create_task(self.sys_backups.reload())
|
||||
|
@ -13,10 +13,13 @@ from ..const import (
|
||||
CoreState,
|
||||
)
|
||||
from ..coresys import CoreSysAttributes
|
||||
from ..dbus.const import UnitActiveState
|
||||
from ..exceptions import AddonsError
|
||||
from ..jobs.decorator import Job, JobCondition
|
||||
from ..mounts.mount import Mount
|
||||
from ..utils.common import FileConfiguration
|
||||
from ..utils.dt import utcnow
|
||||
from ..utils.sentinel import DEFAULT
|
||||
from ..utils.sentry import capture_exception
|
||||
from .backup import Backup
|
||||
from .const import BackupType
|
||||
@ -51,21 +54,41 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
|
||||
"""Set days until backup is considered stale."""
|
||||
self._data[ATTR_DAYS_UNTIL_STALE] = value
|
||||
|
||||
@property
|
||||
def backup_locations(self) -> list[Path]:
|
||||
"""List of locations containing backups."""
|
||||
return [self.sys_config.path_backup] + [
|
||||
mount.local_where
|
||||
for mount in self.sys_mounts.backup_mounts
|
||||
if mount.state == UnitActiveState.ACTIVE
|
||||
]
|
||||
|
||||
def get(self, slug):
|
||||
"""Return backup object."""
|
||||
return self._backups.get(slug)
|
||||
|
||||
def _get_base_path(self, location: Mount | type[DEFAULT] | None = DEFAULT) -> Path:
|
||||
"""Get base path for backup using location or default location."""
|
||||
if location:
|
||||
return location.local_where
|
||||
|
||||
if location == DEFAULT and self.sys_mounts.default_backup_mount:
|
||||
return self.sys_mounts.default_backup_mount.local_where
|
||||
|
||||
return self.sys_config.path_backup
|
||||
|
||||
def _create_backup(
|
||||
self,
|
||||
name: str,
|
||||
sys_type: BackupType,
|
||||
password: str | None,
|
||||
compressed: bool = True,
|
||||
location: Mount | type[DEFAULT] | None = DEFAULT,
|
||||
) -> Backup:
|
||||
"""Initialize a new backup object from name."""
|
||||
date_str = utcnow().isoformat()
|
||||
slug = create_slug(name, date_str)
|
||||
tar_file = Path(self.sys_config.path_backup, f"{slug}.tar")
|
||||
tar_file = Path(self._get_base_path(location), f"{slug}.tar")
|
||||
|
||||
# init object
|
||||
backup = Backup(self.coresys, tar_file)
|
||||
@ -95,7 +118,8 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
|
||||
|
||||
tasks = [
|
||||
_load_backup(tar_file)
|
||||
for tar_file in self.sys_config.path_backup.glob("*.tar")
|
||||
for path in self.backup_locations
|
||||
for tar_file in path.glob("*.tar")
|
||||
]
|
||||
|
||||
_LOGGER.info("Found %d backup files", len(tasks))
|
||||
@ -182,13 +206,21 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
|
||||
self.sys_core.state = CoreState.RUNNING
|
||||
|
||||
@Job(conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING])
|
||||
async def do_backup_full(self, name="", password=None, compressed=True):
|
||||
async def do_backup_full(
|
||||
self,
|
||||
name="",
|
||||
password=None,
|
||||
compressed=True,
|
||||
location: Mount | type[DEFAULT] | None = DEFAULT,
|
||||
):
|
||||
"""Create a full backup."""
|
||||
if self.lock.locked():
|
||||
_LOGGER.error("A backup/restore process is already running")
|
||||
return None
|
||||
|
||||
backup = self._create_backup(name, BackupType.FULL, password, compressed)
|
||||
backup = self._create_backup(
|
||||
name, BackupType.FULL, password, compressed, location
|
||||
)
|
||||
|
||||
_LOGGER.info("Creating new full backup with slug %s", backup.slug)
|
||||
async with self.lock:
|
||||
@ -208,6 +240,7 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
|
||||
password: str | None = None,
|
||||
homeassistant: bool = False,
|
||||
compressed: bool = True,
|
||||
location: Mount | type[DEFAULT] | None = DEFAULT,
|
||||
):
|
||||
"""Create a partial backup."""
|
||||
if self.lock.locked():
|
||||
@ -225,7 +258,9 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
|
||||
if len(addons) == 0 and len(folders) == 0 and not homeassistant:
|
||||
_LOGGER.error("Nothing to create backup for")
|
||||
|
||||
backup = self._create_backup(name, BackupType.PARTIAL, password, compressed)
|
||||
backup = self._create_backup(
|
||||
name, BackupType.PARTIAL, password, compressed, location
|
||||
)
|
||||
|
||||
_LOGGER.info("Creating new partial backup with slug %s", backup.slug)
|
||||
async with self.lock:
|
||||
|
@ -33,6 +33,7 @@ RUN_WATCHDOG_ADDON_APPLICATON = 120
|
||||
RUN_WATCHDOG_OBSERVER_APPLICATION = 180
|
||||
|
||||
RUN_REFRESH_ADDON = 15
|
||||
RUN_REFRESH_MOUNTS = 900
|
||||
|
||||
PLUGIN_AUTO_UPDATE_CONDITIONS = PLUGIN_UPDATE_CONDITIONS + [JobCondition.RUNNING]
|
||||
|
||||
@ -62,6 +63,7 @@ class Tasks(CoreSysAttributes):
|
||||
self.sys_scheduler.register_task(self.sys_backups.reload, RUN_RELOAD_BACKUPS)
|
||||
self.sys_scheduler.register_task(self.sys_host.reload, RUN_RELOAD_HOST)
|
||||
self.sys_scheduler.register_task(self.sys_ingress.reload, RUN_RELOAD_INGRESS)
|
||||
self.sys_scheduler.register_task(self.sys_mounts.reload, RUN_REFRESH_MOUNTS)
|
||||
|
||||
# Watchdog
|
||||
self.sys_scheduler.register_task(
|
||||
|
@ -5,6 +5,7 @@ from pathlib import PurePath
|
||||
|
||||
FILE_CONFIG_MOUNTS = PurePath("mounts.json")
|
||||
|
||||
ATTR_DEFAULT_BACKUP_MOUNT = "default_backup_mount"
|
||||
ATTR_MOUNTS = "mounts"
|
||||
ATTR_PATH = "path"
|
||||
ATTR_SERVER = "server"
|
||||
@ -25,11 +26,3 @@ class MountUsage(str, Enum):
|
||||
|
||||
BACKUP = "backup"
|
||||
MEDIA = "media"
|
||||
|
||||
|
||||
class MountState(str, Enum):
|
||||
"""Mount state."""
|
||||
|
||||
ACTIVE = "active"
|
||||
FAILED = "failed"
|
||||
UNKNOWN = "unknown"
|
||||
|
@ -1,6 +1,7 @@
|
||||
"""Supervisor mount manager."""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Awaitable
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
from pathlib import PurePath
|
||||
@ -9,10 +10,15 @@ from ..const import ATTR_NAME
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..dbus.const import UnitActiveState
|
||||
from ..exceptions import MountActivationError, MountError, MountNotFound
|
||||
from ..resolution.const import ContextType, IssueType, SuggestionType
|
||||
from ..resolution.const import SuggestionType
|
||||
from ..utils.common import FileConfiguration
|
||||
from ..utils.sentry import capture_exception
|
||||
from .const import ATTR_MOUNTS, FILE_CONFIG_MOUNTS, MountUsage
|
||||
from .const import (
|
||||
ATTR_DEFAULT_BACKUP_MOUNT,
|
||||
ATTR_MOUNTS,
|
||||
FILE_CONFIG_MOUNTS,
|
||||
MountUsage,
|
||||
)
|
||||
from .mount import BindMount, Mount
|
||||
from .validate import SCHEMA_MOUNTS_CONFIG
|
||||
|
||||
@ -64,6 +70,21 @@ class MountManager(FileConfiguration, CoreSysAttributes):
|
||||
"""Return list of bound mounts and where else they have been bind mounted."""
|
||||
return list(self._bound_mounts.values())
|
||||
|
||||
@property
|
||||
def default_backup_mount(self) -> Mount | None:
|
||||
"""Get default backup mount if set."""
|
||||
if ATTR_DEFAULT_BACKUP_MOUNT not in self._data:
|
||||
return None
|
||||
return self.get(self._data[ATTR_DEFAULT_BACKUP_MOUNT])
|
||||
|
||||
@default_backup_mount.setter
|
||||
def default_backup_mount(self, value: Mount | None):
|
||||
"""Set or unset default backup mount."""
|
||||
if value:
|
||||
self._data[ATTR_DEFAULT_BACKUP_MOUNT] = value.name
|
||||
else:
|
||||
self._data.pop(ATTR_DEFAULT_BACKUP_MOUNT, None)
|
||||
|
||||
def get(self, name: str) -> Mount:
|
||||
"""Get mount by name."""
|
||||
if name not in self._mounts:
|
||||
@ -82,31 +103,49 @@ class MountManager(FileConfiguration, CoreSysAttributes):
|
||||
return
|
||||
|
||||
_LOGGER.info("Initializing all user-configured mounts")
|
||||
mounts = self.mounts
|
||||
errors = await asyncio.gather(
|
||||
*[mount.load() for mount in mounts], return_exceptions=True
|
||||
await self._mount_errors_to_issues(
|
||||
self.mounts.copy(), [mount.load() for mount in self.mounts]
|
||||
)
|
||||
|
||||
# Bind all media mounts to directories in media
|
||||
if self.media_mounts:
|
||||
await asyncio.wait([self._bind_media(mount) for mount in self.media_mounts])
|
||||
|
||||
async def reload(self) -> None:
|
||||
"""Update mounts info via dbus and reload failed mounts."""
|
||||
await asyncio.wait([mount.update() for mount in self.mounts])
|
||||
|
||||
# Try to reload any newly failed mounts and report issues if failure persists
|
||||
new_failures = [
|
||||
mount
|
||||
for mount in self.mounts
|
||||
if mount.state != UnitActiveState.ACTIVE
|
||||
and mount.failed_issue not in self.sys_resolution.issues
|
||||
]
|
||||
await self._mount_errors_to_issues(
|
||||
new_failures, [mount.reload() for mount in new_failures]
|
||||
)
|
||||
|
||||
async def _mount_errors_to_issues(
|
||||
self, mounts: list[Mount], mount_tasks: list[Awaitable[None]]
|
||||
) -> None:
|
||||
"""Await a list of tasks on mounts and turn each error into a failed mount issue."""
|
||||
errors = await asyncio.gather(*mount_tasks, return_exceptions=True)
|
||||
|
||||
for i in range(len(errors)): # pylint: disable=consider-using-enumerate
|
||||
if not errors[i]:
|
||||
continue
|
||||
if not isinstance(errors[i], MountError):
|
||||
capture_exception(errors[i])
|
||||
|
||||
self.sys_resolution.create_issue(
|
||||
IssueType.MOUNT_FAILED,
|
||||
ContextType.MOUNT,
|
||||
reference=mounts[i].name,
|
||||
self.sys_resolution.add_issue(
|
||||
mounts[i].failed_issue,
|
||||
suggestions=[
|
||||
SuggestionType.EXECUTE_RELOAD,
|
||||
SuggestionType.EXECUTE_REMOVE,
|
||||
],
|
||||
)
|
||||
|
||||
# Bind all media mounts to directories in media
|
||||
if self.media_mounts:
|
||||
await asyncio.wait([self._bind_media(mount) for mount in self.media_mounts])
|
||||
|
||||
async def create_mount(self, mount: Mount) -> None:
|
||||
"""Add/update a mount."""
|
||||
if mount.name in self._mounts:
|
||||
@ -136,10 +175,16 @@ class MountManager(FileConfiguration, CoreSysAttributes):
|
||||
await self._bound_mounts[name].bind_mount.unmount()
|
||||
del self._bound_mounts[name]
|
||||
|
||||
await self._mounts[name].unmount()
|
||||
mount = self._mounts[name]
|
||||
await mount.unmount()
|
||||
if not retain_entry:
|
||||
del self._mounts[name]
|
||||
|
||||
if self._data.get(ATTR_DEFAULT_BACKUP_MOUNT) == mount.name:
|
||||
self.default_backup_mount = None
|
||||
|
||||
return mount
|
||||
|
||||
async def reload_mount(self, name: str) -> None:
|
||||
"""Reload a mount to retry mounting with same config."""
|
||||
if name not in self._mounts:
|
||||
|
@ -27,6 +27,8 @@ from ..exceptions import (
|
||||
MountError,
|
||||
MountInvalidError,
|
||||
)
|
||||
from ..resolution.const import ContextType, IssueType
|
||||
from ..resolution.data import Issue
|
||||
from ..utils.sentry import capture_exception
|
||||
from .const import ATTR_PATH, ATTR_SERVER, ATTR_SHARE, ATTR_USAGE, MountType, MountUsage
|
||||
from .validate import MountData
|
||||
@ -132,6 +134,15 @@ class Mount(CoreSysAttributes, ABC):
|
||||
else None
|
||||
)
|
||||
|
||||
@property
|
||||
def failed_issue(self) -> Issue:
|
||||
"""Get issue used if this mount has failed."""
|
||||
return Issue(IssueType.MOUNT_FAILED, ContextType.MOUNT, reference=self.name)
|
||||
|
||||
def __eq__(self, other):
|
||||
"""Return true if mounts are the same."""
|
||||
return isinstance(other, Mount) and self.name == other.name
|
||||
|
||||
async def load(self) -> None:
|
||||
"""Initialize object."""
|
||||
await self._update_await_activating()
|
||||
@ -164,6 +175,13 @@ class Mount(CoreSysAttributes, ABC):
|
||||
f"Could not get active state of mount due to: {err!s}"
|
||||
) from err
|
||||
|
||||
# If active, dismiss corresponding failed mount issue if found
|
||||
if (
|
||||
self.state == UnitActiveState.ACTIVE
|
||||
and self.failed_issue in self.sys_resolution.issues
|
||||
):
|
||||
self.sys_resolution.dismiss_issue(self.failed_issue)
|
||||
|
||||
async def _update_await_activating(self):
|
||||
"""Update info about mount from dbus. If 'activating' wait up to 30 seconds."""
|
||||
await self.update()
|
||||
|
@ -178,11 +178,19 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
|
||||
suggestions: list[SuggestionType] | None = None,
|
||||
) -> None:
|
||||
"""Create issues and suggestion."""
|
||||
self.add_issue(Issue(issue, context, reference), suggestions)
|
||||
|
||||
def add_issue(
|
||||
self, issue: Issue, suggestions: list[SuggestionType] | None = None
|
||||
) -> None:
|
||||
"""Add an issue and suggestions."""
|
||||
if suggestions:
|
||||
for suggestion in suggestions:
|
||||
self.suggestions = Suggestion(suggestion, context, reference)
|
||||
self.suggestions = Suggestion(
|
||||
suggestion, issue.context, issue.reference
|
||||
)
|
||||
|
||||
self.issues = Issue(issue, context, reference)
|
||||
self.issues = issue
|
||||
|
||||
async def load(self):
|
||||
"""Load the resoulution manager."""
|
||||
|
22
supervisor/utils/sentinel.py
Normal file
22
supervisor/utils/sentinel.py
Normal file
@ -0,0 +1,22 @@
|
||||
"""Sentinel to use when None is a valid value."""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
|
||||
class SentinelMeta(type):
|
||||
"""Metaclass for sentinel to improve representation and make falsy.
|
||||
|
||||
Credit to https://stackoverflow.com/a/69243488 .
|
||||
"""
|
||||
|
||||
def __repr__(cls) -> str:
|
||||
"""Represent class more like an enum."""
|
||||
return f"<{cls.__name__}>"
|
||||
|
||||
def __bool__(cls) -> Literal[False]:
|
||||
"""Return false as a sentinel is akin to an empty value."""
|
||||
return False
|
||||
|
||||
|
||||
class DEFAULT(metaclass=SentinelMeta):
|
||||
"""Sentinel for default value when None is valid."""
|
@ -1,9 +1,15 @@
|
||||
"""Test backups API."""
|
||||
|
||||
from pathlib import Path, PurePath
|
||||
from unittest.mock import patch
|
||||
|
||||
from aiohttp.test_utils import TestClient
|
||||
import pytest
|
||||
|
||||
from supervisor.backups.backup import Backup
|
||||
from supervisor.const import CoreState
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.mounts.mount import Mount
|
||||
|
||||
|
||||
async def test_info(api_client, coresys: CoreSys, mock_full_backup: Backup):
|
||||
@ -43,3 +49,79 @@ async def test_options(api_client, coresys: CoreSys):
|
||||
save_data.assert_called_once()
|
||||
|
||||
assert coresys.backups.days_until_stale == 10
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"location,backup_dir",
|
||||
[("backup_test", PurePath("mounts", "backup_test")), (None, PurePath("backup"))],
|
||||
)
|
||||
async def test_backup_to_location(
|
||||
api_client: TestClient,
|
||||
coresys: CoreSys,
|
||||
location: str | None,
|
||||
backup_dir: PurePath,
|
||||
tmp_supervisor_data: Path,
|
||||
path_extern,
|
||||
):
|
||||
"""Test making a backup to a specific location with default mount."""
|
||||
await coresys.mounts.load()
|
||||
(coresys.config.path_mounts / "backup_test").mkdir()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
coresys.mounts.default_backup_mount = mount
|
||||
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
resp = await api_client.post(
|
||||
"/backups/new/full",
|
||||
json={
|
||||
"name": "Mount test",
|
||||
"location": location,
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
slug = result["data"]["slug"]
|
||||
|
||||
assert (tmp_supervisor_data / backup_dir / f"{slug}.tar").exists()
|
||||
|
||||
|
||||
async def test_backup_to_default(
|
||||
api_client: TestClient, coresys: CoreSys, tmp_supervisor_data, path_extern
|
||||
):
|
||||
"""Test making backup to default mount."""
|
||||
await coresys.mounts.load()
|
||||
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
coresys.mounts.default_backup_mount = mount
|
||||
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
resp = await api_client.post(
|
||||
"/backups/new/full",
|
||||
json={"name": "Mount test"},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
slug = result["data"]["slug"]
|
||||
|
||||
assert (mount_dir / f"{slug}.tar").exists()
|
||||
|
@ -1,9 +1,13 @@
|
||||
"""Test mounts API."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import patch
|
||||
|
||||
from aiohttp.test_utils import TestClient
|
||||
from dbus_fast import DBusError, ErrorType
|
||||
import pytest
|
||||
|
||||
from supervisor.backups.manager import BackupManager
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.mounts.mount import Mount
|
||||
|
||||
@ -26,6 +30,7 @@ async def fixture_mount(coresys: CoreSys, tmp_supervisor_data, path_extern) -> M
|
||||
},
|
||||
)
|
||||
coresys.mounts._mounts = {"backup_test": mount} # pylint: disable=protected-access
|
||||
coresys.mounts.default_backup_mount = mount
|
||||
await coresys.mounts.load()
|
||||
yield mount
|
||||
|
||||
@ -305,7 +310,10 @@ async def test_api_reload_error_mount_missing(api_client: TestClient):
|
||||
assert resp.status == 400
|
||||
result = await resp.json()
|
||||
assert result["result"] == "error"
|
||||
assert result["message"] == "No mount exists with name backup_test"
|
||||
assert (
|
||||
result["message"]
|
||||
== "Cannot reload 'backup_test', no mount exists with that name"
|
||||
)
|
||||
|
||||
|
||||
async def test_api_delete_mount(api_client: TestClient, coresys: CoreSys, mount):
|
||||
@ -328,4 +336,302 @@ async def test_api_delete_error_mount_missing(api_client: TestClient):
|
||||
assert resp.status == 400
|
||||
result = await resp.json()
|
||||
assert result["result"] == "error"
|
||||
assert result["message"] == "No mount exists with name backup_test"
|
||||
assert (
|
||||
result["message"]
|
||||
== "Cannot remove 'backup_test', no mount exists with that name"
|
||||
)
|
||||
|
||||
|
||||
async def test_api_create_backup_mount_sets_default(
|
||||
api_client: TestClient, coresys: CoreSys, tmp_supervisor_data, path_extern
|
||||
):
|
||||
"""Test creating backup mounts sets default if not set."""
|
||||
await coresys.mounts.load()
|
||||
assert coresys.mounts.default_backup_mount is None
|
||||
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
assert coresys.mounts.default_backup_mount.name == "backup_test"
|
||||
|
||||
# Confirm the default does not change if mount created after its been set
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "backup_test_2",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
assert coresys.mounts.default_backup_mount.name == "backup_test"
|
||||
|
||||
|
||||
async def test_update_backup_mount_changes_default(
|
||||
api_client: TestClient, coresys: CoreSys, mount
|
||||
):
|
||||
"""Test updating a backup mount may unset the default."""
|
||||
# Make another backup mount for testing
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "other_backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
# Changing this mount should have no effect on the default
|
||||
resp = await api_client.put(
|
||||
"/mounts/other_backup_test",
|
||||
json={
|
||||
"type": "cifs",
|
||||
"usage": "media",
|
||||
"server": "other-media.local",
|
||||
"share": "media",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
assert coresys.mounts.default_backup_mount.name == "backup_test"
|
||||
|
||||
# Changing this one to non-backup should unset the default
|
||||
resp = await api_client.put(
|
||||
"/mounts/backup_test",
|
||||
json={
|
||||
"type": "cifs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"share": "media",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
assert coresys.mounts.default_backup_mount is None
|
||||
|
||||
|
||||
async def test_delete_backup_mount_changes_default(
|
||||
api_client: TestClient, coresys: CoreSys, mount
|
||||
):
|
||||
"""Test deleting a backup mount may unset the default."""
|
||||
# Make another backup mount for testing
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "other_backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
# Deleting this one should have no effect on the default
|
||||
resp = await api_client.delete("/mounts/other_backup_test")
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
assert coresys.mounts.default_backup_mount.name == "backup_test"
|
||||
|
||||
# Deleting this current default should unset it
|
||||
resp = await api_client.delete("/mounts/backup_test")
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
assert coresys.mounts.default_backup_mount is None
|
||||
|
||||
|
||||
async def test_backup_mounts_reload_backups(
|
||||
api_client: TestClient,
|
||||
coresys: CoreSys,
|
||||
tmp_supervisor_data,
|
||||
path_extern,
|
||||
):
|
||||
"""Test actions on a backup mount reload backups."""
|
||||
await coresys.mounts.load()
|
||||
|
||||
with patch.object(BackupManager, "reload") as reload:
|
||||
# Only creating a backup mount triggers reload
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "media_test",
|
||||
"type": "cifs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"share": "media",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_not_called()
|
||||
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_called_once()
|
||||
|
||||
# Only updating a backup mount triggers reload
|
||||
reload.reset_mock()
|
||||
resp = await api_client.put(
|
||||
"/mounts/media_test",
|
||||
json={
|
||||
"type": "cifs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"share": "media2",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_not_called()
|
||||
|
||||
resp = await api_client.put(
|
||||
"/mounts/backup_test",
|
||||
json={
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups2",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_called_once()
|
||||
|
||||
# Only reloading a backup mount triggers reload
|
||||
reload.reset_mock()
|
||||
resp = await api_client.post("/mounts/media_test/reload")
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_not_called()
|
||||
|
||||
resp = await api_client.post("/mounts/backup_test/reload")
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_called_once()
|
||||
|
||||
# Only deleting a backup mount triggers reload
|
||||
reload.reset_mock()
|
||||
resp = await api_client.delete("/mounts/media_test")
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_not_called()
|
||||
|
||||
resp = await api_client.delete("/mounts/backup_test")
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
await asyncio.sleep(0)
|
||||
reload.assert_called_once()
|
||||
|
||||
|
||||
async def test_options(api_client: TestClient, coresys: CoreSys, mount):
|
||||
"""Test changing options."""
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "other_backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "media_test",
|
||||
"type": "cifs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"share": "media",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
coresys.mounts.save_data.reset_mock()
|
||||
|
||||
# Not a backup mount, will fail
|
||||
resp = await api_client.post(
|
||||
"/mounts/options",
|
||||
json={
|
||||
"default_backup_mount": "media_test",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "error"
|
||||
|
||||
# Mount doesn't exist, will fail
|
||||
resp = await api_client.post(
|
||||
"/mounts/options",
|
||||
json={
|
||||
"default_backup_mount": "junk",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "error"
|
||||
|
||||
assert coresys.mounts.default_backup_mount.name == "backup_test"
|
||||
coresys.mounts.save_data.assert_not_called()
|
||||
|
||||
# Changes to new backup mount
|
||||
resp = await api_client.post(
|
||||
"/mounts/options",
|
||||
json={
|
||||
"default_backup_mount": "other_backup_test",
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
assert coresys.mounts.default_backup_mount.name == "other_backup_test"
|
||||
coresys.mounts.save_data.assert_called_once()
|
||||
|
||||
# Unsets default backup mount
|
||||
resp = await api_client.post(
|
||||
"/mounts/options",
|
||||
json={
|
||||
"default_backup_mount": None,
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
assert coresys.mounts.default_backup_mount is None
|
||||
assert coresys.mounts.save_data.call_count == 2
|
||||
|
@ -3,6 +3,7 @@
|
||||
from shutil import rmtree
|
||||
from unittest.mock import AsyncMock, MagicMock, Mock, PropertyMock, patch
|
||||
|
||||
from awesomeversion import AwesomeVersion
|
||||
from dbus_fast import DBusError
|
||||
|
||||
from supervisor.addons.addon import Addon
|
||||
@ -13,6 +14,7 @@ from supervisor.const import FOLDER_HOMEASSISTANT, FOLDER_SHARE, CoreState
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.exceptions import AddonsError, DockerError
|
||||
from supervisor.homeassistant.core import HomeAssistantCore
|
||||
from supervisor.homeassistant.module import HomeAssistant
|
||||
from supervisor.mounts.mount import Mount
|
||||
|
||||
from tests.const import TEST_ADDON_SLUG
|
||||
@ -423,3 +425,167 @@ async def test_backup_media_with_mounts(
|
||||
assert test_dir.is_dir()
|
||||
assert test_file_2.exists()
|
||||
assert not mount_dir.exists()
|
||||
|
||||
|
||||
async def test_full_backup_to_mount(coresys: CoreSys, tmp_supervisor_data, path_extern):
|
||||
"""Test full backup to and restoring from a mount."""
|
||||
(marker := coresys.config.path_homeassistant / "test.txt").touch()
|
||||
|
||||
# Add a backup mount
|
||||
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
|
||||
await coresys.mounts.load()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"usage": "backup",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "test",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
assert mount_dir in coresys.backups.backup_locations
|
||||
|
||||
# Make a backup and add it to mounts. Confirm it exists in the right place
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
backup: Backup = await coresys.backups.do_backup_full("test", location=mount)
|
||||
assert (mount_dir / f"{backup.slug}.tar").exists()
|
||||
|
||||
# Reload and check that backups in mounts are listed
|
||||
await coresys.backups.reload()
|
||||
assert coresys.backups.get(backup.slug)
|
||||
|
||||
# Remove marker file and restore. Confirm it comes back
|
||||
marker.unlink()
|
||||
|
||||
async def mock_async_true(*args, **kwargs):
|
||||
return True
|
||||
|
||||
with patch.object(HomeAssistantCore, "is_running", new=mock_async_true):
|
||||
await coresys.backups.do_restore_full(backup)
|
||||
|
||||
assert marker.exists()
|
||||
|
||||
|
||||
async def test_partial_backup_to_mount(
|
||||
coresys: CoreSys, tmp_supervisor_data, path_extern
|
||||
):
|
||||
"""Test partial backup to and restoring from a mount."""
|
||||
(marker := coresys.config.path_homeassistant / "test.txt").touch()
|
||||
|
||||
# Add a backup mount
|
||||
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
|
||||
await coresys.mounts.load()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"usage": "backup",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "test",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
assert mount_dir in coresys.backups.backup_locations
|
||||
|
||||
# Make a backup and add it to mounts. Confirm it exists in the right place
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
|
||||
with patch.object(
|
||||
HomeAssistant,
|
||||
"version",
|
||||
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
|
||||
):
|
||||
backup: Backup = await coresys.backups.do_backup_partial(
|
||||
"test", homeassistant=True, location=mount
|
||||
)
|
||||
|
||||
assert (mount_dir / f"{backup.slug}.tar").exists()
|
||||
|
||||
# Reload and check that backups in mounts are listed
|
||||
await coresys.backups.reload()
|
||||
assert coresys.backups.get(backup.slug)
|
||||
|
||||
# Remove marker file and restore. Confirm it comes back
|
||||
marker.unlink()
|
||||
|
||||
async def mock_async_true(*args, **kwargs):
|
||||
return True
|
||||
|
||||
with patch.object(HomeAssistantCore, "is_running", new=mock_async_true):
|
||||
await coresys.backups.do_restore_partial(backup, homeassistant=True)
|
||||
|
||||
assert marker.exists()
|
||||
|
||||
|
||||
async def test_backup_to_local_with_default(
|
||||
coresys: CoreSys, tmp_supervisor_data, path_extern
|
||||
):
|
||||
"""Test making backup to local when a default mount is specified."""
|
||||
# Add a default backup mount
|
||||
await coresys.mounts.load()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"usage": "backup",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "test",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
coresys.mounts.default_backup_mount = mount
|
||||
|
||||
# Make a backup for local. Confirm it exists in the right place
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
|
||||
with patch.object(
|
||||
HomeAssistant,
|
||||
"version",
|
||||
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
|
||||
):
|
||||
backup: Backup = await coresys.backups.do_backup_partial(
|
||||
"test", homeassistant=True, location=None
|
||||
)
|
||||
|
||||
assert (coresys.config.path_backup / f"{backup.slug}.tar").exists()
|
||||
|
||||
|
||||
async def test_backup_to_default(coresys: CoreSys, tmp_supervisor_data, path_extern):
|
||||
"""Test making backup to default mount."""
|
||||
# Add a default backup mount
|
||||
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
|
||||
await coresys.mounts.load()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"usage": "backup",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "test",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
coresys.mounts.default_backup_mount = mount
|
||||
|
||||
# Make a backup for default. Confirm it exists in the right place
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
|
||||
with patch.object(
|
||||
HomeAssistant,
|
||||
"version",
|
||||
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
|
||||
):
|
||||
backup: Backup = await coresys.backups.do_backup_partial(
|
||||
"test", homeassistant=True
|
||||
)
|
||||
|
||||
assert (mount_dir / f"{backup.slug}.tar").exists()
|
||||
|
@ -372,6 +372,7 @@ async def tmp_supervisor_data(coresys: CoreSys, tmp_path: Path) -> Path:
|
||||
coresys.config.path_mounts.mkdir()
|
||||
coresys.config.path_backup.mkdir()
|
||||
coresys.config.path_tmp.mkdir()
|
||||
coresys.config.path_homeassistant.mkdir()
|
||||
yield tmp_path
|
||||
|
||||
|
||||
|
@ -345,7 +345,7 @@ async def test_remove_mount(
|
||||
systemd_service.StopUnit.calls.clear()
|
||||
|
||||
# Remove the mount
|
||||
await coresys.mounts.remove_mount(mount.name)
|
||||
assert mount == await coresys.mounts.remove_mount(mount.name)
|
||||
|
||||
assert mount.state is None
|
||||
assert mount not in coresys.mounts
|
||||
@ -472,3 +472,37 @@ async def test_create_mount_activation_failure(
|
||||
assert len(systemd_service.StartTransientUnit.calls) == 1
|
||||
assert len(systemd_service.ResetFailedUnit.calls) == 1
|
||||
assert not systemd_service.StopUnit.calls
|
||||
|
||||
|
||||
async def test_reload_mounts(
|
||||
coresys: CoreSys, all_dbus_services: dict[str, DBusServiceMock], mount: Mount
|
||||
):
|
||||
"""Test reloading mounts."""
|
||||
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
|
||||
systemd_service: SystemdService = all_dbus_services["systemd"]
|
||||
systemd_service.ReloadOrRestartUnit.calls.clear()
|
||||
|
||||
await coresys.mounts.load()
|
||||
|
||||
assert mount.state == UnitActiveState.ACTIVE
|
||||
assert mount.failed_issue not in coresys.resolution.issues
|
||||
|
||||
systemd_unit_service.active_state = "failed"
|
||||
await coresys.mounts.reload()
|
||||
|
||||
assert mount.state == UnitActiveState.FAILED
|
||||
assert mount.failed_issue in coresys.resolution.issues
|
||||
assert len(coresys.resolution.suggestions_for_issue(mount.failed_issue)) == 2
|
||||
assert len(systemd_service.ReloadOrRestartUnit.calls) == 1
|
||||
|
||||
# This shouldn't reload the mount again since this isn't a new failure
|
||||
await coresys.mounts.reload()
|
||||
assert len(systemd_service.ReloadOrRestartUnit.calls) == 1
|
||||
|
||||
# This should now remove the issue from the list
|
||||
systemd_unit_service.active_state = "active"
|
||||
await coresys.mounts.reload()
|
||||
|
||||
assert mount.state == UnitActiveState.ACTIVE
|
||||
assert mount.failed_issue not in coresys.resolution.issues
|
||||
assert not coresys.resolution.suggestions_for_issue(mount.failed_issue)
|
||||
|
@ -12,6 +12,7 @@ from supervisor.dbus.const import UnitActiveState
|
||||
from supervisor.exceptions import MountError, MountInvalidError
|
||||
from supervisor.mounts.const import MountType, MountUsage
|
||||
from supervisor.mounts.mount import CIFSMount, Mount, NFSMount
|
||||
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
|
||||
|
||||
from tests.dbus_service_mocks.base import DBusServiceMock
|
||||
from tests.dbus_service_mocks.systemd import Systemd as SystemdService
|
||||
@ -457,3 +458,35 @@ async def test_mount_local_where_invalid(
|
||||
await mount.mount()
|
||||
|
||||
assert systemd_service.StartTransientUnit.calls == []
|
||||
|
||||
|
||||
async def test_update_clears_issue(coresys: CoreSys, path_extern):
|
||||
"""Test updating mount data clears corresponding failed mount issue if active."""
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "test",
|
||||
"usage": "media",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "share",
|
||||
},
|
||||
)
|
||||
|
||||
assert mount.failed_issue not in coresys.resolution.issues
|
||||
|
||||
coresys.resolution.create_issue(
|
||||
IssueType.MOUNT_FAILED,
|
||||
ContextType.MOUNT,
|
||||
reference="test",
|
||||
suggestions=[SuggestionType.EXECUTE_RELOAD, SuggestionType.EXECUTE_REMOVE],
|
||||
)
|
||||
|
||||
assert mount.failed_issue in coresys.resolution.issues
|
||||
assert len(coresys.resolution.suggestions_for_issue(mount.failed_issue)) == 2
|
||||
|
||||
await mount.update()
|
||||
|
||||
assert mount.state == UnitActiveState.ACTIVE
|
||||
assert mount.failed_issue not in coresys.resolution.issues
|
||||
assert not coresys.resolution.suggestions_for_issue(mount.failed_issue)
|
||||
|
Loading…
x
Reference in New Issue
Block a user