Compare commits

..

3 Commits

Author SHA1 Message Date
Jan Čermák
0e5bd48b73 Use explicit Python fix version in GH actions
Specify explicitly Python 3.14.3, as the setup-python action otherwise default
to 3.14.2 when 3.14.3, leading to different version in CI and in production.
2026-02-23 14:24:53 +01:00
Jan Čermák
d57b5e0166 Update wheels ABI in the wheels builder to cp314 2026-02-23 14:12:10 +01:00
Jan Čermák
662a7ae6e6 Use Python 3.14(.3) in CI and base image
Update base image to the latest tag using Python 3.14.3 and update Python
version in CI workflows to 3.14.

With Python 3.14, backports.zstd is no longer necessary as it's now available
in the standard library.
2026-02-23 14:08:11 +01:00
22 changed files with 148 additions and 292 deletions

View File

@@ -91,8 +91,8 @@ availability.
### Python Requirements
- **Compatibility**: Python 3.13+
- **Language Features**: Use modern Python features:
- **Compatibility**: Python 3.14+
- **Language Features**: Use modern Python features:
- Type hints with `typing` module
- f-strings (preferred over `%` or `.format()`)
- Dataclasses and enum classes

View File

@@ -33,7 +33,7 @@ on:
- setup.py
env:
DEFAULT_PYTHON: "3.13"
DEFAULT_PYTHON: "3.14.3"
COSIGN_VERSION: "v2.5.3"
CRANE_VERSION: "v0.20.7"
CRANE_SHA256: "8ef3564d264e6b5ca93f7b7f5652704c4dd29d33935aff6947dd5adefd05953e"
@@ -106,7 +106,7 @@ jobs:
- runs-on: ubuntu-24.04-arm
arch: aarch64
env:
WHEELS_ABI: cp313
WHEELS_ABI: cp314
WHEELS_TAG: musllinux_1_2
WHEELS_APK_DEPS: "libffi-dev;openssl-dev;yaml-dev"
WHEELS_SKIP_BINARY: aiohttp
@@ -205,7 +205,7 @@ jobs:
# home-assistant/builder doesn't support sha pinning
- name: Build supervisor
uses: home-assistant/builder@2026.02.1
uses: home-assistant/builder@2025.11.0
with:
image: ${{ matrix.arch }}
args: |
@@ -259,7 +259,7 @@ jobs:
# home-assistant/builder doesn't support sha pinning
- name: Build the Supervisor
if: needs.init.outputs.publish != 'true'
uses: home-assistant/builder@2026.02.1
uses: home-assistant/builder@2025.11.0
with:
args: |
--test \

View File

@@ -8,7 +8,7 @@ on:
pull_request: ~
env:
DEFAULT_PYTHON: "3.13"
DEFAULT_PYTHON: "3.14.3"
PRE_COMMIT_CACHE: ~/.cache/pre-commit
MYPY_CACHE_VERSION: 1

View File

@@ -1,7 +1,7 @@
image: ghcr.io/home-assistant/{arch}-hassio-supervisor
build_from:
aarch64: ghcr.io/home-assistant/aarch64-base-python:3.13-alpine3.22-2025.12.2
amd64: ghcr.io/home-assistant/amd64-base-python:3.13-alpine3.22-2025.12.2
aarch64: ghcr.io/home-assistant/aarch64-base-python:3.14-alpine3.22-2026.02.0
amd64: ghcr.io/home-assistant/amd64-base-python:3.14-alpine3.22-2026.02.0
cosign:
base_identity: https://github.com/home-assistant/docker-base/.*
identity: https://github.com/home-assistant/supervisor/.*

View File

@@ -4,7 +4,6 @@ aiohttp==3.13.3
atomicwrites-homeassistant==1.4.1
attrs==25.4.0
awesomeversion==25.8.0
backports.zstd==1.3.0
blockbuster==1.5.26
brotli==1.2.0
ciso8601==2.3.3
@@ -23,7 +22,7 @@ pulsectl==24.12.0
pyudev==0.24.4
PyYAML==6.0.3
requests==2.32.5
securetar==2026.2.0
securetar==2025.12.0
sentry-sdk==2.53.0
setuptools==82.0.0
voluptuous==0.16.0

View File

