Compare commits

..

6 Commits

Author SHA1 Message Date
Stefan Agner
2545d6b841 Fix pytest 2025-10-10 08:45:38 +02:00
Stefan Agner
ba24bbd0d0 Fix pytest 2025-10-08 15:23:27 +02:00
Stefan Agner
418a8cc86a Drop unused tests 2025-10-08 10:36:29 +02:00
Stefan Agner
7dc6be51c5 Drop CodeNotary integrity fixups 2025-10-07 16:40:44 +02:00
Stefan Agner
d3e7df34d2 Remove CodeNotary specific integrity checking
The current code is specific to how CodeNotary was doing integrity
checking. A future integrity checking mechanism likely will work
differently (e.g. through EROFS based containers). Remove the current
code to make way for a future implementation.
2025-10-07 15:52:48 +02:00
Stefan Agner
4b24255fa4 Formally deprecate CodeNotary build config 2025-10-07 15:44:39 +02:00
49 changed files with 797 additions and 2212 deletions

View File

@@ -16,7 +16,6 @@ jobs:
days-before-close: 7
stale-issue-label: "stale"
exempt-issue-labels: "no-stale,Help%20wanted,help-wanted,pinned,rfc,security"
only-issue-types: "bug"
stale-issue-message: >
There hasn't been any activity on this issue recently. Due to the
high number of incoming GitHub notifications, we have to clean some

View File

@@ -1 +0,0 @@
.github/copilot-instructions.md

View File

@@ -1,5 +1,4 @@
aiodns==3.5.0
aiodocker==0.24.0
aiohttp==3.13.0
atomicwrites-homeassistant==1.4.1
attrs==25.4.0
@@ -20,11 +19,11 @@ jinja2==3.1.6
log-rate-limit==1.4.2
orjson==3.11.3
pulsectl==24.12.0
pyudev==0.24.4
pyudev==0.24.3
PyYAML==6.0.3
requests==2.32.5
securetar==2025.2.1
sentry-sdk==2.41.0
sentry-sdk==2.40.0
setuptools==80.9.0
voluptuous==0.15.2
dbus-fast==2.44.5

View File

@@ -1,16 +1,16 @@
astroid==4.0.1
astroid==3.3.11
coverage==7.10.7
mypy==1.18.2
pre-commit==4.3.0
pylint==4.0.1
pylint==3.3.9
pytest-aiohttp==1.1.0
pytest-asyncio==0.25.2
pytest-cov==7.0.0
pytest-timeout==2.4.0
pytest==8.4.2
ruff==0.14.0
ruff==0.13.3
time-machine==2.19.0
types-docker==7.1.0.20251009
types-docker==7.1.0.20250916
types-pyyaml==6.0.12.20250915
types-requests==2.32.4.20250913
urllib3==2.5.0

View File

