Files
supervisor/supervisor/docker/interface.py
Stefan Agner 1448a33dbf Remove Codenotary integrity check (#6236)
* Formally deprecate CodeNotary build config

* Remove CodeNotary specific integrity checking

The current code is specific to how CodeNotary was doing integrity
checking. A future integrity checking mechanism likely will work
differently (e.g. through EROFS based containers). Remove the current
code to make way for a future implementation.

* Drop CodeNotary integrity fixups

* Drop unused tests

* Fix pytest

* Fix pytest

* Remove CodeNotary related exceptions and handling

Remove CodeNotary related exceptions and handling from the Docker
interface.

* Drop unnecessary comment

* Remove Codenotary specific IssueType/SuggestionType

* Drop Codenotary specific environment and secret reference

* Remove unused constants

* Introduce APIGone exception for removed APIs

Introduce a new exception class APIGone to indicate that certain API
features have been removed and are no longer available. Update the
security integrity check endpoint to raise this new exception instead
of a generic APIError, providing clearer communication to clients that
the feature has been intentionally removed.

* Drop content trust

A cosign based signature verification will likely be named differently
to avoid confusion with existing implementations. For now, remove the
content trust option entirely.

* Drop code sign test

* Remove source_mods/content_trust evaluations

* Remove content_trust reference in bootstrap.py

* Fix security tests

* Drop unused tests

* Drop codenotary from schema

Since we have "remove extra" in voluptuous, we can remove the
codenotary field from the addon schema.

* Remove content_trust from tests

* Remove content_trust unsupported reason

* Remove unnecessary comment

* Remove unrelated pytest

* Remove unrelated fixtures
2025-11-03 20:13:15 +01:00

787 lines
28 KiB
Python

"""Interface class for Supervisor Docker object."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Awaitable
from contextlib import suppress
import logging
import re
from time import time
from typing import Any, cast
from uuid import uuid4
from awesomeversion import AwesomeVersion
from awesomeversion.strategy import AwesomeVersionStrategy
import docker
from docker.models.containers import Container
from docker.models.images import Image
import requests
from ..bus import EventListener
from ..const import (
ATTR_PASSWORD,
ATTR_REGISTRY,
ATTR_USERNAME,
LABEL_ARCH,
LABEL_VERSION,
BusEvent,
CpuArch,
)
from ..coresys import CoreSys
from ..exceptions import (
DockerAPIError,
DockerError,
DockerJobError,
DockerLogOutOfOrder,
DockerNotFound,
DockerRequestError,
)
from ..jobs import SupervisorJob
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency
from ..jobs.decorator import Job
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.sentry import async_capture_exception
from .const import ContainerState, PullImageLayerStage, RestartPolicy
from .manager import CommandReturn, PullLogEntry
from .monitor import DockerContainerStateEvent
from .stats import DockerStats
_LOGGER: logging.Logger = logging.getLogger(__name__)
IMAGE_WITH_HOST = re.compile(r"^((?:[a-z0-9]+(?:-[a-z0-9]+)*\.)+[a-z]{2,})\/.+")
DOCKER_HUB = "hub.docker.com"
MAP_ARCH: dict[CpuArch | str, str] = {
CpuArch.ARMV7: "linux/arm/v7",
CpuArch.ARMHF: "linux/arm/v6",
CpuArch.AARCH64: "linux/arm64",
CpuArch.I386: "linux/386",
CpuArch.AMD64: "linux/amd64",
}
def _container_state_from_model(docker_container: Container) -> ContainerState:
"""Get container state from model."""
if docker_container.status == "running":
if "Health" in docker_container.attrs["State"]:
return (
ContainerState.HEALTHY
if docker_container.attrs["State"]["Health"]["Status"] == "healthy"
else ContainerState.UNHEALTHY
)
return ContainerState.RUNNING
if docker_container.attrs["State"]["ExitCode"] > 0:
return ContainerState.FAILED
return ContainerState.STOPPED
class DockerInterface(JobGroup, ABC):
"""Docker Supervisor interface."""
def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper."""
super().__init__(
coresys,
JOB_GROUP_DOCKER_INTERFACE.format_map(
defaultdict(str, name=self.name or uuid4().hex)
),
self.name,
)
self.coresys: CoreSys = coresys
self._meta: dict[str, Any] | None = None
@property
def timeout(self) -> int:
"""Return timeout for Docker actions."""
return 10
@property
@abstractmethod
def name(self) -> str:
"""Return name of Docker container."""
@property
def meta_config(self) -> dict[str, Any]:
"""Return meta data of configuration for container/image."""
if not self._meta:
return {}
return self._meta.get("Config", {})
@property
def meta_host(self) -> dict[str, Any]:
"""Return meta data of configuration for host."""
if not self._meta:
return {}
return self._meta.get("HostConfig", {})
@property
def meta_labels(self) -> dict[str, str]:
"""Return meta data of labels for container/image."""
return self.meta_config.get("Labels") or {}
@property
def meta_mounts(self) -> list[dict[str, Any]]:
"""Return meta data of mounts for container/image."""
if not self._meta:
return []
return self._meta.get("Mounts", [])
@property
def image(self) -> str | None:
"""Return name of Docker image."""
try:
return self.meta_config["Image"].partition(":")[0]
except KeyError:
return None
@property
def version(self) -> AwesomeVersion | None:
"""Return version of Docker image."""
if LABEL_VERSION not in self.meta_labels:
return None
return AwesomeVersion(self.meta_labels[LABEL_VERSION])
@property
def arch(self) -> str | None:
"""Return arch of Docker image."""
return self.meta_labels.get(LABEL_ARCH)
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.active_job is not None
@property
def restart_policy(self) -> RestartPolicy | None:
"""Return restart policy of container."""
if "RestartPolicy" not in self.meta_host:
return None
policy = self.meta_host["RestartPolicy"].get("Name")
return policy if policy else RestartPolicy.NO
@property
def security_opt(self) -> list[str]:
"""Control security options."""
# Disable Seccomp / We don't support it official and it
# causes problems on some types of host systems.
return ["seccomp=unconfined"]
@property
def healthcheck(self) -> dict[str, Any] | None:
"""Healthcheck of instance if it has one."""
return self.meta_config.get("Healthcheck")
def _get_credentials(self, image: str) -> dict:
"""Return a dictionay with credentials for docker login."""
registry = None
credentials = {}
matcher = IMAGE_WITH_HOST.match(image)
# Custom registry
if matcher:
if matcher.group(1) in self.sys_docker.config.registries:
registry = matcher.group(1)
credentials[ATTR_REGISTRY] = registry
# If no match assume "dockerhub" as registry
elif DOCKER_HUB in self.sys_docker.config.registries:
registry = DOCKER_HUB
if registry:
stored = self.sys_docker.config.registries[registry]
credentials[ATTR_USERNAME] = stored[ATTR_USERNAME]
credentials[ATTR_PASSWORD] = stored[ATTR_PASSWORD]
_LOGGER.debug(
"Logging in to %s as %s",
registry,
stored[ATTR_USERNAME],
)
return credentials
async def _docker_login(self, image: str) -> None:
"""Try to log in to the registry if there are credentials available."""
if not self.sys_docker.config.registries:
return
credentials = self._get_credentials(image)
if not credentials:
return
await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials)
def _process_pull_image_log(
self, install_job_id: str, reference: PullLogEntry
) -> None:
"""Process events fired from a docker while pulling an image, filtered to a given job id."""
if (
reference.job_id != install_job_id
or not reference.id
or not reference.status
or not (stage := PullImageLayerStage.from_status(reference.status))
):
return
# Pulling FS Layer is our marker for a layer that needs to be downloaded and extracted. Otherwise it already exists and we can ignore
job: SupervisorJob | None = None
if stage == PullImageLayerStage.PULLING_FS_LAYER:
job = self.sys_jobs.new_job(
name="Pulling container image layer",
initial_stage=stage.status,
reference=reference.id,
parent_id=install_job_id,
internal=True,
)
job.done = False
return
# Find our sub job to update details of
for j in self.sys_jobs.jobs:
if j.parent_id == install_job_id and j.reference == reference.id:
job = j
break
# This likely only occurs if the logs came in out of sync and we got progress before the Pulling FS Layer one
if not job:
raise DockerLogOutOfOrder(
f"Received pull image log with status {reference.status} for image id {reference.id} and parent job {install_job_id} but could not find a matching job, skipping",
_LOGGER.debug,
)
# Hopefully these come in order but if they sometimes get out of sync, avoid accidentally going backwards
# If it happens a lot though we may need to reconsider the value of this feature
if job.done:
raise DockerLogOutOfOrder(
f"Received pull image log with status {reference.status} for job {job.uuid} but job was done, skipping",
_LOGGER.debug,
)
if job.stage and stage < PullImageLayerStage.from_status(job.stage):
raise DockerLogOutOfOrder(
f"Received pull image log with status {reference.status} for job {job.uuid} but job was already on stage {job.stage}, skipping",
_LOGGER.debug,
)
# For progress calcuation we assume downloading and extracting are each 50% of the time and others stages negligible
progress = job.progress
match stage:
case PullImageLayerStage.DOWNLOADING | PullImageLayerStage.EXTRACTING:
if (
reference.progress_detail
and reference.progress_detail.current
and reference.progress_detail.total
):
progress = 50 * (
reference.progress_detail.current
/ reference.progress_detail.total
)
if stage == PullImageLayerStage.EXTRACTING:
progress += 50
case (
PullImageLayerStage.VERIFYING_CHECKSUM
| PullImageLayerStage.DOWNLOAD_COMPLETE
):
progress = 50
case PullImageLayerStage.PULL_COMPLETE:
progress = 100
case PullImageLayerStage.RETRYING_DOWNLOAD:
progress = 0
if stage != PullImageLayerStage.RETRYING_DOWNLOAD and progress < job.progress:
raise DockerLogOutOfOrder(
f"Received pull image log with status {reference.status} for job {job.uuid} that implied progress was {progress} but current progress is {job.progress}, skipping",
_LOGGER.debug,
)
# Our filters have all passed. Time to update the job
# Only downloading and extracting have progress details. Use that to set extra
# We'll leave it around on later stages as the total bytes may be useful after that stage
# Enforce range to prevent float drift error
progress = max(0, min(progress, 100))
if (
stage in {PullImageLayerStage.DOWNLOADING, PullImageLayerStage.EXTRACTING}
and reference.progress_detail
):
job.update(
progress=progress,
stage=stage.status,
extra={
"current": reference.progress_detail.current,
"total": reference.progress_detail.total,
},
)
else:
job.update(
progress=progress,
stage=stage.status,
done=stage == PullImageLayerStage.PULL_COMPLETE,
extra=None
if stage == PullImageLayerStage.RETRYING_DOWNLOAD
else job.extra,
)
# Once we have received a progress update for every child job, start to set status of the main one
install_job = self.sys_jobs.get_job(install_job_id)
layer_jobs = [
job
for job in self.sys_jobs.jobs
if job.parent_id == install_job.uuid
and job.name == "Pulling container image layer"
]
# First set the total bytes to be downloaded/extracted on the main job
if not install_job.extra:
total = 0
for job in layer_jobs:
if not job.extra:
return
total += job.extra["total"]
install_job.extra = {"total": total}
else:
total = install_job.extra["total"]
# Then determine total progress based on progress of each sub-job, factoring in size of each compared to total
progress = 0.0
stage = PullImageLayerStage.PULL_COMPLETE
for job in layer_jobs:
if not job.extra:
return
progress += job.progress * (job.extra["total"] / total)
job_stage = PullImageLayerStage.from_status(cast(str, job.stage))
if job_stage < PullImageLayerStage.EXTRACTING:
stage = PullImageLayerStage.DOWNLOADING
elif (
stage == PullImageLayerStage.PULL_COMPLETE
and job_stage < PullImageLayerStage.PULL_COMPLETE
):
stage = PullImageLayerStage.EXTRACTING
# Ensure progress is 100 at this point to prevent float drift
if stage == PullImageLayerStage.PULL_COMPLETE:
progress = 100
# To reduce noise, limit updates to when result has changed by an entire percent or when stage changed
if stage != install_job.stage or progress >= install_job.progress + 1:
install_job.update(stage=stage.status, progress=max(0, min(progress, 100)))
@Job(
name="docker_interface_install",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
internal=True,
)
async def install(
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
arch: CpuArch | None = None,
) -> None:
"""Pull docker image."""
image = image or self.image
if not image:
raise ValueError("Cannot pull without an image!")
image_arch = str(arch) if arch else self.sys_arch.supervisor
listener: EventListener | None = None
_LOGGER.info("Downloading docker image %s with tag %s.", image, version)
try:
if self.sys_docker.config.registries:
# Try login if we have defined credentials
await self._docker_login(image)
curr_job_id = self.sys_jobs.current.uuid
async def process_pull_image_log(reference: PullLogEntry) -> None:
try:
self._process_pull_image_log(curr_job_id, reference)
except DockerLogOutOfOrder as err:
# Send all these to sentry. Missing a few progress updates
# shouldn't matter to users but matters to us
await async_capture_exception(err)
listener = self.sys_bus.register_event(
BusEvent.DOCKER_IMAGE_PULL_UPDATE, process_pull_image_log
)
# Pull new image
docker_image = await self.sys_run_in_executor(
self.sys_docker.pull_image,
self.sys_jobs.current.uuid,
image,
str(version),
platform=MAP_ARCH[image_arch],
)
# Tag latest
if latest:
_LOGGER.info(
"Tagging image %s with version %s as latest", image, version
)
await self.sys_run_in_executor(docker_image.tag, image, tag="latest")
except docker.errors.APIError as err:
if err.status_code == 429:
self.sys_resolution.create_issue(
IssueType.DOCKER_RATELIMIT,
ContextType.SYSTEM,
suggestions=[SuggestionType.REGISTRY_LOGIN],
)
_LOGGER.info(
"Your IP address has made too many requests to Docker Hub which activated a rate limit. "
"For more details see https://www.home-assistant.io/more-info/dockerhub-rate-limit"
)
raise DockerError(
f"Can't install {image}:{version!s}: {err}", _LOGGER.error
) from err
except (docker.errors.DockerException, requests.RequestException) as err:
await async_capture_exception(err)
raise DockerError(
f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error
) from err
finally:
if listener:
self.sys_bus.remove_listener(listener)
self._meta = docker_image.attrs
async def exists(self) -> bool:
"""Return True if Docker image exists in local repository."""
with suppress(docker.errors.DockerException, requests.RequestException):
await self.sys_run_in_executor(
self.sys_docker.images.get, f"{self.image}:{self.version!s}"
)
return True
return False
async def is_running(self) -> bool:
"""Return True if Docker is running."""
try:
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
except docker.errors.NotFound:
return False
except docker.errors.DockerException as err:
raise DockerAPIError() from err
except requests.RequestException as err:
raise DockerRequestError() from err
return docker_container.status == "running"
async def current_state(self) -> ContainerState:
"""Return current state of container."""
try:
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
except docker.errors.NotFound:
return ContainerState.UNKNOWN
except docker.errors.DockerException as err:
raise DockerAPIError() from err
except requests.RequestException as err:
raise DockerRequestError() from err
return _container_state_from_model(docker_container)
@Job(name="docker_interface_attach", concurrency=JobConcurrency.GROUP_QUEUE)
async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> None:
"""Attach to running Docker container."""
with suppress(docker.errors.DockerException, requests.RequestException):
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
self._meta = docker_container.attrs
self.sys_docker.monitor.watch_container(docker_container)
state = _container_state_from_model(docker_container)
if not (
skip_state_event_if_down
and state in [ContainerState.STOPPED, ContainerState.FAILED]
):
# Fire event with current state of container
self.sys_bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
self.name, state, cast(str, docker_container.id), int(time())
),
)
with suppress(docker.errors.DockerException, requests.RequestException):
if not self._meta and self.image:
self._meta = self.sys_docker.images.get(
f"{self.image}:{version!s}"
).attrs
# Successful?
if not self._meta:
raise DockerError()
_LOGGER.info("Attaching to %s with version %s", self.image, self.version)
@Job(
name="docker_interface_run",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""
raise NotImplementedError()
async def _run(self, **kwargs) -> None:
"""Run Docker image with retry inf necessary."""
if await self.is_running():
return
# Cleanup
await self.stop()
# Create & Run container
try:
docker_container = await self.sys_run_in_executor(
self.sys_docker.run, self.image, **kwargs
)
except DockerNotFound as err:
# If image is missing, capture the exception as this shouldn't happen
await async_capture_exception(err)
raise
# Store metadata
self._meta = docker_container.attrs
@Job(
name="docker_interface_stop",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
with suppress(DockerNotFound):
await self.sys_run_in_executor(
self.sys_docker.stop_container,
self.name,
self.timeout,
remove_container,
)
@Job(
name="docker_interface_start",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def start(self) -> Awaitable[None]:
"""Start Docker container."""
return self.sys_run_in_executor(self.sys_docker.start_container, self.name)
@Job(
name="docker_interface_remove",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def remove(self, *, remove_image: bool = True) -> None:
"""Remove Docker images."""
# Cleanup container
with suppress(DockerError):
await self.stop()
if remove_image:
await self.sys_run_in_executor(
self.sys_docker.remove_image, self.image, self.version
)
self._meta = None
@Job(
name="docker_interface_check_image",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def check_image(
self,
version: AwesomeVersion,
expected_image: str,
expected_cpu_arch: CpuArch | None = None,
) -> None:
"""Check we have expected image with correct arch."""
expected_image_cpu_arch = (
str(expected_cpu_arch) if expected_cpu_arch else self.sys_arch.supervisor
)
image_name = f"{expected_image}:{version!s}"
if self.image == expected_image:
try:
image: Image = await self.sys_run_in_executor(
self.sys_docker.images.get, image_name
)
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get {image_name} for check due to: {err!s}",
_LOGGER.error,
) from err
image_arch = f"{image.attrs['Os']}/{image.attrs['Architecture']}"
if "Variant" in image.attrs:
image_arch = f"{image_arch}/{image.attrs['Variant']}"
# If we have an image and its the right arch, all set
# It seems that newer Docker version return a variant for arm64 images.
# Make sure we match linux/arm64 and linux/arm64/v8.
expected_image_arch = MAP_ARCH[expected_image_cpu_arch]
if image_arch.startswith(expected_image_arch):
return
_LOGGER.info(
"Image %s has arch %s, expected %s. Reinstalling.",
image_name,
image_arch,
expected_image_arch,
)
# We're missing the image we need. Stop and clean up what we have then pull the right one
with suppress(DockerError):
await self.remove()
await self.install(version, expected_image, arch=expected_image_cpu_arch)
@Job(
name="docker_interface_update",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
) -> None:
"""Update a Docker image."""
image = image or self.image
_LOGGER.info(
"Updating image %s:%s to %s:%s", self.image, self.version, image, version
)
# Update docker image
await self.install(version, image=image, latest=latest)
# Stop container & cleanup
with suppress(DockerError):
await self.stop()
async def logs(self) -> bytes:
"""Return Docker logs of container."""
with suppress(DockerError):
return await self.sys_run_in_executor(
self.sys_docker.container_logs, self.name
)
return b""
@Job(name="docker_interface_cleanup", concurrency=JobConcurrency.GROUP_QUEUE)
async def cleanup(
self,
old_image: str | None = None,
image: str | None = None,
version: AwesomeVersion | None = None,
) -> None:
"""Check if old version exists and cleanup."""
await self.sys_run_in_executor(
self.sys_docker.cleanup_old_images,
image or self.image,
version or self.version,
{old_image} if old_image else None,
)
@Job(
name="docker_interface_restart",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def restart(self) -> Awaitable[None]:
"""Restart docker container."""
return self.sys_run_in_executor(
self.sys_docker.restart_container, self.name, self.timeout
)
@Job(
name="docker_interface_execute_command",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command."""
raise NotImplementedError()
async def stats(self) -> DockerStats:
"""Read and return stats from container."""
stats = await self.sys_run_in_executor(
self.sys_docker.container_stats, self.name
)
return DockerStats(stats)
async def is_failed(self) -> bool:
"""Return True if Docker is failing state."""
try:
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
except docker.errors.NotFound:
return False
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError() from err
# container is not running
if docker_container.status != "exited":
return False
# Check return value
return int(docker_container.attrs["State"]["ExitCode"]) != 0
async def get_latest_version(self) -> AwesomeVersion:
"""Return latest version of local image."""
available_version: list[AwesomeVersion] = []
try:
for image in await self.sys_run_in_executor(
self.sys_docker.images.list, self.image
):
for tag in image.tags:
version = AwesomeVersion(tag.partition(":")[2])
if version.strategy == AwesomeVersionStrategy.UNKNOWN:
continue
available_version.append(version)
if not available_version:
raise ValueError()
except (docker.errors.DockerException, ValueError) as err:
raise DockerNotFound(
f"No version found for {self.image}", _LOGGER.info
) from err
except requests.RequestException as err:
raise DockerRequestError(
f"Communication issues with dockerd on Host: {err}", _LOGGER.warning
) from err
_LOGGER.info("Found %s versions: %s", self.image, available_version)
# Sort version and return latest version
available_version.sort(reverse=True)
return available_version[0]
@Job(
name="docker_interface_run_inside",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def run_inside(self, command: str) -> Awaitable[CommandReturn]:
"""Execute a command inside Docker container."""
return self.sys_run_in_executor(
self.sys_docker.container_run_inside, self.name, command
)