Fix mypy issues in docker, hardware and homeassistant modules (#5805)

* Fix mypy issues in docker and hardware modules

* Fix mypy issues in homeassistant module

* Fix async_send_command typing

* Fixes from feedback
This commit is contained in:
Mike Degatano 2025-04-08 12:52:58 -04:00 committed by GitHub
parent 59a7e9519d
commit 4a00caa2e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 247 additions and 153 deletions

View File

@ -118,7 +118,7 @@ class APIHomeAssistant(CoreSysAttributes):
body = await api_validate(SCHEMA_OPTIONS, request) body = await api_validate(SCHEMA_OPTIONS, request)
if ATTR_IMAGE in body: if ATTR_IMAGE in body:
self.sys_homeassistant.image = body[ATTR_IMAGE] self.sys_homeassistant.set_image(body[ATTR_IMAGE])
self.sys_homeassistant.override_image = ( self.sys_homeassistant.override_image = (
self.sys_homeassistant.image != self.sys_homeassistant.default_image self.sys_homeassistant.image != self.sys_homeassistant.default_image
) )

View File

@ -145,13 +145,21 @@ class Auth(FileConfiguration, CoreSysAttributes):
async def list_users(self) -> list[dict[str, Any]]: async def list_users(self) -> list[dict[str, Any]]:
"""List users on the Home Assistant instance.""" """List users on the Home Assistant instance."""
try: try:
return await self.sys_homeassistant.websocket.async_send_command( users: (
list[dict[str, Any]] | None
) = await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: "config/auth/list"} {ATTR_TYPE: "config/auth/list"}
) )
except HomeAssistantWSError: except HomeAssistantWSError as err:
_LOGGER.error("Can't request listing users on Home Assistant!") raise AuthListUsersError(
f"Can't request listing users on Home Assistant: {err}", _LOGGER.error
) from err
raise AuthListUsersError() if users is not None:
return users
raise AuthListUsersError(
"Can't request listing users on Home Assistant!", _LOGGER.error
)
@staticmethod @staticmethod
def _rehash(value: str, salt2: str = "") -> str: def _rehash(value: str, salt2: str = "") -> str:

View File

@ -70,7 +70,7 @@ async def initialize_coresys() -> CoreSys:
coresys.addons = await AddonManager(coresys).load_config() coresys.addons = await AddonManager(coresys).load_config()
coresys.backups = await BackupManager(coresys).load_config() coresys.backups = await BackupManager(coresys).load_config()
coresys.host = await HostManager(coresys).post_init() coresys.host = await HostManager(coresys).post_init()
coresys.hardware = await HardwareManager(coresys).post_init() coresys.hardware = await HardwareManager.create(coresys)
coresys.ingress = await Ingress(coresys).load_config() coresys.ingress = await Ingress(coresys).load_config()
coresys.tasks = Tasks(coresys) coresys.tasks = Tasks(coresys)
coresys.services = await ServiceManager(coresys).load_config() coresys.services = await ServiceManager(coresys).load_config()

View File

@ -2,7 +2,7 @@
from dataclasses import dataclass from dataclasses import dataclass
from enum import StrEnum from enum import StrEnum
from ipaddress import ip_network from ipaddress import IPv4Network
from pathlib import Path from pathlib import Path
from sys import version_info as systemversion from sys import version_info as systemversion
from typing import Self from typing import Self
@ -41,8 +41,8 @@ SYSTEMD_JOURNAL_PERSISTENT = Path("/var/log/journal")
SYSTEMD_JOURNAL_VOLATILE = Path("/run/log/journal") SYSTEMD_JOURNAL_VOLATILE = Path("/run/log/journal")
DOCKER_NETWORK = "hassio" DOCKER_NETWORK = "hassio"
DOCKER_NETWORK_MASK = ip_network("172.30.32.0/23") DOCKER_NETWORK_MASK = IPv4Network("172.30.32.0/23")
DOCKER_NETWORK_RANGE = ip_network("172.30.33.0/24") DOCKER_NETWORK_RANGE = IPv4Network("172.30.33.0/24")
# This needs to match the dockerd --cpu-rt-runtime= argument. # This needs to match the dockerd --cpu-rt-runtime= argument.
DOCKER_CPU_RUNTIME_TOTAL = 950_000 DOCKER_CPU_RUNTIME_TOTAL = 950_000

View File

@ -2,13 +2,12 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Awaitable
from contextlib import suppress from contextlib import suppress
from ipaddress import IPv4Address, ip_address from ipaddress import IPv4Address
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, cast
from attr import evolve from attr import evolve
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
@ -73,7 +72,7 @@ if TYPE_CHECKING:
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
NO_ADDDRESS = ip_address("0.0.0.0") NO_ADDDRESS = IPv4Address("0.0.0.0")
class DockerAddon(DockerInterface): class DockerAddon(DockerInterface):
@ -101,10 +100,12 @@ class DockerAddon(DockerInterface):
"""Return IP address of this container.""" """Return IP address of this container."""
if self.addon.host_network: if self.addon.host_network:
return self.sys_docker.network.gateway return self.sys_docker.network.gateway
if not self._meta:
return NO_ADDDRESS
# Extract IP-Address # Extract IP-Address
try: try:
return ip_address( return IPv4Address(
self._meta["NetworkSettings"]["Networks"]["hassio"]["IPAddress"] self._meta["NetworkSettings"]["Networks"]["hassio"]["IPAddress"]
) )
except (KeyError, TypeError, ValueError): except (KeyError, TypeError, ValueError):
@ -121,7 +122,7 @@ class DockerAddon(DockerInterface):
return self.addon.version return self.addon.version
@property @property
def arch(self) -> str: def arch(self) -> str | None:
"""Return arch of Docker image.""" """Return arch of Docker image."""
if self.addon.legacy: if self.addon.legacy:
return self.sys_arch.default return self.sys_arch.default
@ -133,9 +134,9 @@ class DockerAddon(DockerInterface):
return DockerAddon.slug_to_name(self.addon.slug) return DockerAddon.slug_to_name(self.addon.slug)
@property @property
def environment(self) -> dict[str, str | None]: def environment(self) -> dict[str, str | int | None]:
"""Return environment for Docker add-on.""" """Return environment for Docker add-on."""
addon_env = self.addon.environment or {} addon_env = cast(dict[str, str | int | None], self.addon.environment or {})
# Provide options for legacy add-ons # Provide options for legacy add-ons
if self.addon.legacy: if self.addon.legacy:
@ -336,7 +337,7 @@ class DockerAddon(DockerInterface):
"""Return mounts for container.""" """Return mounts for container."""
addon_mapping = self.addon.map_volumes addon_mapping = self.addon.map_volumes
target_data_path = "" target_data_path: str | None = None
if MappingType.DATA in addon_mapping: if MappingType.DATA in addon_mapping:
target_data_path = addon_mapping[MappingType.DATA].path target_data_path = addon_mapping[MappingType.DATA].path
@ -677,12 +678,12 @@ class DockerAddon(DockerInterface):
) )
try: try:
image, log = await self.sys_run_in_executor(build_image) docker_image, log = await self.sys_run_in_executor(build_image)
_LOGGER.debug("Build %s:%s done: %s", self.image, version, log) _LOGGER.debug("Build %s:%s done: %s", self.image, version, log)
# Update meta data # Update meta data
self._meta = image.attrs self._meta = docker_image.attrs
except (docker.errors.DockerException, requests.RequestException) as err: except (docker.errors.DockerException, requests.RequestException) as err:
_LOGGER.error("Can't build %s:%s: %s", self.image, version, err) _LOGGER.error("Can't build %s:%s: %s", self.image, version, err)
@ -699,9 +700,14 @@ class DockerAddon(DockerInterface):
_LOGGER.info("Build %s:%s done", self.image, version) _LOGGER.info("Build %s:%s done", self.image, version)
def export_image(self, tar_file: Path) -> Awaitable[None]: def export_image(self, tar_file: Path) -> None:
"""Export current images into a tar file.""" """Export current images into a tar file.
return self.sys_docker.export_image(self.image, self.version, tar_file)
Must be run in executor.
"""
if not self.image:
raise RuntimeError("Cannot export without image!")
self.sys_docker.export_image(self.image, self.version, tar_file)
@Job( @Job(
name="docker_addon_import_image", name="docker_addon_import_image",
@ -805,15 +811,15 @@ class DockerAddon(DockerInterface):
): ):
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue) self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
async def _validate_trust( async def _validate_trust(self, image_id: str) -> None:
self, image_id: str, image: str, version: AwesomeVersion
) -> None:
"""Validate trust of content.""" """Validate trust of content."""
if not self.addon.signed: if not self.addon.signed:
return return
checksum = image_id.partition(":")[2] checksum = image_id.partition(":")[2]
return await self.sys_security.verify_content(self.addon.codenotary, checksum) return await self.sys_security.verify_content(
cast(str, self.addon.codenotary), checksum
)
@Job( @Job(
name="docker_addon_hardware_events", name="docker_addon_hardware_events",
@ -834,6 +840,7 @@ class DockerAddon(DockerInterface):
self.sys_docker.containers.get, self.name self.sys_docker.containers.get, self.name
) )
except docker.errors.NotFound: except docker.errors.NotFound:
if self._hw_listener:
self.sys_bus.remove_listener(self._hw_listener) self.sys_bus.remove_listener(self._hw_listener)
self._hw_listener = None self._hw_listener = None
return return

