Compare commits

...

8 Commits

Author SHA1 Message Date
Stefan Agner
e51859c91b Use default_verbose for host logs as well
Use the new default_verbose flag for advanced logs, to make it more
explicit that we want timestamps for host logs as well.
2026-02-26 13:36:58 +01:00
Stefan Agner
bd8b58f45a Use verbose log output for plug-ins
All three plug-ins which support logging (dns, multicast and audio)
should use the verbose log format by default to make sure the log lines
are annotated with timestamp. Introduce a new flag default_verbose for
advanced logs.
2026-02-26 13:35:16 +01:00
Stefan Agner
7f6327e94e Handle missing Accept header in host logs (#6594)
* Handle missing Accept header in host logs

Avoid indexing request headers directly in the host advanced logs handler when Accept is absent, preventing KeyError crashes on valid requests without that header. Fixes SUPERVISOR-1939.

* Add pytest
2026-02-26 11:30:08 +01:00
Mike Degatano
9f00b6e34f Ensure uuid of dismissed suggestion/issue matches an existing one (#6582)
* Ensure uuid of dismissed suggestion/issue matches an existing one

* Fix lint, test and feedback issues

* Adjust existing tests and remove new ones for not found errors

* fix device access issue usage
2026-02-25 10:26:44 +01:00
Stefan Agner
7a0b2e474a Remove unused Docker config from backup metadata (#6591)
Remove the docker property and schema validation from backup metadata.
The Docker config (registry credentials, IPv6 setting) was already
dropped from backup/restore operations in #5605, but the property and
schema entry remained. Old backups with the docker key still load fine
since the schema uses extra=vol.ALLOW_EXTRA.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 09:12:05 +01:00
dependabot[bot]
b74277ced0 Bump home-assistant/builder from 2025.11.0 to 2026.02.1 (#6592)
Bumps [home-assistant/builder](https://github.com/home-assistant/builder) from 2025.11.0 to 2026.02.1.
- [Release notes](https://github.com/home-assistant/builder/releases)
- [Commits](https://github.com/home-assistant/builder/compare/2025.11.0...2026.02.1)

---
updated-dependencies:
- dependency-name: home-assistant/builder
  dependency-version: 2026.02.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-02-25 09:07:24 +01:00
Stefan Agner
c9a874b352 Remove RuntimeError from APIError inheritance (#6588)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 22:46:16 +01:00
Stefan Agner
3de2deaf02 Bump securetar to 2026.2.0 (#6575)
* Bump securetar from 2025.12.0 to 2026.2.0

Adapt to the new securetar API:
- Use SecureTarArchive for outer backup tar (replaces SecureTarFile
  with gzip=False for the outer container)
- create_inner_tar() renamed to create_tar(), password now inherited
  from the archive rather than passed per inner tar
- SecureTarFile no longer accepts a mode parameter (read-only by
  default, InnerSecureTarFile for writing)
- Pass create_version=2 to keep protected backups at version 2

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Reformat imports

* Rename _create_cleanup to _create_finalize and update docstring

* Use constant for SecureTar create version

* Add test for SecureTarReadError in validate_backup

securetar >= 2026.2.0 raises SecureTarReadError instead of
tarfile.ReadError for invalid passwords. Catching this exception
and raising BackupInvalidError is required so Core shows the
encryption key dialog to the user.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Handle InvalidPasswordError for v3 backups

* Address typos

* Add securetar v3 encrypted password test fixture

Add a test fixture for a securetar v3 encrypted backup with password.
This will be used in the test suite to verify that the backup
extraction process correctly handles encrypted backups.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:08:14 +01:00
26 changed files with 351 additions and 166 deletions

View File

@@ -205,7 +205,7 @@ jobs:
# home-assistant/builder doesn't support sha pinning
- name: Build supervisor
uses: home-assistant/builder@2025.11.0
uses: home-assistant/builder@2026.02.1
with:
image: ${{ matrix.arch }}
args: |
@@ -259,7 +259,7 @@ jobs:
# home-assistant/builder doesn't support sha pinning
- name: Build the Supervisor
if: needs.init.outputs.publish != 'true'
uses: home-assistant/builder@2025.11.0
uses: home-assistant/builder@2026.02.1
with:
args: |
--test \

View File

@@ -23,7 +23,7 @@ pulsectl==24.12.0
pyudev==0.24.4
PyYAML==6.0.3
requests==2.32.5
securetar==2025.12.0
securetar==2026.2.0
sentry-sdk==2.53.0
setuptools==82.0.0
voluptuous==0.16.0

View File

@@ -191,18 +191,18 @@ class Addon(AddonModel):
self._startup_event.set()
# Dismiss boot failed issue if present and we started
if (
new_state == AddonState.STARTED
and self.boot_failed_issue in self.sys_resolution.issues
if new_state == AddonState.STARTED and (
issue := self.sys_resolution.get_issue_if_present(self.boot_failed_issue)
):
self.sys_resolution.dismiss_issue(self.boot_failed_issue)
self.sys_resolution.dismiss_issue(issue)
# Dismiss device access missing issue if present and we stopped
if (
new_state == AddonState.STOPPED
and self.device_access_missing_issue in self.sys_resolution.issues
if new_state == AddonState.STOPPED and (
issue := self.sys_resolution.get_issue_if_present(
self.device_access_missing_issue
)
):
self.sys_resolution.dismiss_issue(self.device_access_missing_issue)
self.sys_resolution.dismiss_issue(issue)
self.sys_homeassistant.websocket.supervisor_event_custom(
WSEvent.ADDON,
@@ -363,11 +363,10 @@ class Addon(AddonModel):
self.persist[ATTR_BOOT] = value
# Dismiss boot failed issue if present and boot at start disabled
if (
value == AddonBoot.MANUAL
and self._boot_failed_issue in self.sys_resolution.issues
if value == AddonBoot.MANUAL and (
issue := self.sys_resolution.get_issue_if_present(self._boot_failed_issue)
):
self.sys_resolution.dismiss_issue(self._boot_failed_issue)
self.sys_resolution.dismiss_issue(issue)
@property
def auto_update(self) -> bool:

View File

@@ -129,14 +129,23 @@ class RestAPI(CoreSysAttributes):
await self.start()
def _register_advanced_logs(self, path: str, syslog_identifier: str):
def _register_advanced_logs(
self,
path: str,
syslog_identifier: str,
default_verbose: bool = False,
):
"""Register logs endpoint for a given path, returning logs for single syslog identifier."""
self.webapp.add_routes(
[
web.get(
f"{path}/logs",
partial(self._api_host.advanced_logs, identifier=syslog_identifier),
partial(
self._api_host.advanced_logs,
identifier=syslog_identifier,
default_verbose=default_verbose,
),
),
web.get(
f"{path}/logs/follow",
@@ -144,6 +153,7 @@ class RestAPI(CoreSysAttributes):
self._api_host.advanced_logs,
identifier=syslog_identifier,
follow=True,
default_verbose=default_verbose,
),
),
web.get(
@@ -153,11 +163,16 @@ class RestAPI(CoreSysAttributes):
identifier=syslog_identifier,
latest=True,
no_colors=True,
default_verbose=default_verbose,
),
),
web.get(
f"{path}/logs/boots/{{bootid}}",
partial(self._api_host.advanced_logs, identifier=syslog_identifier),
partial(
self._api_host.advanced_logs,
identifier=syslog_identifier,
default_verbose=default_verbose,
),
),
web.get(
f"{path}/logs/boots/{{bootid}}/follow",
@@ -165,6 +180,7 @@ class RestAPI(CoreSysAttributes):
self._api_host.advanced_logs,
identifier=syslog_identifier,
follow=True,
default_verbose=default_verbose,
),
),
]
@@ -177,10 +193,13 @@ class RestAPI(CoreSysAttributes):
self.webapp.add_routes(
[
web.get("/host/info", api_host.info),
web.get("/host/logs", api_host.advanced_logs),
web.get(
"/host/logs",
partial(api_host.advanced_logs, default_verbose=True),
),
web.get(
"/host/logs/follow",
partial(api_host.advanced_logs, follow=True),
partial(api_host.advanced_logs, follow=True, default_verbose=True),
),
web.get("/host/logs/identifiers", api_host.list_identifiers),
web.get("/host/logs/identifiers/{identifier}", api_host.advanced_logs),
@@ -189,10 +208,13 @@ class RestAPI(CoreSysAttributes):
partial(api_host.advanced_logs, follow=True),
),
web.get("/host/logs/boots", api_host.list_boots),
web.get("/host/logs/boots/{bootid}", api_host.advanced_logs),
web.get(
"/host/logs/boots/{bootid}",
partial(api_host.advanced_logs, default_verbose=True),
),
web.get(
"/host/logs/boots/{bootid}/follow",
partial(api_host.advanced_logs, follow=True),
partial(api_host.advanced_logs, follow=True, default_verbose=True),
),
web.get(
"/host/logs/boots/{bootid}/identifiers/{identifier}",
@@ -335,7 +357,9 @@ class RestAPI(CoreSysAttributes):
web.post("/multicast/restart", api_multicast.restart),
]
)
self._register_advanced_logs("/multicast", "hassio_multicast")
self._register_advanced_logs(
"/multicast", "hassio_multicast", default_verbose=True
)
def _register_hardware(self) -> None:
"""Register hardware functions."""
@@ -695,7 +719,7 @@ class RestAPI(CoreSysAttributes):
]
)
self._register_advanced_logs("/dns", "hassio_dns")
self._register_advanced_logs("/dns", "hassio_dns", default_verbose=True)
def _register_audio(self) -> None:
"""Register Audio functions."""
@@ -718,7 +742,7 @@ class RestAPI(CoreSysAttributes):
]
)
self._register_advanced_logs("/audio", "hassio_audio")
self._register_advanced_logs("/audio", "hassio_audio", default_verbose=True)
def _register_mounts(self) -> None:
"""Register mounts endpoints."""

View File

@@ -208,9 +208,10 @@ class APIHost(CoreSysAttributes):
follow: bool = False,
latest: bool = False,
no_colors: bool = False,
default_verbose: bool = False,
) -> web.StreamResponse:
"""Return systemd-journald logs."""
log_formatter = LogFormatter.PLAIN
log_formatter = LogFormatter.VERBOSE if default_verbose else LogFormatter.PLAIN
params: dict[str, Any] = {}
if identifier:
params[PARAM_SYSLOG_IDENTIFIER] = identifier
@@ -218,8 +219,6 @@ class APIHost(CoreSysAttributes):
params[PARAM_SYSLOG_IDENTIFIER] = request.match_info[IDENTIFIER]
else:
params[PARAM_SYSLOG_IDENTIFIER] = self.sys_host.logs.default_identifiers
# host logs should be always verbose, no matter what Accept header is used
log_formatter = LogFormatter.VERBOSE
if BOOTID in request.match_info:
params[PARAM_BOOT_ID] = await self._get_boot_id(request.match_info[BOOTID])
@@ -240,7 +239,9 @@ class APIHost(CoreSysAttributes):
f"Cannot determine CONTAINER_LOG_EPOCH of {identifier}, latest logs not available."
) from err
if ACCEPT in request.headers and request.headers[ACCEPT] not in [
accept_header = request.headers.get(ACCEPT)
if accept_header and accept_header not in [
CONTENT_TYPE_TEXT,
CONTENT_TYPE_X_LOG,
"*/*",
@@ -250,7 +251,7 @@ class APIHost(CoreSysAttributes):
"supported for now."
)
if "verbose" in request.query or request.headers[ACCEPT] == CONTENT_TYPE_X_LOG:
if "verbose" in request.query or accept_header == CONTENT_TYPE_X_LOG:
log_formatter = LogFormatter.VERBOSE
if "no_colors" in request.query:
@@ -326,10 +327,11 @@ class APIHost(CoreSysAttributes):
follow: bool = False,
latest: bool = False,
no_colors: bool = False,
default_verbose: bool = False,
) -> web.StreamResponse:
"""Return systemd-journald logs. Wrapped as standard API handler."""
return await self.advanced_logs_handler(
request, identifier, follow, latest, no_colors
request, identifier, follow, latest, no_colors, default_verbose
)
@api_process

View File

@@ -19,7 +19,6 @@ from ..const import (
ATTR_UNSUPPORTED,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APINotFound, ResolutionNotFound
from ..resolution.checks.base import CheckBase
from ..resolution.data import Issue, Suggestion
from .utils import api_process, api_validate
@@ -32,24 +31,17 @@ class APIResoulution(CoreSysAttributes):
def _extract_issue(self, request: web.Request) -> Issue:
"""Extract issue from request or raise."""
try:
return self.sys_resolution.get_issue(request.match_info["issue"])
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid issue") from None
return self.sys_resolution.get_issue_by_id(request.match_info["issue"])
def _extract_suggestion(self, request: web.Request) -> Suggestion:
"""Extract suggestion from request or raise."""
try:
return self.sys_resolution.get_suggestion(request.match_info["suggestion"])
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid suggestion") from None
return self.sys_resolution.get_suggestion_by_id(
request.match_info["suggestion"]
)
def _extract_check(self, request: web.Request) -> CheckBase:
"""Extract check from request or raise."""
try:
return self.sys_resolution.check.get(request.match_info["check"])
except ResolutionNotFound:
raise APINotFound("The supplied check slug is not available") from None
return self.sys_resolution.check.get(request.match_info["check"])
def _generate_suggestion_information(self, suggestion: Suggestion):
"""Generate suggestion information for response."""

View File

@@ -12,13 +12,19 @@ import json
import logging
from pathlib import Path, PurePath
import tarfile
from tarfile import TarFile
from tempfile import TemporaryDirectory
import time
from typing import Any, Self, cast
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from securetar import AddFileError, SecureTarFile, atomic_contents_add
from securetar import (
AddFileError,
InvalidPasswordError,
SecureTarArchive,
SecureTarFile,
SecureTarReadError,
atomic_contents_add,
)
import voluptuous as vol
from voluptuous.humanize import humanize_error
@@ -59,7 +65,7 @@ from ..utils import remove_folder
from ..utils.dt import parse_datetime, utcnow
from ..utils.json import json_bytes
from ..utils.sentinel import DEFAULT
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, BackupType
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, SECURETAR_CREATE_VERSION, BackupType
from .validate import SCHEMA_BACKUP
IGNORED_COMPARISON_FIELDS = {ATTR_PROTECTED, ATTR_CRYPTO, ATTR_DOCKER}
@@ -99,7 +105,7 @@ class Backup(JobGroup):
)
self._data: dict[str, Any] = data or {ATTR_SLUG: slug}
self._tmp: TemporaryDirectory | None = None
self._outer_secure_tarfile: SecureTarFile | None = None
self._outer_secure_tarfile: SecureTarArchive | None = None
self._password: str | None = None
self._locations: dict[str | None, BackupLocation] = {
location: BackupLocation(
@@ -198,16 +204,6 @@ class Backup(JobGroup):
"""Get extra metadata added by client."""
return self._data[ATTR_EXTRA]
@property
def docker(self) -> dict[str, Any]:
"""Return backup Docker config data."""
return self._data.get(ATTR_DOCKER, {})
@docker.setter
def docker(self, value: dict[str, Any]) -> None:
"""Set the Docker config data."""
self._data[ATTR_DOCKER] = value
@property
def location(self) -> str | None:
"""Return the location of the backup."""
@@ -364,15 +360,17 @@ class Backup(JobGroup):
test_tar_file = backup.extractfile(test_tar_name)
try:
with SecureTarFile(
ending, # Not used
gzip=self.compressed,
mode="r",
fileobj=test_tar_file,
password=self._password,
):
# If we can read the tar file, the password is correct
return
except tarfile.ReadError as ex:
except (
tarfile.ReadError,
SecureTarReadError,
InvalidPasswordError,
) as ex:
raise BackupInvalidError(
f"Invalid password for backup {self.slug}", _LOGGER.error
) from ex
@@ -441,7 +439,7 @@ class Backup(JobGroup):
async def create(self) -> AsyncGenerator[None]:
"""Create new backup file."""
def _open_outer_tarfile() -> tuple[SecureTarFile, tarfile.TarFile]:
def _open_outer_tarfile() -> SecureTarArchive:
"""Create and open outer tarfile."""
if self.tarfile.is_file():
raise BackupFileExistError(
@@ -449,14 +447,15 @@ class Backup(JobGroup):
_LOGGER.error,
)
_outer_secure_tarfile = SecureTarFile(
_outer_secure_tarfile = SecureTarArchive(
self.tarfile,
"w",
gzip=False,
bufsize=BUF_SIZE,
create_version=SECURETAR_CREATE_VERSION,
password=self._password,
)
try:
_outer_tarfile = _outer_secure_tarfile.open()
_outer_secure_tarfile.open()
except PermissionError as ex:
raise BackupPermissionError(
f"Cannot open backup file {self.tarfile.as_posix()}, permission error!",
@@ -468,11 +467,9 @@ class Backup(JobGroup):
_LOGGER.error,
) from ex
return _outer_secure_tarfile, _outer_tarfile
return _outer_secure_tarfile
outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
_open_outer_tarfile
)
outer_secure_tarfile = await self.sys_run_in_executor(_open_outer_tarfile)
self._outer_secure_tarfile = outer_secure_tarfile
def _close_outer_tarfile() -> int:
@@ -483,7 +480,7 @@ class Backup(JobGroup):
try:
yield
finally:
await self._create_cleanup(outer_tarfile)
await self._create_finalize(outer_secure_tarfile)
size_bytes = await self.sys_run_in_executor(_close_outer_tarfile)
self._locations[self.location].size_bytes = size_bytes
self._outer_secure_tarfile = None
@@ -543,11 +540,11 @@ class Backup(JobGroup):
if self._tmp:
await self.sys_run_in_executor(self._tmp.cleanup)
async def _create_cleanup(self, outer_tarfile: TarFile) -> None:
"""Cleanup after backup creation.
async def _create_finalize(self, outer_archive: SecureTarArchive) -> None:
"""Finalize backup creation.
Separate method to be called from create to ensure
that cleanup is always performed, even if an exception is raised.
Separate method to be called from create to ensure that the backup is
finalized.
"""
# validate data
try:
@@ -566,7 +563,7 @@ class Backup(JobGroup):
tar_info = tarfile.TarInfo(name="./backup.json")
tar_info.size = len(raw_bytes)
tar_info.mtime = int(time.time())
outer_tarfile.addfile(tar_info, fileobj=fileobj)
outer_archive.tar.addfile(tar_info, fileobj=fileobj)
try:
await self.sys_run_in_executor(_add_backup_json)
@@ -593,10 +590,9 @@ class Backup(JobGroup):
tar_name = f"{slug}.tar{'.gz' if self.compressed else ''}"
addon_file = self._outer_secure_tarfile.create_inner_tar(
addon_file = self._outer_secure_tarfile.create_tar(
f"./{tar_name}",
gzip=self.compressed,
password=self._password,
)
# Take backup
try:
@@ -646,7 +642,6 @@ class Backup(JobGroup):
tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}"
addon_file = SecureTarFile(
Path(self._tmp.name, tar_name),
"r",
gzip=self.compressed,
bufsize=BUF_SIZE,
password=self._password,
@@ -742,10 +737,9 @@ class Backup(JobGroup):
return False
with outer_secure_tarfile.create_inner_tar(
with outer_secure_tarfile.create_tar(
f"./{tar_name}",
gzip=self.compressed,
password=self._password,
) as tar_file:
atomic_contents_add(
tar_file,
@@ -805,7 +799,6 @@ class Backup(JobGroup):
_LOGGER.info("Restore folder %s", name)
with SecureTarFile(
tar_name,
"r",
gzip=self.compressed,
bufsize=BUF_SIZE,
password=self._password,
@@ -873,10 +866,9 @@ class Backup(JobGroup):
tar_name = f"homeassistant.tar{'.gz' if self.compressed else ''}"
# Backup Home Assistant Core config directory
homeassistant_file = self._outer_secure_tarfile.create_inner_tar(
homeassistant_file = self._outer_secure_tarfile.create_tar(
f"./{tar_name}",
gzip=self.compressed,
password=self._password,
)
await self.sys_homeassistant.backup(homeassistant_file, exclude_database)
@@ -900,7 +892,6 @@ class Backup(JobGroup):
)
homeassistant_file = SecureTarFile(
tar_name,
"r",
gzip=self.compressed,
bufsize=BUF_SIZE,
password=self._password,

View File

@@ -6,6 +6,7 @@ from typing import Literal
from ..mounts.mount import Mount
BUF_SIZE = 2**20 * 4 # 4MB
SECURETAR_CREATE_VERSION = 2
DEFAULT_FREEZE_TIMEOUT = 600
LOCATION_CLOUD_BACKUP = ".cloud_backup"

View File

@@ -14,7 +14,6 @@ from ..const import (
ATTR_CRYPTO,
ATTR_DATE,
ATTR_DAYS_UNTIL_STALE,
ATTR_DOCKER,
ATTR_EXCLUDE_DATABASE,
ATTR_EXTRA,
ATTR_FOLDERS,
@@ -35,7 +34,7 @@ from ..const import (
FOLDER_SSL,
)
from ..store.validate import repositories
from ..validate import SCHEMA_DOCKER_CONFIG, version_tag
from ..validate import version_tag
ALL_FOLDERS = [
FOLDER_SHARE,
@@ -114,7 +113,6 @@ SCHEMA_BACKUP = vol.Schema(
)
),
),
vol.Optional(ATTR_DOCKER, default=dict): SCHEMA_DOCKER_CONFIG,
vol.Optional(ATTR_FOLDERS, default=list): vol.All(
v1_folderlist, [vol.In(ALL_FOLDERS)], vol.Unique()
),

View File

@@ -874,11 +874,12 @@ class DockerAddon(DockerInterface):
await super().stop(remove_container)
# If there is a device access issue and the container is removed, clear it
if (
remove_container
and self.addon.device_access_missing_issue in self.sys_resolution.issues
if remove_container and (
issue := self.sys_resolution.get_issue_if_present(
self.addon.device_access_missing_issue
)
):
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
self.sys_resolution.dismiss_issue(issue)
@Job(
name="docker_addon_hardware_events",

View File

@@ -46,7 +46,7 @@ class HassioNotSupportedError(HassioError):
# API
class APIError(HassioError, RuntimeError):
class APIError(HassioError):
"""API errors."""
status = 400
@@ -964,6 +964,44 @@ class ResolutionFixupJobError(ResolutionFixupError, JobException):
"""Raise on job error."""
class ResolutionCheckNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
"""Raise if check does not exist."""
error_key = "resolution_check_not_found_error"
message_template = "Check '{check}' does not exist"
def __init__(
self, logger: Callable[..., None] | None = None, *, check: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"check": check}
super().__init__(None, logger)
class ResolutionIssueNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
"""Raise if issue does not exist."""
error_key = "resolution_issue_not_found_error"
message_template = "Issue {uuid} does not exist"
def __init__(self, logger: Callable[..., None] | None = None, *, uuid: str) -> None:
"""Initialize exception."""
self.extra_fields = {"uuid": uuid}
super().__init__(None, logger)
class ResolutionSuggestionNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
"""Raise if suggestion does not exist."""
error_key = "resolution_suggestion_not_found_error"
message_template = "Suggestion {uuid} does not exist"
def __init__(self, logger: Callable[..., None] | None = None, *, uuid: str) -> None:
"""Initialize exception."""
self.extra_fields = {"uuid": uuid}
super().__init__(None, logger)
# Store

View File

@@ -215,10 +215,10 @@ class Mount(CoreSysAttributes, ABC):
await self._update_state(unit)
# If active, dismiss corresponding failed mount issue if found
if (
mounted := await self.is_mounted()
) and self.failed_issue in self.sys_resolution.issues:
self.sys_resolution.dismiss_issue(self.failed_issue)
if (mounted := await self.is_mounted()) and (
issue := self.sys_resolution.get_issue_if_present(self.failed_issue)
):
self.sys_resolution.dismiss_issue(issue)
return mounted
@@ -361,8 +361,8 @@ class Mount(CoreSysAttributes, ABC):
await self._restart()
# If it is mounted now, dismiss corresponding issue if present
if self.failed_issue in self.sys_resolution.issues:
self.sys_resolution.dismiss_issue(self.failed_issue)
if issue := self.sys_resolution.get_issue_if_present(self.failed_issue):
self.sys_resolution.dismiss_issue(issue)
async def _restart(self) -> None:
"""Restart mount unit to re-mount."""

View File

@@ -6,7 +6,7 @@ from typing import Any
from ..const import ATTR_CHECKS
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import ResolutionNotFound
from ..exceptions import ResolutionCheckNotFound
from ..utils.sentry import async_capture_exception
from .checks.base import CheckBase
from .validate import get_valid_modules
@@ -50,7 +50,7 @@ class ResolutionCheck(CoreSysAttributes):
if slug in self._checks:
return self._checks[slug]
raise ResolutionNotFound(f"Check with slug {slug} not found!")
raise ResolutionCheckNotFound(check=slug)
async def check_system(self) -> None:
"""Check the system."""

View File

@@ -7,7 +7,11 @@ import attr
from ..bus import EventListener
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import ResolutionError, ResolutionNotFound
from ..exceptions import (
ResolutionError,
ResolutionIssueNotFound,
ResolutionSuggestionNotFound,
)
from ..homeassistant.const import WSEvent
from ..utils.common import FileConfiguration
from .check import ResolutionCheck
@@ -165,21 +169,37 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
]
}
def get_suggestion(self, uuid: str) -> Suggestion:
def get_suggestion_by_id(self, uuid: str) -> Suggestion:
"""Return suggestion with uuid."""
for suggestion in self._suggestions:
if suggestion.uuid != uuid:
continue
return suggestion
raise ResolutionNotFound()
raise ResolutionSuggestionNotFound(uuid=uuid)
def get_issue(self, uuid: str) -> Issue:
def get_suggestion_if_present(self, suggestion: Suggestion) -> Suggestion | None:
"""Get suggestion matching provided one if it exists in resolution manager."""
for s in self._suggestions:
if s != suggestion:
continue
return s
return None
def get_issue_by_id(self, uuid: str) -> Issue:
"""Return issue with uuid."""
for issue in self._issues:
if issue.uuid != uuid:
continue
return issue
raise ResolutionNotFound()
raise ResolutionIssueNotFound(uuid=uuid)
def get_issue_if_present(self, issue: Issue) -> Issue | None:
"""Get issue matching provided one if it exists in resolution manager."""
for i in self._issues:
if i != issue:
continue
return i
return None
def create_issue(
self,
@@ -234,20 +254,13 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
async def apply_suggestion(self, suggestion: Suggestion) -> None:
"""Apply suggested action."""
if suggestion not in self._suggestions:
raise ResolutionError(
f"Suggestion {suggestion.uuid} is not valid", _LOGGER.warning
)
suggestion = self.get_suggestion_by_id(suggestion.uuid)
await self.fixup.apply_fixup(suggestion)
await self.healthcheck()
def dismiss_suggestion(self, suggestion: Suggestion) -> None:
"""Dismiss suggested action."""
if suggestion not in self._suggestions:
raise ResolutionError(
f"The UUID {suggestion.uuid} is not valid suggestion", _LOGGER.warning
)
suggestion = self.get_suggestion_by_id(suggestion.uuid)
self._suggestions.remove(suggestion)
# Remove event listeners if present
@@ -263,10 +276,7 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
def dismiss_issue(self, issue: Issue) -> None:
"""Dismiss suggested action."""
if issue not in self._issues:
raise ResolutionError(
f"The UUID {issue.uuid} is not a valid issue", _LOGGER.warning
)
issue = self.get_issue_by_id(issue.uuid)
self._issues.remove(issue)
# Event on issue removal

View File

@@ -12,7 +12,7 @@ import aiodocker
from aiodocker.containers import DockerContainer
from awesomeversion import AwesomeVersion
import pytest
from securetar import SecureTarFile
from securetar import SecureTarArchive, SecureTarFile
from supervisor.addons.addon import Addon
from supervisor.addons.const import AddonBackupMode
@@ -34,6 +34,8 @@ from supervisor.exceptions import (
)
from supervisor.hardware.helper import HwHelper
from supervisor.ingress import Ingress
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue
from supervisor.utils.dt import utcnow
from .test_manager import BOOT_FAIL_ISSUE, BOOT_FAIL_SUGGESTIONS
@@ -436,8 +438,11 @@ async def test_backup(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
assert await install_addon_ssh.backup(tarfile) is None
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
assert await install_addon_ssh.backup(tar_file) is None
archive.close()
@pytest.mark.parametrize("status", ["running", "stopped"])
@@ -457,8 +462,11 @@ async def test_backup_no_config(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
assert await install_addon_ssh.backup(tarfile) is None
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
assert await install_addon_ssh.backup(tar_file) is None
archive.close()
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
@@ -473,14 +481,17 @@ async def test_backup_with_pre_post_command(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
with (
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
patch.object(
Addon, "backup_post", new=PropertyMock(return_value="backup_post")
),
):
assert await install_addon_ssh.backup(tarfile) is None
assert await install_addon_ssh.backup(tar_file) is None
archive.close()
assert container.exec.call_count == 2
assert container.exec.call_args_list[0].args[0] == "backup_pre"
@@ -543,15 +554,18 @@ async def test_backup_with_pre_command_error(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
with (
patch.object(DockerAddon, "is_running", return_value=True),
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
pytest.raises(exc_type_raised),
):
assert await install_addon_ssh.backup(tarfile) is None
assert await install_addon_ssh.backup(tar_file) is None
assert not tarfile.path.exists()
assert not tar_file.path.exists()
archive.close()
@pytest.mark.parametrize("status", ["running", "stopped"])
@@ -568,7 +582,9 @@ async def test_backup_cold_mode(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
with (
patch.object(
AddonModel,
@@ -579,7 +595,8 @@ async def test_backup_cold_mode(
DockerAddon, "is_running", side_effect=[status == "running", False, False]
),
):
start_task = await install_addon_ssh.backup(tarfile)
start_task = await install_addon_ssh.backup(tar_file)
archive.close()
assert bool(start_task) is (status == "running")
@@ -607,7 +624,9 @@ async def test_backup_cold_mode_with_watchdog(
# Patching out the normal end of backup process leaves the container in a stopped state
# Watchdog should still not try to restart it though, it should remain this way
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
with (
patch.object(Addon, "start") as start,
patch.object(Addon, "restart") as restart,
@@ -619,10 +638,11 @@ async def test_backup_cold_mode_with_watchdog(
new=PropertyMock(return_value=AddonBackupMode.COLD),
),
):
await install_addon_ssh.backup(tarfile)
await install_addon_ssh.backup(tar_file)
await asyncio.sleep(0)
start.assert_not_called()
restart.assert_not_called()
archive.close()
@pytest.mark.parametrize("status", ["running", "stopped"])
@@ -635,7 +655,7 @@ async def test_restore(coresys: CoreSys, install_addon_ssh: Addon, status: str)
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"), "r")
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"))
with patch.object(DockerAddon, "is_running", return_value=False):
start_task = await coresys.addons.restore(TEST_ADDON_SLUG, tarfile)
@@ -655,7 +675,7 @@ async def test_restore_while_running(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
with (
patch.object(DockerAddon, "is_running", return_value=True),
patch.object(Ingress, "update_hass_panel"),
@@ -688,7 +708,7 @@ async def test_restore_while_running_with_watchdog(
# We restore a stopped backup so restore will not restart it
# Watchdog will see it stop and should not attempt reanimation either
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
with (
patch.object(Addon, "start") as start,
patch.object(Addon, "restart") as restart,
@@ -976,16 +996,40 @@ async def test_addon_manual_only_boot(install_addon_example: Addon):
assert install_addon_example.boot == "manual"
async def test_addon_start_dismisses_boot_fail(
coresys: CoreSys, install_addon_ssh: Addon
@pytest.mark.parametrize(
("initial_state", "target_state", "issue", "suggestions"),
[
(
AddonState.ERROR,
AddonState.STARTED,
BOOT_FAIL_ISSUE,
[suggestion.type for suggestion in BOOT_FAIL_SUGGESTIONS],
),
(
AddonState.STARTED,
AddonState.STOPPED,
Issue(
IssueType.DEVICE_ACCESS_MISSING,
ContextType.ADDON,
reference=TEST_ADDON_SLUG,
),
[SuggestionType.EXECUTE_RESTART],
),
],
)
async def test_addon_state_dismisses_issue(
coresys: CoreSys,
install_addon_ssh: Addon,
initial_state: AddonState,
target_state: AddonState,
issue: Issue,
suggestions: list[SuggestionType],
):
"""Test a successful start dismisses the boot fail issue."""
install_addon_ssh.state = AddonState.ERROR
coresys.resolution.add_issue(
BOOT_FAIL_ISSUE, [suggestion.type for suggestion in BOOT_FAIL_SUGGESTIONS]
)
"""Test an addon state change dismisses the issues."""
install_addon_ssh.state = initial_state
coresys.resolution.add_issue(issue, suggestions)
install_addon_ssh.state = AddonState.STARTED
install_addon_ssh.state = target_state
assert coresys.resolution.issues == []
assert coresys.resolution.suggestions == []

View File

@@ -16,6 +16,7 @@ DEFAULT_LOG_RANGE_FOLLOW = "entries=:-99:18446744073709551615"
async def _common_test_api_advanced_logs(
path_prefix: str,
syslog_identifier: str,
formatter: LogFormatter,
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
@@ -32,7 +33,7 @@ async def _common_test_api_advanced_logs(
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, False)
journal_logs_reader.assert_called_with(ANY, formatter, False)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
@@ -46,7 +47,7 @@ async def _common_test_api_advanced_logs(
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, True)
journal_logs_reader.assert_called_with(ANY, formatter, True)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
@@ -60,7 +61,7 @@ async def _common_test_api_advanced_logs(
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, False)
journal_logs_reader.assert_called_with(ANY, formatter, False)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
@@ -86,7 +87,7 @@ async def _common_test_api_advanced_logs(
assert logs_call[1]["params"]["SYSLOG_IDENTIFIER"] == syslog_identifier
assert logs_call[1]["params"]["CONTAINER_LOG_EPOCH"] == "12345"
assert logs_call[1]["range_header"] == "entries=:0:18446744073709551615"
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, True)
journal_logs_reader.assert_called_with(ANY, formatter, True)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
@@ -125,7 +126,7 @@ async def advanced_logs_tester(
coresys: CoreSys,
os_available,
journal_logs_reader: MagicMock,
) -> Callable[[str, str], Awaitable[None]]:
) -> Callable[..., Awaitable[None]]:
"""Fixture that returns a function to test advanced logs endpoints.
This allows tests to avoid explicitly passing all the required fixtures.
@@ -135,10 +136,15 @@ async def advanced_logs_tester(
await advanced_logs_tester("/path/prefix", "syslog_identifier")
"""
async def test_logs(path_prefix: str, syslog_identifier: str):
async def test_logs(
path_prefix: str,
syslog_identifier: str,
formatter: LogFormatter = LogFormatter.PLAIN,
):
await _common_test_api_advanced_logs(
path_prefix,
syslog_identifier,
formatter,
api_client,
journald_logs,
coresys,

View File

@@ -1,6 +1,8 @@
"""Test audio api."""
from supervisor.host.const import LogFormatter
async def test_api_audio_logs(advanced_logs_tester) -> None:
"""Test audio logs."""
await advanced_logs_tester("/audio", "hassio_audio")
await advanced_logs_tester("/audio", "hassio_audio", LogFormatter.VERBOSE)

View File

@@ -6,6 +6,7 @@ from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from supervisor.dbus.resolved import Resolved
from supervisor.host.const import LogFormatter
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.resolved import Resolved as ResolvedService
@@ -67,4 +68,4 @@ async def test_options(api_client: TestClient, coresys: CoreSys):
async def test_api_dns_logs(advanced_logs_tester):
"""Test dns logs."""
await advanced_logs_tester("/dns", "hassio_dns")
await advanced_logs_tester("/dns", "hassio_dns", LogFormatter.VERBOSE)

View File

@@ -374,6 +374,11 @@ async def test_advanced_logs_formatters(
await api_client.get("/host/logs/identifiers/test", headers=headers)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.reset_mock()
await api_client.get("/host/logs/identifiers/test", skip_auto_headers={"Accept"})
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN, False)
async def test_advanced_logs_errors(coresys: CoreSys, api_client: TestClient):
"""Test advanced logging API errors."""

View File

@@ -1,6 +1,8 @@
"""Test multicast api."""
from supervisor.host.const import LogFormatter
async def test_api_multicast_logs(advanced_logs_tester):
"""Test multicast logs."""
await advanced_logs_tester("/multicast", "hassio_multicast")
await advanced_logs_tester("/multicast", "hassio_multicast", LogFormatter.VERBOSE)

View File

@@ -1,5 +1,6 @@
"""Test Resolution API."""
from http import HTTPStatus
from unittest.mock import AsyncMock
from aiohttp.test_utils import TestClient
@@ -46,7 +47,7 @@ async def test_api_resolution_base(coresys: CoreSys, api_client: TestClient):
async def test_api_resolution_dismiss_suggestion(
coresys: CoreSys, api_client: TestClient
):
"""Test resolution manager suggestion apply api."""
"""Test resolution manager dismiss suggestion api."""
coresys.resolution.add_suggestion(
clear_backup := Suggestion(SuggestionType.CLEAR_FULL_BACKUP, ContextType.SYSTEM)
)
@@ -189,7 +190,9 @@ async def test_issue_not_found(api_client: TestClient, method: str, url: str):
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "The supplied UUID is not a valid issue"
assert body["message"] == "Issue bad does not exist"
assert body["error_key"] == "resolution_issue_not_found_error"
assert body["extra_fields"] == {"uuid": "bad"}
@pytest.mark.parametrize(
@@ -201,7 +204,9 @@ async def test_suggestion_not_found(api_client: TestClient, method: str, url: st
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "The supplied UUID is not a valid suggestion"
assert body["message"] == "Suggestion bad does not exist"
assert body["error_key"] == "resolution_suggestion_not_found_error"
assert body["extra_fields"] == {"uuid": "bad"}
@pytest.mark.parametrize(
@@ -211,6 +216,8 @@ async def test_suggestion_not_found(api_client: TestClient, method: str, url: st
async def test_check_not_found(api_client: TestClient, method: str, url: str):
"""Test check not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
assert resp.status == HTTPStatus.NOT_FOUND
body = await resp.json()
assert body["message"] == "The supplied check slug is not available"
assert body["message"] == "Check 'bad' does not exist"
assert body["error_key"] == "resolution_check_not_found_error"
assert body["extra_fields"] == {"check": "bad"}

View File

@@ -8,7 +8,7 @@ import tarfile
from unittest.mock import MagicMock, patch
import pytest
from securetar import AddFileError
from securetar import AddFileError, InvalidPasswordError, SecureTarReadError
from supervisor.addons.addon import Addon
from supervisor.backups.backup import Backup, BackupLocation
@@ -234,7 +234,21 @@ async def test_consolidate_failure(coresys: CoreSys, tmp_path: Path):
pytest.raises(
BackupInvalidError, match="Invalid password for backup 93b462f8"
),
), # Invalid password
), # Invalid password (legacy securetar exception)
(
None,
SecureTarReadError,
pytest.raises(
BackupInvalidError, match="Invalid password for backup 93b462f8"
),
), # Invalid password (securetar >= 2026.2.0 raises SecureTarReadError)
(
None,
InvalidPasswordError,
pytest.raises(
BackupInvalidError, match="Invalid password for backup 93b462f8"
),
), # Invalid password (securetar >= 2026.2.0 with v3 backup raises InvalidPasswordError)
],
)
async def test_validate_backup(
@@ -244,7 +258,12 @@ async def test_validate_backup(
securetar_side_effect: type[Exception] | None,
expected_exception: AbstractContextManager,
):
"""Parameterized test for validate_backup."""
"""Parameterized test for validate_backup.
Note that it is paramount that BackupInvalidError is raised for invalid password
cases, as this is used by the Core to determine if a backup password is invalid
and offer a input field to the user to input the correct password.
"""
enc_tar = Path(copy(get_fixture_path("backup_example_enc.tar"), tmp_path))
enc_backup = Backup(coresys, enc_tar, "test", None)
await enc_backup.load()
@@ -273,3 +292,44 @@ async def test_validate_backup(
expected_exception,
):
await enc_backup.validate_backup(None)
@pytest.mark.parametrize(
("password", "expected_exception"),
[
("supervisor", does_not_raise()),
(
"wrong_password",
pytest.raises(
BackupInvalidError, match="Invalid password for backup f92f0339"
),
),
(
None,
pytest.raises(
BackupInvalidError, match="Invalid password for backup f92f0339"
),
),
],
)
async def test_validate_backup_v3(
coresys: CoreSys,
tmp_path: Path,
password: str | None,
expected_exception: AbstractContextManager,
):
"""Test validate_backup with a real SecureTar v3 encrypted backup.
SecureTar v3 uses Argon2id key derivation and raises InvalidPasswordError
on wrong passwords. It is paramount that BackupInvalidError is raised for
invalid password cases, as this is used by the Core to determine if a backup
password is invalid and offer a dialog to the user to input the correct
password.
"""
v3_tar = Path(copy(get_fixture_path("backup_example_sec_v3.tar"), tmp_path))
v3_backup = Backup(coresys, v3_tar, "test", None)
await v3_backup.load()
v3_backup.set_password(password)
with expected_exception:
await v3_backup.validate_backup(None)

View File

@@ -167,7 +167,7 @@ async def test_homeassistant_restore_rejects_path_traversal(
traversal_info.size = 9
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
tar_file = SecureTarFile(tar_path, "r", gzip=True)
tar_file = SecureTarFile(tar_path, gzip=True)
with pytest.raises(BackupInvalidError):
await coresys.homeassistant.restore(tar_file)
@@ -181,7 +181,7 @@ async def test_addon_restore_rejects_path_traversal(
traversal_info.size = 9
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
tar_file = SecureTarFile(tar_path, "r", gzip=True)
tar_file = SecureTarFile(tar_path, gzip=True)
with pytest.raises(BackupInvalidError):
await install_addon_ssh.restore(tar_file)
@@ -203,7 +203,7 @@ async def test_addon_restore_rejects_symlink_escape(
{"escape/evil.py": b"malicious"},
)
tar_file = SecureTarFile(tar_path, "r", gzip=True)
tar_file = SecureTarFile(tar_path, gzip=True)
with pytest.raises(BackupInvalidError):
await install_addon_ssh.restore(tar_file)

View File

@@ -23,7 +23,7 @@ from blockbuster import BlockBuster, BlockBusterFunction
from dbus_fast import BusType
from dbus_fast.aio.message_bus import MessageBus
import pytest
from securetar import SecureTarFile
from securetar import SecureTarArchive
from supervisor import config as su_config
from supervisor.addons.addon import Addon
@@ -848,7 +848,7 @@ async def backups(
for i in range(request.param if hasattr(request, "param") else 5):
slug = f"sn{i + 1}"
temp_tar = Path(tmp_path, f"{slug}.tar")
with SecureTarFile(temp_tar, "w"):
with SecureTarArchive(temp_tar, "w"):
pass
backup = Backup(coresys, temp_tar, slug, None)
backup._data = { # pylint: disable=protected-access

BIN
tests/fixtures/backup_example_sec_v3.tar vendored Normal file

Binary file not shown.

View File

@@ -43,7 +43,9 @@ async def test_reading_addon_files_error(coresys: CoreSys):
assert reset_repo in coresys.resolution.suggestions
assert coresys.core.healthy is True
coresys.resolution.dismiss_issue(corrupt_repo)
coresys.resolution.dismiss_issue(
coresys.resolution.get_issue_if_present(corrupt_repo)
)
err.errno = errno.EBADMSG
assert (await coresys.store.data._find_addon_configs(Path("test"), {})) is None
assert corrupt_repo in coresys.resolution.issues