@@ -226,7 +226,6 @@ class Addon(AddonModel):
)
await self._check_ingress_port()
default_image = self._image(self.data)
try:
await self.instance.attach(version=self.version)
@@ -775,6 +774,7 @@ class Addon(AddonModel):
raise AddonsError("Missing from store, cannot install!")
await self.sys_addons.data.install(self.addon_store)
await self.load()
def setup_data():
if not self.path_data.is_dir():
@@ -797,9 +797,6 @@ class Addon(AddonModel):
await self.sys_addons.data.uninstall(self)
raise AddonsError() from err
# Finish initialization and set up listeners
await self.load()
# Add to addon manager
self.sys_addons.local[self.slug] = self
@@ -1513,13 +1510,6 @@ class Addon(AddonModel):
_LOGGER.info("Finished restore for add-on %s", self.slug)
return wait_for_start
def check_trust(self) -> Awaitable[None]:
"""Calculate Addon docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
@Job(
name="addon_restart_after_problem",
throttle_period=WATCHDOG_THROTTLE_PERIOD,

View File

@@ -9,6 +9,8 @@ from typing import Self, Union
from attr import evolve
from supervisor.jobs.const import JobConcurrency
from ..const import AddonBoot, AddonStartup, AddonState
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
@@ -19,8 +21,6 @@ from ..exceptions import (
DockerError,
HassioError,
)
from ..jobs import ChildJobSyncFilter
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..store.addon import AddonStore
@@ -182,9 +182,6 @@ class AddonManager(CoreSysAttributes):
conditions=ADDON_UPDATE_CONDITIONS,
on_condition=AddonsJobError,
concurrency=JobConcurrency.QUEUE,
child_job_syncs=[
ChildJobSyncFilter("docker_interface_install", progress_allocation=1.0)
],
)
async def install(
self, slug: str, *, validation_complete: asyncio.Event | None = None
@@ -232,13 +229,6 @@ class AddonManager(CoreSysAttributes):
name="addon_manager_update",
conditions=ADDON_UPDATE_CONDITIONS,
on_condition=AddonsJobError,
# We assume for now the docker image pull is 100% of this task for progress
# allocation. But from a user perspective that isn't true. Other steps
# that take time which is not accounted for in progress include:
# partial backup, image cleanup, apparmor update, and addon restart
child_job_syncs=[
ChildJobSyncFilter("docker_interface_install", progress_allocation=1.0)
],
)
async def update(
self,
@@ -281,10 +271,7 @@ class AddonManager(CoreSysAttributes):
addons=[addon.slug],
)
task = await addon.update()
_LOGGER.info("Add-on '%s' successfully updated", slug)
return task
return await addon.update()
@Job(
name="addon_manager_rebuild",

View File

@@ -72,7 +72,6 @@ from ..const import (
ATTR_TYPE,
ATTR_UART,
ATTR_UDEV,
ATTR_ULIMITS,
ATTR_URL,
ATTR_USB,
ATTR_VERSION,
@@ -463,11 +462,6 @@ class AddonModel(JobGroup, ABC):
"""Return True if the add-on have his own udev."""
return self.data[ATTR_UDEV]
@property
def ulimits(self) -> dict[str, Any]:
"""Return ulimits configuration."""
return self.data[ATTR_ULIMITS]
@property
def with_kernel_modules(self) -> bool:
"""Return True if the add-on access to kernel modules."""

View File

@@ -88,7 +88,6 @@ from ..const import (
ATTR_TYPE,
ATTR_UART,
ATTR_UDEV,
ATTR_ULIMITS,
ATTR_URL,
ATTR_USB,
ATTR_USER,
@@ -207,6 +206,12 @@ def _warn_addon_config(config: dict[str, Any]):
name,
)
if ATTR_CODENOTARY in config:
_LOGGER.warning(
"Add-on '%s' uses deprecated 'codenotary' field in config. This field is no longer used and will be ignored. Please report this to the maintainer.",
name,
)
return config
@@ -424,20 +429,6 @@ _SCHEMA_ADDON_CONFIG = vol.Schema(
False,
),
vol.Optional(ATTR_IMAGE): docker_image,
vol.Optional(ATTR_ULIMITS, default=dict): vol.Any(
{str: vol.Coerce(int)}, # Simple format: {name: limit}
{
str: vol.Any(
vol.Coerce(int), # Simple format for individual entries
vol.Schema(
{ # Detailed format for individual entries
vol.Required("soft"): vol.Coerce(int),
vol.Required("hard"): vol.Coerce(int),
}
),
)
},
),
vol.Optional(ATTR_TIMEOUT, default=10): vol.All(
vol.Coerce(int), vol.Range(min=10, max=300)
),

View File

@@ -1,19 +1,14 @@
"""Init file for Supervisor Security RESTful API."""
import asyncio
import logging
from typing import Any
from aiohttp import web
import attr
import voluptuous as vol
from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED
from ..coresys import CoreSysAttributes
from .utils import api_process, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)
# pylint: disable=no-value-for-parameter
SCHEMA_OPTIONS = vol.Schema(
{
@@ -54,6 +49,9 @@ class APISecurity(CoreSysAttributes):
@api_process
async def integrity_check(self, request: web.Request) -> dict[str, Any]:
"""Run backend integrity check."""
result = await asyncio.shield(self.sys_security.integrity_check())
return attr.asdict(result)
"""Run backend integrity check.
CodeNotary integrity checking has been removed. This endpoint now returns
an error indicating the feature is currently non-functional.
"""
return {"error": "No integrity checking available"}

View File

@@ -348,7 +348,6 @@ ATTR_TRANSLATIONS = "translations"
ATTR_TYPE = "type"
ATTR_UART = "uart"
ATTR_UDEV = "udev"
ATTR_ULIMITS = "ulimits"
ATTR_UNHEALTHY = "unhealthy"
ATTR_UNSAVED = "unsaved"
ATTR_UNSUPPORTED = "unsupported"

View File

@@ -9,7 +9,6 @@ import os
from pathlib import Path
from typing import TYPE_CHECKING, cast
import aiodocker
from attr import evolve
from awesomeversion import AwesomeVersion
import docker
@@ -319,18 +318,7 @@ class DockerAddon(DockerInterface):
mem = 128 * 1024 * 1024
limits.append(docker.types.Ulimit(name="memlock", soft=mem, hard=mem))
# Add configurable ulimits from add-on config
for name, config in self.addon.ulimits.items():
if isinstance(config, int):
# Simple format: both soft and hard limits are the same
limits.append(docker.types.Ulimit(name=name, soft=config, hard=config))
elif isinstance(config, dict):
# Detailed format: both soft and hard limits are mandatory
soft = config["soft"]
hard = config["hard"]
limits.append(docker.types.Ulimit(name=name, soft=soft, hard=hard))
# Return None if no ulimits are present
# Return None if no capabilities is present
if limits:
return limits
return None
@@ -718,21 +706,19 @@ class DockerAddon(DockerInterface):
error_message = f"Docker build failed for {addon_image_tag} (exit code {result.exit_code}). Build output:\n{logs}"
raise docker.errors.DockerException(error_message)
return addon_image_tag, logs
addon_image = self.sys_docker.images.get(addon_image_tag)
return addon_image, logs
try:
addon_image_tag, log = await self.sys_run_in_executor(build_image)
docker_image, log = await self.sys_run_in_executor(build_image)
_LOGGER.debug("Build %s:%s done: %s", self.image, version, log)
# Update meta data
self._meta = await self.sys_docker.images.inspect(addon_image_tag)
self._meta = docker_image.attrs
except (
docker.errors.DockerException,
requests.RequestException,
aiodocker.DockerError,
) as err:
except (docker.errors.DockerException, requests.RequestException) as err:
_LOGGER.error("Can't build %s:%s: %s", self.image, version, err)
raise DockerError() from err
@@ -754,8 +740,11 @@ class DockerAddon(DockerInterface):
)
async def import_image(self, tar_file: Path) -> None:
"""Import a tar file as image."""
if docker_image := await self.sys_docker.import_image(tar_file):
self._meta = docker_image
docker_image = await self.sys_run_in_executor(
self.sys_docker.import_image, tar_file
)
if docker_image:
self._meta = docker_image.attrs
_LOGGER.info("Importing image %s and version %s", tar_file, self.version)
with suppress(DockerError):
@@ -769,21 +758,17 @@ class DockerAddon(DockerInterface):
version: AwesomeVersion | None = None,
) -> None:
"""Check if old version exists and cleanup other versions of image not in use."""
if not (use_image := image or self.image):
raise DockerError("Cannot determine image from metadata!", _LOGGER.error)
if not (use_version := version or self.version):
raise DockerError("Cannot determine version from metadata!", _LOGGER.error)
await self.sys_docker.cleanup_old_images(
use_image,
use_version,
await self.sys_run_in_executor(
self.sys_docker.cleanup_old_images,
(image := image or self.image),
version or self.version,
{old_image} if old_image else None,
keep_images={
f"{addon.image}:{addon.version}"
for addon in self.sys_addons.installed
if addon.slug != self.addon.slug
and addon.image
and addon.image in {old_image, use_image}
and addon.image in {old_image, image}
},
)
@@ -850,16 +835,6 @@ class DockerAddon(DockerInterface):
):
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
if not self.addon.signed:
return
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_content(
cast(str, self.addon.codenotary), checksum
)
@Job(
name="docker_addon_hardware_events",
conditions=[JobCondition.OS_AGENT],

View File

@@ -1,10 +1,11 @@
"""Init file for Supervisor Docker object."""
from collections.abc import Awaitable
from ipaddress import IPv4Address
import logging
import re
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from awesomeversion import AwesomeVersion
from docker.types import Mount
from ..const import LABEL_MACHINE
@@ -235,20 +236,11 @@ class DockerHomeAssistant(DockerInterface):
environment={ENV_TIME: self.sys_timezone},
)
async def is_initialize(self) -> bool:
def is_initialize(self) -> Awaitable[bool]:
"""Return True if Docker container exists."""
if not self.sys_homeassistant.version:
return False
return await self.sys_docker.container_is_initialized(
self.name, self.image, self.sys_homeassistant.version
return self.sys_run_in_executor(
self.sys_docker.container_is_initialized,
self.name,
self.image,
self.sys_homeassistant.version,
)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
try:
if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST:
return
except AwesomeVersionCompareException:
return
await super()._validate_trust(image_id)

View File

@@ -6,18 +6,17 @@ from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Awaitable
from contextlib import suppress
from http import HTTPStatus
import logging
import re
from time import time
from typing import Any, cast
from uuid import uuid4
import aiodocker
from awesomeversion import AwesomeVersion
from awesomeversion.strategy import AwesomeVersionStrategy
import docker
from docker.models.containers import Container
from docker.models.images import Image
import requests
from ..bus import EventListener
@@ -36,7 +35,6 @@ from ..exceptions import (
CodeNotaryUntrusted,
DockerAPIError,
DockerError,
DockerHubRateLimitExceeded,
DockerJobError,
DockerLogOutOfOrder,
DockerNotFound,
@@ -220,14 +218,12 @@ class DockerInterface(JobGroup, ABC):
if not credentials:
return
await self.sys_run_in_executor(self.sys_docker.dockerpy.login, **credentials)
await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials)
def _process_pull_image_log(
self, install_job_id: str, reference: PullLogEntry
) -> None:
def _process_pull_image_log(self, job_id: str, reference: PullLogEntry) -> None:
"""Process events fired from a docker while pulling an image, filtered to a given job id."""
if (
reference.job_id != install_job_id
reference.job_id != job_id
or not reference.id
or not reference.status
or not (stage := PullImageLayerStage.from_status(reference.status))
@@ -241,22 +237,21 @@ class DockerInterface(JobGroup, ABC):
name="Pulling container image layer",
initial_stage=stage.status,
reference=reference.id,
parent_id=install_job_id,
internal=True,
parent_id=job_id,
)
job.done = False
return
# Find our sub job to update details of
for j in self.sys_jobs.jobs:
if j.parent_id == install_job_id and j.reference == reference.id:
if j.parent_id == job_id and j.reference == reference.id:
job = j
break
# This likely only occurs if the logs came in out of sync and we got progress before the Pulling FS Layer one
if not job:
raise DockerLogOutOfOrder(
f"Received pull image log with status {reference.status} for image id {reference.id} and parent job {install_job_id} but could not find a matching job, skipping",
f"Received pull image log with status {reference.status} for image id {reference.id} and parent job {job_id} but could not find a matching job, skipping",
_LOGGER.debug,
)
@@ -330,56 +325,10 @@ class DockerInterface(JobGroup, ABC):
else job.extra,
)
# Once we have received a progress update for every child job, start to set status of the main one
install_job = self.sys_jobs.get_job(install_job_id)
layer_jobs = [
job
for job in self.sys_jobs.jobs
if job.parent_id == install_job.uuid
and job.name == "Pulling container image layer"
]
# First set the total bytes to be downloaded/extracted on the main job
if not install_job.extra:
total = 0
for job in layer_jobs:
if not job.extra:
return
total += job.extra["total"]
install_job.extra = {"total": total}
else:
total = install_job.extra["total"]
# Then determine total progress based on progress of each sub-job, factoring in size of each compared to total
progress = 0.0
stage = PullImageLayerStage.PULL_COMPLETE
for job in layer_jobs:
if not job.extra:
return
progress += job.progress * (job.extra["total"] / total)
job_stage = PullImageLayerStage.from_status(cast(str, job.stage))
if job_stage < PullImageLayerStage.EXTRACTING:
stage = PullImageLayerStage.DOWNLOADING
elif (
stage == PullImageLayerStage.PULL_COMPLETE
and job_stage < PullImageLayerStage.PULL_COMPLETE
):
stage = PullImageLayerStage.EXTRACTING
# Ensure progress is 100 at this point to prevent float drift
if stage == PullImageLayerStage.PULL_COMPLETE:
progress = 100
# To reduce noise, limit updates to when result has changed by an entire percent or when stage changed
if stage != install_job.stage or progress >= install_job.progress + 1:
install_job.update(stage=stage.status, progress=progress)
@Job(
name="docker_interface_install",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
internal=True,
)
async def install(
self,
@@ -402,11 +351,11 @@ class DockerInterface(JobGroup, ABC):
# Try login if we have defined credentials
await self._docker_login(image)
curr_job_id = self.sys_jobs.current.uuid
job_id = self.sys_jobs.current.uuid
async def process_pull_image_log(reference: PullLogEntry) -> None:
try:
self._process_pull_image_log(curr_job_id, reference)
self._process_pull_image_log(job_id, reference)
except DockerLogOutOfOrder as err:
# Send all these to sentry. Missing a few progress updates
# shouldn't matter to users but matters to us
@@ -417,47 +366,37 @@ class DockerInterface(JobGroup, ABC):
)
# Pull new image
docker_image = await self.sys_docker.pull_image(
docker_image = await self.sys_run_in_executor(
self.sys_docker.pull_image,
self.sys_jobs.current.uuid,
image,
str(version),
platform=MAP_ARCH[image_arch],
)
# Validate content
try:
await self._validate_trust(cast(str, docker_image["Id"]))
except CodeNotaryError:
with suppress(aiodocker.DockerError, requests.RequestException):
await self.sys_docker.images.delete(
f"{image}:{version!s}", force=True
)
raise
# CodeNotary content trust validation has been removed
# Tag latest
if latest:
_LOGGER.info(
"Tagging image %s with version %s as latest", image, version
)
await self.sys_docker.images.tag(
docker_image["Id"], image, tag="latest"
)
await self.sys_run_in_executor(docker_image.tag, image, tag="latest")
except docker.errors.APIError as err:
if err.status_code == HTTPStatus.TOO_MANY_REQUESTS:
if err.status_code == 429:
self.sys_resolution.create_issue(
IssueType.DOCKER_RATELIMIT,
ContextType.SYSTEM,
suggestions=[SuggestionType.REGISTRY_LOGIN],
)
raise DockerHubRateLimitExceeded(_LOGGER.error) from err
_LOGGER.info(
"Your IP address has made too many requests to Docker Hub which activated a rate limit. "
"For more details see https://www.home-assistant.io/more-info/dockerhub-rate-limit"
)
raise DockerError(
f"Can't install {image}:{version!s}: {err}", _LOGGER.error
) from err
except (
aiodocker.DockerError,
docker.errors.DockerException,
requests.RequestException,
) as err:
except (docker.errors.DockerException, requests.RequestException) as err:
await async_capture_exception(err)
raise DockerError(
f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error
@@ -476,12 +415,14 @@ class DockerInterface(JobGroup, ABC):
if listener:
self.sys_bus.remove_listener(listener)
self._meta = docker_image
self._meta = docker_image.attrs
async def exists(self) -> bool:
"""Return True if Docker image exists in local repository."""
with suppress(aiodocker.DockerError, requests.RequestException):
await self.sys_docker.images.inspect(f"{self.image}:{self.version!s}")
with suppress(docker.errors.DockerException, requests.RequestException):
await self.sys_run_in_executor(
self.sys_docker.images.get, f"{self.image}:{self.version!s}"
)
return True
return False
@@ -540,11 +481,11 @@ class DockerInterface(JobGroup, ABC):
),
)
with suppress(aiodocker.DockerError, requests.RequestException):
with suppress(docker.errors.DockerException, requests.RequestException):
if not self._meta and self.image:
self._meta = await self.sys_docker.images.inspect(
self._meta = self.sys_docker.images.get(
f"{self.image}:{version!s}"
)
).attrs
# Successful?
if not self._meta:
@@ -612,17 +553,14 @@ class DockerInterface(JobGroup, ABC):
)
async def remove(self, *, remove_image: bool = True) -> None:
"""Remove Docker images."""
if not self.image or not self.version:
raise DockerError(
"Cannot determine image and/or version from metadata!", _LOGGER.error
)
# Cleanup container
with suppress(DockerError):
await self.stop()
if remove_image:
await self.sys_docker.remove_image(self.image, self.version)
await self.sys_run_in_executor(
self.sys_docker.remove_image, self.image, self.version
)
self._meta = None
@@ -644,16 +582,18 @@ class DockerInterface(JobGroup, ABC):
image_name = f"{expected_image}:{version!s}"
if self.image == expected_image:
try:
image = await self.sys_docker.images.inspect(image_name)
except (aiodocker.DockerError, requests.RequestException) as err:
image: Image = await self.sys_run_in_executor(
self.sys_docker.images.get, image_name
)
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get {image_name} for check due to: {err!s}",
_LOGGER.error,
) from err
image_arch = f"{image['Os']}/{image['Architecture']}"
if "Variant" in image:
image_arch = f"{image_arch}/{image['Variant']}"
image_arch = f"{image.attrs['Os']}/{image.attrs['Architecture']}"
if "Variant" in image.attrs:
image_arch = f"{image_arch}/{image.attrs['Variant']}"
# If we have an image and its the right arch, all set
# It seems that newer Docker version return a variant for arm64 images.
@@ -679,10 +619,7 @@ class DockerInterface(JobGroup, ABC):
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None:
"""Update a Docker image."""
image = image or self.image
@@ -715,13 +652,11 @@ class DockerInterface(JobGroup, ABC):
version: AwesomeVersion | None = None,
) -> None:
"""Check if old version exists and cleanup."""
if not (use_image := image or self.image):
raise DockerError("Cannot determine image from metadata!", _LOGGER.error)
if not (use_version := version or self.version):
raise DockerError("Cannot determine version from metadata!", _LOGGER.error)
await self.sys_docker.cleanup_old_images(
use_image, use_version, {old_image} if old_image else None
await self.sys_run_in_executor(
self.sys_docker.cleanup_old_images,
image or self.image,
version or self.version,
{old_image} if old_image else None,
)
@Job(
@@ -773,10 +708,10 @@ class DockerInterface(JobGroup, ABC):
"""Return latest version of local image."""
available_version: list[AwesomeVersion] = []
try:
for image in await self.sys_docker.images.list(
filters=f'{{"reference": ["{self.image}"]}}'
for image in await self.sys_run_in_executor(
self.sys_docker.images.list, self.image
):
for tag in image["RepoTags"]:
for tag in image.tags:
version = AwesomeVersion(tag.partition(":")[2])
if version.strategy == AwesomeVersionStrategy.UNKNOWN:
continue
@@ -785,7 +720,7 @@ class DockerInterface(JobGroup, ABC):
if not available_version:
raise ValueError()
except (aiodocker.DockerError, ValueError) as err:
except (docker.errors.DockerException, ValueError) as err:
raise DockerNotFound(
f"No version found for {self.image}", _LOGGER.info
) from err
@@ -810,24 +745,3 @@ class DockerInterface(JobGroup, ABC):
return self.sys_run_in_executor(
self.sys_docker.container_run_inside, self.name, command
)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_own_content(checksum)
@Job(
name="docker_interface_check_trust",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def check_trust(self) -> None:
"""Check trust of exists Docker image."""
try:
image = await self.sys_docker.images.inspect(
f"{self.image}:{self.version!s}"
)
except (aiodocker.DockerError, requests.RequestException):
return
await self._validate_trust(cast(str, image["Id"]))

View File

@@ -6,23 +6,20 @@ import asyncio
from contextlib import suppress
from dataclasses import dataclass
from functools import partial
from http import HTTPStatus
from ipaddress import IPv4Address
import json
import logging
import os
from pathlib import Path
import re
from typing import Any, Final, Self, cast
import aiodocker
from aiodocker.images import DockerImages
import attr
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from docker import errors as docker_errors
from docker.api.client import APIClient
from docker.client import DockerClient
from docker.errors import DockerException, ImageNotFound, NotFound
from docker.models.containers import Container, ContainerCollection
from docker.models.images import Image, ImageCollection
from docker.models.networks import Network
from docker.types.daemon import CancellableStream
import requests
@@ -56,7 +53,6 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
MIN_SUPPORTED_DOCKER: Final = AwesomeVersion("24.0.0")
DOCKER_NETWORK_HOST: Final = "host"
RE_IMPORT_IMAGE_STREAM = re.compile(r"(^Loaded image ID: |^Loaded image: )(.+)$")
@attr.s(frozen=True)
@@ -208,12 +204,7 @@ class DockerAPI(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper."""
self.coresys = coresys
# We keep both until we can fully refactor to aiodocker
self._dockerpy: DockerClient | None = None
self.docker: aiodocker.Docker = aiodocker.Docker(
url=f"unix:/{str(SOCKET_DOCKER)}", api_version="auto"
)
self._docker: DockerClient | None = None
self._network: DockerNetwork | None = None
self._info: DockerInfo | None = None
self.config: DockerConfig = DockerConfig()
@@ -221,7 +212,7 @@ class DockerAPI(CoreSysAttributes):
async def post_init(self) -> Self:
"""Post init actions that must be done in event loop."""
self._dockerpy = await asyncio.get_running_loop().run_in_executor(
self._docker = await asyncio.get_running_loop().run_in_executor(
None,
partial(
DockerClient,
@@ -230,19 +221,19 @@ class DockerAPI(CoreSysAttributes):
timeout=900,
),
)
self._info = DockerInfo.new(self.dockerpy.info())
self._info = DockerInfo.new(self.docker.info())
await self.config.read_data()
self._network = await DockerNetwork(self.dockerpy).post_init(
self._network = await DockerNetwork(self.docker).post_init(
self.config.enable_ipv6, self.config.mtu
)
return self
@property
def dockerpy(self) -> DockerClient:
def docker(self) -> DockerClient:
"""Get docker API client."""
if not self._dockerpy:
if not self._docker:
raise RuntimeError("Docker API Client not initialized!")
return self._dockerpy
return self._docker
@property
def network(self) -> DockerNetwork:
@@ -252,19 +243,19 @@ class DockerAPI(CoreSysAttributes):
return self._network
@property
def images(self) -> DockerImages:
def images(self) -> ImageCollection:
"""Return API images."""
return self.docker.images
@property
def containers(self) -> ContainerCollection:
"""Return API containers."""
return self.dockerpy.containers
return self.docker.containers
@property
def api(self) -> APIClient:
"""Return API containers."""
return self.dockerpy.api
return self.docker.api
@property
def info(self) -> DockerInfo:
@@ -276,7 +267,7 @@ class DockerAPI(CoreSysAttributes):
@property
def events(self) -> CancellableStream:
"""Return docker event stream."""
return self.dockerpy.events(decode=True)
return self.docker.events(decode=True)
@property
def monitor(self) -> DockerMonitor:
@@ -392,7 +383,7 @@ class DockerAPI(CoreSysAttributes):
with suppress(DockerError):
self.network.detach_default_bridge(container)
else:
host_network: Network = self.dockerpy.networks.get(DOCKER_NETWORK_HOST)
host_network: Network = self.docker.networks.get(DOCKER_NETWORK_HOST)
# Check if container is register on host
# https://github.com/moby/moby/issues/23302
@@ -419,36 +410,35 @@ class DockerAPI(CoreSysAttributes):
return container
async def pull_image(
def pull_image(
self,
job_id: str,
repository: str,
tag: str = "latest",
platform: str | None = None,
) -> dict[str, Any]:
) -> Image:
"""Pull the specified image and return it.
This mimics the high level API of images.pull but provides better error handling by raising
based on a docker error on pull. Whereas the high level API ignores all errors on pull and
raises only if the get fails afterwards. Additionally it fires progress reports for the pull
on the bus so listeners can use that to update status for users.
Must be run in executor.
"""
def api_pull():
pull_log = self.dockerpy.api.pull(
repository, tag=tag, platform=platform, stream=True, decode=True
pull_log = self.docker.api.pull(
repository, tag=tag, platform=platform, stream=True, decode=True
)
for e in pull_log:
entry = PullLogEntry.from_pull_log_dict(job_id, e)
if entry.error:
raise entry.exception
self.sys_loop.call_soon_threadsafe(
self.sys_bus.fire_event, BusEvent.DOCKER_IMAGE_PULL_UPDATE, entry
)
for e in pull_log:
entry = PullLogEntry.from_pull_log_dict(job_id, e)
if entry.error:
raise entry.exception
self.sys_loop.call_soon_threadsafe(
self.sys_bus.fire_event, BusEvent.DOCKER_IMAGE_PULL_UPDATE, entry
)
await self.sys_run_in_executor(api_pull)
sep = "@" if tag.startswith("sha256:") else ":"
return await self.images.inspect(f"{repository}{sep}{tag}")
return self.images.get(f"{repository}{sep}{tag}")
def run_command(
self,
@@ -469,7 +459,7 @@ class DockerAPI(CoreSysAttributes):
_LOGGER.info("Runing command '%s' on %s", command, image_with_tag)
container = None
try:
container = self.dockerpy.containers.run(
container = self.docker.containers.run(
image_with_tag,
command=command,
detach=True,
@@ -497,35 +487,35 @@ class DockerAPI(CoreSysAttributes):
"""Repair local docker overlayfs2 issues."""
_LOGGER.info("Prune stale containers")
try:
output = self.dockerpy.api.prune_containers()
output = self.docker.api.prune_containers()
_LOGGER.debug("Containers prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for containers prune: %s", err)
_LOGGER.info("Prune stale images")
try:
output = self.dockerpy.api.prune_images(filters={"dangling": False})
output = self.docker.api.prune_images(filters={"dangling": False})
_LOGGER.debug("Images prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for images prune: %s", err)
_LOGGER.info("Prune stale builds")
try:
output = self.dockerpy.api.prune_builds()
output = self.docker.api.prune_builds()
_LOGGER.debug("Builds prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for builds prune: %s", err)
_LOGGER.info("Prune stale volumes")
try:
output = self.dockerpy.api.prune_volumes()
output = self.docker.api.prune_builds()
_LOGGER.debug("Volumes prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for volumes prune: %s", err)
_LOGGER.info("Prune stale networks")
try:
output = self.dockerpy.api.prune_networks()
output = self.docker.api.prune_networks()
_LOGGER.debug("Networks prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks prune: %s", err)
@@ -547,11 +537,11 @@ class DockerAPI(CoreSysAttributes):
Fix: https://github.com/moby/moby/issues/23302
"""
network: Network = self.dockerpy.networks.get(network_name)
network: Network = self.docker.networks.get(network_name)
for cid, data in network.attrs.get("Containers", {}).items():
try:
self.dockerpy.containers.get(cid)
self.docker.containers.get(cid)
continue
except docker_errors.NotFound:
_LOGGER.debug(
@@ -566,26 +556,22 @@ class DockerAPI(CoreSysAttributes):
with suppress(docker_errors.DockerException, requests.RequestException):
network.disconnect(data.get("Name", cid), force=True)
async def container_is_initialized(
def container_is_initialized(
self, name: str, image: str, version: AwesomeVersion
) -> bool:
"""Return True if docker container exists in good state and is built from expected image."""
try:
docker_container = await self.sys_run_in_executor(self.containers.get, name)
docker_image = await self.images.inspect(f"{image}:{version}")
except docker_errors.NotFound:
docker_container = self.containers.get(name)
docker_image = self.images.get(f"{image}:{version}")
except NotFound:
return False
except aiodocker.DockerError as err:
if err.status == HTTPStatus.NOT_FOUND:
return False
raise DockerError() from err
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError() from err
# Check the image is correct and state is good
return (
docker_container.image is not None
and docker_container.image.id == docker_image["Id"]
and docker_container.image.id == docker_image.id
and docker_container.status in ("exited", "running", "created")
)
@@ -595,18 +581,18 @@ class DockerAPI(CoreSysAttributes):
"""Stop/remove Docker container."""
try:
docker_container: Container = self.containers.get(name)
except docker_errors.NotFound:
except NotFound:
raise DockerNotFound() from None
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError() from err
if docker_container.status == "running":
_LOGGER.info("Stopping %s application", name)
with suppress(docker_errors.DockerException, requests.RequestException):
with suppress(DockerException, requests.RequestException):
docker_container.stop(timeout=timeout)
if remove_container:
with suppress(docker_errors.DockerException, requests.RequestException):
with suppress(DockerException, requests.RequestException):
_LOGGER.info("Cleaning %s application", name)
docker_container.remove(force=True, v=True)
@@ -618,11 +604,11 @@ class DockerAPI(CoreSysAttributes):
"""Start Docker container."""
try:
docker_container: Container = self.containers.get(name)
except docker_errors.NotFound:
except NotFound:
raise DockerNotFound(
f"{name} not found for starting up", _LOGGER.error
) from None
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get {name} for starting up", _LOGGER.error
) from err
@@ -630,36 +616,36 @@ class DockerAPI(CoreSysAttributes):
_LOGGER.info("Starting %s", name)
try:
docker_container.start()
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError(f"Can't start {name}: {err}", _LOGGER.error) from err
def restart_container(self, name: str, timeout: int) -> None:
"""Restart docker container."""
try:
container: Container = self.containers.get(name)
except docker_errors.NotFound:
except NotFound:
raise DockerNotFound() from None
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError() from err
_LOGGER.info("Restarting %s", name)
try:
container.restart(timeout=timeout)
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError(f"Can't restart {name}: {err}", _LOGGER.warning) from err
def container_logs(self, name: str, tail: int = 100) -> bytes:
"""Return Docker logs of container."""
try:
docker_container: Container = self.containers.get(name)
except docker_errors.NotFound:
except NotFound:
raise DockerNotFound() from None
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError() from err
try:
return docker_container.logs(tail=tail, stdout=True, stderr=True)
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't grep logs from {name}: {err}", _LOGGER.warning
) from err
@@ -668,9 +654,9 @@ class DockerAPI(CoreSysAttributes):
"""Read and return stats from container."""
try:
docker_container: Container = self.containers.get(name)
except docker_errors.NotFound:
except NotFound:
raise DockerNotFound() from None
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError() from err
# container is not running
@@ -679,7 +665,7 @@ class DockerAPI(CoreSysAttributes):
try:
return docker_container.stats(stream=False)
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't read stats from {name}: {err}", _LOGGER.error
) from err
@@ -688,84 +674,61 @@ class DockerAPI(CoreSysAttributes):
"""Execute a command inside Docker container."""
try:
docker_container: Container = self.containers.get(name)
except docker_errors.NotFound:
except NotFound:
raise DockerNotFound() from None
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError() from err
# Execute
try:
code, output = docker_container.exec_run(command)
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError() from err
return CommandReturn(code, output)
async def remove_image(
def remove_image(
self, image: str, version: AwesomeVersion, latest: bool = True
) -> None:
"""Remove a Docker image by version and latest."""
try:
if latest:
_LOGGER.info("Removing image %s with latest", image)
try:
await self.images.delete(f"{image}:latest", force=True)
except aiodocker.DockerError as err:
if err.status != HTTPStatus.NOT_FOUND:
raise
with suppress(ImageNotFound):
self.images.remove(image=f"{image}:latest", force=True)
_LOGGER.info("Removing image %s with %s", image, version)
try:
await self.images.delete(f"{image}:{version!s}", force=True)
except aiodocker.DockerError as err:
if err.status != HTTPStatus.NOT_FOUND:
raise
with suppress(ImageNotFound):
self.images.remove(image=f"{image}:{version!s}", force=True)
except (aiodocker.DockerError, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't remove image {image}: {err}", _LOGGER.warning
) from err
async def import_image(self, tar_file: Path) -> dict[str, Any] | None:
def import_image(self, tar_file: Path) -> Image | None:
"""Import a tar file as image."""
try:
with tar_file.open("rb") as read_tar:
resp: list[dict[str, Any]] = self.images.import_image(read_tar)
except (aiodocker.DockerError, OSError) as err:
docker_image_list: list[Image] = self.images.load(read_tar) # type: ignore
if len(docker_image_list) != 1:
_LOGGER.warning(
"Unexpected image count %d while importing image from tar",
len(docker_image_list),
)
return None
return docker_image_list[0]
except (DockerException, OSError) as err:
raise DockerError(
f"Can't import image from tar: {err}", _LOGGER.error
) from err
docker_image_list: list[str] = []
for chunk in resp:
if "errorDetail" in chunk:
raise DockerError(
f"Can't import image from tar: {chunk['errorDetail']['message']}",
_LOGGER.error,
)
if "stream" in chunk:
if match := RE_IMPORT_IMAGE_STREAM.search(chunk["stream"]):
docker_image_list.append(match.group(2))
if len(docker_image_list) != 1:
_LOGGER.warning(
"Unexpected image count %d while importing image from tar",
len(docker_image_list),
)
return None
try:
return await self.images.inspect(docker_image_list[0])
except (aiodocker.DockerError, requests.RequestException) as err:
raise DockerError(
f"Could not inspect imported image due to: {err!s}", _LOGGER.error
) from err
def export_image(self, image: str, version: AwesomeVersion, tar_file: Path) -> None:
"""Export current images into a tar file."""
try:
docker_image = self.api.get_image(f"{image}:{version}")
except (docker_errors.DockerException, requests.RequestException) as err:
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't fetch image {image}: {err}", _LOGGER.error
) from err
@@ -782,7 +745,7 @@ class DockerAPI(CoreSysAttributes):
_LOGGER.info("Export image %s done", image)
async def cleanup_old_images(
def cleanup_old_images(
self,
current_image: str,
current_version: AwesomeVersion,
@@ -793,57 +756,46 @@ class DockerAPI(CoreSysAttributes):
"""Clean up old versions of an image."""
image = f"{current_image}:{current_version!s}"
try:
try:
image_attr = await self.images.inspect(image)
except aiodocker.DockerError as err:
if err.status == HTTPStatus.NOT_FOUND:
raise DockerNotFound(
f"{current_image} not found for cleanup", _LOGGER.warning
) from None
raise
except (aiodocker.DockerError, requests.RequestException) as err:
keep = {cast(str, self.images.get(image).id)}
except ImageNotFound:
raise DockerNotFound(
f"{current_image} not found for cleanup", _LOGGER.warning
) from None
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't get {current_image} for cleanup", _LOGGER.warning
) from err
keep = {cast(str, image_attr["Id"])}
if keep_images:
keep_images -= {image}
results = await asyncio.gather(
*[self.images.inspect(image) for image in keep_images],
return_exceptions=True,
)
for result in results:
# If its not found, no need to preserve it from getting removed
if (
isinstance(result, aiodocker.DockerError)
and result.status == HTTPStatus.NOT_FOUND
):
continue
if isinstance(result, BaseException):
raise DockerError(
f"Failed to get one or more images from {keep} during cleanup",
_LOGGER.warning,
) from result
keep.add(cast(str, result["Id"]))
try:
for image in keep_images:
# If its not found, no need to preserve it from getting removed
with suppress(ImageNotFound):
keep.add(cast(str, self.images.get(image).id))
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Failed to get one or more images from {keep} during cleanup",
_LOGGER.warning,
) from err
# Cleanup old and current
image_names = list(
old_images | {current_image} if old_images else {current_image}
)
try:
images_list = await self.images.list(
filters=json.dumps({"reference": image_names})
)
except (aiodocker.DockerError, requests.RequestException) as err:
# This API accepts a list of image names. Tested and confirmed working on docker==7.1.0
# Its typing does say only `str` though. Bit concerning, could an update break this?
images_list = self.images.list(name=image_names) # type: ignore
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Corrupt docker overlayfs found: {err}", _LOGGER.warning
) from err
for docker_image in images_list:
if docker_image["Id"] in keep:
if docker_image.id in keep:
continue
with suppress(aiodocker.DockerError, requests.RequestException):
_LOGGER.info("Cleanup images: %s", docker_image["RepoTags"])
await self.images.delete(docker_image["Id"], force=True)
with suppress(DockerException, requests.RequestException):
_LOGGER.info("Cleanup images: %s", docker_image.tags)
self.images.remove(docker_image.id, force=True)

View File

@@ -1,12 +1,10 @@
"""Init file for Supervisor Docker object."""
import asyncio
from collections.abc import Awaitable
from ipaddress import IPv4Address
import logging
import os
import aiodocker
from awesomeversion.awesomeversion import AwesomeVersion
import docker
import requests
@@ -114,18 +112,19 @@ class DockerSupervisor(DockerInterface):
name="docker_supervisor_update_start_tag",
concurrency=JobConcurrency.GROUP_QUEUE,
)
async def update_start_tag(self, image: str, version: AwesomeVersion) -> None:
def update_start_tag(self, image: str, version: AwesomeVersion) -> Awaitable[None]:
"""Update start tag to new version."""
return self.sys_run_in_executor(self._update_start_tag, image, version)
def _update_start_tag(self, image: str, version: AwesomeVersion) -> None:
"""Update start tag to new version.
Need run inside executor.
"""
try:
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
docker_image = await self.sys_docker.images.inspect(f"{image}:{version!s}")
except (
aiodocker.DockerError,
docker.errors.DockerException,
requests.RequestException,
) as err:
docker_container = self.sys_docker.containers.get(self.name)
docker_image = self.sys_docker.images.get(f"{image}:{version!s}")
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't get image or container to fix start tag: {err}", _LOGGER.error
) from err
@@ -145,14 +144,8 @@ class DockerSupervisor(DockerInterface):
# If version tag
if start_tag != "latest":
continue
await asyncio.gather(
self.sys_docker.images.tag(
docker_image["Id"], start_image, tag=start_tag
),
self.sys_docker.images.tag(
docker_image["Id"], start_image, tag=version.string
),
)
docker_image.tag(start_image, start_tag)
docker_image.tag(start_image, version.string)
except (aiodocker.DockerError, requests.RequestException) as err:
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError(f"Can't fix start tag: {err}", _LOGGER.error) from err

View File

@@ -648,32 +648,9 @@ class DockerLogOutOfOrder(DockerError):
class DockerNoSpaceOnDevice(DockerError):
"""Raise if a docker pull fails due to available space."""
error_key = "docker_no_space_on_device"
message_template = "No space left on disk"
def __init__(self, logger: Callable[..., None] | None = None) -> None:
"""Raise & log."""
super().__init__(None, logger=logger)
class DockerHubRateLimitExceeded(DockerError):
"""Raise for docker hub rate limit exceeded error."""
error_key = "dockerhub_rate_limit_exceeded"
message_template = (
"Your IP address has made too many requests to Docker Hub which activated a rate limit. "
"For more details see {dockerhub_rate_limit_url}"
)
def __init__(self, logger: Callable[..., None] | None = None) -> None:
"""Raise & log."""
super().__init__(
None,
logger=logger,
extra_fields={
"dockerhub_rate_limit_url": "https://www.home-assistant.io/more-info/dockerhub-rate-limit"
},
)
super().__init__("No space left on disk", logger=logger)
class DockerJobError(DockerError, JobException):

View File

@@ -28,7 +28,6 @@ from ..exceptions import (
HomeAssistantUpdateError,
JobException,
)
from ..jobs import ChildJobSyncFilter
from ..jobs.const import JOB_GROUP_HOME_ASSISTANT_CORE, JobConcurrency, JobThrottle
from ..jobs.decorator import Job, JobCondition
from ..jobs.job_group import JobGroup
@@ -225,13 +224,6 @@ class HomeAssistantCore(JobGroup):
],
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
# We assume for now the docker image pull is 100% of this task. But from
# a user perspective that isn't true. Other steps that take time which
# is not accounted for in progress include: partial backup, image
# cleanup, and Home Assistant restart
child_job_syncs=[
ChildJobSyncFilter("docker_interface_install", progress_allocation=1.0)
],
)
async def update(
self,
@@ -428,13 +420,6 @@ class HomeAssistantCore(JobGroup):
"""
return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate HomeAssistant docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats:
"""Return stats of Home Assistant."""
try:

View File

@@ -282,10 +282,8 @@ class JobManager(FileConfiguration, CoreSysAttributes):
# reporting shouldn't raise and break the active job
continue
progress = min(
100,
sync.starting_progress
+ (sync.progress_allocation * job_data["progress"]),
progress = sync.starting_progress + (
sync.progress_allocation * job_data["progress"]
)
# Using max would always trigger on change even if progress was unchanged
# pylint: disable-next=R1731

View File

@@ -76,13 +76,6 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
"""Return True if a task is in progress."""
return self.instance.in_progress
def check_trust(self) -> Awaitable[None]:
"""Calculate plugin docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
def logs(self) -> Awaitable[bytes]:
"""Get docker plugin logs.

View File

@@ -1,59 +0,0 @@
"""Helpers to check supervisor trust."""
import logging
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckSupervisorTrust(coresys)
class CheckSupervisorTrust(CheckBase):
"""CheckSystemTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
try:
await self.sys_supervisor.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.add_unhealthy_reason(UnhealthyReason.UNTRUSTED)
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
pass
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
try:
await self.sys_supervisor.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SUPERVISOR
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@@ -8,7 +8,7 @@ from ..const import UnsupportedReason
from .base import EvaluateBase
EXPECTED_LOGGING = "journald"
EXPECTED_STORAGE = ("overlay2", "overlayfs")
EXPECTED_STORAGE = "overlay2"
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -41,18 +41,14 @@ class EvaluateDockerConfiguration(EvaluateBase):
storage_driver = self.sys_docker.info.storage
logging_driver = self.sys_docker.info.logging
is_unsupported = False
if storage_driver not in EXPECTED_STORAGE:
is_unsupported = True
if storage_driver != EXPECTED_STORAGE:
_LOGGER.warning(
"Docker storage driver %s is not supported!", storage_driver
)
if logging_driver != EXPECTED_LOGGING:
is_unsupported = True
_LOGGER.warning(
"Docker logging driver %s is not supported!", logging_driver
)
return is_unsupported
return storage_driver != EXPECTED_STORAGE or logging_driver != EXPECTED_LOGGING

View File

@@ -1,14 +1,11 @@
"""Evaluation class for Content Trust."""
import errno
import logging
from pathlib import Path
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ...utils.codenotary import calc_checksum_path_sourcecode
from ..const import ContextType, IssueType, UnhealthyReason, UnsupportedReason
from ..const import UnsupportedReason
from .base import EvaluateBase
_SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor")
@@ -44,29 +41,4 @@ class EvaluateSourceMods(EvaluateBase):
_LOGGER.warning("Disabled content-trust, skipping evaluation")
return False
# Calculate sume of the sourcecode
try:
checksum = await self.sys_run_in_executor(
calc_checksum_path_sourcecode, _SUPERVISOR_SOURCE
)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.add_unhealthy_reason(
UnhealthyReason.OSERROR_BAD_MESSAGE
)
self.sys_resolution.create_issue(
IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM
)
_LOGGER.error("Can't calculate checksum of source code: %s", err)
return False
# Validate checksum
try:
await self.sys_security.verify_own_content(checksum)
except CodeNotaryUntrusted:
return True
except CodeNotaryError:
pass
return False

View File

@@ -1,67 +0,0 @@
"""Helpers to check and fix issues with free space."""
from datetime import timedelta
import logging
from ...coresys import CoreSys
from ...exceptions import ResolutionFixupError, ResolutionFixupJobError
from ...jobs.const import JobCondition, JobThrottle
from ...jobs.decorator import Job
from ...security.const import ContentTrustResult
from ..const import ContextType, IssueType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupSystemExecuteIntegrity(coresys)
class FixupSystemExecuteIntegrity(FixupBase):
"""Storage class for fixup."""
@Job(
name="fixup_system_execute_integrity_process",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=ResolutionFixupJobError,
throttle_period=timedelta(hours=8),
throttle=JobThrottle.THROTTLE,
)
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
result = await self.sys_security.integrity_check()
if ContentTrustResult.FAILED in (result.core, result.supervisor):
raise ResolutionFixupError()
for plugin in result.plugins:
if plugin != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
for addon in result.addons:
if addon != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
@property
def suggestion(self) -> SuggestionType:
"""Return a SuggestionType enum."""
return SuggestionType.EXECUTE_INTEGRITY
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SYSTEM
@property
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.TRUST]
@property
def auto(self) -> bool:
"""Return if a fixup can be apply as auto fix."""
return True

View File

@@ -11,20 +11,10 @@ from ..const import (
FILE_HASSIO_SECURITY,
)
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
PwnedError,
SecurityJobError,
)
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.codenotary import cas_validate
from ..exceptions import PwnedError
from ..utils.common import FileConfiguration
from ..utils.pwned import check_pwned_password
from ..validate import SCHEMA_SECURITY_CONFIG
from .const import ContentTrustResult, IntegrityResult
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -67,30 +57,6 @@ class Security(FileConfiguration, CoreSysAttributes):
"""Set pwned is enabled/disabled."""
self._data[ATTR_PWNED] = value
async def verify_content(self, signer: str, checksum: str) -> None:
"""Verify content on CAS."""
if not self.content_trust:
_LOGGER.warning("Disabled content-trust, skip validation")
return
try:
await cas_validate(signer, checksum)
except CodeNotaryUntrusted:
raise
except CodeNotaryError:
if self.force:
raise
self.sys_resolution.create_issue(
IssueType.TRUST,
ContextType.SYSTEM,
suggestions=[SuggestionType.EXECUTE_INTEGRITY],
)
return
async def verify_own_content(self, checksum: str) -> None:
"""Verify content from HA org."""
return await self.verify_content("notary@home-assistant.io", checksum)
async def verify_secret(self, pwned_hash: str) -> None:
"""Verify pwned state of a secret."""
if not self.pwned:
@@ -103,73 +69,3 @@ class Security(FileConfiguration, CoreSysAttributes):
if self.force:
raise
return
@Job(
name="security_manager_integrity_check",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=SecurityJobError,
concurrency=JobConcurrency.REJECT,
)
async def integrity_check(self) -> IntegrityResult:
"""Run a full system integrity check of the platform.
We only allow to install trusted content.
This is a out of the band manual check.
"""
result: IntegrityResult = IntegrityResult()
if not self.content_trust:
_LOGGER.warning(
"Skipping integrity check, content_trust is globally disabled"
)
return result
# Supervisor
try:
await self.sys_supervisor.check_trust()
result.supervisor = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.supervisor = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
result.supervisor = ContentTrustResult.FAILED
# Core
try:
await self.sys_homeassistant.core.check_trust()
result.core = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.core = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE)
except CodeNotaryError:
result.core = ContentTrustResult.FAILED
# Plugins
for plugin in self.sys_plugins.all_plugins:
try:
await plugin.check_trust()
result.plugins[plugin.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.plugins[plugin.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug
)
except CodeNotaryError:
result.plugins[plugin.slug] = ContentTrustResult.FAILED
# Add-ons
for addon in self.sys_addons.installed:
if not addon.signed:
result.addons[addon.slug] = ContentTrustResult.UNTESTED
continue
try:
await addon.check_trust()
result.addons[addon.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.addons[addon.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.ADDON, reference=addon.slug
)
except CodeNotaryError:
result.addons[addon.slug] = ContentTrustResult.FAILED
return result

View File

@@ -13,8 +13,6 @@ import aiohttp
from aiohttp.client_exceptions import ClientError
from awesomeversion import AwesomeVersion, AwesomeVersionException
from supervisor.jobs import ChildJobSyncFilter
from .const import (
ATTR_SUPERVISOR_INTERNET,
SUPERVISOR_VERSION,
@@ -25,8 +23,6 @@ from .coresys import CoreSys, CoreSysAttributes
from .docker.stats import DockerStats
from .docker.supervisor import DockerSupervisor
from .exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
DockerError,
HostAppArmorError,
SupervisorAppArmorError,
@@ -37,7 +33,6 @@ from .exceptions import (
from .jobs.const import JobCondition, JobThrottle
from .jobs.decorator import Job
from .resolution.const import ContextType, IssueType, UnhealthyReason
from .utils.codenotary import calc_checksum
from .utils.sentry import async_capture_exception
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -150,20 +145,6 @@ class Supervisor(CoreSysAttributes):
_LOGGER.error,
) from err
# Validate
try:
await self.sys_security.verify_own_content(calc_checksum(data))
except CodeNotaryUntrusted as err:
raise SupervisorAppArmorError(
"Content-Trust is broken for the AppArmor profile fetch!",
_LOGGER.critical,
) from err
except CodeNotaryError as err:
raise SupervisorAppArmorError(
f"CodeNotary error while processing AppArmor fetch: {err!s}",
_LOGGER.error,
) from err
# Load
temp_dir: TemporaryDirectory | None = None
@@ -197,15 +178,6 @@ class Supervisor(CoreSysAttributes):
if temp_dir:
await self.sys_run_in_executor(temp_dir.cleanup)
@Job(
name="supervisor_update",
# We assume for now the docker image pull is 100% of this task. But from
# a user perspective that isn't true. Other steps that take time which
# is not accounted for in progress include: app armor update and restart
child_job_syncs=[
ChildJobSyncFilter("docker_interface_install", progress_allocation=1.0)
],
)
async def update(self, version: AwesomeVersion | None = None) -> None:
"""Update Supervisor version."""
version = version or self.latest_version or self.version
@@ -232,7 +204,6 @@ class Supervisor(CoreSysAttributes):
# Update container
_LOGGER.info("Update Supervisor to version %s", version)
try:
await self.instance.install(version, image=image)
await self.instance.update_start_tag(image, version)
@@ -273,13 +244,6 @@ class Supervisor(CoreSysAttributes):
"""
return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate Supervisor docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats:
"""Return stats of Supervisor."""
try:

View File

@@ -31,14 +31,8 @@ from .const import (
UpdateChannel,
)
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
UpdaterError,
UpdaterJobError,
)
from .exceptions import UpdaterError, UpdaterJobError
from .jobs.decorator import Job, JobCondition
from .utils.codenotary import calc_checksum
from .utils.common import FileConfiguration
from .validate import SCHEMA_UPDATER_CONFIG
@@ -289,19 +283,6 @@ class Updater(FileConfiguration, CoreSysAttributes):
self.sys_bus.remove_listener(self._connectivity_listener)
self._connectivity_listener = None
# Validate
try:
await self.sys_security.verify_own_content(calc_checksum(data))
except CodeNotaryUntrusted as err:
raise UpdaterError(
"Content-Trust is broken for the version file fetch!", _LOGGER.critical
) from err
except CodeNotaryError as err:
raise UpdaterError(
f"CodeNotary error while processing version fetch: {err!s}",
_LOGGER.error,
) from err
# Parse data
try:
data = json.loads(data)

View File

@@ -1,109 +0,0 @@
"""Small wrapper for CodeNotary."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
from pathlib import Path
import shlex
from typing import Final
from dirhash import dirhash
from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted
from . import clean_env
_LOGGER: logging.Logger = logging.getLogger(__name__)
_CAS_CMD: str = (
"cas authenticate --signerID {signer} --silent --output json --hash {sum}"
)
_CACHE: set[tuple[str, str]] = set()
_ATTR_ERROR: Final = "error"
_ATTR_STATUS: Final = "status"
_FALLBACK_ERROR: Final = "Unknown CodeNotary backend issue"
def calc_checksum(data: str | bytes) -> str:
"""Generate checksum for CodeNotary."""
if isinstance(data, str):
return hashlib.sha256(data.encode()).hexdigest()
return hashlib.sha256(data).hexdigest()
def calc_checksum_path_sourcecode(folder: Path) -> str:
"""Calculate checksum for a path source code.
Need catch OSError.
"""
return dirhash(folder.as_posix(), "sha256", match=["*.py"])
# pylint: disable=unreachable
async def cas_validate(
signer: str,
checksum: str,
) -> None:
"""Validate data against CodeNotary."""
return
if (checksum, signer) in _CACHE:
return
# Generate command for request
command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum))
# Request notary authorization
_LOGGER.debug("Send cas command: %s", command)
try:
proc = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=clean_env(),
)
async with asyncio.timeout(15):
data, error = await proc.communicate()
except TimeoutError:
raise CodeNotaryBackendError(
"Timeout while processing CodeNotary", _LOGGER.warning
) from None
except OSError as err:
raise CodeNotaryError(
f"CodeNotary fatal error: {err!s}", _LOGGER.critical
) from err
# Check if Notarized
if proc.returncode != 0 and not data:
if error:
try:
error = error.decode("utf-8")
except UnicodeDecodeError as err:
raise CodeNotaryBackendError(_FALLBACK_ERROR, _LOGGER.warning) from err
if "not notarized" in error:
raise CodeNotaryUntrusted()
else:
error = _FALLBACK_ERROR
raise CodeNotaryBackendError(error, _LOGGER.warning)
# Parse data
try:
data_json = json.loads(data)
_LOGGER.debug("CodeNotary response with: %s", data_json)
except (json.JSONDecodeError, UnicodeDecodeError) as err:
raise CodeNotaryError(
f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error
) from err
if _ATTR_ERROR in data_json:
raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning)
if data_json[_ATTR_STATUS] == 0:
_CACHE.add((checksum, signer))
else:
raise CodeNotaryUntrusted()

View File

@@ -3,25 +3,22 @@
import asyncio
from datetime import timedelta
import errno
from http import HTTPStatus
from pathlib import Path
from unittest.mock import MagicMock, PropertyMock, call, patch
from unittest.mock import MagicMock, PropertyMock, patch
import aiodocker
from awesomeversion import AwesomeVersion
from docker.errors import DockerException, NotFound
from docker.errors import DockerException, ImageNotFound, NotFound
import pytest
from securetar import SecureTarFile
from supervisor.addons.addon import Addon
from supervisor.addons.const import AddonBackupMode
from supervisor.addons.model import AddonModel
from supervisor.config import CoreConfig
from supervisor.const import AddonBoot, AddonState, BusEvent
from supervisor.coresys import CoreSys
from supervisor.docker.addon import DockerAddon
from supervisor.docker.const import ContainerState
from supervisor.docker.manager import CommandReturn, DockerAPI
from supervisor.docker.manager import CommandReturn
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import AddonsError, AddonsJobError, AudioUpdateError
from supervisor.hardware.helper import HwHelper
@@ -864,14 +861,16 @@ async def test_addon_loads_wrong_image(
container.remove.assert_called_with(force=True, v=True)
# one for removing the addon, one for removing the addon builder
assert coresys.docker.images.delete.call_count == 2
assert coresys.docker.images.remove.call_count == 2
assert coresys.docker.images.delete.call_args_list[0] == call(
"local/aarch64-addon-ssh:latest", force=True
)
assert coresys.docker.images.delete.call_args_list[1] == call(
"local/aarch64-addon-ssh:9.2.1", force=True
)
assert coresys.docker.images.remove.call_args_list[0].kwargs == {
"image": "local/aarch64-addon-ssh:latest",
"force": True,
}
assert coresys.docker.images.remove.call_args_list[1].kwargs == {
"image": "local/aarch64-addon-ssh:9.2.1",
"force": True,
}
mock_run_command.assert_called_once()
assert mock_run_command.call_args.args[0] == "docker.io/library/docker"
assert mock_run_command.call_args.kwargs["version"] == "1.0.0-cli"
@@ -895,9 +894,7 @@ async def test_addon_loads_missing_image(
mock_amd64_arch_supported,
):
"""Test addon corrects a missing image on load."""
coresys.docker.images.inspect.side_effect = aiodocker.DockerError(
HTTPStatus.NOT_FOUND, {"message": "missing"}
)
coresys.docker.images.get.side_effect = ImageNotFound("missing")
with (
patch("pathlib.Path.is_file", return_value=True),
@@ -929,50 +926,40 @@ async def test_addon_loads_missing_image(
assert install_addon_ssh.image == "local/amd64-addon-ssh"
@pytest.mark.parametrize(
"pull_image_exc",
[DockerException(), aiodocker.DockerError(400, {"message": "error"})],
)
@pytest.mark.usefixtures("container", "mock_amd64_arch_supported")
async def test_addon_load_succeeds_with_docker_errors(
coresys: CoreSys,
install_addon_ssh: Addon,
container: MagicMock,
caplog: pytest.LogCaptureFixture,
pull_image_exc: Exception,
mock_amd64_arch_supported,
):
"""Docker errors while building/pulling an image during load should not raise and fail setup."""
# Build env invalid failure
coresys.docker.images.inspect.side_effect = aiodocker.DockerError(
HTTPStatus.NOT_FOUND, {"message": "missing"}
)
coresys.docker.images.get.side_effect = ImageNotFound("missing")
caplog.clear()
await install_addon_ssh.load()
assert "Invalid build environment" in caplog.text
# Image build failure
coresys.docker.images.build.side_effect = DockerException()
caplog.clear()
with (
patch("pathlib.Path.is_file", return_value=True),
patch.object(
CoreConfig, "local_to_extern_path", return_value="/addon/path/on/host"
),
patch.object(
DockerAPI,
"run_command",
return_value=MagicMock(exit_code=1, output=b"error"),
type(coresys.config),
"local_to_extern_path",
return_value="/addon/path/on/host",
),
):
await install_addon_ssh.load()
assert (
"Can't build local/amd64-addon-ssh:9.2.1: Docker build failed for local/amd64-addon-ssh:9.2.1 (exit code 1). Build output:\nerror"
in caplog.text
)
assert "Can't build local/amd64-addon-ssh:9.2.1" in caplog.text
# Image pull failure
install_addon_ssh.data["image"] = "test/amd64-addon-ssh"
coresys.docker.images.build.reset_mock(side_effect=True)
coresys.docker.pull_image.side_effect = DockerException()
caplog.clear()
with patch.object(DockerAPI, "pull_image", side_effect=pull_image_exc):
await install_addon_ssh.load()
await install_addon_ssh.load()
assert "Unknown error with test/amd64-addon-ssh:9.2.1" in caplog.text

View File

@@ -419,71 +419,3 @@ def test_valid_schema():
config["schema"] = {"field": "invalid"}
with pytest.raises(vol.Invalid):
assert vd.SCHEMA_ADDON_CONFIG(config)
def test_ulimits_simple_format():
"""Test ulimits simple format validation."""
config = load_json_fixture("basic-addon-config.json")
config["ulimits"] = {"nofile": 65535, "nproc": 32768, "memlock": 134217728}
valid_config = vd.SCHEMA_ADDON_CONFIG(config)
assert valid_config["ulimits"]["nofile"] == 65535
assert valid_config["ulimits"]["nproc"] == 32768
assert valid_config["ulimits"]["memlock"] == 134217728
def test_ulimits_detailed_format():
"""Test ulimits detailed format validation."""
config = load_json_fixture("basic-addon-config.json")
config["ulimits"] = {
"nofile": {"soft": 20000, "hard": 40000},
"nproc": 32768, # Mixed format should work
"memlock": {"soft": 67108864, "hard": 134217728},
}
valid_config = vd.SCHEMA_ADDON_CONFIG(config)
assert valid_config["ulimits"]["nofile"]["soft"] == 20000
assert valid_config["ulimits"]["nofile"]["hard"] == 40000
assert valid_config["ulimits"]["nproc"] == 32768
assert valid_config["ulimits"]["memlock"]["soft"] == 67108864
assert valid_config["ulimits"]["memlock"]["hard"] == 134217728
def test_ulimits_empty_dict():
"""Test ulimits with empty dict (default)."""
config = load_json_fixture("basic-addon-config.json")
valid_config = vd.SCHEMA_ADDON_CONFIG(config)
assert valid_config["ulimits"] == {}
def test_ulimits_invalid_values():
"""Test ulimits with invalid values."""
config = load_json_fixture("basic-addon-config.json")
# Invalid string values
config["ulimits"] = {"nofile": "invalid"}
with pytest.raises(vol.Invalid):
vd.SCHEMA_ADDON_CONFIG(config)
# Invalid detailed format
config["ulimits"] = {"nofile": {"invalid_key": 1000}}
with pytest.raises(vol.Invalid):
vd.SCHEMA_ADDON_CONFIG(config)
# Missing hard value in detailed format
config["ulimits"] = {"nofile": {"soft": 1000}}
with pytest.raises(vol.Invalid):
vd.SCHEMA_ADDON_CONFIG(config)
# Missing soft value in detailed format
config["ulimits"] = {"nofile": {"hard": 1000}}
with pytest.raises(vol.Invalid):
vd.SCHEMA_ADDON_CONFIG(config)
# Empty dict in detailed format
config["ulimits"] = {"nofile": {}}
with pytest.raises(vol.Invalid):
vd.SCHEMA_ADDON_CONFIG(config)

View File

@@ -4,7 +4,7 @@ import asyncio
from collections.abc import AsyncGenerator, Generator
from copy import deepcopy
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, Mock, PropertyMock, call, patch
from unittest.mock import AsyncMock, MagicMock, Mock, PropertyMock, patch
from awesomeversion import AwesomeVersion
import pytest
@@ -514,13 +514,19 @@ async def test_shared_image_kept_on_uninstall(
latest = f"{install_addon_example.image}:latest"
await coresys.addons.uninstall("local_example2")
coresys.docker.images.delete.assert_not_called()
coresys.docker.images.remove.assert_not_called()
assert not coresys.addons.get("local_example2", local_only=True)
await coresys.addons.uninstall("local_example")
assert coresys.docker.images.delete.call_count == 2
assert coresys.docker.images.delete.call_args_list[0] == call(latest, force=True)
assert coresys.docker.images.delete.call_args_list[1] == call(image, force=True)
assert coresys.docker.images.remove.call_count == 2
assert coresys.docker.images.remove.call_args_list[0].kwargs == {
"image": latest,
"force": True,
}
assert coresys.docker.images.remove.call_args_list[1].kwargs == {
"image": image,
"force": True,
}
assert not coresys.addons.get("local_example", local_only=True)
@@ -548,17 +554,19 @@ async def test_shared_image_kept_on_update(
assert example_2.version == "1.2.0"
assert install_addon_example_image.version == "1.2.0"
image_new = {"Id": "image_new", "RepoTags": ["image_new:latest"]}
image_old = {"Id": "image_old", "RepoTags": ["image_old:latest"]}
docker.images.inspect.side_effect = [image_new, image_old]
image_new = MagicMock()
image_new.id = "image_new"
image_old = MagicMock()
image_old.id = "image_old"
docker.images.get.side_effect = [image_new, image_old]
docker.images.list.return_value = [image_new, image_old]
with patch.object(DockerAPI, "pull_image", return_value=image_new):
await coresys.addons.update("local_example2")
docker.images.delete.assert_not_called()
docker.images.remove.assert_not_called()
assert example_2.version == "1.3.0"
docker.images.inspect.side_effect = [image_new]
docker.images.get.side_effect = [image_new]
await coresys.addons.update("local_example_image")
docker.images.delete.assert_called_once_with("image_old", force=True)
docker.images.remove.assert_called_once_with("image_old", force=True)
assert install_addon_example_image.version == "1.3.0"

View File

@@ -2,19 +2,16 @@
import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
from unittest.mock import MagicMock, PropertyMock, patch
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
import pytest
from supervisor.backups.manager import BackupManager
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.docker.homeassistant import DockerHomeAssistant
from supervisor.docker.interface import DockerInterface
from supervisor.homeassistant.api import APIState, HomeAssistantAPI
from supervisor.homeassistant.const import WSEvent
from supervisor.homeassistant.api import APIState
from supervisor.homeassistant.core import HomeAssistantCore
from supervisor.homeassistant.module import HomeAssistant
@@ -274,96 +271,3 @@ async def test_background_home_assistant_update_fails_fast(
assert resp.status == 400
body = await resp.json()
assert body["message"] == "Version 2025.8.3 is already installed"
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_api_progress_updates_home_assistant_update(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
):
"""Test progress updates sent to Home Assistant for updates."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.core.set_state(CoreState.RUNNING)
coresys.docker.dockerpy.api.pull.return_value = load_json_fixture(
"docker_pull_image_log.json"
)
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
with (
patch.object(
DockerHomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2025.8.0")),
),
patch.object(
HomeAssistantAPI, "get_config", return_value={"components": ["frontend"]}
),
):
resp = await api_client.post("/core/update", json={"version": "2025.8.3"})
assert resp.status == 200
events = [
{
"stage": evt.args[0]["data"]["data"]["stage"],
"progress": evt.args[0]["data"]["data"]["progress"],
"done": evt.args[0]["data"]["data"]["done"],
}
for evt in ha_ws_client.async_send_command.call_args_list
if "data" in evt.args[0]
and evt.args[0]["data"]["event"] == WSEvent.JOB
and evt.args[0]["data"]["data"]["name"] == "home_assistant_core_update"
]
assert events[:5] == [
{
"stage": None,
"progress": 0,
"done": None,
},
{
"stage": None,
"progress": 0,
"done": False,
},
{
"stage": None,
"progress": 0.1,
"done": False,
},
{
"stage": None,
"progress": 1.2,
"done": False,
},
{
"stage": None,
"progress": 2.8,
"done": False,
},
]
assert events[-5:] == [
{
"stage": None,
"progress": 97.2,
"done": False,
},
{
"stage": None,
"progress": 98.4,
"done": False,
},
{
"stage": None,
"progress": 99.4,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": True,
},
]

View File

@@ -41,11 +41,9 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys):
async def test_api_integrity_check(
api_client, coresys: CoreSys, supervisor_internet: AsyncMock
):
"""Test security integrity check."""
coresys.security.content_trust = False
"""Test security integrity check - now deprecated."""
resp = await api_client.post("/security/integrity")
result = await resp.json()
assert result["data"]["core"] == "untested"
assert result["data"]["supervisor"] == "untested"
# CodeNotary integrity check has been removed
assert "error" in result["data"]

View File

@@ -13,13 +13,12 @@ from supervisor.addons.addon import Addon
from supervisor.arch import CpuArch
from supervisor.backups.manager import BackupManager
from supervisor.config import CoreConfig
from supervisor.const import AddonState, CoreState
from supervisor.const import AddonState
from supervisor.coresys import CoreSys
from supervisor.docker.addon import DockerAddon
from supervisor.docker.const import ContainerState
from supervisor.docker.interface import DockerInterface
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.homeassistant.const import WSEvent
from supervisor.homeassistant.module import HomeAssistant
from supervisor.store.addon import AddonStore
from supervisor.store.repository import Repository
@@ -710,101 +709,3 @@ async def test_api_store_addons_addon_availability_installed_addon(
assert (
"requires Home Assistant version 2023.1.1 or greater" in result["message"]
)
@pytest.mark.parametrize(
("action", "job_name", "addon_slug"),
[
("install", "addon_manager_install", "local_ssh"),
("update", "addon_manager_update", "local_example"),
],
)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_api_progress_updates_addon_install_update(
api_client: TestClient,
coresys: CoreSys,
ha_ws_client: AsyncMock,
install_addon_example: Addon,
action: str,
job_name: str,
addon_slug: str,
):
"""Test progress updates sent to Home Assistant for installs/updates."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.core.set_state(CoreState.RUNNING)
coresys.docker.dockerpy.api.pull.return_value = load_json_fixture(
"docker_pull_image_log.json"
)
coresys.arch._supported_arch = ["amd64"] # pylint: disable=protected-access
install_addon_example.data_store["version"] = AwesomeVersion("2.0.0")
with (
patch.object(Addon, "load"),
patch.object(Addon, "need_build", new=PropertyMock(return_value=False)),
patch.object(Addon, "latest_need_build", new=PropertyMock(return_value=False)),
):
resp = await api_client.post(f"/store/addons/{addon_slug}/{action}")
assert resp.status == 200
events = [
{
"stage": evt.args[0]["data"]["data"]["stage"],
"progress": evt.args[0]["data"]["data"]["progress"],
"done": evt.args[0]["data"]["data"]["done"],
}
for evt in ha_ws_client.async_send_command.call_args_list
if "data" in evt.args[0]
and evt.args[0]["data"]["event"] == WSEvent.JOB
and evt.args[0]["data"]["data"]["name"] == job_name
and evt.args[0]["data"]["data"]["reference"] == addon_slug
]
assert events[:4] == [
{
"stage": None,
"progress": 0,
"done": False,
},
{
"stage": None,
"progress": 0.1,
"done": False,
},
{
"stage": None,
"progress": 1.2,
"done": False,
},
{
"stage": None,
"progress": 2.8,
"done": False,
},
]
assert events[-5:] == [
{
"stage": None,
"progress": 97.2,
"done": False,
},
{
"stage": None,
"progress": 98.4,
"done": False,
},
{
"stage": None,
"progress": 99.4,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": True,
},
]

View File

@@ -2,24 +2,17 @@
# pylint: disable=protected-access
import time
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
from blockbuster import BlockingError
import pytest
from supervisor.const import CoreState
from supervisor.core import Core
from supervisor.coresys import CoreSys
from supervisor.exceptions import HassioError, HostNotSupportedError, StoreGitError
from supervisor.homeassistant.const import WSEvent
from supervisor.store.repository import Repository
from supervisor.supervisor import Supervisor
from supervisor.updater import Updater
from tests.api import common_test_api_advanced_logs
from tests.common import load_json_fixture
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService
@@ -323,97 +316,3 @@ async def test_api_supervisor_options_blocking_io(
# This should not raise blocking error anymore
time.sleep(0)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_api_progress_updates_supervisor_update(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
):
"""Test progress updates sent to Home Assistant for updates."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.core.set_state(CoreState.RUNNING)
coresys.docker.dockerpy.api.pull.return_value = load_json_fixture(
"docker_pull_image_log.json"
)
with (
patch.object(
Supervisor,
"version",
new=PropertyMock(return_value=AwesomeVersion("2025.08.0")),
),
patch.object(
Updater,
"version_supervisor",
new=PropertyMock(return_value=AwesomeVersion("2025.08.3")),
),
patch.object(
Updater, "image_supervisor", new=PropertyMock(return_value="supervisor")
),
patch.object(Supervisor, "update_apparmor"),
patch.object(Core, "stop"),
):
resp = await api_client.post("/supervisor/update")
assert resp.status == 200
events = [
{
"stage": evt.args[0]["data"]["data"]["stage"],
"progress": evt.args[0]["data"]["data"]["progress"],
"done": evt.args[0]["data"]["data"]["done"],
}
for evt in ha_ws_client.async_send_command.call_args_list
if "data" in evt.args[0]
and evt.args[0]["data"]["event"] == WSEvent.JOB
and evt.args[0]["data"]["data"]["name"] == "supervisor_update"
]
assert events[:4] == [
{
"stage": None,
"progress": 0,
"done": False,
},
{
"stage": None,
"progress": 0.1,
"done": False,
},
{
"stage": None,
"progress": 1.2,
"done": False,
},
{
"stage": None,
"progress": 2.8,
"done": False,
},
]
assert events[-5:] == [
{
"stage": None,
"progress": 97.2,
"done": False,
},
{
"stage": None,
"progress": 98.4,
"done": False,
},
{
"stage": None,
"progress": 99.4,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": True,
},
]

View File

@@ -9,7 +9,6 @@ import subprocess
from unittest.mock import AsyncMock, MagicMock, Mock, PropertyMock, patch
from uuid import uuid4
from aiodocker.docker import DockerImages
from aiohttp import ClientSession, web
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
@@ -113,15 +112,13 @@ async def supervisor_name() -> None:
@pytest.fixture
async def docker() -> DockerAPI:
"""Mock DockerAPI."""
image_inspect = {
"Os": "linux",
"Architecture": "amd64",
"Id": "test123",
"RepoTags": ["ghcr.io/home-assistant/amd64-hassio-supervisor:latest"],
}
images = [MagicMock(tags=["ghcr.io/home-assistant/amd64-hassio-supervisor:latest"])]
image = MagicMock()
image.attrs = {"Os": "linux", "Architecture": "amd64"}
with (
patch("supervisor.docker.manager.DockerClient", return_value=MagicMock()),
patch("supervisor.docker.manager.DockerAPI.images", return_value=MagicMock()),
patch(
"supervisor.docker.manager.DockerAPI.containers", return_value=MagicMock()
),
@@ -129,30 +126,19 @@ async def docker() -> DockerAPI:
"supervisor.docker.manager.DockerAPI.api",
return_value=(api_mock := MagicMock()),
),
patch("supervisor.docker.manager.DockerAPI.images.get", return_value=image),
patch("supervisor.docker.manager.DockerAPI.images.list", return_value=images),
patch(
"supervisor.docker.manager.DockerAPI.info",
return_value=MagicMock(),
),
patch("supervisor.docker.manager.DockerAPI.unload"),
patch("supervisor.docker.manager.aiodocker.Docker", return_value=MagicMock()),
patch(
"supervisor.docker.manager.DockerAPI.images",
new=PropertyMock(
return_value=(docker_images := MagicMock(spec=DockerImages))
),
),
):
docker_obj = await DockerAPI(MagicMock()).post_init()
docker_obj.config._data = {"registries": {}}
with patch("supervisor.docker.monitor.DockerMonitor.load"):
await docker_obj.load()
docker_images.inspect.return_value = image_inspect
docker_images.list.return_value = [image_inspect]
docker_images.import_image.return_value = [
{"stream": "Loaded image: test:latest\n"}
]
docker_obj.info.logging = "journald"
docker_obj.info.storage = "overlay2"
docker_obj.info.version = AwesomeVersion("1.0.0")
@@ -852,9 +838,11 @@ async def container(docker: DockerAPI) -> MagicMock:
"""Mock attrs and status for container on attach."""
docker.containers.get.return_value = addon = MagicMock()
docker.containers.create.return_value = addon
docker.images.build.return_value = (addon, "")
addon.status = "stopped"
addon.attrs = {"State": {"ExitCode": 0}}
yield addon
with patch.object(DockerAPI, "pull_image", return_value=addon):
yield addon
@pytest.fixture

View File

@@ -503,93 +503,3 @@ async def test_addon_new_device_no_haos(
await install_addon_ssh.stop()
assert coresys.resolution.issues == []
assert coresys.resolution.suggestions == []
async def test_ulimits_integration(
coresys: CoreSys,
install_addon_ssh: Addon,
):
"""Test ulimits integration with Docker addon."""
docker_addon = DockerAddon(coresys, install_addon_ssh)
# Test default case (no ulimits, no realtime)
assert docker_addon.ulimits is None
# Test with realtime enabled (should have built-in ulimits)
install_addon_ssh.data["realtime"] = True
ulimits = docker_addon.ulimits
assert ulimits is not None
assert len(ulimits) == 2
# Check for rtprio limit
rtprio_limit = next((u for u in ulimits if u.name == "rtprio"), None)
assert rtprio_limit is not None
assert rtprio_limit.soft == 90
assert rtprio_limit.hard == 99
# Check for memlock limit
memlock_limit = next((u for u in ulimits if u.name == "memlock"), None)
assert memlock_limit is not None
assert memlock_limit.soft == 128 * 1024 * 1024
assert memlock_limit.hard == 128 * 1024 * 1024
# Test with configurable ulimits (simple format)
install_addon_ssh.data["realtime"] = False
install_addon_ssh.data["ulimits"] = {"nofile": 65535, "nproc": 32768}
ulimits = docker_addon.ulimits
assert ulimits is not None
assert len(ulimits) == 2
nofile_limit = next((u for u in ulimits if u.name == "nofile"), None)
assert nofile_limit is not None
assert nofile_limit.soft == 65535
assert nofile_limit.hard == 65535
nproc_limit = next((u for u in ulimits if u.name == "nproc"), None)
assert nproc_limit is not None
assert nproc_limit.soft == 32768
assert nproc_limit.hard == 32768
# Test with configurable ulimits (detailed format)
install_addon_ssh.data["ulimits"] = {
"nofile": {"soft": 20000, "hard": 40000},
"memlock": {"soft": 67108864, "hard": 134217728},
}
ulimits = docker_addon.ulimits
assert ulimits is not None
assert len(ulimits) == 2
nofile_limit = next((u for u in ulimits if u.name == "nofile"), None)
assert nofile_limit is not None
assert nofile_limit.soft == 20000
assert nofile_limit.hard == 40000
memlock_limit = next((u for u in ulimits if u.name == "memlock"), None)
assert memlock_limit is not None
assert memlock_limit.soft == 67108864
assert memlock_limit.hard == 134217728
# Test mixed format and realtime (realtime + custom ulimits)
install_addon_ssh.data["realtime"] = True
install_addon_ssh.data["ulimits"] = {
"nofile": 65535,
"core": {"soft": 0, "hard": 0}, # Disable core dumps
}
ulimits = docker_addon.ulimits
assert ulimits is not None
assert (
len(ulimits) == 4
) # rtprio, memlock (from realtime) + nofile, core (from config)
# Check realtime limits still present
rtprio_limit = next((u for u in ulimits if u.name == "rtprio"), None)
assert rtprio_limit is not None
# Check custom limits added
nofile_limit = next((u for u in ulimits if u.name == "nofile"), None)
assert nofile_limit is not None
assert nofile_limit.soft == 65535
assert nofile_limit.hard == 65535
core_limit = next((u for u in ulimits if u.name == "core"), None)
assert core_limit is not None
assert core_limit.soft == 0
assert core_limit.hard == 0

View File

@@ -5,10 +5,10 @@ from pathlib import Path
from typing import Any
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, PropertyMock, call, patch
import aiodocker
from awesomeversion import AwesomeVersion
from docker.errors import DockerException, NotFound
from docker.models.containers import Container
from docker.models.images import Image
import pytest
from requests import RequestException
@@ -26,20 +26,12 @@ from supervisor.exceptions import (
DockerNotFound,
DockerRequestError,
)
from supervisor.homeassistant.const import WSEvent
from supervisor.jobs import JobSchedulerOptions, SupervisorJob
from tests.common import load_json_fixture
@pytest.fixture(autouse=True)
def mock_verify_content(coresys: CoreSys):
"""Mock verify_content utility during tests."""
with patch.object(
coresys.security, "verify_content", return_value=None
) as verify_content:
yield verify_content
@pytest.mark.parametrize(
"cpu_arch, platform",
[
@@ -57,30 +49,35 @@ async def test_docker_image_platform(
platform: str,
):
"""Test platform set correctly from arch."""
coresys.docker.images.inspect.return_value = {"Id": "test:1.2.3"}
await test_docker_interface.install(AwesomeVersion("1.2.3"), "test", arch=cpu_arch)
coresys.docker.dockerpy.api.pull.assert_called_once_with(
"test", tag="1.2.3", platform=platform, stream=True, decode=True
)
coresys.docker.images.inspect.assert_called_once_with("test:1.2.3")
with patch.object(
coresys.docker.images, "get", return_value=Mock(id="test:1.2.3")
) as get:
await test_docker_interface.install(
AwesomeVersion("1.2.3"), "test", arch=cpu_arch
)
coresys.docker.docker.api.pull.assert_called_once_with(
"test", tag="1.2.3", platform=platform, stream=True, decode=True
)
get.assert_called_once_with("test:1.2.3")
async def test_docker_image_default_platform(
coresys: CoreSys, test_docker_interface: DockerInterface
):
"""Test platform set using supervisor arch when omitted."""
coresys.docker.images.inspect.return_value = {"Id": "test:1.2.3"}
with (
patch.object(
type(coresys.supervisor), "arch", PropertyMock(return_value="i386")
),
patch.object(
coresys.docker.images, "get", return_value=Mock(id="test:1.2.3")
) as get,
):
await test_docker_interface.install(AwesomeVersion("1.2.3"), "test")
coresys.docker.dockerpy.api.pull.assert_called_once_with(
coresys.docker.docker.api.pull.assert_called_once_with(
"test", tag="1.2.3", platform="linux/386", stream=True, decode=True
)
coresys.docker.images.inspect.assert_called_once_with("test:1.2.3")
get.assert_called_once_with("test:1.2.3")
@pytest.mark.parametrize(
@@ -211,40 +208,57 @@ async def test_attach_existing_container(
async def test_attach_container_failure(coresys: CoreSys):
"""Test attach fails to find container but finds image."""
coresys.docker.containers.get.side_effect = DockerException()
coresys.docker.images.inspect.return_value.setdefault("Config", {})["Image"] = (
"sha256:abc123"
)
with patch.object(type(coresys.bus), "fire_event") as fire_event:
container_collection = MagicMock()
container_collection.get.side_effect = DockerException()
image_collection = MagicMock()
image_config = {"Image": "sha256:abc123"}
image_collection.get.return_value = Image({"Config": image_config})
with (
patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
),
patch(
"supervisor.docker.manager.DockerAPI.images",
new=PropertyMock(return_value=image_collection),
),
patch.object(type(coresys.bus), "fire_event") as fire_event,
):
await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3"))
assert not [
event
for event in fire_event.call_args_list
if event.args[0] == BusEvent.DOCKER_CONTAINER_STATE_CHANGE
]
assert (
coresys.homeassistant.core.instance.meta_config["Image"] == "sha256:abc123"
)
assert coresys.homeassistant.core.instance.meta_config == image_config
async def test_attach_total_failure(coresys: CoreSys):
"""Test attach fails to find container or image."""
coresys.docker.containers.get.side_effect = DockerException
coresys.docker.images.inspect.side_effect = aiodocker.DockerError(
400, {"message": ""}
)
with pytest.raises(DockerError):
container_collection = MagicMock()
container_collection.get.side_effect = DockerException()
image_collection = MagicMock()
image_collection.get.side_effect = DockerException()
with (
patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
),
patch(
"supervisor.docker.manager.DockerAPI.images",
new=PropertyMock(return_value=image_collection),
),
pytest.raises(DockerError),
):
await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3"))
@pytest.mark.parametrize(
"err", [aiodocker.DockerError(400, {"message": ""}), RequestException()]
)
@pytest.mark.parametrize("err", [DockerException(), RequestException()])
async def test_image_pull_fail(
coresys: CoreSys, capture_exception: Mock, err: Exception
):
"""Test failure to pull image."""
coresys.docker.images.inspect.side_effect = err
coresys.docker.images.get.side_effect = err
with pytest.raises(DockerError):
await coresys.homeassistant.core.instance.install(
AwesomeVersion("2022.7.3"), arch=CpuArch.AMD64
@@ -277,7 +291,7 @@ async def test_install_fires_progress_events(
):
"""Test progress events are fired during an install for listeners."""
# This is from a sample pull. Filtered log to just one per unique status for test
coresys.docker.dockerpy.api.pull.return_value = [
coresys.docker.docker.api.pull.return_value = [
{
"status": "Pulling from home-assistant/odroid-n2-homeassistant",
"id": "2025.7.2",
@@ -321,10 +335,10 @@ async def test_install_fires_progress_events(
),
):
await test_docker_interface.install(AwesomeVersion("1.2.3"), "test")
coresys.docker.dockerpy.api.pull.assert_called_once_with(
coresys.docker.docker.api.pull.assert_called_once_with(
"test", tag="1.2.3", platform="linux/386", stream=True, decode=True
)
coresys.docker.images.inspect.assert_called_once_with("test:1.2.3")
coresys.docker.images.get.assert_called_once_with("test:1.2.3")
await asyncio.sleep(1)
assert events == [
@@ -394,18 +408,197 @@ async def test_install_fires_progress_events(
]
async def test_install_sends_progress_to_home_assistant(
coresys: CoreSys, test_docker_interface: DockerInterface, ha_ws_client: AsyncMock
):
"""Test progress events are sent as job updates to Home Assistant."""
coresys.core.set_state(CoreState.RUNNING)
coresys.docker.docker.api.pull.return_value = load_json_fixture(
"docker_pull_image_log.json"
)
with (
patch.object(
type(coresys.supervisor), "arch", PropertyMock(return_value="i386")
),
):
# Schedule job so we can listen for the end. Then we can assert against the WS mock
event = asyncio.Event()
job, install_task = coresys.jobs.schedule_job(
test_docker_interface.install,
JobSchedulerOptions(),
AwesomeVersion("1.2.3"),
"test",
)
async def listen_for_job_end(reference: SupervisorJob):
if reference.uuid != job.uuid:
return
event.set()
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, listen_for_job_end)
await install_task
await event.wait()
events = [
evt.args[0]["data"]["data"]
for evt in ha_ws_client.async_send_command.call_args_list
if "data" in evt.args[0] and evt.args[0]["data"]["event"] == WSEvent.JOB
]
assert events[0]["name"] == "docker_interface_install"
assert events[0]["uuid"] == job.uuid
assert events[0]["done"] is None
assert events[1]["name"] == "docker_interface_install"
assert events[1]["uuid"] == job.uuid
assert events[1]["done"] is False
assert events[-1]["name"] == "docker_interface_install"
assert events[-1]["uuid"] == job.uuid
assert events[-1]["done"] is True
def make_sub_log(layer_id: str):
return [
{
"stage": evt["stage"],
"progress": evt["progress"],
"done": evt["done"],
"extra": evt["extra"],
}
for evt in events
if evt["name"] == "Pulling container image layer"
and evt["reference"] == layer_id
and evt["parent_id"] == job.uuid
]
layer_1_log = make_sub_log("1e214cd6d7d0")
layer_2_log = make_sub_log("1a38e1d5e18d")
assert len(layer_1_log) == 20
assert len(layer_2_log) == 19
assert len(events) == 42
assert layer_1_log == [
{"stage": "Pulling fs layer", "progress": 0, "done": False, "extra": None},
{
"stage": "Downloading",
"progress": 0.1,
"done": False,
"extra": {"current": 539462, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 0.6,
"done": False,
"extra": {"current": 4864838, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 0.9,
"done": False,
"extra": {"current": 7552896, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 1.2,
"done": False,
"extra": {"current": 10252544, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 2.9,
"done": False,
"extra": {"current": 25369792, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 11.9,
"done": False,
"extra": {"current": 103619904, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 26.1,
"done": False,
"extra": {"current": 227726144, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 49.6,
"done": False,
"extra": {"current": 433170048, "total": 436480882},
},
{
"stage": "Verifying Checksum",
"progress": 50,
"done": False,
"extra": {"current": 433170048, "total": 436480882},
},
{
"stage": "Download complete",
"progress": 50,
"done": False,
"extra": {"current": 433170048, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 50.1,
"done": False,
"extra": {"current": 557056, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 60.3,
"done": False,
"extra": {"current": 89686016, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 70.0,
"done": False,
"extra": {"current": 174358528, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 80.0,
"done": False,
"extra": {"current": 261816320, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 88.4,
"done": False,
"extra": {"current": 334790656, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 94.0,
"done": False,
"extra": {"current": 383811584, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 99.9,
"done": False,
"extra": {"current": 435617792, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 100.0,
"done": False,
"extra": {"current": 436480882, "total": 436480882},
},
{
"stage": "Pull complete",
"progress": 100.0,
"done": True,
"extra": {"current": 436480882, "total": 436480882},
},
]
async def test_install_progress_rounding_does_not_cause_misses(
coresys: CoreSys,
test_docker_interface: DockerInterface,
ha_ws_client: AsyncMock,
capture_exception: Mock,
coresys: CoreSys, test_docker_interface: DockerInterface, ha_ws_client: AsyncMock
):
"""Test extremely close progress events do not create rounding issues."""
coresys.core.set_state(CoreState.RUNNING)
# Current numbers chosen to create a rounding issue with original code
# Where a progress update came in with a value between the actual previous
# value and what it was rounded to. It should not raise an out of order exception
coresys.docker.dockerpy.api.pull.return_value = [
coresys.docker.docker.api.pull.return_value = [
{
"status": "Pulling from home-assistant/odroid-n2-homeassistant",
"id": "2025.7.1",
@@ -469,7 +662,65 @@ async def test_install_progress_rounding_does_not_cause_misses(
await install_task
await event.wait()
capture_exception.assert_not_called()
events = [
evt.args[0]["data"]["data"]
for evt in ha_ws_client.async_send_command.call_args_list
if "data" in evt.args[0]
and evt.args[0]["data"]["event"] == WSEvent.JOB
and evt.args[0]["data"]["data"]["reference"] == "1e214cd6d7d0"
and evt.args[0]["data"]["data"]["stage"] in {"Downloading", "Extracting"}
]
assert events == [
{
"name": "Pulling container image layer",
"stage": "Downloading",
"progress": 49.6,
"done": False,
"extra": {"current": 432700000, "total": 436480882},
"reference": "1e214cd6d7d0",
"parent_id": job.uuid,
"errors": [],
"uuid": ANY,
"created": ANY,
},
{
"name": "Pulling container image layer",
"stage": "Downloading",
"progress": 49.6,
"done": False,
"extra": {"current": 432800000, "total": 436480882},
"reference": "1e214cd6d7d0",
"parent_id": job.uuid,
"errors": [],
"uuid": ANY,
"created": ANY,
},
{
"name": "Pulling container image layer",
"stage": "Extracting",
"progress": 99.6,
"done": False,
"extra": {"current": 432700000, "total": 436480882},
"reference": "1e214cd6d7d0",
"parent_id": job.uuid,
"errors": [],
"uuid": ANY,
"created": ANY,
},
{
"name": "Pulling container image layer",
"stage": "Extracting",
"progress": 99.6,
"done": False,
"extra": {"current": 432800000, "total": 436480882},
"reference": "1e214cd6d7d0",
"parent_id": job.uuid,
"errors": [],
"uuid": ANY,
"created": ANY,
},
]
@pytest.mark.parametrize(
@@ -500,7 +751,7 @@ async def test_install_raises_on_pull_error(
exc_msg: str,
):
"""Test exceptions raised from errors in pull log."""
coresys.docker.dockerpy.api.pull.return_value = [
coresys.docker.docker.api.pull.return_value = [
{
"status": "Pulling from home-assistant/odroid-n2-homeassistant",
"id": "2025.7.2",
@@ -519,16 +770,11 @@ async def test_install_raises_on_pull_error(
async def test_install_progress_handles_download_restart(
coresys: CoreSys,
test_docker_interface: DockerInterface,
ha_ws_client: AsyncMock,
capture_exception: Mock,
coresys: CoreSys, test_docker_interface: DockerInterface, ha_ws_client: AsyncMock
):
"""Test install handles docker progress events that include a download restart."""
coresys.core.set_state(CoreState.RUNNING)
# Fixture emulates a download restart as it docker logs it
# A log out of order exception should not be raised
coresys.docker.dockerpy.api.pull.return_value = load_json_fixture(
coresys.docker.docker.api.pull.return_value = load_json_fixture(
"docker_pull_image_log_restart.json"
)
@@ -555,4 +801,106 @@ async def test_install_progress_handles_download_restart(
await install_task
await event.wait()
capture_exception.assert_not_called()
events = [
evt.args[0]["data"]["data"]
for evt in ha_ws_client.async_send_command.call_args_list
if "data" in evt.args[0] and evt.args[0]["data"]["event"] == WSEvent.JOB
]
def make_sub_log(layer_id: str):
return [
{
"stage": evt["stage"],
"progress": evt["progress"],
"done": evt["done"],
"extra": evt["extra"],
}
for evt in events
if evt["name"] == "Pulling container image layer"
and evt["reference"] == layer_id
and evt["parent_id"] == job.uuid
]
layer_1_log = make_sub_log("1e214cd6d7d0")
assert len(layer_1_log) == 14
assert layer_1_log == [
{"stage": "Pulling fs layer", "progress": 0, "done": False, "extra": None},
{
"stage": "Downloading",
"progress": 11.9,
"done": False,
"extra": {"current": 103619904, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 26.1,
"done": False,
"extra": {"current": 227726144, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 49.6,
"done": False,
"extra": {"current": 433170048, "total": 436480882},
},
{
"stage": "Retrying download",
"progress": 0,
"done": False,
"extra": None,
},
{
"stage": "Retrying download",
"progress": 0,
"done": False,
"extra": None,
},
{
"stage": "Downloading",
"progress": 11.9,
"done": False,
"extra": {"current": 103619904, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 26.1,
"done": False,
"extra": {"current": 227726144, "total": 436480882},
},
{
"stage": "Downloading",
"progress": 49.6,
"done": False,
"extra": {"current": 433170048, "total": 436480882},
},
{
"stage": "Verifying Checksum",
"progress": 50,
"done": False,
"extra": {"current": 433170048, "total": 436480882},
},
{
"stage": "Download complete",
"progress": 50,
"done": False,
"extra": {"current": 433170048, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 80.0,
"done": False,
"extra": {"current": 261816320, "total": 436480882},
},
{
"stage": "Extracting",
"progress": 100.0,
"done": False,
"extra": {"current": 436480882, "total": 436480882},
},
{
"stage": "Pull complete",
"progress": 100.0,
"done": True,
"extra": {"current": 436480882, "total": 436480882},
},
]

View File

@@ -1,10 +1,9 @@
"""Test Docker manager."""
import asyncio
from pathlib import Path
from unittest.mock import MagicMock, patch
from docker.errors import APIError, DockerException, NotFound
from docker.errors import DockerException
import pytest
from requests import RequestException
@@ -21,7 +20,7 @@ async def test_run_command_success(docker: DockerAPI):
mock_container.logs.return_value = b"command output"
# Mock docker containers.run to return our mock container
docker.dockerpy.containers.run.return_value = mock_container
docker.docker.containers.run.return_value = mock_container
# Execute the command
result = docker.run_command(
@@ -34,7 +33,7 @@ async def test_run_command_success(docker: DockerAPI):
assert result.output == b"command output"
# Verify docker.containers.run was called correctly
docker.dockerpy.containers.run.assert_called_once_with(
docker.docker.containers.run.assert_called_once_with(
"alpine:3.18",
command="echo hello",
detach=True,
@@ -56,7 +55,7 @@ async def test_run_command_with_defaults(docker: DockerAPI):
mock_container.logs.return_value = b"error output"
# Mock docker containers.run to return our mock container
docker.dockerpy.containers.run.return_value = mock_container
docker.docker.containers.run.return_value = mock_container
# Execute the command with minimal parameters
result = docker.run_command(image="ubuntu")
@@ -67,7 +66,7 @@ async def test_run_command_with_defaults(docker: DockerAPI):
assert result.output == b"error output"
# Verify docker.containers.run was called with defaults
docker.dockerpy.containers.run.assert_called_once_with(
docker.docker.containers.run.assert_called_once_with(
"ubuntu:latest", # default tag
command=None, # default command
detach=True,
@@ -82,7 +81,7 @@ async def test_run_command_with_defaults(docker: DockerAPI):
async def test_run_command_docker_exception(docker: DockerAPI):
"""Test command execution when Docker raises an exception."""
# Mock docker containers.run to raise DockerException
docker.dockerpy.containers.run.side_effect = DockerException("Docker error")
docker.docker.containers.run.side_effect = DockerException("Docker error")
# Execute the command and expect DockerError
with pytest.raises(DockerError, match="Can't execute command: Docker error"):
@@ -92,7 +91,7 @@ async def test_run_command_docker_exception(docker: DockerAPI):
async def test_run_command_request_exception(docker: DockerAPI):
"""Test command execution when requests raises an exception."""
# Mock docker containers.run to raise RequestException
docker.dockerpy.containers.run.side_effect = RequestException("Connection error")
docker.docker.containers.run.side_effect = RequestException("Connection error")
# Execute the command and expect DockerError
with pytest.raises(DockerError, match="Can't execute command: Connection error"):
@@ -105,7 +104,7 @@ async def test_run_command_cleanup_on_exception(docker: DockerAPI):
mock_container = MagicMock()
# Mock docker.containers.run to return container, but container.wait to raise exception
docker.dockerpy.containers.run.return_value = mock_container
docker.docker.containers.run.return_value = mock_container
mock_container.wait.side_effect = DockerException("Wait failed")
# Execute the command and expect DockerError
@@ -124,7 +123,7 @@ async def test_run_command_custom_stdout_stderr(docker: DockerAPI):
mock_container.logs.return_value = b"output"
# Mock docker containers.run to return our mock container
docker.dockerpy.containers.run.return_value = mock_container
docker.docker.containers.run.return_value = mock_container
# Execute the command with custom stdout/stderr
result = docker.run_command(
@@ -151,7 +150,7 @@ async def test_run_container_with_cidfile(
cidfile_path = coresys.config.path_cid_files / f"{container_name}.cid"
extern_cidfile_path = coresys.config.path_extern_cid_files / f"{container_name}.cid"
docker.dockerpy.containers.run.return_value = mock_container
docker.docker.containers.run.return_value = mock_container
# Mock container creation
with patch.object(
@@ -352,101 +351,3 @@ async def test_run_container_with_leftover_cidfile_directory(
assert cidfile_path.read_text() == mock_container.id
assert result == mock_container
async def test_repair(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Test repair API."""
coresys.docker.dockerpy.networks.get.side_effect = [
hassio := MagicMock(
attrs={
"Containers": {
"good": {"Name": "good"},
"corrupt": {"Name": "corrupt"},
"fail": {"Name": "fail"},
}
}
),
host := MagicMock(attrs={"Containers": {}}),
]
coresys.docker.dockerpy.containers.get.side_effect = [
MagicMock(),
NotFound("corrupt"),
DockerException("fail"),
]
await coresys.run_in_executor(coresys.docker.repair)
coresys.docker.dockerpy.api.prune_containers.assert_called_once()
coresys.docker.dockerpy.api.prune_images.assert_called_once_with(
filters={"dangling": False}
)
coresys.docker.dockerpy.api.prune_builds.assert_called_once()
coresys.docker.dockerpy.api.prune_volumes.assert_called_once()
coresys.docker.dockerpy.api.prune_networks.assert_called_once()
hassio.disconnect.assert_called_once_with("corrupt", force=True)
host.disconnect.assert_not_called()
assert "Docker fatal error on container fail on hassio" in caplog.text
async def test_repair_failures(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Test repair proceeds best it can through failures."""
coresys.docker.dockerpy.api.prune_containers.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_images.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_builds.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_volumes.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_networks.side_effect = APIError("fail")
coresys.docker.dockerpy.networks.get.side_effect = NotFound("missing")
await coresys.run_in_executor(coresys.docker.repair)
assert "Error for containers prune: fail" in caplog.text
assert "Error for images prune: fail" in caplog.text
assert "Error for builds prune: fail" in caplog.text
assert "Error for volumes prune: fail" in caplog.text
assert "Error for networks prune: fail" in caplog.text
assert "Error for networks hassio prune: missing" in caplog.text
assert "Error for networks host prune: missing" in caplog.text
@pytest.mark.parametrize("log_starter", [("Loaded image ID"), ("Loaded image")])
async def test_import_image(coresys: CoreSys, tmp_path: Path, log_starter: str):
"""Test importing an image into docker."""
(test_tar := tmp_path / "test.tar").touch()
coresys.docker.images.import_image.return_value = [
{"stream": f"{log_starter}: imported"}
]
coresys.docker.images.inspect.return_value = {"Id": "imported"}
image = await coresys.docker.import_image(test_tar)
assert image["Id"] == "imported"
coresys.docker.images.inspect.assert_called_once_with("imported")
async def test_import_image_error(coresys: CoreSys, tmp_path: Path):
"""Test failure importing an image into docker."""
(test_tar := tmp_path / "test.tar").touch()
coresys.docker.images.import_image.return_value = [
{"errorDetail": {"message": "fail"}}
]
with pytest.raises(DockerError, match="Can't import image from tar: fail"):
await coresys.docker.import_image(test_tar)
coresys.docker.images.inspect.assert_not_called()
async def test_import_multiple_images_in_tar(
coresys: CoreSys, tmp_path: Path, caplog: pytest.LogCaptureFixture
):
"""Test importing an image into docker."""
(test_tar := tmp_path / "test.tar").touch()
coresys.docker.images.import_image.return_value = [
{"stream": "Loaded image: imported-1"},
{"stream": "Loaded image: imported-2"},
]
assert await coresys.docker.import_image(test_tar) is None
assert "Unexpected image count 2 while importing image from tar" in caplog.text
coresys.docker.images.inspect.assert_not_called()

View File

@@ -1,11 +1,10 @@
"""Test Home Assistant core."""
from datetime import datetime, timedelta
from unittest.mock import ANY, MagicMock, Mock, PropertyMock, call, patch
from unittest.mock import ANY, MagicMock, Mock, PropertyMock, patch
import aiodocker
from awesomeversion import AwesomeVersion
from docker.errors import APIError, DockerException, NotFound
from docker.errors import APIError, DockerException, ImageNotFound, NotFound
import pytest
from time_machine import travel
@@ -70,7 +69,7 @@ async def test_install_landingpage_docker_error(
),
patch("supervisor.homeassistant.core.asyncio.sleep") as sleep,
):
coresys.docker.dockerpy.api.pull.side_effect = [APIError("fail"), MagicMock()]
coresys.docker.images.get.side_effect = [APIError("fail"), MagicMock()]
await coresys.homeassistant.core.install_landingpage()
sleep.assert_awaited_once_with(30)
@@ -82,7 +81,7 @@ async def test_install_landingpage_other_error(
coresys: CoreSys, capture_exception: Mock, caplog: pytest.LogCaptureFixture
):
"""Test install landing page fails due to other error."""
coresys.docker.images.inspect.side_effect = [(err := OSError()), MagicMock()]
coresys.docker.images.get.side_effect = [(err := OSError()), MagicMock()]
with (
patch.object(DockerHomeAssistant, "attach", side_effect=DockerError),
@@ -124,7 +123,7 @@ async def test_install_docker_error(
),
patch("supervisor.homeassistant.core.asyncio.sleep") as sleep,
):
coresys.docker.dockerpy.api.pull.side_effect = [APIError("fail"), MagicMock()]
coresys.docker.images.get.side_effect = [APIError("fail"), MagicMock()]
await coresys.homeassistant.core.install()
sleep.assert_awaited_once_with(30)
@@ -136,7 +135,7 @@ async def test_install_other_error(
coresys: CoreSys, capture_exception: Mock, caplog: pytest.LogCaptureFixture
):
"""Test install fails due to other error."""
coresys.docker.images.inspect.side_effect = [(err := OSError()), MagicMock()]
coresys.docker.images.get.side_effect = [(err := OSError()), MagicMock()]
with (
patch.object(HomeAssistantCore, "start"),
@@ -162,29 +161,21 @@ async def test_install_other_error(
@pytest.mark.parametrize(
("container_exc", "image_exc", "remove_calls"),
[
(NotFound("missing"), None, []),
(
None,
aiodocker.DockerError(404, {"message": "missing"}),
[call(force=True, v=True)],
),
(None, None, [call(force=True, v=True)]),
],
"container_exists,image_exists", [(False, True), (True, False), (True, True)]
)
@pytest.mark.usefixtures("path_extern")
async def test_start(
coresys: CoreSys,
container_exc: DockerException | None,
image_exc: aiodocker.DockerError | None,
remove_calls: list[call],
coresys: CoreSys, container_exists: bool, image_exists: bool, path_extern
):
"""Test starting Home Assistant."""
coresys.docker.images.inspect.return_value = {"Id": "123"}
coresys.docker.images.inspect.side_effect = image_exc
coresys.docker.containers.get.return_value.id = "123"
coresys.docker.containers.get.side_effect = container_exc
if image_exists:
coresys.docker.images.get.return_value.id = "123"
else:
coresys.docker.images.get.side_effect = ImageNotFound("missing")
if container_exists:
coresys.docker.containers.get.return_value.image.id = "123"
else:
coresys.docker.containers.get.side_effect = NotFound("missing")
with (
patch.object(
@@ -207,14 +198,18 @@ async def test_start(
assert run.call_args.kwargs["hostname"] == "homeassistant"
coresys.docker.containers.get.return_value.stop.assert_not_called()
assert (
coresys.docker.containers.get.return_value.remove.call_args_list == remove_calls
)
if container_exists:
coresys.docker.containers.get.return_value.remove.assert_called_once_with(
force=True,
v=True,
)
else:
coresys.docker.containers.get.return_value.remove.assert_not_called()
async def test_start_existing_container(coresys: CoreSys, path_extern):
"""Test starting Home Assistant when container exists and is viable."""
coresys.docker.images.inspect.return_value = {"Id": "123"}
coresys.docker.images.get.return_value.id = "123"
coresys.docker.containers.get.return_value.image.id = "123"
coresys.docker.containers.get.return_value.status = "exited"
@@ -399,32 +394,24 @@ async def test_core_loads_wrong_image_for_machine(
"""Test core is loaded with wrong image for machine."""
coresys.homeassistant.set_image("ghcr.io/home-assistant/odroid-n2-homeassistant")
coresys.homeassistant.version = AwesomeVersion("2024.4.0")
container.attrs["Config"] = {"Labels": {"io.hass.version": "2024.4.0"}}
with patch.object(
DockerAPI,
"pull_image",
return_value={
"Id": "abc123",
"Config": {"Labels": {"io.hass.version": "2024.4.0"}},
},
) as pull_image:
container.attrs |= pull_image.return_value
await coresys.homeassistant.core.load()
pull_image.assert_called_once_with(
ANY,
"ghcr.io/home-assistant/qemux86-64-homeassistant",
"2024.4.0",
platform="linux/amd64",
)
await coresys.homeassistant.core.load()
container.remove.assert_called_once_with(force=True, v=True)
assert coresys.docker.images.delete.call_args_list[0] == call(
"ghcr.io/home-assistant/odroid-n2-homeassistant:latest",
force=True,
)
assert coresys.docker.images.delete.call_args_list[1] == call(
"ghcr.io/home-assistant/odroid-n2-homeassistant:2024.4.0",
force=True,
assert coresys.docker.images.remove.call_args_list[0].kwargs == {
"image": "ghcr.io/home-assistant/odroid-n2-homeassistant:latest",
"force": True,
}
assert coresys.docker.images.remove.call_args_list[1].kwargs == {
"image": "ghcr.io/home-assistant/odroid-n2-homeassistant:2024.4.0",
"force": True,
}
coresys.docker.pull_image.assert_called_once_with(
ANY,
"ghcr.io/home-assistant/qemux86-64-homeassistant",
"2024.4.0",
platform="linux/amd64",
)
assert (
coresys.homeassistant.image == "ghcr.io/home-assistant/qemux86-64-homeassistant"
@@ -441,8 +428,8 @@ async def test_core_load_allows_image_override(coresys: CoreSys, container: Magi
await coresys.homeassistant.core.load()
container.remove.assert_not_called()
coresys.docker.images.delete.assert_not_called()
coresys.docker.images.inspect.assert_not_called()
coresys.docker.images.remove.assert_not_called()
coresys.docker.images.get.assert_not_called()
assert (
coresys.homeassistant.image == "ghcr.io/home-assistant/odroid-n2-homeassistant"
)
@@ -453,36 +440,27 @@ async def test_core_loads_wrong_image_for_architecture(
):
"""Test core is loaded with wrong image for architecture."""
coresys.homeassistant.version = AwesomeVersion("2024.4.0")
coresys.docker.images.inspect.return_value = img_data = (
coresys.docker.images.inspect.return_value
| {
"Architecture": "arm64",
"Config": {"Labels": {"io.hass.version": "2024.4.0"}},
}
)
container.attrs |= img_data
container.attrs["Config"] = {"Labels": {"io.hass.version": "2024.4.0"}}
coresys.docker.images.get("ghcr.io/home-assistant/qemux86-64-homeassistant").attrs[
"Architecture"
] = "arm64"
with patch.object(
DockerAPI,
"pull_image",
return_value=img_data | {"Architecture": "amd64"},
) as pull_image:
await coresys.homeassistant.core.load()
pull_image.assert_called_once_with(
ANY,
"ghcr.io/home-assistant/qemux86-64-homeassistant",
"2024.4.0",
platform="linux/amd64",
)
await coresys.homeassistant.core.load()
container.remove.assert_called_once_with(force=True, v=True)
assert coresys.docker.images.delete.call_args_list[0] == call(
"ghcr.io/home-assistant/qemux86-64-homeassistant:latest",
force=True,
)
assert coresys.docker.images.delete.call_args_list[1] == call(
"ghcr.io/home-assistant/qemux86-64-homeassistant:2024.4.0",
force=True,
assert coresys.docker.images.remove.call_args_list[0].kwargs == {
"image": "ghcr.io/home-assistant/qemux86-64-homeassistant:latest",
"force": True,
}
assert coresys.docker.images.remove.call_args_list[1].kwargs == {
"image": "ghcr.io/home-assistant/qemux86-64-homeassistant:2024.4.0",
"force": True,
}
coresys.docker.pull_image.assert_called_once_with(
ANY,
"ghcr.io/home-assistant/qemux86-64-homeassistant",
"2024.4.0",
platform="linux/amd64",
)
assert (
coresys.homeassistant.image == "ghcr.io/home-assistant/qemux86-64-homeassistant"

View File

@@ -2,7 +2,7 @@
import asyncio
from pathlib import Path
from unittest.mock import ANY, MagicMock, Mock, PropertyMock, call, patch
from unittest.mock import ANY, MagicMock, Mock, PropertyMock, patch
from awesomeversion import AwesomeVersion
import pytest
@@ -11,14 +11,12 @@ from supervisor.const import BusEvent, CpuArch
from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState
from supervisor.docker.interface import DockerInterface
from supervisor.docker.manager import DockerAPI
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import (
AudioError,
AudioJobError,
CliError,
CliJobError,
CodeNotaryUntrusted,
CoreDNSError,
CoreDNSJobError,
DockerError,
@@ -338,14 +336,12 @@ async def test_repair_failed(
patch.object(
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
),
patch(
"supervisor.security.module.cas_validate", side_effect=CodeNotaryUntrusted
),
patch.object(DockerInterface, "install", side_effect=DockerError),
):
await plugin.repair()
capture_exception.assert_called_once()
assert check_exception_chain(capture_exception.call_args[0][0], CodeNotaryUntrusted)
assert check_exception_chain(capture_exception.call_args[0][0], DockerError)
@pytest.mark.parametrize(
@@ -363,26 +359,21 @@ async def test_load_with_incorrect_image(
plugin.version = AwesomeVersion("2024.4.0")
container.status = "running"
coresys.docker.images.inspect.return_value = img_data = (
coresys.docker.images.inspect.return_value
| {"Config": {"Labels": {"io.hass.version": "2024.4.0"}}}
)
container.attrs |= img_data
container.attrs["Config"] = {"Labels": {"io.hass.version": "2024.4.0"}}
with patch.object(DockerAPI, "pull_image", return_value=img_data) as pull_image:
await plugin.load()
pull_image.assert_called_once_with(
ANY, correct_image, "2024.4.0", platform="linux/amd64"
)
await plugin.load()
container.remove.assert_called_once_with(force=True, v=True)
assert coresys.docker.images.delete.call_args_list[0] == call(
f"{old_image}:latest",
force=True,
)
assert coresys.docker.images.delete.call_args_list[1] == call(
f"{old_image}:2024.4.0",
force=True,
assert coresys.docker.images.remove.call_args_list[0].kwargs == {
"image": f"{old_image}:latest",
"force": True,
}
assert coresys.docker.images.remove.call_args_list[1].kwargs == {
"image": f"{old_image}:2024.4.0",
"force": True,
}
coresys.docker.pull_image.assert_called_once_with(
ANY, correct_image, "2024.4.0", platform="linux/amd64"
)
assert plugin.image == correct_image

View File

@@ -1,96 +0,0 @@
"""Test Check Supervisor trust."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
supervisor_trust = CheckSupervisorTrust(coresys)
assert supervisor_trust.slug == "supervisor_trust"
assert supervisor_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
coresys.supervisor.check_trust = AsyncMock(return_value=None)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await supervisor_trust.approve_check()
coresys.supervisor.check_trust = AsyncMock(return_value=None)
assert not await supervisor_trust.approve_check()
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert not coresys.security.verify_own_content.called
assert (
"Skipping supervisor_trust, content_trust is globally disabled" in caplog.text
)
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
supervisor_trust = CheckSupervisorTrust(coresys)
should_run = supervisor_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check",
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
await supervisor_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
await supervisor_trust()
check.assert_not_called()
check.reset_mock()

View File

@@ -25,18 +25,13 @@ async def test_evaluation(coresys: CoreSys):
assert docker_configuration.reason in coresys.resolution.unsupported
coresys.resolution.unsupported.clear()
coresys.docker.info.storage = EXPECTED_STORAGE[0]
coresys.docker.info.storage = EXPECTED_STORAGE
coresys.docker.info.logging = "unsupported"
await docker_configuration()
assert docker_configuration.reason in coresys.resolution.unsupported
coresys.resolution.unsupported.clear()
coresys.docker.info.storage = "overlay2"
coresys.docker.info.logging = EXPECTED_LOGGING
await docker_configuration()
assert docker_configuration.reason not in coresys.resolution.unsupported
coresys.docker.info.storage = "overlayfs"
coresys.docker.info.storage = EXPECTED_STORAGE
coresys.docker.info.logging = EXPECTED_LOGGING
await docker_configuration()
assert docker_configuration.reason not in coresys.resolution.unsupported

View File

@@ -1,40 +1,22 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
import errno
import os
from pathlib import Path
from unittest.mock import AsyncMock, patch
from unittest.mock import patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.const import ContextType, IssueType
from supervisor.resolution.data import Issue
from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
with patch(
"supervisor.resolution.evaluations.source_mods._SUPERVISOR_SOURCE",
Path(f"{os.getcwd()}/supervisor"),
):
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
"""Test evaluation - CodeNotary removed."""
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await sourcemods()
assert sourcemods.reason in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError)
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock()
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
# CodeNotary checking removed, evaluation always returns False now
assert sourcemods.reason not in coresys.resolution.unsupported
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
async def test_did_run(coresys: CoreSys):
@@ -63,27 +45,11 @@ async def test_did_run(coresys: CoreSys):
async def test_evaluation_error(coresys: CoreSys):
"""Test error reading file during evaluation."""
"""Test error reading file during evaluation - CodeNotary removed."""
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM)
# CodeNotary checking removed, evaluation always returns False now
assert sourcemods.reason not in coresys.resolution.unsupported
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs not in coresys.resolution.issues
with patch(
"supervisor.utils.codenotary.dirhash",
side_effect=(err := OSError()),
):
err.errno = errno.EBUSY
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs in coresys.resolution.issues
assert coresys.core.healthy is True
coresys.resolution.dismiss_issue(corrupt_fs)
err.errno = errno.EBADMSG
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs in coresys.resolution.issues
assert coresys.core.healthy is False

View File

@@ -1,9 +1,8 @@
"""Test fixup addon execute repair."""
from http import HTTPStatus
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import aiodocker
from docker.errors import NotFound
import pytest
from supervisor.addons.addon import Addon
@@ -18,9 +17,7 @@ from supervisor.resolution.fixups.addon_execute_repair import FixupAddonExecuteR
async def test_fixup(docker: DockerAPI, coresys: CoreSys, install_addon_ssh: Addon):
"""Test fixup rebuilds addon's container."""
docker.images.inspect.side_effect = aiodocker.DockerError(
HTTPStatus.NOT_FOUND, {"message": "missing"}
)
docker.images.get.side_effect = NotFound("missing")
install_addon_ssh.data["image"] = "test_image"
addon_execute_repair = FixupAddonExecuteRepair(coresys)
@@ -44,9 +41,7 @@ async def test_fixup_max_auto_attempts(
docker: DockerAPI, coresys: CoreSys, install_addon_ssh: Addon
):
"""Test fixup stops being auto-applied after 5 failures."""
docker.images.inspect.side_effect = aiodocker.DockerError(
HTTPStatus.NOT_FOUND, {"message": "missing"}
)
docker.images.get.side_effect = NotFound("missing")
install_addon_ssh.data["image"] = "test_image"
addon_execute_repair = FixupAddonExecuteRepair(coresys)
@@ -87,6 +82,8 @@ async def test_fixup_image_exists(
docker: DockerAPI, coresys: CoreSys, install_addon_ssh: Addon
):
"""Test fixup dismisses if image exists."""
docker.images.get.return_value = MagicMock()
addon_execute_repair = FixupAddonExecuteRepair(coresys)
assert addon_execute_repair.auto is True

View File

@@ -1,69 +0,0 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
from datetime import timedelta
from unittest.mock import AsyncMock
import time_machine
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
from supervisor.resolution.fixups.system_execute_integrity import (
FixupSystemExecuteIntegrity,
)
from supervisor.security.const import ContentTrustResult, IntegrityResult
from supervisor.utils.dt import utcnow
async def test_fixup(coresys: CoreSys, supervisor_internet: AsyncMock):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.add_suggestion(
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
)
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.PASS,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 0
assert len(coresys.resolution.issues) == 0
async def test_fixup_error(coresys: CoreSys, supervisor_internet: AsyncMock):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.add_suggestion(
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
)
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.FAILED,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
with time_machine.travel(utcnow() + timedelta(hours=24)):
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 1
assert len(coresys.resolution.issues) == 1

View File

@@ -1,21 +1,15 @@
"""Test evaluations."""
from unittest.mock import Mock, patch
from unittest.mock import Mock
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.utils import check_exception_chain
async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock):
"""Test error while evaluating system."""
await coresys.core.set_state(CoreState.RUNNING)
with patch(
"supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode",
side_effect=RuntimeError,
):
await coresys.resolution.evaluate.evaluate_system()
await coresys.resolution.evaluate.evaluate_system()
capture_exception.assert_called_once()
assert check_exception_chain(capture_exception.call_args[0][0], RuntimeError)
capture_exception.assert_not_called()

View File

@@ -1,127 +0,0 @@
"""Testing handling with Security."""
from unittest.mock import AsyncMock, patch
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.security.const import ContentTrustResult
async def test_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
with patch(
"supervisor.security.module.cas_validate", AsyncMock()
) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with(
"notary@home-assistant.io", "ffffffffffffff"
)
async def test_disabled_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
coresys.security.content_trust = False
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert not cas_validate.called
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert not cas_validate.called
async def test_force_content_trust(coresys: CoreSys):
"""Force Content-Trust tests."""
with patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
coresys.security.force = True
with (
patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate,
pytest.raises(CodeNotaryError),
):
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
async def test_integrity_check_disabled(coresys: CoreSys):
"""Test integrity check with disabled content trust."""
coresys.security.content_trust = False
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.UNTESTED
assert result.supervisor == ContentTrustResult.UNTESTED
async def test_integrity_check(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
install_addon_ssh.check_trust = AsyncMock()
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS
async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust issues."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.ERROR
assert result.supervisor == ContentTrustResult.ERROR
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR
async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust failed."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.FAILED
assert result.supervisor == ContentTrustResult.FAILED
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED
async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust but no signed add-ons."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED

View File

@@ -86,10 +86,10 @@ async def test_os_update_path(
"""Test OS upgrade path across major versions."""
coresys.os._board = "rpi4" # pylint: disable=protected-access
coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access
with patch.object(type(coresys.security), "verify_own_content"):
await coresys.updater.fetch_data()
# CodeNotary verification removed
await coresys.updater.fetch_data()
assert coresys.updater.version_hassos == AwesomeVersion(expected)
assert coresys.updater.version_hassos == AwesomeVersion(expected)
@pytest.mark.usefixtures("no_job_throttle")
@@ -105,7 +105,6 @@ async def test_delayed_fetch_for_connectivity(
load_binary_fixture("version_stable.json")
)
coresys.websession.head = AsyncMock()
coresys.security.verify_own_content = AsyncMock()
# Network connectivity change causes a series of async tasks to eventually do a version fetch
# Rather then use some kind of sleep loop, set up listener for start of fetch data job

View File

@@ -1,128 +0,0 @@
"""Test CodeNotary."""
from __future__ import annotations
from dataclasses import dataclass
from unittest.mock import AsyncMock, Mock, patch
import pytest
from supervisor.exceptions import (
CodeNotaryBackendError,
CodeNotaryError,
CodeNotaryUntrusted,
)
from supervisor.utils.codenotary import calc_checksum, cas_validate
pytest.skip("code notary has been disabled due to issues", allow_module_level=True)
@dataclass
class SubprocessResponse:
"""Class for specifying subprocess exec response."""
returncode: int = 0
data: str = ""
error: str | None = None
exception: Exception | None = None
@pytest.fixture(name="subprocess_exec")
def fixture_subprocess_exec(request):
"""Mock subprocess exec with specific return."""
response = request.param
if response.exception:
communicate_return = AsyncMock(side_effect=response.exception)
else:
communicate_return = AsyncMock(return_value=(response.data, response.error))
exec_return = Mock(returncode=response.returncode, communicate=communicate_return)
with patch(
"supervisor.utils.codenotary.asyncio.create_subprocess_exec",
return_value=exec_return,
) as subprocess_exec:
yield subprocess_exec
def test_checksum_calc():
"""Calc Checkusm as test."""
assert calc_checksum("test") == calc_checksum(b"test")
assert (
calc_checksum("test")
== "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
)
async def test_valid_checksum():
"""Test a valid autorization."""
await cas_validate(
"notary@home-assistant.io",
"4434a33ff9c695e870bc5bbe04230ea3361ecf4c129eb06133dd1373975a43f0",
)
async def test_invalid_checksum():
"""Test a invalid autorization."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[SubprocessResponse(returncode=1, error=b"x is not notarized")],
)
async def test_not_notarized_error(subprocess_exec):
"""Test received a not notarized error response from command."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[
SubprocessResponse(returncode=1, error=b"test"),
SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'),
SubprocessResponse(returncode=1, error="test".encode("utf-16")),
],
indirect=True,
)
async def test_cas_backend_error(subprocess_exec):
"""Test backend error executing cas command."""
with pytest.raises(CodeNotaryBackendError):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[SubprocessResponse(returncode=0, data='{"status":1}')],
indirect=True,
)
async def test_cas_notarized_untrusted(subprocess_exec):
"""Test cas found notarized but untrusted content."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True
)
async def test_cas_exec_os_error(subprocess_exec):
"""Test os error attempting to execute cas command."""
with pytest.raises(CodeNotaryError):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)