View File

@ -248,14 +248,12 @@ class DockerHomeAssistant(DockerInterface):
self.sys_homeassistant.version, self.sys_homeassistant.version,
) )
async def _validate_trust( async def _validate_trust(self, image_id: str) -> None:
self, image_id: str, image: str, version: AwesomeVersion
) -> None:
"""Validate trust of content.""" """Validate trust of content."""
try: try:
if version != LANDINGPAGE and version < _VERIFY_TRUST: if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST:
return return
except AwesomeVersionCompareException: except AwesomeVersionCompareException:
return return
await super()._validate_trust(image_id, image, version) await super()._validate_trust(image_id)

View File

@ -2,13 +2,14 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from collections.abc import Awaitable from collections.abc import Awaitable
from contextlib import suppress from contextlib import suppress
import logging import logging
import re import re
from time import time from time import time
from typing import Any from typing import Any, cast
from uuid import uuid4 from uuid import uuid4
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
@ -79,7 +80,7 @@ def _container_state_from_model(docker_container: Container) -> ContainerState:
return ContainerState.STOPPED return ContainerState.STOPPED
class DockerInterface(JobGroup): class DockerInterface(JobGroup, ABC):
"""Docker Supervisor interface.""" """Docker Supervisor interface."""
def __init__(self, coresys: CoreSys): def __init__(self, coresys: CoreSys):
@ -100,9 +101,9 @@ class DockerInterface(JobGroup):
return 10 return 10
@property @property
def name(self) -> str | None: @abstractmethod
def name(self) -> str:
"""Return name of Docker container.""" """Return name of Docker container."""
return None
@property @property
def meta_config(self) -> dict[str, Any]: def meta_config(self) -> dict[str, Any]:
@ -153,7 +154,7 @@ class DockerInterface(JobGroup):
@property @property
def in_progress(self) -> bool: def in_progress(self) -> bool:
"""Return True if a task is in progress.""" """Return True if a task is in progress."""
return self.active_job return self.active_job is not None
@property @property
def restart_policy(self) -> RestartPolicy | None: def restart_policy(self) -> RestartPolicy | None:
@ -230,7 +231,10 @@ class DockerInterface(JobGroup):
) -> None: ) -> None:
"""Pull docker image.""" """Pull docker image."""
image = image or self.image image = image or self.image
arch = arch or self.sys_arch.supervisor if not image:
raise ValueError("Cannot pull without an image!")
image_arch = str(arch) if arch else self.sys_arch.supervisor
_LOGGER.info("Downloading docker image %s with tag %s.", image, version) _LOGGER.info("Downloading docker image %s with tag %s.", image, version)
try: try:
@ -242,12 +246,12 @@ class DockerInterface(JobGroup):
docker_image = await self.sys_run_in_executor( docker_image = await self.sys_run_in_executor(
self.sys_docker.images.pull, self.sys_docker.images.pull,
f"{image}:{version!s}", f"{image}:{version!s}",
platform=MAP_ARCH[arch], platform=MAP_ARCH[image_arch],
) )
# Validate content # Validate content
try: try:
await self._validate_trust(docker_image.id, image, version) await self._validate_trust(cast(str, docker_image.id))
except CodeNotaryError: except CodeNotaryError:
with suppress(docker.errors.DockerException): with suppress(docker.errors.DockerException):
await self.sys_run_in_executor( await self.sys_run_in_executor(
@ -355,7 +359,7 @@ class DockerInterface(JobGroup):
self.sys_bus.fire_event( self.sys_bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent( DockerContainerStateEvent(
self.name, state, docker_container.id, int(time()) self.name, state, cast(str, docker_container.id), int(time())
), ),
) )
@ -454,7 +458,9 @@ class DockerInterface(JobGroup):
expected_arch: CpuArch | None = None, expected_arch: CpuArch | None = None,
) -> None: ) -> None:
"""Check we have expected image with correct arch.""" """Check we have expected image with correct arch."""
expected_arch = expected_arch or self.sys_arch.supervisor expected_image_arch = (
str(expected_arch) if expected_arch else self.sys_arch.supervisor
)
image_name = f"{expected_image}:{version!s}" image_name = f"{expected_image}:{version!s}"
if self.image == expected_image: if self.image == expected_image:
try: try:
@ -472,13 +478,13 @@ class DockerInterface(JobGroup):
image_arch = f"{image_arch}/{image.attrs['Variant']}" image_arch = f"{image_arch}/{image.attrs['Variant']}"
# If we have an image and its the right arch, all set # If we have an image and its the right arch, all set
if MAP_ARCH[expected_arch] == image_arch: if MAP_ARCH[expected_image_arch] == image_arch:
return return
# We're missing the image we need. Stop and clean up what we have then pull the right one # We're missing the image we need. Stop and clean up what we have then pull the right one
with suppress(DockerError): with suppress(DockerError):
await self.remove() await self.remove()
await self.install(version, expected_image, arch=expected_arch) await self.install(version, expected_image, arch=expected_image_arch)
@Job( @Job(
name="docker_interface_update", name="docker_interface_update",
@ -613,9 +619,7 @@ class DockerInterface(JobGroup):
self.sys_docker.container_run_inside, self.name, command self.sys_docker.container_run_inside, self.name, command
) )
async def _validate_trust( async def _validate_trust(self, image_id: str) -> None:
self, image_id: str, image: str, version: AwesomeVersion
) -> None:
"""Validate trust of content.""" """Validate trust of content."""
checksum = image_id.partition(":")[2] checksum = image_id.partition(":")[2]
return await self.sys_security.verify_own_content(checksum) return await self.sys_security.verify_own_content(checksum)
@ -634,4 +638,4 @@ class DockerInterface(JobGroup):
except (docker.errors.DockerException, requests.RequestException): except (docker.errors.DockerException, requests.RequestException):
return return
await self._validate_trust(image.id, self.image, self.version) await self._validate_trust(cast(str, image.id))

