mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-02-26 20:27:31 +00:00
Compare commits
3 Commits
2026.02.3
...
python-3.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0e5bd48b73 | ||
|
|
d57b5e0166 | ||
|
|
662a7ae6e6 |
4
.github/copilot-instructions.md
vendored
4
.github/copilot-instructions.md
vendored
@@ -91,8 +91,8 @@ availability.
|
||||
|
||||
### Python Requirements
|
||||
|
||||
- **Compatibility**: Python 3.13+
|
||||
- **Language Features**: Use modern Python features:
|
||||
- **Compatibility**: Python 3.14+
|
||||
- **Language Features**: Use modern Python features:
|
||||
- Type hints with `typing` module
|
||||
- f-strings (preferred over `%` or `.format()`)
|
||||
- Dataclasses and enum classes
|
||||
|
||||
8
.github/workflows/builder.yml
vendored
8
.github/workflows/builder.yml
vendored
@@ -33,7 +33,7 @@ on:
|
||||
- setup.py
|
||||
|
||||
env:
|
||||
DEFAULT_PYTHON: "3.13"
|
||||
DEFAULT_PYTHON: "3.14.3"
|
||||
COSIGN_VERSION: "v2.5.3"
|
||||
CRANE_VERSION: "v0.20.7"
|
||||
CRANE_SHA256: "8ef3564d264e6b5ca93f7b7f5652704c4dd29d33935aff6947dd5adefd05953e"
|
||||
@@ -106,7 +106,7 @@ jobs:
|
||||
- runs-on: ubuntu-24.04-arm
|
||||
arch: aarch64
|
||||
env:
|
||||
WHEELS_ABI: cp313
|
||||
WHEELS_ABI: cp314
|
||||
WHEELS_TAG: musllinux_1_2
|
||||
WHEELS_APK_DEPS: "libffi-dev;openssl-dev;yaml-dev"
|
||||
WHEELS_SKIP_BINARY: aiohttp
|
||||
@@ -205,7 +205,7 @@ jobs:
|
||||
|
||||
# home-assistant/builder doesn't support sha pinning
|
||||
- name: Build supervisor
|
||||
uses: home-assistant/builder@2026.02.1
|
||||
uses: home-assistant/builder@2025.11.0
|
||||
with:
|
||||
image: ${{ matrix.arch }}
|
||||
args: |
|
||||
@@ -259,7 +259,7 @@ jobs:
|
||||
# home-assistant/builder doesn't support sha pinning
|
||||
- name: Build the Supervisor
|
||||
if: needs.init.outputs.publish != 'true'
|
||||
uses: home-assistant/builder@2026.02.1
|
||||
uses: home-assistant/builder@2025.11.0
|
||||
with:
|
||||
args: |
|
||||
--test \
|
||||
|
||||
2
.github/workflows/ci.yaml
vendored
2
.github/workflows/ci.yaml
vendored
@@ -8,7 +8,7 @@ on:
|
||||
pull_request: ~
|
||||
|
||||
env:
|
||||
DEFAULT_PYTHON: "3.13"
|
||||
DEFAULT_PYTHON: "3.14.3"
|
||||
PRE_COMMIT_CACHE: ~/.cache/pre-commit
|
||||
MYPY_CACHE_VERSION: 1
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
image: ghcr.io/home-assistant/{arch}-hassio-supervisor
|
||||
build_from:
|
||||
aarch64: ghcr.io/home-assistant/aarch64-base-python:3.13-alpine3.22-2025.12.2
|
||||
amd64: ghcr.io/home-assistant/amd64-base-python:3.13-alpine3.22-2025.12.2
|
||||
aarch64: ghcr.io/home-assistant/aarch64-base-python:3.14-alpine3.22-2026.02.0
|
||||
amd64: ghcr.io/home-assistant/amd64-base-python:3.14-alpine3.22-2026.02.0
|
||||
cosign:
|
||||
base_identity: https://github.com/home-assistant/docker-base/.*
|
||||
identity: https://github.com/home-assistant/supervisor/.*
|
||||
|
||||
@@ -4,7 +4,6 @@ aiohttp==3.13.3
|
||||
atomicwrites-homeassistant==1.4.1
|
||||
attrs==25.4.0
|
||||
awesomeversion==25.8.0
|
||||
backports.zstd==1.3.0
|
||||
blockbuster==1.5.26
|
||||
brotli==1.2.0
|
||||
ciso8601==2.3.3
|
||||
@@ -23,7 +22,7 @@ pulsectl==24.12.0
|
||||
pyudev==0.24.4
|
||||
PyYAML==6.0.3
|
||||
requests==2.32.5
|
||||
securetar==2026.2.0
|
||||
securetar==2025.12.0
|
||||
sentry-sdk==2.53.0
|
||||
setuptools==82.0.0
|
||||
voluptuous==0.16.0
|
||||
|
||||
@@ -191,18 +191,18 @@ class Addon(AddonModel):
|
||||
self._startup_event.set()
|
||||
|
||||
# Dismiss boot failed issue if present and we started
|
||||
if new_state == AddonState.STARTED and (
|
||||
issue := self.sys_resolution.get_issue_if_present(self.boot_failed_issue)
|
||||
if (
|
||||
new_state == AddonState.STARTED
|
||||
and self.boot_failed_issue in self.sys_resolution.issues
|
||||
):
|
||||
self.sys_resolution.dismiss_issue(issue)
|
||||
self.sys_resolution.dismiss_issue(self.boot_failed_issue)
|
||||
|
||||
# Dismiss device access missing issue if present and we stopped
|
||||
if new_state == AddonState.STOPPED and (
|
||||
issue := self.sys_resolution.get_issue_if_present(
|
||||
self.device_access_missing_issue
|
||||
)
|
||||
if (
|
||||
new_state == AddonState.STOPPED
|
||||
and self.device_access_missing_issue in self.sys_resolution.issues
|
||||
):
|
||||
self.sys_resolution.dismiss_issue(issue)
|
||||
self.sys_resolution.dismiss_issue(self.device_access_missing_issue)
|
||||
|
||||
self.sys_homeassistant.websocket.supervisor_event_custom(
|
||||
WSEvent.ADDON,
|
||||
@@ -363,10 +363,11 @@ class Addon(AddonModel):
|
||||
self.persist[ATTR_BOOT] = value
|
||||
|
||||
# Dismiss boot failed issue if present and boot at start disabled
|
||||
if value == AddonBoot.MANUAL and (
|
||||
issue := self.sys_resolution.get_issue_if_present(self._boot_failed_issue)
|
||||
if (
|
||||
value == AddonBoot.MANUAL
|
||||
and self._boot_failed_issue in self.sys_resolution.issues
|
||||
):
|
||||
self.sys_resolution.dismiss_issue(issue)
|
||||
self.sys_resolution.dismiss_issue(self._boot_failed_issue)
|
||||
|
||||
@property
|
||||
def auto_update(self) -> bool:
|
||||
|
||||
@@ -19,6 +19,7 @@ from ..const import (
|
||||
ATTR_UNSUPPORTED,
|
||||
)
|
||||
from ..coresys import CoreSysAttributes
|
||||
from ..exceptions import APINotFound, ResolutionNotFound
|
||||
from ..resolution.checks.base import CheckBase
|
||||
from ..resolution.data import Issue, Suggestion
|
||||
from .utils import api_process, api_validate
|
||||
@@ -31,17 +32,24 @@ class APIResoulution(CoreSysAttributes):
|
||||
|
||||
def _extract_issue(self, request: web.Request) -> Issue:
|
||||
"""Extract issue from request or raise."""
|
||||
return self.sys_resolution.get_issue_by_id(request.match_info["issue"])
|
||||
try:
|
||||
return self.sys_resolution.get_issue(request.match_info["issue"])
|
||||
except ResolutionNotFound:
|
||||
raise APINotFound("The supplied UUID is not a valid issue") from None
|
||||
|
||||
def _extract_suggestion(self, request: web.Request) -> Suggestion:
|
||||
"""Extract suggestion from request or raise."""
|
||||
return self.sys_resolution.get_suggestion_by_id(
|
||||
request.match_info["suggestion"]
|
||||
)
|
||||
try:
|
||||
return self.sys_resolution.get_suggestion(request.match_info["suggestion"])
|
||||
except ResolutionNotFound:
|
||||
raise APINotFound("The supplied UUID is not a valid suggestion") from None
|
||||
|
||||
def _extract_check(self, request: web.Request) -> CheckBase:
|
||||
"""Extract check from request or raise."""
|
||||
return self.sys_resolution.check.get(request.match_info["check"])
|
||||
try:
|
||||
return self.sys_resolution.check.get(request.match_info["check"])
|
||||
except ResolutionNotFound:
|
||||
raise APINotFound("The supplied check slug is not available") from None
|
||||
|
||||
def _generate_suggestion_information(self, suggestion: Suggestion):
|
||||
"""Generate suggestion information for response."""
|
||||
|
||||
@@ -12,19 +12,13 @@ import json
|
||||
import logging
|
||||
from pathlib import Path, PurePath
|
||||
import tarfile
|
||||
from tarfile import TarFile
|
||||
from tempfile import TemporaryDirectory
|
||||
import time
|
||||
from typing import Any, Self, cast
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from securetar import (
|
||||
AddFileError,
|
||||
InvalidPasswordError,
|
||||
SecureTarArchive,
|
||||
SecureTarFile,
|
||||
SecureTarReadError,
|
||||
atomic_contents_add,
|
||||
)
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -65,7 +59,7 @@ from ..utils import remove_folder
|
||||
from ..utils.dt import parse_datetime, utcnow
|
||||
from ..utils.json import json_bytes
|
||||
from ..utils.sentinel import DEFAULT
|
||||
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, SECURETAR_CREATE_VERSION, BackupType
|
||||
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, BackupType
|
||||
from .validate import SCHEMA_BACKUP
|
||||
|
||||
IGNORED_COMPARISON_FIELDS = {ATTR_PROTECTED, ATTR_CRYPTO, ATTR_DOCKER}
|
||||
@@ -105,7 +99,7 @@ class Backup(JobGroup):
|
||||
)
|
||||
self._data: dict[str, Any] = data or {ATTR_SLUG: slug}
|
||||
self._tmp: TemporaryDirectory | None = None
|
||||
self._outer_secure_tarfile: SecureTarArchive | None = None
|
||||
self._outer_secure_tarfile: SecureTarFile | None = None
|
||||
self._password: str | None = None
|
||||
self._locations: dict[str | None, BackupLocation] = {
|
||||
location: BackupLocation(
|
||||
@@ -204,6 +198,16 @@ class Backup(JobGroup):
|
||||
"""Get extra metadata added by client."""
|
||||
return self._data[ATTR_EXTRA]
|
||||
|
||||
@property
|
||||
def docker(self) -> dict[str, Any]:
|
||||
"""Return backup Docker config data."""
|
||||
return self._data.get(ATTR_DOCKER, {})
|
||||
|
||||
@docker.setter
|
||||
def docker(self, value: dict[str, Any]) -> None:
|
||||
"""Set the Docker config data."""
|
||||
self._data[ATTR_DOCKER] = value
|
||||
|
||||
@property
|
||||
def location(self) -> str | None:
|
||||
"""Return the location of the backup."""
|
||||
@@ -360,17 +364,15 @@ class Backup(JobGroup):
|
||||
test_tar_file = backup.extractfile(test_tar_name)
|
||||
try:
|
||||
with SecureTarFile(
|
||||
ending, # Not used
|
||||
gzip=self.compressed,
|
||||
mode="r",
|
||||
fileobj=test_tar_file,
|
||||
password=self._password,
|
||||
):
|
||||
# If we can read the tar file, the password is correct
|
||||
return
|
||||
except (
|
||||
tarfile.ReadError,
|
||||
SecureTarReadError,
|
||||
InvalidPasswordError,
|
||||
) as ex:
|
||||
except tarfile.ReadError as ex:
|
||||
raise BackupInvalidError(
|
||||
f"Invalid password for backup {self.slug}", _LOGGER.error
|
||||
) from ex
|
||||
@@ -439,7 +441,7 @@ class Backup(JobGroup):
|
||||
async def create(self) -> AsyncGenerator[None]:
|
||||
"""Create new backup file."""
|
||||
|
||||
def _open_outer_tarfile() -> SecureTarArchive:
|
||||
def _open_outer_tarfile() -> tuple[SecureTarFile, tarfile.TarFile]:
|
||||
"""Create and open outer tarfile."""
|
||||
if self.tarfile.is_file():
|
||||
raise BackupFileExistError(
|
||||
@@ -447,15 +449,14 @@ class Backup(JobGroup):
|
||||
_LOGGER.error,
|
||||
)
|
||||
|
||||
_outer_secure_tarfile = SecureTarArchive(
|
||||
_outer_secure_tarfile = SecureTarFile(
|
||||
self.tarfile,
|
||||
"w",
|
||||
gzip=False,
|
||||
bufsize=BUF_SIZE,
|
||||
create_version=SECURETAR_CREATE_VERSION,
|
||||
password=self._password,
|
||||
)
|
||||
try:
|
||||
_outer_secure_tarfile.open()
|
||||
_outer_tarfile = _outer_secure_tarfile.open()
|
||||
except PermissionError as ex:
|
||||
raise BackupPermissionError(
|
||||
f"Cannot open backup file {self.tarfile.as_posix()}, permission error!",
|
||||
@@ -467,9 +468,11 @@ class Backup(JobGroup):
|
||||
_LOGGER.error,
|
||||
) from ex
|
||||
|
||||
return _outer_secure_tarfile
|
||||
return _outer_secure_tarfile, _outer_tarfile
|
||||
|
||||
outer_secure_tarfile = await self.sys_run_in_executor(_open_outer_tarfile)
|
||||
outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
|
||||
_open_outer_tarfile
|
||||
)
|
||||
self._outer_secure_tarfile = outer_secure_tarfile
|
||||
|
||||
def _close_outer_tarfile() -> int:
|
||||
@@ -480,7 +483,7 @@ class Backup(JobGroup):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
await self._create_finalize(outer_secure_tarfile)
|
||||
await self._create_cleanup(outer_tarfile)
|
||||
size_bytes = await self.sys_run_in_executor(_close_outer_tarfile)
|
||||
self._locations[self.location].size_bytes = size_bytes
|
||||
self._outer_secure_tarfile = None
|
||||
@@ -540,11 +543,11 @@ class Backup(JobGroup):
|
||||
if self._tmp:
|
||||
await self.sys_run_in_executor(self._tmp.cleanup)
|
||||
|
||||
async def _create_finalize(self, outer_archive: SecureTarArchive) -> None:
|
||||
"""Finalize backup creation.
|
||||
async def _create_cleanup(self, outer_tarfile: TarFile) -> None:
|
||||
"""Cleanup after backup creation.
|
||||
|
||||
Separate method to be called from create to ensure that the backup is
|
||||
finalized.
|
||||
Separate method to be called from create to ensure
|
||||
that cleanup is always performed, even if an exception is raised.
|
||||
"""
|
||||
# validate data
|
||||
try:
|
||||
@@ -563,7 +566,7 @@ class Backup(JobGroup):
|
||||
tar_info = tarfile.TarInfo(name="./backup.json")
|
||||
tar_info.size = len(raw_bytes)
|
||||
tar_info.mtime = int(time.time())
|
||||
outer_archive.tar.addfile(tar_info, fileobj=fileobj)
|
||||
outer_tarfile.addfile(tar_info, fileobj=fileobj)
|
||||
|
||||
try:
|
||||
await self.sys_run_in_executor(_add_backup_json)
|
||||
@@ -590,9 +593,10 @@ class Backup(JobGroup):
|
||||
|
||||
tar_name = f"{slug}.tar{'.gz' if self.compressed else ''}"
|
||||
|
||||
addon_file = self._outer_secure_tarfile.create_tar(
|
||||
addon_file = self._outer_secure_tarfile.create_inner_tar(
|
||||
f"./{tar_name}",
|
||||
gzip=self.compressed,
|
||||
password=self._password,
|
||||
)
|
||||
# Take backup
|
||||
try:
|
||||
@@ -642,6 +646,7 @@ class Backup(JobGroup):
|
||||
tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}"
|
||||
addon_file = SecureTarFile(
|
||||
Path(self._tmp.name, tar_name),
|
||||
"r",
|
||||
gzip=self.compressed,
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
@@ -737,9 +742,10 @@ class Backup(JobGroup):
|
||||
|
||||
return False
|
||||
|
||||
with outer_secure_tarfile.create_tar(
|
||||
with outer_secure_tarfile.create_inner_tar(
|
||||
f"./{tar_name}",
|
||||
gzip=self.compressed,
|
||||
password=self._password,
|
||||
) as tar_file:
|
||||
atomic_contents_add(
|
||||
tar_file,
|
||||
@@ -799,6 +805,7 @@ class Backup(JobGroup):
|
||||
_LOGGER.info("Restore folder %s", name)
|
||||
with SecureTarFile(
|
||||
tar_name,
|
||||
"r",
|
||||
gzip=self.compressed,
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
@@ -866,9 +873,10 @@ class Backup(JobGroup):
|
||||
|
||||
tar_name = f"homeassistant.tar{'.gz' if self.compressed else ''}"
|
||||
# Backup Home Assistant Core config directory
|
||||
homeassistant_file = self._outer_secure_tarfile.create_tar(
|
||||
homeassistant_file = self._outer_secure_tarfile.create_inner_tar(
|
||||
f"./{tar_name}",
|
||||
gzip=self.compressed,
|
||||
password=self._password,
|
||||
)
|
||||
|
||||
await self.sys_homeassistant.backup(homeassistant_file, exclude_database)
|
||||
@@ -892,6 +900,7 @@ class Backup(JobGroup):
|
||||
)
|
||||
homeassistant_file = SecureTarFile(
|
||||
tar_name,
|
||||
"r",
|
||||
gzip=self.compressed,
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
|
||||
@@ -6,7 +6,6 @@ from typing import Literal
|
||||
from ..mounts.mount import Mount
|
||||
|
||||
BUF_SIZE = 2**20 * 4 # 4MB
|
||||
SECURETAR_CREATE_VERSION = 2
|
||||
DEFAULT_FREEZE_TIMEOUT = 600
|
||||
LOCATION_CLOUD_BACKUP = ".cloud_backup"
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ from ..const import (
|
||||
ATTR_CRYPTO,
|
||||
ATTR_DATE,
|
||||
ATTR_DAYS_UNTIL_STALE,
|
||||
ATTR_DOCKER,
|
||||
ATTR_EXCLUDE_DATABASE,
|
||||
ATTR_EXTRA,
|
||||
ATTR_FOLDERS,
|
||||
@@ -34,7 +35,7 @@ from ..const import (
|
||||
FOLDER_SSL,
|
||||
)
|
||||
from ..store.validate import repositories
|
||||
from ..validate import version_tag
|
||||
from ..validate import SCHEMA_DOCKER_CONFIG, version_tag
|
||||
|
||||
ALL_FOLDERS = [
|
||||
FOLDER_SHARE,
|
||||
@@ -113,6 +114,7 @@ SCHEMA_BACKUP = vol.Schema(
|
||||
)
|
||||
),
|
||||
),
|
||||
vol.Optional(ATTR_DOCKER, default=dict): SCHEMA_DOCKER_CONFIG,
|
||||
vol.Optional(ATTR_FOLDERS, default=list): vol.All(
|
||||
v1_folderlist, [vol.In(ALL_FOLDERS)], vol.Unique()
|
||||
),
|
||||
|
||||
@@ -874,12 +874,11 @@ class DockerAddon(DockerInterface):
|
||||
await super().stop(remove_container)
|
||||
|
||||
# If there is a device access issue and the container is removed, clear it
|
||||
if remove_container and (
|
||||
issue := self.sys_resolution.get_issue_if_present(
|
||||
self.addon.device_access_missing_issue
|
||||
)
|
||||
if (
|
||||
remove_container
|
||||
and self.addon.device_access_missing_issue in self.sys_resolution.issues
|
||||
):
|
||||
self.sys_resolution.dismiss_issue(issue)
|
||||
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
|
||||
|
||||
@Job(
|
||||
name="docker_addon_hardware_events",
|
||||
|
||||
@@ -46,7 +46,7 @@ class HassioNotSupportedError(HassioError):
|
||||
# API
|
||||
|
||||
|
||||
class APIError(HassioError):
|
||||
class APIError(HassioError, RuntimeError):
|
||||
"""API errors."""
|
||||
|
||||
status = 400
|
||||
@@ -964,44 +964,6 @@ class ResolutionFixupJobError(ResolutionFixupError, JobException):
|
||||
"""Raise on job error."""
|
||||
|
||||
|
||||
class ResolutionCheckNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
|
||||
"""Raise if check does not exist."""
|
||||
|
||||
error_key = "resolution_check_not_found_error"
|
||||
message_template = "Check '{check}' does not exist"
|
||||
|
||||
def __init__(
|
||||
self, logger: Callable[..., None] | None = None, *, check: str
|
||||
) -> None:
|
||||
"""Initialize exception."""
|
||||
self.extra_fields = {"check": check}
|
||||
super().__init__(None, logger)
|
||||
|
||||
|
||||
class ResolutionIssueNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
|
||||
"""Raise if issue does not exist."""
|
||||
|
||||
error_key = "resolution_issue_not_found_error"
|
||||
message_template = "Issue {uuid} does not exist"
|
||||
|
||||
def __init__(self, logger: Callable[..., None] | None = None, *, uuid: str) -> None:
|
||||
"""Initialize exception."""
|
||||
self.extra_fields = {"uuid": uuid}
|
||||
super().__init__(None, logger)
|
||||
|
||||
|
||||
class ResolutionSuggestionNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
|
||||
"""Raise if suggestion does not exist."""
|
||||
|
||||
error_key = "resolution_suggestion_not_found_error"
|
||||
message_template = "Suggestion {uuid} does not exist"
|
||||
|
||||
def __init__(self, logger: Callable[..., None] | None = None, *, uuid: str) -> None:
|
||||
"""Initialize exception."""
|
||||
self.extra_fields = {"uuid": uuid}
|
||||
super().__init__(None, logger)
|
||||
|
||||
|
||||
# Store
|
||||
|
||||
|
||||
|
||||
@@ -215,10 +215,10 @@ class Mount(CoreSysAttributes, ABC):
|
||||
await self._update_state(unit)
|
||||
|
||||
# If active, dismiss corresponding failed mount issue if found
|
||||
if (mounted := await self.is_mounted()) and (
|
||||
issue := self.sys_resolution.get_issue_if_present(self.failed_issue)
|
||||
):
|
||||
self.sys_resolution.dismiss_issue(issue)
|
||||
if (
|
||||
mounted := await self.is_mounted()
|
||||
) and self.failed_issue in self.sys_resolution.issues:
|
||||
self.sys_resolution.dismiss_issue(self.failed_issue)
|
||||
|
||||
return mounted
|
||||
|
||||
@@ -361,8 +361,8 @@ class Mount(CoreSysAttributes, ABC):
|
||||
await self._restart()
|
||||
|
||||
# If it is mounted now, dismiss corresponding issue if present
|
||||
if issue := self.sys_resolution.get_issue_if_present(self.failed_issue):
|
||||
self.sys_resolution.dismiss_issue(issue)
|
||||
if self.failed_issue in self.sys_resolution.issues:
|
||||
self.sys_resolution.dismiss_issue(self.failed_issue)
|
||||
|
||||
async def _restart(self) -> None:
|
||||
"""Restart mount unit to re-mount."""
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import Any
|
||||
|
||||
from ..const import ATTR_CHECKS
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import ResolutionCheckNotFound
|
||||
from ..exceptions import ResolutionNotFound
|
||||
from ..utils.sentry import async_capture_exception
|
||||
from .checks.base import CheckBase
|
||||
from .validate import get_valid_modules
|
||||
@@ -50,7 +50,7 @@ class ResolutionCheck(CoreSysAttributes):
|
||||
if slug in self._checks:
|
||||
return self._checks[slug]
|
||||
|
||||
raise ResolutionCheckNotFound(check=slug)
|
||||
raise ResolutionNotFound(f"Check with slug {slug} not found!")
|
||||
|
||||
async def check_system(self) -> None:
|
||||
"""Check the system."""
|
||||
|
||||
@@ -7,11 +7,7 @@ import attr
|
||||
|
||||
from ..bus import EventListener
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import (
|
||||
ResolutionError,
|
||||
ResolutionIssueNotFound,
|
||||
ResolutionSuggestionNotFound,
|
||||
)
|
||||
from ..exceptions import ResolutionError, ResolutionNotFound
|
||||
from ..homeassistant.const import WSEvent
|
||||
from ..utils.common import FileConfiguration
|
||||
from .check import ResolutionCheck
|
||||
@@ -169,37 +165,21 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
|
||||
]
|
||||
}
|
||||
|
||||
def get_suggestion_by_id(self, uuid: str) -> Suggestion:
|
||||
def get_suggestion(self, uuid: str) -> Suggestion:
|
||||
"""Return suggestion with uuid."""
|
||||
for suggestion in self._suggestions:
|
||||
if suggestion.uuid != uuid:
|
||||
continue
|
||||
return suggestion
|
||||
raise ResolutionSuggestionNotFound(uuid=uuid)
|
||||
raise ResolutionNotFound()
|
||||
|
||||
def get_suggestion_if_present(self, suggestion: Suggestion) -> Suggestion | None:
|
||||
"""Get suggestion matching provided one if it exists in resolution manager."""
|
||||
for s in self._suggestions:
|
||||
if s != suggestion:
|
||||
continue
|
||||
return s
|
||||
return None
|
||||
|
||||
def get_issue_by_id(self, uuid: str) -> Issue:
|
||||
def get_issue(self, uuid: str) -> Issue:
|
||||
"""Return issue with uuid."""
|
||||
for issue in self._issues:
|
||||
if issue.uuid != uuid:
|
||||
continue
|
||||
return issue
|
||||
raise ResolutionIssueNotFound(uuid=uuid)
|
||||
|
||||
def get_issue_if_present(self, issue: Issue) -> Issue | None:
|
||||
"""Get issue matching provided one if it exists in resolution manager."""
|
||||
for i in self._issues:
|
||||
if i != issue:
|
||||
continue
|
||||
return i
|
||||
return None
|
||||
raise ResolutionNotFound()
|
||||
|
||||
def create_issue(
|
||||
self,
|
||||
@@ -254,13 +234,20 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
|
||||
|
||||
async def apply_suggestion(self, suggestion: Suggestion) -> None:
|
||||
"""Apply suggested action."""
|
||||
suggestion = self.get_suggestion_by_id(suggestion.uuid)
|
||||
if suggestion not in self._suggestions:
|
||||
raise ResolutionError(
|
||||
f"Suggestion {suggestion.uuid} is not valid", _LOGGER.warning
|
||||
)
|
||||
|
||||
await self.fixup.apply_fixup(suggestion)
|
||||
await self.healthcheck()
|
||||
|
||||
def dismiss_suggestion(self, suggestion: Suggestion) -> None:
|
||||
"""Dismiss suggested action."""
|
||||
suggestion = self.get_suggestion_by_id(suggestion.uuid)
|
||||
if suggestion not in self._suggestions:
|
||||
raise ResolutionError(
|
||||
f"The UUID {suggestion.uuid} is not valid suggestion", _LOGGER.warning
|
||||
)
|
||||
self._suggestions.remove(suggestion)
|
||||
|
||||
# Remove event listeners if present
|
||||
@@ -276,7 +263,10 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
|
||||
|
||||
def dismiss_issue(self, issue: Issue) -> None:
|
||||
"""Dismiss suggested action."""
|
||||
issue = self.get_issue_by_id(issue.uuid)
|
||||
if issue not in self._issues:
|
||||
raise ResolutionError(
|
||||
f"The UUID {issue.uuid} is not a valid issue", _LOGGER.warning
|
||||
)
|
||||
self._issues.remove(issue)
|
||||
|
||||
# Event on issue removal
|
||||
|
||||
@@ -12,7 +12,7 @@ import aiodocker
|
||||
from aiodocker.containers import DockerContainer
|
||||
from awesomeversion import AwesomeVersion
|
||||
import pytest
|
||||
from securetar import SecureTarArchive, SecureTarFile
|
||||
from securetar import SecureTarFile
|
||||
|
||||
from supervisor.addons.addon import Addon
|
||||
from supervisor.addons.const import AddonBackupMode
|
||||
@@ -34,8 +34,6 @@ from supervisor.exceptions import (
|
||||
)
|
||||
from supervisor.hardware.helper import HwHelper
|
||||
from supervisor.ingress import Ingress
|
||||
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
|
||||
from supervisor.resolution.data import Issue
|
||||
from supervisor.utils.dt import utcnow
|
||||
|
||||
from .test_manager import BOOT_FAIL_ISSUE, BOOT_FAIL_SUGGESTIONS
|
||||
@@ -438,11 +436,8 @@ async def test_backup(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
archive.close()
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", ["running", "stopped"])
|
||||
@@ -462,11 +457,8 @@ async def test_backup_no_config(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
archive.close()
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
|
||||
@@ -481,17 +473,14 @@ async def test_backup_with_pre_post_command(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
with (
|
||||
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
|
||||
patch.object(
|
||||
Addon, "backup_post", new=PropertyMock(return_value="backup_post")
|
||||
),
|
||||
):
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
archive.close()
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
|
||||
assert container.exec.call_count == 2
|
||||
assert container.exec.call_args_list[0].args[0] == "backup_pre"
|
||||
@@ -554,18 +543,15 @@ async def test_backup_with_pre_command_error(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
with (
|
||||
patch.object(DockerAddon, "is_running", return_value=True),
|
||||
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
|
||||
pytest.raises(exc_type_raised),
|
||||
):
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
|
||||
assert not tar_file.path.exists()
|
||||
archive.close()
|
||||
assert not tarfile.path.exists()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", ["running", "stopped"])
|
||||
@@ -582,9 +568,7 @@ async def test_backup_cold_mode(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
with (
|
||||
patch.object(
|
||||
AddonModel,
|
||||
@@ -595,8 +579,7 @@ async def test_backup_cold_mode(
|
||||
DockerAddon, "is_running", side_effect=[status == "running", False, False]
|
||||
),
|
||||
):
|
||||
start_task = await install_addon_ssh.backup(tar_file)
|
||||
archive.close()
|
||||
start_task = await install_addon_ssh.backup(tarfile)
|
||||
|
||||
assert bool(start_task) is (status == "running")
|
||||
|
||||
@@ -624,9 +607,7 @@ async def test_backup_cold_mode_with_watchdog(
|
||||
|
||||
# Patching out the normal end of backup process leaves the container in a stopped state
|
||||
# Watchdog should still not try to restart it though, it should remain this way
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
with (
|
||||
patch.object(Addon, "start") as start,
|
||||
patch.object(Addon, "restart") as restart,
|
||||
@@ -638,11 +619,10 @@ async def test_backup_cold_mode_with_watchdog(
|
||||
new=PropertyMock(return_value=AddonBackupMode.COLD),
|
||||
),
|
||||
):
|
||||
await install_addon_ssh.backup(tar_file)
|
||||
await install_addon_ssh.backup(tarfile)
|
||||
await asyncio.sleep(0)
|
||||
start.assert_not_called()
|
||||
restart.assert_not_called()
|
||||
archive.close()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", ["running", "stopped"])
|
||||
@@ -655,7 +635,7 @@ async def test_restore(coresys: CoreSys, install_addon_ssh: Addon, status: str)
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"))
|
||||
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"), "r")
|
||||
with patch.object(DockerAddon, "is_running", return_value=False):
|
||||
start_task = await coresys.addons.restore(TEST_ADDON_SLUG, tarfile)
|
||||
|
||||
@@ -675,7 +655,7 @@ async def test_restore_while_running(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
|
||||
with (
|
||||
patch.object(DockerAddon, "is_running", return_value=True),
|
||||
patch.object(Ingress, "update_hass_panel"),
|
||||
@@ -708,7 +688,7 @@ async def test_restore_while_running_with_watchdog(
|
||||
|
||||
# We restore a stopped backup so restore will not restart it
|
||||
# Watchdog will see it stop and should not attempt reanimation either
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
|
||||
with (
|
||||
patch.object(Addon, "start") as start,
|
||||
patch.object(Addon, "restart") as restart,
|
||||
@@ -996,40 +976,16 @@ async def test_addon_manual_only_boot(install_addon_example: Addon):
|
||||
assert install_addon_example.boot == "manual"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("initial_state", "target_state", "issue", "suggestions"),
|
||||
[
|
||||
(
|
||||
AddonState.ERROR,
|
||||
AddonState.STARTED,
|
||||
BOOT_FAIL_ISSUE,
|
||||
[suggestion.type for suggestion in BOOT_FAIL_SUGGESTIONS],
|
||||
),
|
||||
(
|
||||
AddonState.STARTED,
|
||||
AddonState.STOPPED,
|
||||
Issue(
|
||||
IssueType.DEVICE_ACCESS_MISSING,
|
||||
ContextType.ADDON,
|
||||
reference=TEST_ADDON_SLUG,
|
||||
),
|
||||
[SuggestionType.EXECUTE_RESTART],
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_addon_state_dismisses_issue(
|
||||
coresys: CoreSys,
|
||||
install_addon_ssh: Addon,
|
||||
initial_state: AddonState,
|
||||
target_state: AddonState,
|
||||
issue: Issue,
|
||||
suggestions: list[SuggestionType],
|
||||
async def test_addon_start_dismisses_boot_fail(
|
||||
coresys: CoreSys, install_addon_ssh: Addon
|
||||
):
|
||||
"""Test an addon state change dismisses the issues."""
|
||||
install_addon_ssh.state = initial_state
|
||||
coresys.resolution.add_issue(issue, suggestions)
|
||||
"""Test a successful start dismisses the boot fail issue."""
|
||||
install_addon_ssh.state = AddonState.ERROR
|
||||
coresys.resolution.add_issue(
|
||||
BOOT_FAIL_ISSUE, [suggestion.type for suggestion in BOOT_FAIL_SUGGESTIONS]
|
||||
)
|
||||
|
||||
install_addon_ssh.state = target_state
|
||||
install_addon_ssh.state = AddonState.STARTED
|
||||
assert coresys.resolution.issues == []
|
||||
assert coresys.resolution.suggestions == []
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Test Resolution API."""
|
||||
|
||||
from http import HTTPStatus
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from aiohttp.test_utils import TestClient
|
||||
@@ -47,7 +46,7 @@ async def test_api_resolution_base(coresys: CoreSys, api_client: TestClient):
|
||||
async def test_api_resolution_dismiss_suggestion(
|
||||
coresys: CoreSys, api_client: TestClient
|
||||
):
|
||||
"""Test resolution manager dismiss suggestion api."""
|
||||
"""Test resolution manager suggestion apply api."""
|
||||
coresys.resolution.add_suggestion(
|
||||
clear_backup := Suggestion(SuggestionType.CLEAR_FULL_BACKUP, ContextType.SYSTEM)
|
||||
)
|
||||
@@ -190,9 +189,7 @@ async def test_issue_not_found(api_client: TestClient, method: str, url: str):
|
||||
resp = await api_client.request(method, url)
|
||||
assert resp.status == 404
|
||||
body = await resp.json()
|
||||
assert body["message"] == "Issue bad does not exist"
|
||||
assert body["error_key"] == "resolution_issue_not_found_error"
|
||||
assert body["extra_fields"] == {"uuid": "bad"}
|
||||
assert body["message"] == "The supplied UUID is not a valid issue"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -204,9 +201,7 @@ async def test_suggestion_not_found(api_client: TestClient, method: str, url: st
|
||||
resp = await api_client.request(method, url)
|
||||
assert resp.status == 404
|
||||
body = await resp.json()
|
||||
assert body["message"] == "Suggestion bad does not exist"
|
||||
assert body["error_key"] == "resolution_suggestion_not_found_error"
|
||||
assert body["extra_fields"] == {"uuid": "bad"}
|
||||
assert body["message"] == "The supplied UUID is not a valid suggestion"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -216,8 +211,6 @@ async def test_suggestion_not_found(api_client: TestClient, method: str, url: st
|
||||
async def test_check_not_found(api_client: TestClient, method: str, url: str):
|
||||
"""Test check not found error."""
|
||||
resp = await api_client.request(method, url)
|
||||
assert resp.status == HTTPStatus.NOT_FOUND
|
||||
assert resp.status == 404
|
||||
body = await resp.json()
|
||||
assert body["message"] == "Check 'bad' does not exist"
|
||||
assert body["error_key"] == "resolution_check_not_found_error"
|
||||
assert body["extra_fields"] == {"check": "bad"}
|
||||
assert body["message"] == "The supplied check slug is not available"
|
||||
|
||||
@@ -8,7 +8,7 @@ import tarfile
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from securetar import AddFileError, InvalidPasswordError, SecureTarReadError
|
||||
from securetar import AddFileError
|
||||
|
||||
from supervisor.addons.addon import Addon
|
||||
from supervisor.backups.backup import Backup, BackupLocation
|
||||
@@ -234,21 +234,7 @@ async def test_consolidate_failure(coresys: CoreSys, tmp_path: Path):
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup 93b462f8"
|
||||
),
|
||||
), # Invalid password (legacy securetar exception)
|
||||
(
|
||||
None,
|
||||
SecureTarReadError,
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup 93b462f8"
|
||||
),
|
||||
), # Invalid password (securetar >= 2026.2.0 raises SecureTarReadError)
|
||||
(
|
||||
None,
|
||||
InvalidPasswordError,
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup 93b462f8"
|
||||
),
|
||||
), # Invalid password (securetar >= 2026.2.0 with v3 backup raises InvalidPasswordError)
|
||||
), # Invalid password
|
||||
],
|
||||
)
|
||||
async def test_validate_backup(
|
||||
@@ -258,12 +244,7 @@ async def test_validate_backup(
|
||||
securetar_side_effect: type[Exception] | None,
|
||||
expected_exception: AbstractContextManager,
|
||||
):
|
||||
"""Parameterized test for validate_backup.
|
||||
|
||||
Note that it is paramount that BackupInvalidError is raised for invalid password
|
||||
cases, as this is used by the Core to determine if a backup password is invalid
|
||||
and offer a input field to the user to input the correct password.
|
||||
"""
|
||||
"""Parameterized test for validate_backup."""
|
||||
enc_tar = Path(copy(get_fixture_path("backup_example_enc.tar"), tmp_path))
|
||||
enc_backup = Backup(coresys, enc_tar, "test", None)
|
||||
await enc_backup.load()
|
||||
@@ -292,44 +273,3 @@ async def test_validate_backup(
|
||||
expected_exception,
|
||||
):
|
||||
await enc_backup.validate_backup(None)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("password", "expected_exception"),
|
||||
[
|
||||
("supervisor", does_not_raise()),
|
||||
(
|
||||
"wrong_password",
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup f92f0339"
|
||||
),
|
||||
),
|
||||
(
|
||||
None,
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup f92f0339"
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_validate_backup_v3(
|
||||
coresys: CoreSys,
|
||||
tmp_path: Path,
|
||||
password: str | None,
|
||||
expected_exception: AbstractContextManager,
|
||||
):
|
||||
"""Test validate_backup with a real SecureTar v3 encrypted backup.
|
||||
|
||||
SecureTar v3 uses Argon2id key derivation and raises InvalidPasswordError
|
||||
on wrong passwords. It is paramount that BackupInvalidError is raised for
|
||||
invalid password cases, as this is used by the Core to determine if a backup
|
||||
password is invalid and offer a dialog to the user to input the correct
|
||||
password.
|
||||
"""
|
||||
v3_tar = Path(copy(get_fixture_path("backup_example_sec_v3.tar"), tmp_path))
|
||||
v3_backup = Backup(coresys, v3_tar, "test", None)
|
||||
await v3_backup.load()
|
||||
v3_backup.set_password(password)
|
||||
|
||||
with expected_exception:
|
||||
await v3_backup.validate_backup(None)
|
||||
|
||||
@@ -167,7 +167,7 @@ async def test_homeassistant_restore_rejects_path_traversal(
|
||||
traversal_info.size = 9
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
tar_file = SecureTarFile(tar_path, gzip=True)
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await coresys.homeassistant.restore(tar_file)
|
||||
|
||||
@@ -181,7 +181,7 @@ async def test_addon_restore_rejects_path_traversal(
|
||||
traversal_info.size = 9
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
tar_file = SecureTarFile(tar_path, gzip=True)
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await install_addon_ssh.restore(tar_file)
|
||||
|
||||
@@ -203,7 +203,7 @@ async def test_addon_restore_rejects_symlink_escape(
|
||||
{"escape/evil.py": b"malicious"},
|
||||
)
|
||||
|
||||
tar_file = SecureTarFile(tar_path, gzip=True)
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await install_addon_ssh.restore(tar_file)
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ from blockbuster import BlockBuster, BlockBusterFunction
|
||||
from dbus_fast import BusType
|
||||
from dbus_fast.aio.message_bus import MessageBus
|
||||
import pytest
|
||||
from securetar import SecureTarArchive
|
||||
from securetar import SecureTarFile
|
||||
|
||||
from supervisor import config as su_config
|
||||
from supervisor.addons.addon import Addon
|
||||
@@ -848,7 +848,7 @@ async def backups(
|
||||
for i in range(request.param if hasattr(request, "param") else 5):
|
||||
slug = f"sn{i + 1}"
|
||||
temp_tar = Path(tmp_path, f"{slug}.tar")
|
||||
with SecureTarArchive(temp_tar, "w"):
|
||||
with SecureTarFile(temp_tar, "w"):
|
||||
pass
|
||||
backup = Backup(coresys, temp_tar, slug, None)
|
||||
backup._data = { # pylint: disable=protected-access
|
||||
|
||||
BIN
tests/fixtures/backup_example_sec_v3.tar
vendored
BIN
tests/fixtures/backup_example_sec_v3.tar
vendored
Binary file not shown.
@@ -43,9 +43,7 @@ async def test_reading_addon_files_error(coresys: CoreSys):
|
||||
assert reset_repo in coresys.resolution.suggestions
|
||||
assert coresys.core.healthy is True
|
||||
|
||||
coresys.resolution.dismiss_issue(
|
||||
coresys.resolution.get_issue_if_present(corrupt_repo)
|
||||
)
|
||||
coresys.resolution.dismiss_issue(corrupt_repo)
|
||||
err.errno = errno.EBADMSG
|
||||
assert (await coresys.store.data._find_addon_configs(Path("test"), {})) is None
|
||||
assert corrupt_repo in coresys.resolution.issues
|
||||
|
||||
Reference in New Issue
Block a user