@@ -191,18 +191,18 @@ class Addon(AddonModel):
self._startup_event.set()
# Dismiss boot failed issue if present and we started
if new_state == AddonState.STARTED and (
issue := self.sys_resolution.get_issue_if_present(self.boot_failed_issue)
if (
new_state == AddonState.STARTED
and self.boot_failed_issue in self.sys_resolution.issues
):
self.sys_resolution.dismiss_issue(issue)
self.sys_resolution.dismiss_issue(self.boot_failed_issue)
# Dismiss device access missing issue if present and we stopped
if new_state == AddonState.STOPPED and (
issue := self.sys_resolution.get_issue_if_present(
self.device_access_missing_issue
)
if (
new_state == AddonState.STOPPED
and self.device_access_missing_issue in self.sys_resolution.issues
):
self.sys_resolution.dismiss_issue(issue)
self.sys_resolution.dismiss_issue(self.device_access_missing_issue)
self.sys_homeassistant.websocket.supervisor_event_custom(
WSEvent.ADDON,
@@ -363,10 +363,11 @@ class Addon(AddonModel):
self.persist[ATTR_BOOT] = value
# Dismiss boot failed issue if present and boot at start disabled
if value == AddonBoot.MANUAL and (
issue := self.sys_resolution.get_issue_if_present(self._boot_failed_issue)
if (
value == AddonBoot.MANUAL
and self._boot_failed_issue in self.sys_resolution.issues
):
self.sys_resolution.dismiss_issue(issue)
self.sys_resolution.dismiss_issue(self._boot_failed_issue)
@property
def auto_update(self) -> bool:

View File

@@ -19,6 +19,7 @@ from ..const import (
ATTR_UNSUPPORTED,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APINotFound, ResolutionNotFound
from ..resolution.checks.base import CheckBase
from ..resolution.data import Issue, Suggestion
from .utils import api_process, api_validate
@@ -31,17 +32,24 @@ class APIResoulution(CoreSysAttributes):
def _extract_issue(self, request: web.Request) -> Issue:
"""Extract issue from request or raise."""
return self.sys_resolution.get_issue_by_id(request.match_info["issue"])
try:
return self.sys_resolution.get_issue(request.match_info["issue"])
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid issue") from None
def _extract_suggestion(self, request: web.Request) -> Suggestion:
"""Extract suggestion from request or raise."""
return self.sys_resolution.get_suggestion_by_id(
request.match_info["suggestion"]
)
try:
return self.sys_resolution.get_suggestion(request.match_info["suggestion"])
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid suggestion") from None
def _extract_check(self, request: web.Request) -> CheckBase:
"""Extract check from request or raise."""
return self.sys_resolution.check.get(request.match_info["check"])
try:
return self.sys_resolution.check.get(request.match_info["check"])
except ResolutionNotFound:
raise APINotFound("The supplied check slug is not available") from None
def _generate_suggestion_information(self, suggestion: Suggestion):
"""Generate suggestion information for response."""

View File

@@ -12,19 +12,13 @@ import json
import logging
from pathlib import Path, PurePath
import tarfile
from tarfile import TarFile
from tempfile import TemporaryDirectory
import time
from typing import Any, Self, cast
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from securetar import (
AddFileError,
InvalidPasswordError,
SecureTarArchive,
SecureTarFile,
SecureTarReadError,
atomic_contents_add,
)
from securetar import AddFileError, SecureTarFile, atomic_contents_add
import voluptuous as vol
from voluptuous.humanize import humanize_error
@@ -65,7 +59,7 @@ from ..utils import remove_folder
from ..utils.dt import parse_datetime, utcnow
from ..utils.json import json_bytes
from ..utils.sentinel import DEFAULT
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, SECURETAR_CREATE_VERSION, BackupType
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, BackupType
from .validate import SCHEMA_BACKUP
IGNORED_COMPARISON_FIELDS = {ATTR_PROTECTED, ATTR_CRYPTO, ATTR_DOCKER}
@@ -105,7 +99,7 @@ class Backup(JobGroup):
)
self._data: dict[str, Any] = data or {ATTR_SLUG: slug}
self._tmp: TemporaryDirectory | None = None
self._outer_secure_tarfile: SecureTarArchive | None = None
self._outer_secure_tarfile: SecureTarFile | None = None
self._password: str | None = None
self._locations: dict[str | None, BackupLocation] = {
location: BackupLocation(
@@ -204,6 +198,16 @@ class Backup(JobGroup):
"""Get extra metadata added by client."""
return self._data[ATTR_EXTRA]
@property
def docker(self) -> dict[str, Any]:
"""Return backup Docker config data."""
return self._data.get(ATTR_DOCKER, {})
@docker.setter
def docker(self, value: dict[str, Any]) -> None:
"""Set the Docker config data."""
self._data[ATTR_DOCKER] = value
@property
def location(self) -> str | None:
"""Return the location of the backup."""
@@ -360,17 +364,15 @@ class Backup(JobGroup):
test_tar_file = backup.extractfile(test_tar_name)
try:
with SecureTarFile(
ending, # Not used
gzip=self.compressed,
mode="r",
fileobj=test_tar_file,
password=self._password,
):
# If we can read the tar file, the password is correct
return
except (
tarfile.ReadError,
SecureTarReadError,
InvalidPasswordError,
) as ex:
except tarfile.ReadError as ex:
raise BackupInvalidError(
f"Invalid password for backup {self.slug}", _LOGGER.error
) from ex
@@ -439,7 +441,7 @@ class Backup(JobGroup):
async def create(self) -> AsyncGenerator[None]:
"""Create new backup file."""
def _open_outer_tarfile() -> SecureTarArchive:
def _open_outer_tarfile() -> tuple[SecureTarFile, tarfile.TarFile]:
"""Create and open outer tarfile."""
if self.tarfile.is_file():
raise BackupFileExistError(
@@ -447,15 +449,14 @@ class Backup(JobGroup):
_LOGGER.error,
)
_outer_secure_tarfile = SecureTarArchive(
_outer_secure_tarfile = SecureTarFile(
self.tarfile,
"w",
gzip=False,
bufsize=BUF_SIZE,
create_version=SECURETAR_CREATE_VERSION,
password=self._password,
)
try:
_outer_secure_tarfile.open()
_outer_tarfile = _outer_secure_tarfile.open()
except PermissionError as ex:
raise BackupPermissionError(
f"Cannot open backup file {self.tarfile.as_posix()}, permission error!",
@@ -467,9 +468,11 @@ class Backup(JobGroup):
_LOGGER.error,
) from ex
return _outer_secure_tarfile
return _outer_secure_tarfile, _outer_tarfile
outer_secure_tarfile = await self.sys_run_in_executor(_open_outer_tarfile)
outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
_open_outer_tarfile
)
self._outer_secure_tarfile = outer_secure_tarfile
def _close_outer_tarfile() -> int:
@@ -480,7 +483,7 @@ class Backup(JobGroup):
try:
yield
finally:
await self._create_finalize(outer_secure_tarfile)
await self._create_cleanup(outer_tarfile)
size_bytes = await self.sys_run_in_executor(_close_outer_tarfile)
self._locations[self.location].size_bytes = size_bytes
self._outer_secure_tarfile = None
@@ -540,11 +543,11 @@ class Backup(JobGroup):
if self._tmp:
await self.sys_run_in_executor(self._tmp.cleanup)
async def _create_finalize(self, outer_archive: SecureTarArchive) -> None:
"""Finalize backup creation.
async def _create_cleanup(self, outer_tarfile: TarFile) -> None:
"""Cleanup after backup creation.
Separate method to be called from create to ensure that the backup is
finalized.
Separate method to be called from create to ensure
that cleanup is always performed, even if an exception is raised.
"""
# validate data
try:
@@ -563,7 +566,7 @@ class Backup(JobGroup):
tar_info = tarfile.TarInfo(name="./backup.json")
tar_info.size = len(raw_bytes)
tar_info.mtime = int(time.time())
outer_archive.tar.addfile(tar_info, fileobj=fileobj)
outer_tarfile.addfile(tar_info, fileobj=fileobj)
try:
await self.sys_run_in_executor(_add_backup_json)
@@ -590,9 +593,10 @@ class Backup(JobGroup):
tar_name = f"{slug}.tar{'.gz' if self.compressed else ''}"
addon_file = self._outer_secure_tarfile.create_tar(
addon_file = self._outer_secure_tarfile.create_inner_tar(
f"./{tar_name}",
gzip=self.compressed,
password=self._password,
)
# Take backup
try:
@@ -642,6 +646,7 @@ class Backup(JobGroup):
tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}"
addon_file = SecureTarFile(
Path(self._tmp.name, tar_name),
"r",
gzip=self.compressed,
bufsize=BUF_SIZE,
password=self._password,
@@ -737,9 +742,10 @@ class Backup(JobGroup):
return False
with outer_secure_tarfile.create_tar(
with outer_secure_tarfile.create_inner_tar(
f"./{tar_name}",
gzip=self.compressed,
password=self._password,
) as tar_file:
atomic_contents_add(
tar_file,
@@ -799,6 +805,7 @@ class Backup(JobGroup):
_LOGGER.info("Restore folder %s", name)
with SecureTarFile(
tar_name,
"r",
gzip=self.compressed,
bufsize=BUF_SIZE,
password=self._password,
@@ -866,9 +873,10 @@ class Backup(JobGroup):
tar_name = f"homeassistant.tar{'.gz' if self.compressed else ''}"
# Backup Home Assistant Core config directory
homeassistant_file = self._outer_secure_tarfile.create_tar(
homeassistant_file = self._outer_secure_tarfile.create_inner_tar(
f"./{tar_name}",
gzip=self.compressed,
password=self._password,
)
await self.sys_homeassistant.backup(homeassistant_file, exclude_database)
@@ -892,6 +900,7 @@ class Backup(JobGroup):
)
homeassistant_file = SecureTarFile(
tar_name,
"r",
gzip=self.compressed,
bufsize=BUF_SIZE,
password=self._password,

View File

@@ -6,7 +6,6 @@ from typing import Literal
from ..mounts.mount import Mount
BUF_SIZE = 2**20 * 4 # 4MB
SECURETAR_CREATE_VERSION = 2
DEFAULT_FREEZE_TIMEOUT = 600
LOCATION_CLOUD_BACKUP = ".cloud_backup"

View File

@@ -14,6 +14,7 @@ from ..const import (
ATTR_CRYPTO,
ATTR_DATE,
ATTR_DAYS_UNTIL_STALE,
ATTR_DOCKER,
ATTR_EXCLUDE_DATABASE,
ATTR_EXTRA,
ATTR_FOLDERS,
@@ -34,7 +35,7 @@ from ..const import (
FOLDER_SSL,
)
from ..store.validate import repositories
from ..validate import version_tag
from ..validate import SCHEMA_DOCKER_CONFIG, version_tag
ALL_FOLDERS = [
FOLDER_SHARE,
@@ -113,6 +114,7 @@ SCHEMA_BACKUP = vol.Schema(
)
),
),
vol.Optional(ATTR_DOCKER, default=dict): SCHEMA_DOCKER_CONFIG,
vol.Optional(ATTR_FOLDERS, default=list): vol.All(
v1_folderlist, [vol.In(ALL_FOLDERS)], vol.Unique()
),

View File

@@ -874,12 +874,11 @@ class DockerAddon(DockerInterface):
await super().stop(remove_container)
# If there is a device access issue and the container is removed, clear it
if remove_container and (
issue := self.sys_resolution.get_issue_if_present(
self.addon.device_access_missing_issue
)
if (
remove_container
and self.addon.device_access_missing_issue in self.sys_resolution.issues
):
self.sys_resolution.dismiss_issue(issue)
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
@Job(
name="docker_addon_hardware_events",

View File

@@ -46,7 +46,7 @@ class HassioNotSupportedError(HassioError):
# API
class APIError(HassioError):
class APIError(HassioError, RuntimeError):
"""API errors."""
status = 400
@@ -964,44 +964,6 @@ class ResolutionFixupJobError(ResolutionFixupError, JobException):
"""Raise on job error."""
class ResolutionCheckNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
"""Raise if check does not exist."""
error_key = "resolution_check_not_found_error"
message_template = "Check '{check}' does not exist"
def __init__(
self, logger: Callable[..., None] | None = None, *, check: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"check": check}
super().__init__(None, logger)
class ResolutionIssueNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
"""Raise if issue does not exist."""
error_key = "resolution_issue_not_found_error"
message_template = "Issue {uuid} does not exist"
def __init__(self, logger: Callable[..., None] | None = None, *, uuid: str) -> None:
"""Initialize exception."""
self.extra_fields = {"uuid": uuid}
super().__init__(None, logger)
class ResolutionSuggestionNotFound(ResolutionNotFound, APINotFound): # pylint: disable=too-many-ancestors
"""Raise if suggestion does not exist."""
error_key = "resolution_suggestion_not_found_error"
message_template = "Suggestion {uuid} does not exist"
def __init__(self, logger: Callable[..., None] | None = None, *, uuid: str) -> None:
"""Initialize exception."""
self.extra_fields = {"uuid": uuid}
super().__init__(None, logger)
# Store

View File

@@ -215,10 +215,10 @@ class Mount(CoreSysAttributes, ABC):
await self._update_state(unit)
# If active, dismiss corresponding failed mount issue if found
if (mounted := await self.is_mounted()) and (
issue := self.sys_resolution.get_issue_if_present(self.failed_issue)
):
self.sys_resolution.dismiss_issue(issue)
if (
mounted := await self.is_mounted()
) and self.failed_issue in self.sys_resolution.issues:
self.sys_resolution.dismiss_issue(self.failed_issue)
return mounted
@@ -361,8 +361,8 @@ class Mount(CoreSysAttributes, ABC):
await self._restart()
# If it is mounted now, dismiss corresponding issue if present
if issue := self.sys_resolution.get_issue_if_present(self.failed_issue):
self.sys_resolution.dismiss_issue(issue)
if self.failed_issue in self.sys_resolution.issues:
self.sys_resolution.dismiss_issue(self.failed_issue)
async def _restart(self) -> None:
"""Restart mount unit to re-mount."""

View File

@@ -6,7 +6,7 @@ from typing import Any
from ..const import ATTR_CHECKS
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import ResolutionCheckNotFound
from ..exceptions import ResolutionNotFound
from ..utils.sentry import async_capture_exception
from .checks.base import CheckBase
from .validate import get_valid_modules
@@ -50,7 +50,7 @@ class ResolutionCheck(CoreSysAttributes):
if slug in self._checks:
return self._checks[slug]
raise ResolutionCheckNotFound(check=slug)
raise ResolutionNotFound(f"Check with slug {slug} not found!")
async def check_system(self) -> None:
"""Check the system."""

View File

@@ -7,11 +7,7 @@ import attr
from ..bus import EventListener
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
ResolutionError,
ResolutionIssueNotFound,
ResolutionSuggestionNotFound,
)
from ..exceptions import ResolutionError, ResolutionNotFound
from ..homeassistant.const import WSEvent
from ..utils.common import FileConfiguration
from .check import ResolutionCheck
@@ -169,37 +165,21 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
]
}
def get_suggestion_by_id(self, uuid: str) -> Suggestion:
def get_suggestion(self, uuid: str) -> Suggestion:
"""Return suggestion with uuid."""
for suggestion in self._suggestions:
if suggestion.uuid != uuid:
continue
return suggestion
raise ResolutionSuggestionNotFound(uuid=uuid)
raise ResolutionNotFound()
def get_suggestion_if_present(self, suggestion: Suggestion) -> Suggestion | None:
"""Get suggestion matching provided one if it exists in resolution manager."""
for s in self._suggestions:
if s != suggestion:
continue
return s
return None
def get_issue_by_id(self, uuid: str) -> Issue:
def get_issue(self, uuid: str) -> Issue:
"""Return issue with uuid."""
for issue in self._issues:
if issue.uuid != uuid:
continue
return issue
raise ResolutionIssueNotFound(uuid=uuid)
def get_issue_if_present(self, issue: Issue) -> Issue | None:
"""Get issue matching provided one if it exists in resolution manager."""
for i in self._issues:
if i != issue:
continue
return i
return None
raise ResolutionNotFound()
def create_issue(
self,
@@ -254,13 +234,20 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
async def apply_suggestion(self, suggestion: Suggestion) -> None:
"""Apply suggested action."""
suggestion = self.get_suggestion_by_id(suggestion.uuid)
if suggestion not in self._suggestions:
raise ResolutionError(
f"Suggestion {suggestion.uuid} is not valid", _LOGGER.warning
)
await self.fixup.apply_fixup(suggestion)
await self.healthcheck()
def dismiss_suggestion(self, suggestion: Suggestion) -> None:
"""Dismiss suggested action."""
suggestion = self.get_suggestion_by_id(suggestion.uuid)
if suggestion not in self._suggestions:
raise ResolutionError(
f"The UUID {suggestion.uuid} is not valid suggestion", _LOGGER.warning
)
self._suggestions.remove(suggestion)
# Remove event listeners if present
@@ -276,7 +263,10 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
def dismiss_issue(self, issue: Issue) -> None:
"""Dismiss suggested action."""
issue = self.get_issue_by_id(issue.uuid)
if issue not in self._issues:
raise ResolutionError(
f"The UUID {issue.uuid} is not a valid issue", _LOGGER.warning
)
self._issues.remove(issue)
# Event on issue removal

View File

@@ -12,7 +12,7 @@ import aiodocker
from aiodocker.containers import DockerContainer
from awesomeversion import AwesomeVersion
import pytest
from securetar import SecureTarArchive, SecureTarFile
from securetar import SecureTarFile
from supervisor.addons.addon import Addon
from supervisor.addons.const import AddonBackupMode
@@ -34,8 +34,6 @@ from supervisor.exceptions import (
)
from supervisor.hardware.helper import HwHelper
from supervisor.ingress import Ingress
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue
from supervisor.utils.dt import utcnow
from .test_manager import BOOT_FAIL_ISSUE, BOOT_FAIL_SUGGESTIONS
@@ -438,11 +436,8 @@ async def test_backup(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
assert await install_addon_ssh.backup(tar_file) is None
archive.close()
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
assert await install_addon_ssh.backup(tarfile) is None
@pytest.mark.parametrize("status", ["running", "stopped"])
@@ -462,11 +457,8 @@ async def test_backup_no_config(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
assert await install_addon_ssh.backup(tar_file) is None
archive.close()
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
assert await install_addon_ssh.backup(tarfile) is None
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
@@ -481,17 +473,14 @@ async def test_backup_with_pre_post_command(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
with (
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
patch.object(
Addon, "backup_post", new=PropertyMock(return_value="backup_post")
),
):
assert await install_addon_ssh.backup(tar_file) is None
archive.close()
assert await install_addon_ssh.backup(tarfile) is None
assert container.exec.call_count == 2
assert container.exec.call_args_list[0].args[0] == "backup_pre"
@@ -554,18 +543,15 @@ async def test_backup_with_pre_command_error(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
with (
patch.object(DockerAddon, "is_running", return_value=True),
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
pytest.raises(exc_type_raised),
):
assert await install_addon_ssh.backup(tar_file) is None
assert await install_addon_ssh.backup(tarfile) is None
assert not tar_file.path.exists()
archive.close()
assert not tarfile.path.exists()
@pytest.mark.parametrize("status", ["running", "stopped"])
@@ -582,9 +568,7 @@ async def test_backup_cold_mode(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
with (
patch.object(
AddonModel,
@@ -595,8 +579,7 @@ async def test_backup_cold_mode(
DockerAddon, "is_running", side_effect=[status == "running", False, False]
),
):
start_task = await install_addon_ssh.backup(tar_file)
archive.close()
start_task = await install_addon_ssh.backup(tarfile)
assert bool(start_task) is (status == "running")
@@ -624,9 +607,7 @@ async def test_backup_cold_mode_with_watchdog(
# Patching out the normal end of backup process leaves the container in a stopped state
# Watchdog should still not try to restart it though, it should remain this way
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
archive.open()
tar_file = archive.create_tar("./test.tar.gz")
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
with (
patch.object(Addon, "start") as start,
patch.object(Addon, "restart") as restart,
@@ -638,11 +619,10 @@ async def test_backup_cold_mode_with_watchdog(
new=PropertyMock(return_value=AddonBackupMode.COLD),
),
):
await install_addon_ssh.backup(tar_file)
await install_addon_ssh.backup(tarfile)
await asyncio.sleep(0)
start.assert_not_called()
restart.assert_not_called()
archive.close()
@pytest.mark.parametrize("status", ["running", "stopped"])
@@ -655,7 +635,7 @@ async def test_restore(coresys: CoreSys, install_addon_ssh: Addon, status: str)
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"))
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"), "r")
with patch.object(DockerAddon, "is_running", return_value=False):
start_task = await coresys.addons.restore(TEST_ADDON_SLUG, tarfile)
@@ -675,7 +655,7 @@ async def test_restore_while_running(
install_addon_ssh.path_data.mkdir()
await install_addon_ssh.load()
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
with (
patch.object(DockerAddon, "is_running", return_value=True),
patch.object(Ingress, "update_hass_panel"),
@@ -708,7 +688,7 @@ async def test_restore_while_running_with_watchdog(
# We restore a stopped backup so restore will not restart it
# Watchdog will see it stop and should not attempt reanimation either
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
with (
patch.object(Addon, "start") as start,
patch.object(Addon, "restart") as restart,
@@ -996,40 +976,16 @@ async def test_addon_manual_only_boot(install_addon_example: Addon):
assert install_addon_example.boot == "manual"
@pytest.mark.parametrize(
("initial_state", "target_state", "issue", "suggestions"),
[
(
AddonState.ERROR,
AddonState.STARTED,
BOOT_FAIL_ISSUE,
[suggestion.type for suggestion in BOOT_FAIL_SUGGESTIONS],
),
(
AddonState.STARTED,
AddonState.STOPPED,
Issue(
IssueType.DEVICE_ACCESS_MISSING,
ContextType.ADDON,
reference=TEST_ADDON_SLUG,
),
[SuggestionType.EXECUTE_RESTART],
),
],
)
async def test_addon_state_dismisses_issue(
coresys: CoreSys,
install_addon_ssh: Addon,
initial_state: AddonState,
target_state: AddonState,
issue: Issue,
suggestions: list[SuggestionType],
async def test_addon_start_dismisses_boot_fail(
coresys: CoreSys, install_addon_ssh: Addon
):
"""Test an addon state change dismisses the issues."""
install_addon_ssh.state = initial_state
coresys.resolution.add_issue(issue, suggestions)
"""Test a successful start dismisses the boot fail issue."""
install_addon_ssh.state = AddonState.ERROR
coresys.resolution.add_issue(
BOOT_FAIL_ISSUE, [suggestion.type for suggestion in BOOT_FAIL_SUGGESTIONS]
)
install_addon_ssh.state = target_state
install_addon_ssh.state = AddonState.STARTED
assert coresys.resolution.issues == []
assert coresys.resolution.suggestions == []

View File

@@ -1,6 +1,5 @@
"""Test Resolution API."""
from http import HTTPStatus
from unittest.mock import AsyncMock
from aiohttp.test_utils import TestClient
@@ -47,7 +46,7 @@ async def test_api_resolution_base(coresys: CoreSys, api_client: TestClient):
async def test_api_resolution_dismiss_suggestion(
coresys: CoreSys, api_client: TestClient
):
"""Test resolution manager dismiss suggestion api."""
"""Test resolution manager suggestion apply api."""
coresys.resolution.add_suggestion(
clear_backup := Suggestion(SuggestionType.CLEAR_FULL_BACKUP, ContextType.SYSTEM)
)
@@ -190,9 +189,7 @@ async def test_issue_not_found(api_client: TestClient, method: str, url: str):
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Issue bad does not exist"
assert body["error_key"] == "resolution_issue_not_found_error"
assert body["extra_fields"] == {"uuid": "bad"}
assert body["message"] == "The supplied UUID is not a valid issue"
@pytest.mark.parametrize(
@@ -204,9 +201,7 @@ async def test_suggestion_not_found(api_client: TestClient, method: str, url: st
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Suggestion bad does not exist"
assert body["error_key"] == "resolution_suggestion_not_found_error"
assert body["extra_fields"] == {"uuid": "bad"}
assert body["message"] == "The supplied UUID is not a valid suggestion"
@pytest.mark.parametrize(
@@ -216,8 +211,6 @@ async def test_suggestion_not_found(api_client: TestClient, method: str, url: st
async def test_check_not_found(api_client: TestClient, method: str, url: str):
"""Test check not found error."""
resp = await api_client.request(method, url)
assert resp.status == HTTPStatus.NOT_FOUND
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Check 'bad' does not exist"
assert body["error_key"] == "resolution_check_not_found_error"
assert body["extra_fields"] == {"check": "bad"}
assert body["message"] == "The supplied check slug is not available"

View File

@@ -8,7 +8,7 @@ import tarfile
from unittest.mock import MagicMock, patch
import pytest
from securetar import AddFileError, InvalidPasswordError, SecureTarReadError
from securetar import AddFileError
from supervisor.addons.addon import Addon
from supervisor.backups.backup import Backup, BackupLocation
@@ -234,21 +234,7 @@ async def test_consolidate_failure(coresys: CoreSys, tmp_path: Path):
pytest.raises(
BackupInvalidError, match="Invalid password for backup 93b462f8"
),
), # Invalid password (legacy securetar exception)
(
None,
SecureTarReadError,
pytest.raises(
BackupInvalidError, match="Invalid password for backup 93b462f8"
),
), # Invalid password (securetar >= 2026.2.0 raises SecureTarReadError)
(
None,
InvalidPasswordError,
pytest.raises(
BackupInvalidError, match="Invalid password for backup 93b462f8"
),
), # Invalid password (securetar >= 2026.2.0 with v3 backup raises InvalidPasswordError)
), # Invalid password
],
)
async def test_validate_backup(
@@ -258,12 +244,7 @@ async def test_validate_backup(
securetar_side_effect: type[Exception] | None,
expected_exception: AbstractContextManager,
):
"""Parameterized test for validate_backup.
Note that it is paramount that BackupInvalidError is raised for invalid password
cases, as this is used by the Core to determine if a backup password is invalid
and offer a input field to the user to input the correct password.
"""
"""Parameterized test for validate_backup."""
enc_tar = Path(copy(get_fixture_path("backup_example_enc.tar"), tmp_path))
enc_backup = Backup(coresys, enc_tar, "test", None)
await enc_backup.load()
@@ -292,44 +273,3 @@ async def test_validate_backup(
expected_exception,
):
await enc_backup.validate_backup(None)
@pytest.mark.parametrize(
("password", "expected_exception"),
[
("supervisor", does_not_raise()),
(
"wrong_password",
pytest.raises(
BackupInvalidError, match="Invalid password for backup f92f0339"
),
),
(
None,
pytest.raises(
BackupInvalidError, match="Invalid password for backup f92f0339"
),
),
],
)
async def test_validate_backup_v3(
coresys: CoreSys,
tmp_path: Path,
password: str | None,
expected_exception: AbstractContextManager,
):
"""Test validate_backup with a real SecureTar v3 encrypted backup.
SecureTar v3 uses Argon2id key derivation and raises InvalidPasswordError
on wrong passwords. It is paramount that BackupInvalidError is raised for
invalid password cases, as this is used by the Core to determine if a backup
password is invalid and offer a dialog to the user to input the correct
password.
"""
v3_tar = Path(copy(get_fixture_path("backup_example_sec_v3.tar"), tmp_path))
v3_backup = Backup(coresys, v3_tar, "test", None)
await v3_backup.load()
v3_backup.set_password(password)
with expected_exception:
await v3_backup.validate_backup(None)

View File

@@ -167,7 +167,7 @@ async def test_homeassistant_restore_rejects_path_traversal(
traversal_info.size = 9
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
tar_file = SecureTarFile(tar_path, gzip=True)
tar_file = SecureTarFile(tar_path, "r", gzip=True)
with pytest.raises(BackupInvalidError):
await coresys.homeassistant.restore(tar_file)
@@ -181,7 +181,7 @@ async def test_addon_restore_rejects_path_traversal(
traversal_info.size = 9
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
tar_file = SecureTarFile(tar_path, gzip=True)
tar_file = SecureTarFile(tar_path, "r", gzip=True)
with pytest.raises(BackupInvalidError):
await install_addon_ssh.restore(tar_file)
@@ -203,7 +203,7 @@ async def test_addon_restore_rejects_symlink_escape(
{"escape/evil.py": b"malicious"},
)
tar_file = SecureTarFile(tar_path, gzip=True)
tar_file = SecureTarFile(tar_path, "r", gzip=True)
with pytest.raises(BackupInvalidError):
await install_addon_ssh.restore(tar_file)

View File

@@ -23,7 +23,7 @@ from blockbuster import BlockBuster, BlockBusterFunction
from dbus_fast import BusType
from dbus_fast.aio.message_bus import MessageBus
import pytest
from securetar import SecureTarArchive
from securetar import SecureTarFile
from supervisor import config as su_config
from supervisor.addons.addon import Addon
@@ -848,7 +848,7 @@ async def backups(
for i in range(request.param if hasattr(request, "param") else 5):
slug = f"sn{i + 1}"
temp_tar = Path(tmp_path, f"{slug}.tar")
with SecureTarArchive(temp_tar, "w"):
with SecureTarFile(temp_tar, "w"):
pass
backup = Backup(coresys, temp_tar, slug, None)
backup._data = { # pylint: disable=protected-access

Binary file not shown.

View File

@@ -43,9 +43,7 @@ async def test_reading_addon_files_error(coresys: CoreSys):
assert reset_repo in coresys.resolution.suggestions
assert coresys.core.healthy is True
coresys.resolution.dismiss_issue(
coresys.resolution.get_issue_if_present(corrupt_repo)
)
coresys.resolution.dismiss_issue(corrupt_repo)
err.errno = errno.EBADMSG
assert (await coresys.store.data._find_addon_configs(Path("test"), {})) is None
assert corrupt_repo in coresys.resolution.issues