View File

@ -7,7 +7,7 @@ from ipaddress import IPv4Address
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from typing import Any, Final, Self from typing import Any, Final, Self, cast
import attr import attr
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
@ -255,7 +255,7 @@ class DockerAPI:
# Check if container is register on host # Check if container is register on host
# https://github.com/moby/moby/issues/23302 # https://github.com/moby/moby/issues/23302
if name in ( if name and name in (
val.get("Name") val.get("Name")
for val in host_network.attrs.get("Containers", {}).values() for val in host_network.attrs.get("Containers", {}).values()
): ):
@ -405,7 +405,8 @@ class DockerAPI:
# Check the image is correct and state is good # Check the image is correct and state is good
return ( return (
docker_container.image.id == docker_image.id docker_container.image is not None
and docker_container.image.id == docker_image.id
and docker_container.status in ("exited", "running", "created") and docker_container.status in ("exited", "running", "created")
) )
@ -540,7 +541,7 @@ class DockerAPI:
"""Import a tar file as image.""" """Import a tar file as image."""
try: try:
with tar_file.open("rb") as read_tar: with tar_file.open("rb") as read_tar:
docker_image_list: list[Image] = self.images.load(read_tar) docker_image_list: list[Image] = self.images.load(read_tar) # type: ignore
if len(docker_image_list) != 1: if len(docker_image_list) != 1:
_LOGGER.warning( _LOGGER.warning(
@ -557,7 +558,7 @@ class DockerAPI:
def export_image(self, image: str, version: AwesomeVersion, tar_file: Path) -> None: def export_image(self, image: str, version: AwesomeVersion, tar_file: Path) -> None:
"""Export current images into a tar file.""" """Export current images into a tar file."""
try: try:
image = self.api.get_image(f"{image}:{version}") docker_image = self.api.get_image(f"{image}:{version}")
except (DockerException, requests.RequestException) as err: except (DockerException, requests.RequestException) as err:
raise DockerError( raise DockerError(
f"Can't fetch image {image}: {err}", _LOGGER.error f"Can't fetch image {image}: {err}", _LOGGER.error
@ -566,7 +567,7 @@ class DockerAPI:
_LOGGER.info("Export image %s to %s", image, tar_file) _LOGGER.info("Export image %s to %s", image, tar_file)
try: try:
with tar_file.open("wb") as write_tar: with tar_file.open("wb") as write_tar:
for chunk in image: for chunk in docker_image:
write_tar.write(chunk) write_tar.write(chunk)
except (OSError, requests.RequestException) as err: except (OSError, requests.RequestException) as err:
raise DockerError( raise DockerError(
@ -586,7 +587,7 @@ class DockerAPI:
"""Clean up old versions of an image.""" """Clean up old versions of an image."""
image = f"{current_image}:{current_version!s}" image = f"{current_image}:{current_version!s}"
try: try:
keep: set[str] = {self.images.get(image).id} keep = {cast(str, self.images.get(image).id)}
except ImageNotFound: except ImageNotFound:
raise DockerNotFound( raise DockerNotFound(
f"{current_image} not found for cleanup", _LOGGER.warning f"{current_image} not found for cleanup", _LOGGER.warning
@ -602,7 +603,7 @@ class DockerAPI:
for image in keep_images: for image in keep_images:
# If its not found, no need to preserve it from getting removed # If its not found, no need to preserve it from getting removed
with suppress(ImageNotFound): with suppress(ImageNotFound):
keep.add(self.images.get(image).id) keep.add(cast(str, self.images.get(image).id))
except (DockerException, requests.RequestException) as err: except (DockerException, requests.RequestException) as err:
raise DockerError( raise DockerError(
f"Failed to get one or more images from {keep} during cleanup", f"Failed to get one or more images from {keep} during cleanup",
@ -614,16 +615,18 @@ class DockerAPI:
old_images | {current_image} if old_images else {current_image} old_images | {current_image} if old_images else {current_image}
) )
try: try:
images_list = self.images.list(name=image_names) # This API accepts a list of image names. Tested and confirmed working on docker==7.1.0
# Its typing does say only `str` though. Bit concerning, could an update break this?
images_list = self.images.list(name=image_names) # type: ignore
except (DockerException, requests.RequestException) as err: except (DockerException, requests.RequestException) as err:
raise DockerError( raise DockerError(
f"Corrupt docker overlayfs found: {err}", _LOGGER.warning f"Corrupt docker overlayfs found: {err}", _LOGGER.warning
) from err ) from err
for image in images_list: for docker_image in images_list:
if image.id in keep: if docker_image.id in keep:
continue continue
with suppress(DockerException, requests.RequestException): with suppress(DockerException, requests.RequestException):
_LOGGER.info("Cleanup images: %s", image.tags) _LOGGER.info("Cleanup images: %s", docker_image.tags)
self.images.remove(image.id, force=True) self.images.remove(docker_image.id, force=True)

View File

@ -37,7 +37,7 @@ class DockerMonitor(CoreSysAttributes, Thread):
def watch_container(self, container: Container): def watch_container(self, container: Container):
"""If container is missing the managed label, add name to list.""" """If container is missing the managed label, add name to list."""
if LABEL_MANAGED not in container.labels: if LABEL_MANAGED not in container.labels and container.name:
self._unlabeled_managed_containers += [container.name] self._unlabeled_managed_containers += [container.name]
async def load(self): async def load(self):
@ -54,8 +54,11 @@ class DockerMonitor(CoreSysAttributes, Thread):
_LOGGER.info("Stopped docker events monitor") _LOGGER.info("Stopped docker events monitor")
def run(self): def run(self) -> None:
"""Monitor and process docker events.""" """Monitor and process docker events."""
if not self._events:
raise RuntimeError("Monitor has not been loaded!")
for event in self._events: for event in self._events:
attributes: dict[str, str] = event.get("Actor", {}).get("Attributes", {}) attributes: dict[str, str] = event.get("Actor", {}).get("Attributes", {})

View File

@ -109,7 +109,7 @@ class DockerNetwork:
self.network.reload() self.network.reload()
# Check stale Network # Check stale Network
if container.name in ( if container.name and container.name in (
val.get("Name") for val in self.network.attrs.get("Containers", {}).values() val.get("Name") for val in self.network.attrs.get("Containers", {}).values()
): ):
self.stale_cleanup(container.name) self.stale_cleanup(container.name)

View File

@ -39,7 +39,7 @@ class DockerSupervisor(DockerInterface):
@property @property
def host_mounts_available(self) -> bool: def host_mounts_available(self) -> bool:
"""Return True if container can see mounts on host within its data directory.""" """Return True if container can see mounts on host within its data directory."""
return self._meta and any( return self._meta is not None and any(
mount.get("Propagation") == PropagationMode.SLAVE mount.get("Propagation") == PropagationMode.SLAVE
for mount in self.meta_mounts for mount in self.meta_mounts
if mount.get("Destination") == "/data" if mount.get("Destination") == "/data"
@ -89,7 +89,18 @@ class DockerSupervisor(DockerInterface):
""" """
try: try:
docker_container = self.sys_docker.containers.get(self.name) docker_container = self.sys_docker.containers.get(self.name)
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get Supervisor container for retag: {err}", _LOGGER.error
) from err
if not self.image or not docker_container.image:
raise DockerError(
"Could not locate image from container metadata for retag",
_LOGGER.error,
)
try:
docker_container.image.tag(self.image, tag=str(self.version)) docker_container.image.tag(self.image, tag=str(self.version))
docker_container.image.tag(self.image, tag="latest") docker_container.image.tag(self.image, tag="latest")
except (docker.errors.DockerException, requests.RequestException) as err: except (docker.errors.DockerException, requests.RequestException) as err:
@ -110,7 +121,18 @@ class DockerSupervisor(DockerInterface):
try: try:
docker_container = self.sys_docker.containers.get(self.name) docker_container = self.sys_docker.containers.get(self.name)
docker_image = self.sys_docker.images.get(f"{image}:{version!s}") docker_image = self.sys_docker.images.get(f"{image}:{version!s}")
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't get image or container to fix start tag: {err}", _LOGGER.error
) from err
if not docker_container.image:
raise DockerError(
"Cannot locate image from container metadata to fix start tag",
_LOGGER.error,
)
try:
# Find start tag # Find start tag
for tag in docker_container.image.tags: for tag in docker_container.image.tags:
start_image = tag.partition(":")[0] start_image = tag.partition(":")[0]

View File

@ -72,7 +72,7 @@ class HwDisk(CoreSysAttributes):
_, _, free = shutil.disk_usage(path) _, _, free = shutil.disk_usage(path)
return round(free / (1024.0**3), 1) return round(free / (1024.0**3), 1)
def _get_mountinfo(self, path: str) -> str: def _get_mountinfo(self, path: str) -> list[str] | None:
mountinfo = _MOUNTINFO.read_text(encoding="utf-8") mountinfo = _MOUNTINFO.read_text(encoding="utf-8")
for line in mountinfo.splitlines(): for line in mountinfo.splitlines():
mountinfoarr = line.split() mountinfoarr = line.split()
@ -80,7 +80,7 @@ class HwDisk(CoreSysAttributes):
return mountinfoarr return mountinfoarr
return None return None
def _get_mount_source(self, path: str) -> str: def _get_mount_source(self, path: str) -> str | None:
mountinfoarr = self._get_mountinfo(path) mountinfoarr = self._get_mountinfo(path)
if mountinfoarr is None: if mountinfoarr is None:
@ -92,7 +92,7 @@ class HwDisk(CoreSysAttributes):
optionsep += 1 optionsep += 1
return mountinfoarr[optionsep + 2] return mountinfoarr[optionsep + 2]
def _try_get_emmc_life_time(self, device_name: str) -> float: def _try_get_emmc_life_time(self, device_name: str) -> float | None:
# Get eMMC life_time # Get eMMC life_time
life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name)) life_time_path = Path(_BLOCK_DEVICE_EMMC_LIFE_TIME.format(device_name))
@ -121,13 +121,13 @@ class HwDisk(CoreSysAttributes):
# Return the pessimistic estimate (0x02 -> 10%-20%, return 20%) # Return the pessimistic estimate (0x02 -> 10%-20%, return 20%)
return life_time_value * 10.0 return life_time_value * 10.0
def get_disk_life_time(self, path: str | Path) -> float: def get_disk_life_time(self, path: str | Path) -> float | None:
"""Return life time estimate of the underlying SSD drive. """Return life time estimate of the underlying SSD drive.
Must be run in executor. Must be run in executor.
""" """
mount_source = self._get_mount_source(str(path)) mount_source = self._get_mount_source(str(path))
if mount_source == "overlay": if not mount_source or mount_source == "overlay":
return None return None
mount_source_path = Path(mount_source) mount_source_path = Path(mount_source)

View File

@ -1,8 +1,9 @@
"""Hardware Manager of Supervisor.""" """Hardware Manager of Supervisor."""
from __future__ import annotations
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Self
import pyudev import pyudev
@ -48,28 +49,30 @@ _STATIC_NODES: list[Device] = [
class HardwareManager(CoreSysAttributes): class HardwareManager(CoreSysAttributes):
"""Hardware manager for supervisor.""" """Hardware manager for supervisor."""
def __init__(self, coresys: CoreSys): def __init__(self, coresys: CoreSys, udev: pyudev.Context) -> None:
"""Initialize Hardware Monitor object.""" """Initialize Hardware Monitor object."""
self.coresys: CoreSys = coresys self.coresys: CoreSys = coresys
self._devices: dict[str, Device] = {} self._devices: dict[str, Device] = {}
self._udev: pyudev.Context | None = None self._udev: pyudev.Context = udev
self._monitor: HwMonitor | None = None self._monitor: HwMonitor = HwMonitor(coresys, udev)
self._helper: HwHelper = HwHelper(coresys) self._helper: HwHelper = HwHelper(coresys)
self._policy: HwPolicy = HwPolicy(coresys) self._policy: HwPolicy = HwPolicy(coresys)
self._disk: HwDisk = HwDisk(coresys) self._disk: HwDisk = HwDisk(coresys)
async def post_init(self) -> Self: @classmethod
"""Complete initialization of obect within event loop.""" async def create(cls: type[HardwareManager], coresys: CoreSys) -> HardwareManager:
self._udev = await self.sys_run_in_executor(pyudev.Context) """Complete initialization of a HardwareManager object within event loop."""
self._monitor: HwMonitor = HwMonitor(self.coresys, self._udev) return cls(coresys, await coresys.run_in_executor(pyudev.Context))
return self
@property
def udev(self) -> pyudev.Context:
"""Return Udev context instance."""
return self._udev
@property @property
def monitor(self) -> HwMonitor: def monitor(self) -> HwMonitor:
"""Return Hardware Monitor instance.""" """Return Hardware Monitor instance."""
if not self._monitor:
raise RuntimeError("Hardware monitor not initialized!")
return self._monitor return self._monitor
@property @property
@ -129,7 +132,7 @@ class HardwareManager(CoreSysAttributes):
def check_subsystem_parents(self, device: Device, subsystem: UdevSubsystem) -> bool: def check_subsystem_parents(self, device: Device, subsystem: UdevSubsystem) -> bool:
"""Return True if the device is part of the given subsystem parent.""" """Return True if the device is part of the given subsystem parent."""
udev_device: pyudev.Device = pyudev.Devices.from_sys_path( udev_device: pyudev.Device = pyudev.Devices.from_sys_path(
self._udev, str(device.sysfs) self.udev, str(device.sysfs)
) )
return udev_device.find_parent(subsystem) is not None return udev_device.find_parent(subsystem) is not None
@ -138,7 +141,7 @@ class HardwareManager(CoreSysAttributes):
self._devices.clear() self._devices.clear()
# Exctract all devices # Exctract all devices
for device in self._udev.list_devices(): for device in self.udev.list_devices():
# Skip devices without mapping # Skip devices without mapping
try: try:
if not device.device_node or self.helper.hide_virtual_device(device): if not device.device_node or self.helper.hide_virtual_device(device):

View File

@ -51,15 +51,17 @@ class HomeAssistantAPI(CoreSysAttributes):
) )
async def ensure_access_token(self) -> None: async def ensure_access_token(self) -> None:
"""Ensure there is an access token.""" """Ensure there is an access token."""
if self.access_token is not None and self._access_token_expires > datetime.now( if (
tz=UTC self.access_token
and self._access_token_expires
and self._access_token_expires > datetime.now(tz=UTC)
): ):
return return
with suppress(asyncio.TimeoutError, aiohttp.ClientError): with suppress(asyncio.TimeoutError, aiohttp.ClientError):
async with self.sys_websession.post( async with self.sys_websession.post(
f"{self.sys_homeassistant.api_url}/auth/token", f"{self.sys_homeassistant.api_url}/auth/token",
timeout=30, timeout=aiohttp.ClientTimeout(total=30),
data={ data={
"grant_type": "refresh_token", "grant_type": "refresh_token",
"refresh_token": self.sys_homeassistant.refresh_token, "refresh_token": self.sys_homeassistant.refresh_token,

View File

@ -101,7 +101,7 @@ class HomeAssistantCore(JobGroup):
await self.instance.check_image( await self.instance.check_image(
self.sys_homeassistant.version, self.sys_homeassistant.default_image self.sys_homeassistant.version, self.sys_homeassistant.default_image
) )
self.sys_homeassistant.image = self.sys_homeassistant.default_image self.sys_homeassistant.set_image(self.sys_homeassistant.default_image)
except DockerError: except DockerError:
_LOGGER.info( _LOGGER.info(
"No Home Assistant Docker image %s found.", self.sys_homeassistant.image "No Home Assistant Docker image %s found.", self.sys_homeassistant.image
@ -109,7 +109,7 @@ class HomeAssistantCore(JobGroup):
await self.install_landingpage() await self.install_landingpage()
else: else:
self.sys_homeassistant.version = self.instance.version self.sys_homeassistant.version = self.instance.version
self.sys_homeassistant.image = self.instance.image self.sys_homeassistant.set_image(self.instance.image)
await self.sys_homeassistant.save_data() await self.sys_homeassistant.save_data()
# Start landingpage # Start landingpage
@ -138,7 +138,7 @@ class HomeAssistantCore(JobGroup):
else: else:
_LOGGER.info("Using preinstalled landingpage") _LOGGER.info("Using preinstalled landingpage")
self.sys_homeassistant.version = LANDINGPAGE self.sys_homeassistant.version = LANDINGPAGE
self.sys_homeassistant.image = self.instance.image self.sys_homeassistant.set_image(self.instance.image)
await self.sys_homeassistant.save_data() await self.sys_homeassistant.save_data()
return return
@ -166,7 +166,7 @@ class HomeAssistantCore(JobGroup):
await asyncio.sleep(30) await asyncio.sleep(30)
self.sys_homeassistant.version = LANDINGPAGE self.sys_homeassistant.version = LANDINGPAGE
self.sys_homeassistant.image = self.sys_updater.image_homeassistant self.sys_homeassistant.set_image(self.sys_updater.image_homeassistant)
await self.sys_homeassistant.save_data() await self.sys_homeassistant.save_data()
@Job( @Job(
@ -199,7 +199,7 @@ class HomeAssistantCore(JobGroup):
_LOGGER.info("Home Assistant docker now installed") _LOGGER.info("Home Assistant docker now installed")
self.sys_homeassistant.version = self.instance.version self.sys_homeassistant.version = self.instance.version
self.sys_homeassistant.image = self.sys_updater.image_homeassistant self.sys_homeassistant.set_image(self.sys_updater.image_homeassistant)
await self.sys_homeassistant.save_data() await self.sys_homeassistant.save_data()
# finishing # finishing
@ -232,6 +232,12 @@ class HomeAssistantCore(JobGroup):
) -> None: ) -> None:
"""Update HomeAssistant version.""" """Update HomeAssistant version."""
version = version or self.sys_homeassistant.latest_version version = version or self.sys_homeassistant.latest_version
if not version:
raise HomeAssistantUpdateError(
"Cannot determine latest version of Home Assistant for update",
_LOGGER.error,
)
old_image = self.sys_homeassistant.image old_image = self.sys_homeassistant.image
rollback = self.sys_homeassistant.version if not self.error_state else None rollback = self.sys_homeassistant.version if not self.error_state else None
running = await self.instance.is_running() running = await self.instance.is_running()
@ -263,7 +269,7 @@ class HomeAssistantCore(JobGroup):
) from err ) from err
self.sys_homeassistant.version = self.instance.version self.sys_homeassistant.version = self.instance.version
self.sys_homeassistant.image = self.sys_updater.image_homeassistant self.sys_homeassistant.set_image(self.sys_updater.image_homeassistant)
if running: if running:
await self.start() await self.start()
@ -303,11 +309,11 @@ class HomeAssistantCore(JobGroup):
# Make a copy of the current log file if it exists # Make a copy of the current log file if it exists
logfile = self.sys_config.path_homeassistant / "home-assistant.log" logfile = self.sys_config.path_homeassistant / "home-assistant.log"
if logfile.exists(): if logfile.exists():
backup = ( rollback_log = (
self.sys_config.path_homeassistant / "home-assistant-rollback.log" self.sys_config.path_homeassistant / "home-assistant-rollback.log"
) )
shutil.copy(logfile, backup) shutil.copy(logfile, rollback_log)
_LOGGER.info( _LOGGER.info(
"A backup of the logfile is stored in /config/home-assistant-rollback.log" "A backup of the logfile is stored in /config/home-assistant-rollback.log"
) )
@ -334,7 +340,7 @@ class HomeAssistantCore(JobGroup):
except DockerError as err: except DockerError as err:
raise HomeAssistantError() from err raise HomeAssistantError() from err
await self._block_till_run(self.sys_homeassistant.version) await self._block_till_run()
# No Instance/Container found, extended start # No Instance/Container found, extended start
else: else:
# Create new API token # Create new API token
@ -349,7 +355,7 @@ class HomeAssistantCore(JobGroup):
except DockerError as err: except DockerError as err:
raise HomeAssistantError() from err raise HomeAssistantError() from err
await self._block_till_run(self.sys_homeassistant.version) await self._block_till_run()
@Job( @Job(
name="home_assistant_core_stop", name="home_assistant_core_stop",
@ -382,7 +388,7 @@ class HomeAssistantCore(JobGroup):
except DockerError as err: except DockerError as err:
raise HomeAssistantError() from err raise HomeAssistantError() from err
await self._block_till_run(self.sys_homeassistant.version) await self._block_till_run()
@Job( @Job(
name="home_assistant_core_rebuild", name="home_assistant_core_rebuild",
@ -440,7 +446,7 @@ class HomeAssistantCore(JobGroup):
@property @property
def in_progress(self) -> bool: def in_progress(self) -> bool:
"""Return True if a task is in progress.""" """Return True if a task is in progress."""
return self.instance.in_progress or self.active_job return self.instance.in_progress or self.active_job is not None
async def check_config(self) -> ConfigResult: async def check_config(self) -> ConfigResult:
"""Run Home Assistant config check.""" """Run Home Assistant config check."""
@ -467,10 +473,10 @@ class HomeAssistantCore(JobGroup):
_LOGGER.info("Home Assistant config is valid") _LOGGER.info("Home Assistant config is valid")
return ConfigResult(True, log) return ConfigResult(True, log)
async def _block_till_run(self, version: AwesomeVersion) -> None: async def _block_till_run(self) -> None:
"""Block until Home-Assistant is booting up or startup timeout.""" """Block until Home-Assistant is booting up or startup timeout."""
# Skip landingpage # Skip landingpage
if version == LANDINGPAGE: if self.sys_homeassistant.version == LANDINGPAGE:
return return
_LOGGER.info("Wait until Home Assistant is ready") _LOGGER.info("Wait until Home Assistant is ready")

View File

@ -112,12 +112,12 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
return self._secrets return self._secrets
@property @property
def machine(self) -> str: def machine(self) -> str | None:
"""Return the system machines.""" """Return the system machines."""
return self.core.instance.machine return self.core.instance.machine
@property @property
def arch(self) -> str: def arch(self) -> str | None:
"""Return arch of running Home Assistant.""" """Return arch of running Home Assistant."""
return self.core.instance.arch return self.core.instance.arch
@ -190,8 +190,7 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
return self._data[ATTR_IMAGE] return self._data[ATTR_IMAGE]
return self.default_image return self.default_image
@image.setter def set_image(self, value: str | None) -> None:
def image(self, value: str | None) -> None:
"""Set image name of Home Assistant container.""" """Set image name of Home Assistant container."""
self._data[ATTR_IMAGE] = value self._data[ATTR_IMAGE] = value
@ -284,7 +283,7 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
def need_update(self) -> bool: def need_update(self) -> bool:
"""Return true if a Home Assistant update is available.""" """Return true if a Home Assistant update is available."""
try: try:
return self.version < self.latest_version return self.version is not None and self.version < self.latest_version
except (AwesomeVersionException, TypeError): except (AwesomeVersionException, TypeError):
return False return False
@ -347,7 +346,9 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
): ):
return return
configuration = await self.sys_homeassistant.websocket.async_send_command( configuration: (
dict[str, Any] | None
) = await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: "get_config"} {ATTR_TYPE: "get_config"}
) )
if not configuration or "usb" not in configuration.get("components", []): if not configuration or "usb" not in configuration.get("components", []):
@ -359,7 +360,7 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
async def begin_backup(self) -> None: async def begin_backup(self) -> None:
"""Inform Home Assistant a backup is beginning.""" """Inform Home Assistant a backup is beginning."""
try: try:
resp = await self.websocket.async_send_command( resp: dict[str, Any] | None = await self.websocket.async_send_command(
{ATTR_TYPE: WSType.BACKUP_START} {ATTR_TYPE: WSType.BACKUP_START}
) )
except HomeAssistantWSError as err: except HomeAssistantWSError as err:
@ -378,7 +379,7 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
async def end_backup(self) -> None: async def end_backup(self) -> None:
"""Inform Home Assistant the backup is ending.""" """Inform Home Assistant the backup is ending."""
try: try:
resp = await self.websocket.async_send_command( resp: dict[str, Any] | None = await self.websocket.async_send_command(
{ATTR_TYPE: WSType.BACKUP_END} {ATTR_TYPE: WSType.BACKUP_END}
) )
except HomeAssistantWSError as err: except HomeAssistantWSError as err:
@ -555,7 +556,9 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
) )
async def get_users(self) -> list[IngressSessionDataUser]: async def get_users(self) -> list[IngressSessionDataUser]:
"""Get list of all configured users.""" """Get list of all configured users."""
list_of_users = await self.sys_homeassistant.websocket.async_send_command( list_of_users: (
list[dict[str, Any]] | None
) = await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: "config/auth/list"} {ATTR_TYPE: "config/auth/list"}
) )

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
from typing import Any from typing import Any, TypeVar, cast
import aiohttp import aiohttp
from aiohttp.http_websocket import WSMsgType from aiohttp.http_websocket import WSMsgType
@ -38,6 +38,8 @@ MIN_VERSION = {
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
T = TypeVar("T")
class WSClient: class WSClient:
"""Home Assistant Websocket client.""" """Home Assistant Websocket client."""
@ -53,7 +55,7 @@ class WSClient:
self._client = client self._client = client
self._message_id: int = 0 self._message_id: int = 0
self._loop = loop self._loop = loop
self._futures: dict[int, asyncio.Future[dict]] = {} self._futures: dict[int, asyncio.Future[T]] = {} # type: ignore
@property @property
def connected(self) -> bool: def connected(self) -> bool:
@ -78,9 +80,9 @@ class WSClient:
try: try:
await self._client.send_json(message, dumps=json_dumps) await self._client.send_json(message, dumps=json_dumps)
except ConnectionError as err: except ConnectionError as err:
raise HomeAssistantWSConnectionError(err) from err raise HomeAssistantWSConnectionError(str(err)) from err
async def async_send_command(self, message: dict[str, Any]) -> dict | None: async def async_send_command(self, message: dict[str, Any]) -> T | None:
"""Send a websocket message, and return the response.""" """Send a websocket message, and return the response."""
self._message_id += 1 self._message_id += 1
message["id"] = self._message_id message["id"] = self._message_id
@ -89,7 +91,7 @@ class WSClient:
try: try:
await self._client.send_json(message, dumps=json_dumps) await self._client.send_json(message, dumps=json_dumps)
except ConnectionError as err: except ConnectionError as err:
raise HomeAssistantWSConnectionError(err) from err raise HomeAssistantWSConnectionError(str(err)) from err
try: try:
return await self._futures[message["id"]] return await self._futures[message["id"]]
@ -206,7 +208,7 @@ class HomeAssistantWebSocket(CoreSysAttributes):
self.sys_websession, self.sys_websession,
self.sys_loop, self.sys_loop,
self.sys_homeassistant.ws_url, self.sys_homeassistant.ws_url,
self.sys_homeassistant.api.access_token, cast(str, self.sys_homeassistant.api.access_token),
) )
self.sys_create_task(client.start_listener()) self.sys_create_task(client.start_listener())
@ -264,24 +266,27 @@ class HomeAssistantWebSocket(CoreSysAttributes):
return return
try: try:
if self._client:
await self._client.async_send_command(message) await self._client.async_send_command(message)
except HomeAssistantWSConnectionError: except HomeAssistantWSConnectionError:
if self._client: if self._client:
await self._client.close() await self._client.close()
self._client = None self._client = None
async def async_send_command(self, message: dict[str, Any]) -> dict[str, Any]: async def async_send_command(self, message: dict[str, Any]) -> T | None:
"""Send a command with the WS client and wait for the response.""" """Send a command with the WS client and wait for the response."""
if not await self._can_send(message): if not await self._can_send(message):
return return None
try: try:
if self._client:
return await self._client.async_send_command(message) return await self._client.async_send_command(message)
except HomeAssistantWSConnectionError: except HomeAssistantWSConnectionError:
if self._client: if self._client:
await self._client.close() await self._client.close()
self._client = None self._client = None
raise raise
return None
async def async_supervisor_update_event( async def async_supervisor_update_event(
self, self,
@ -323,7 +328,7 @@ class HomeAssistantWebSocket(CoreSysAttributes):
async def async_supervisor_event( async def async_supervisor_event(
self, event: WSEvent, data: dict[str, Any] | None = None self, event: WSEvent, data: dict[str, Any] | None = None
): ) -> None:
"""Send a supervisor/event command to Home Assistant.""" """Send a supervisor/event command to Home Assistant."""
try: try:
await self.async_send_message( await self.async_send_message(
@ -340,7 +345,9 @@ class HomeAssistantWebSocket(CoreSysAttributes):
except HomeAssistantWSError as err: except HomeAssistantWSError as err:
_LOGGER.error("Could not send message to Home Assistant due to %s", err) _LOGGER.error("Could not send message to Home Assistant due to %s", err)
def supervisor_event(self, event: WSEvent, data: dict[str, Any] | None = None): def supervisor_event(
self, event: WSEvent, data: dict[str, Any] | None = None
) -> None:
"""Send a supervisor/event command to Home Assistant.""" """Send a supervisor/event command to Home Assistant."""
if self.sys_core.state in CLOSING_STATES: if self.sys_core.state in CLOSING_STATES:
return return

View File

@ -133,7 +133,7 @@ class InfoCenter(CoreSysAttributes):
self.coresys.config.path_supervisor, self.coresys.config.path_supervisor,
) )
async def disk_life_time(self) -> float: async def disk_life_time(self) -> float | None:
"""Return the estimated life-time usage (in %) of the SSD storing the data directory.""" """Return the estimated life-time usage (in %) of the SSD storing the data directory."""
return await self.sys_run_in_executor( return await self.sys_run_in_executor(
self.sys_hardware.disk.get_disk_life_time, self.sys_hardware.disk.get_disk_life_time,

21
tests/docker/conftest.py Normal file
View File

@ -0,0 +1,21 @@
"""Fixtures for docker tests."""
import pytest
from supervisor.coresys import CoreSys
from supervisor.docker.interface import DockerInterface
class TestDockerInterface(DockerInterface):
"""Test docker interface."""
@property
def name(self) -> str:
"""Name of test interface."""
return "test_interface"
@pytest.fixture
def test_docker_interface(coresys: CoreSys) -> TestDockerInterface:
"""Return test docker interface."""
return TestDockerInterface(coresys)

View File

@ -5,37 +5,44 @@ from supervisor.coresys import CoreSys
from supervisor.docker.interface import DOCKER_HUB, DockerInterface from supervisor.docker.interface import DOCKER_HUB, DockerInterface
def test_no_credentials(coresys: CoreSys): def test_no_credentials(coresys: CoreSys, test_docker_interface: DockerInterface):
"""Test no credentials.""" """Test no credentials."""
docker = DockerInterface(coresys)
coresys.docker.config._data["registries"] = { coresys.docker.config._data["registries"] = {
DOCKER_HUB: {"username": "Spongebob Squarepants", "password": "Password1!"} DOCKER_HUB: {"username": "Spongebob Squarepants", "password": "Password1!"}
} }
assert not docker._get_credentials("ghcr.io/homeassistant") assert not test_docker_interface._get_credentials("ghcr.io/homeassistant")
assert not docker._get_credentials("ghcr.io/homeassistant/amd64-supervisor") assert not test_docker_interface._get_credentials(
"ghcr.io/homeassistant/amd64-supervisor"
)
def test_no_matching_credentials(coresys: CoreSys): def test_no_matching_credentials(
coresys: CoreSys, test_docker_interface: DockerInterface
):
"""Test no matching credentials.""" """Test no matching credentials."""
docker = DockerInterface(coresys)
coresys.docker.config._data["registries"] = { coresys.docker.config._data["registries"] = {
DOCKER_HUB: {"username": "Spongebob Squarepants", "password": "Password1!"} DOCKER_HUB: {"username": "Spongebob Squarepants", "password": "Password1!"}
} }
assert not docker._get_credentials("ghcr.io/homeassistant") assert not test_docker_interface._get_credentials("ghcr.io/homeassistant")
assert not docker._get_credentials("ghcr.io/homeassistant/amd64-supervisor") assert not test_docker_interface._get_credentials(
"ghcr.io/homeassistant/amd64-supervisor"
)
def test_matching_credentials(coresys: CoreSys): def test_matching_credentials(coresys: CoreSys, test_docker_interface: DockerInterface):
"""Test no matching credentials.""" """Test no matching credentials."""
docker = DockerInterface(coresys)
coresys.docker.config._data["registries"] = { coresys.docker.config._data["registries"] = {
"ghcr.io": {"username": "Octocat", "password": "Password1!"}, "ghcr.io": {"username": "Octocat", "password": "Password1!"},
DOCKER_HUB: {"username": "Spongebob Squarepants", "password": "Password1!"}, DOCKER_HUB: {"username": "Spongebob Squarepants", "password": "Password1!"},
} }
credentials = docker._get_credentials("ghcr.io/homeassistant/amd64-supervisor") credentials = test_docker_interface._get_credentials(
"ghcr.io/homeassistant/amd64-supervisor"
)
assert credentials["registry"] == "ghcr.io" assert credentials["registry"] == "ghcr.io"
credentials = docker._get_credentials("homeassistant/amd64-supervisor") credentials = test_docker_interface._get_credentials(
"homeassistant/amd64-supervisor"
)
assert credentials["username"] == "Spongebob Squarepants" assert credentials["username"] == "Spongebob Squarepants"
assert "registry" not in credentials assert "registry" not in credentials

View File

@ -44,18 +44,26 @@ def mock_verify_content(coresys: CoreSys):
(CpuArch.AMD64, "linux/amd64"), (CpuArch.AMD64, "linux/amd64"),
], ],
) )
async def test_docker_image_platform(coresys: CoreSys, cpu_arch: str, platform: str): async def test_docker_image_platform(
coresys: CoreSys,
test_docker_interface: DockerInterface,
cpu_arch: str,
platform: str,
):
"""Test platform set correctly from arch.""" """Test platform set correctly from arch."""
with patch.object( with patch.object(
coresys.docker.images, "pull", return_value=Mock(id="test:1.2.3") coresys.docker.images, "pull", return_value=Mock(id="test:1.2.3")
) as pull: ) as pull:
instance = DockerInterface(coresys) await test_docker_interface.install(
await instance.install(AwesomeVersion("1.2.3"), "test", arch=cpu_arch) AwesomeVersion("1.2.3"), "test", arch=cpu_arch
)
assert pull.call_count == 1 assert pull.call_count == 1
assert pull.call_args == call("test:1.2.3", platform=platform) assert pull.call_args == call("test:1.2.3", platform=platform)
async def test_docker_image_default_platform(coresys: CoreSys): async def test_docker_image_default_platform(
coresys: CoreSys, test_docker_interface: DockerInterface
):
"""Test platform set using supervisor arch when omitted.""" """Test platform set using supervisor arch when omitted."""
with ( with (
patch.object( patch.object(
@ -65,8 +73,7 @@ async def test_docker_image_default_platform(coresys: CoreSys):
coresys.docker.images, "pull", return_value=Mock(id="test:1.2.3") coresys.docker.images, "pull", return_value=Mock(id="test:1.2.3")
) as pull, ) as pull,
): ):
instance = DockerInterface(coresys) await test_docker_interface.install(AwesomeVersion("1.2.3"), "test")
await instance.install(AwesomeVersion("1.2.3"), "test")
assert pull.call_count == 1 assert pull.call_count == 1
assert pull.call_args == call("test:1.2.3", platform="linux/386") assert pull.call_args == call("test:1.2.3", platform="linux/386")

View File

@ -4,7 +4,7 @@ from datetime import datetime, timedelta
from unittest.mock import MagicMock, Mock, PropertyMock, patch from unittest.mock import MagicMock, Mock, PropertyMock, patch
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
from docker.errors import DockerException, ImageNotFound, NotFound from docker.errors import APIError, DockerException, ImageNotFound, NotFound
import pytest import pytest
from time_machine import travel from time_machine import travel
@ -15,7 +15,6 @@ from supervisor.docker.interface import DockerInterface
from supervisor.docker.manager import DockerAPI from supervisor.docker.manager import DockerAPI
from supervisor.exceptions import ( from supervisor.exceptions import (
AudioUpdateError, AudioUpdateError,
CodeNotaryError,
DockerError, DockerError,
HomeAssistantCrashError, HomeAssistantCrashError,
HomeAssistantError, HomeAssistantError,
@ -69,11 +68,8 @@ async def test_install_landingpage_docker_error(
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64) DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
), ),
patch("supervisor.homeassistant.core.asyncio.sleep") as sleep, patch("supervisor.homeassistant.core.asyncio.sleep") as sleep,
patch(
"supervisor.security.module.cas_validate",
side_effect=[CodeNotaryError, None],
),
): ):
coresys.docker.images.pull.side_effect = [APIError("fail"), MagicMock()]
await coresys.homeassistant.core.install_landingpage() await coresys.homeassistant.core.install_landingpage()
sleep.assert_awaited_once_with(30) sleep.assert_awaited_once_with(30)
@ -126,11 +122,8 @@ async def test_install_docker_error(
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64) DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
), ),
patch("supervisor.homeassistant.core.asyncio.sleep") as sleep, patch("supervisor.homeassistant.core.asyncio.sleep") as sleep,
patch(
"supervisor.security.module.cas_validate",
side_effect=[CodeNotaryError, None],
),
): ):
coresys.docker.images.pull.side_effect = [APIError("fail"), MagicMock()]
await coresys.homeassistant.core.install() await coresys.homeassistant.core.install()
sleep.assert_awaited_once_with(30) sleep.assert_awaited_once_with(30)
@ -398,7 +391,7 @@ async def test_core_loads_wrong_image_for_machine(
coresys: CoreSys, container: MagicMock coresys: CoreSys, container: MagicMock
): ):
"""Test core is loaded with wrong image for machine.""" """Test core is loaded with wrong image for machine."""
coresys.homeassistant.image = "ghcr.io/home-assistant/odroid-n2-homeassistant" coresys.homeassistant.set_image("ghcr.io/home-assistant/odroid-n2-homeassistant")
coresys.homeassistant.version = AwesomeVersion("2024.4.0") coresys.homeassistant.version = AwesomeVersion("2024.4.0")
container.attrs["Config"] = {"Labels": {"io.hass.version": "2024.4.0"}} container.attrs["Config"] = {"Labels": {"io.hass.version": "2024.4.0"}}
@ -424,7 +417,7 @@ async def test_core_loads_wrong_image_for_machine(
async def test_core_load_allows_image_override(coresys: CoreSys, container: MagicMock): async def test_core_load_allows_image_override(coresys: CoreSys, container: MagicMock):
"""Test core does not change image if user overrode it.""" """Test core does not change image if user overrode it."""
coresys.homeassistant.image = "ghcr.io/home-assistant/odroid-n2-homeassistant" coresys.homeassistant.set_image("ghcr.io/home-assistant/odroid-n2-homeassistant")
coresys.homeassistant.version = AwesomeVersion("2024.4.0") coresys.homeassistant.version = AwesomeVersion("2024.4.0")
container.attrs["Config"] = {"Labels": {"io.hass.version": "2024.4.0"}} container.attrs["Config"] = {"Labels": {"io.hass.version": "2024.4.0"}}