Docker events based watchdog and docker healthchecks (#3725)

* Docker events based watchdog

* Separate monitor from DockerAPI since it needs coresys

* Move monitor into dockerAPI

* Fix properties on coresys

* Add watchdog tests

* Added tests

* pylint issue

* Current state failures test

* Thread-safe event processing

* Use labels property
This commit is contained in:
Mike Degatano 2022-07-15 03:21:59 -04:00 committed by GitHub
parent 14bc771ba9
commit d19166bb86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1860 additions and 671 deletions

View File

@ -48,9 +48,12 @@ from ..const import (
AddonBoot,
AddonStartup,
AddonState,
BusEvent,
)
from ..coresys import CoreSys
from ..docker.addon import DockerAddon
from ..docker.const import ContainerState
from ..docker.monitor import DockerContainerStateEvent
from ..docker.stats import DockerStats
from ..exceptions import (
AddonConfigurationError,
@ -58,7 +61,6 @@ from ..exceptions import (
AddonsNotSupportedError,
ConfigurationFileError,
DockerError,
DockerRequestError,
HostAppArmorError,
)
from ..hardware.data import Device
@ -66,7 +68,7 @@ from ..homeassistant.const import WSEvent, WSType
from ..utils import check_port
from ..utils.apparmor import adjust_profile
from ..utils.json import read_json_file, write_json_file
from .const import AddonBackupMode
from .const import WATCHDOG_RETRY_SECONDS, AddonBackupMode
from .model import AddonModel, Data
from .options import AddonOptions
from .utils import remove_data
@ -135,15 +137,16 @@ class Addon(AddonModel):
async def load(self) -> None:
"""Async initialize of object."""
self.sys_bus.register_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, self.container_state_changed
)
self.sys_bus.register_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, self.watchdog_container
)
with suppress(DockerError):
await self.instance.attach(version=self.version)
# Evaluate state
if await self.instance.is_running():
self.state = AddonState.STARTED
else:
self.state = AddonState.STOPPED
@property
def ip_address(self) -> IPv4Address:
"""Return IP of add-on instance."""
@ -613,27 +616,17 @@ class Addon(AddonModel):
# Start Add-on
try:
await self.instance.run()
except DockerRequestError as err:
self.state = AddonState.ERROR
raise AddonsError() from err
except DockerError as err:
self.state = AddonState.ERROR
raise AddonsError() from err
else:
self.state = AddonState.STARTED
async def stop(self) -> None:
"""Stop add-on."""
try:
await self.instance.stop()
except DockerRequestError as err:
self.state = AddonState.ERROR
raise AddonsError() from err
except DockerError as err:
self.state = AddonState.ERROR
raise AddonsError() from err
else:
self.state = AddonState.STOPPED
async def restart(self) -> None:
"""Restart add-on."""
@ -886,3 +879,64 @@ class Addon(AddonModel):
Return Coroutine.
"""
return self.instance.check_trust()
async def container_state_changed(self, event: DockerContainerStateEvent) -> None:
"""Set addon state from container state."""
if event.name != self.instance.name:
return
if event.state == ContainerState.RUNNING:
self.state = AddonState.STARTED
elif event.state == ContainerState.STOPPED:
self.state = AddonState.STOPPED
elif event.state == ContainerState.FAILED:
self.state = AddonState.ERROR
async def watchdog_container(self, event: DockerContainerStateEvent) -> None:
"""Process state changes in addon container and restart if necessary."""
if not (event.name == self.instance.name and self.watchdog):
return
if event.state == ContainerState.UNHEALTHY:
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found addon %s is unhealthy, restarting...", self.name
)
try:
await self.restart()
except AddonsError as err:
_LOGGER.error("Watchdog restart of addon %s failed!", self.name)
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
elif event.state == ContainerState.FAILED:
rebuild = False
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found addon %s failed, restarting...", self.name
)
if not rebuild:
try:
await self.start()
except AddonsError as err:
self.sys_capture_exception(err)
rebuild = True
else:
break
try:
await self.rebuild()
except AddonsError as err:
_LOGGER.error(
"Watchdog reanimation of addon %s failed!", self.name
)
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)

View File

@ -11,3 +11,4 @@ class AddonBackupMode(str, Enum):
ATTR_BACKUP = "backup"
ATTR_CODENOTARY = "codenotary"
WATCHDOG_RETRY_SECONDS = 10

View File

@ -13,6 +13,7 @@ from sentry_sdk.integrations.excepthook import ExcepthookIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.threading import ThreadingIntegration
from supervisor.docker.manager import DockerAPI
from supervisor.jobs import JobManager
from .addons import AddonManager
@ -63,6 +64,7 @@ async def initialize_coresys() -> CoreSys:
coresys = CoreSys()
# Initialize core objects
coresys.docker = DockerAPI(coresys)
coresys.resolution = ResolutionManager(coresys)
coresys.jobs = JobManager(coresys)
coresys.core = Core(coresys)

View File

@ -439,6 +439,7 @@ class BusEvent(str, Enum):
HARDWARE_NEW_DEVICE = "hardware_new_device"
HARDWARE_REMOVE_DEVICE = "hardware_remove_device"
DOCKER_CONTAINER_STATE_CHANGE = "docker_container_state_change"
class CpuArch(str, Enum):

View File

@ -116,6 +116,8 @@ class Core(CoreSysAttributes):
self.sys_host.load(),
# Adjust timezone / time settings
self._adjust_system_datetime(),
# Start docker monitoring
self.sys_docker.load(),
# Load Plugins container
self.sys_plugins.load(),
# load last available data
@ -278,7 +280,13 @@ class Core(CoreSysAttributes):
# Stage 1
try:
async with async_timeout.timeout(10):
await asyncio.wait([self.sys_api.stop(), self.sys_scheduler.shutdown()])
await asyncio.wait(
[
self.sys_api.stop(),
self.sys_scheduler.shutdown(),
self.sys_docker.unload(),
]
)
except asyncio.TimeoutError:
_LOGGER.warning("Stage 1: Force Shutdown!")
@ -305,6 +313,9 @@ class Core(CoreSysAttributes):
if self.state == CoreState.RUNNING:
self.state = CoreState.SHUTDOWN
# Stop docker monitoring
await self.sys_docker.unload()
# Shutdown Application Add-ons, using Home Assistant API
await self.sys_addons.shutdown(AddonStartup.APPLICATION)

View File

@ -13,7 +13,6 @@ import sentry_sdk
from .config import CoreConfig
from .const import ENV_SUPERVISOR_DEV, SERVER_SOFTWARE
from .docker import DockerAPI
from .utils.dt import UTC, get_time_zone
if TYPE_CHECKING:
@ -26,6 +25,7 @@ if TYPE_CHECKING:
from .core import Core
from .dbus.manager import DBusManager
from .discovery import Discovery
from .docker.manager import DockerAPI
from .hardware.manager import HardwareManager
from .homeassistant.module import HomeAssistant
from .host.manager import HostManager
@ -63,9 +63,9 @@ class CoreSys:
# Global objects
self._config: CoreConfig = CoreConfig()
self._docker: DockerAPI = DockerAPI()
# Internal objects pointers
self._docker: DockerAPI | None = None
self._core: Core | None = None
self._arch: CpuArch | None = None
self._auth: Auth | None = None
@ -128,8 +128,17 @@ class CoreSys:
@property
def docker(self) -> DockerAPI:
"""Return DockerAPI object."""
if self._docker is None:
raise RuntimeError("Docker not set!")
return self._docker
@docker.setter
def docker(self, value: DockerAPI) -> None:
"""Set docker object."""
if self._docker:
raise RuntimeError("Docker already set!")
self._docker = value
@property
def scheduler(self) -> Scheduler:
"""Return Scheduler object."""

View File

@ -1,323 +1 @@
"""Init file for Supervisor Docker object."""
from contextlib import suppress
from ipaddress import IPv4Address
import logging
import os
from pathlib import Path
from typing import Any, Optional
import attr
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from docker import errors as docker_errors
from docker.api.client import APIClient
from docker.client import DockerClient
from docker.models.containers import Container, ContainerCollection
from docker.models.images import ImageCollection
from docker.models.networks import Network
import requests
from ..const import (
ATTR_REGISTRIES,
DNS_SUFFIX,
DOCKER_NETWORK,
ENV_SUPERVISOR_CPU_RT,
FILE_HASSIO_DOCKER,
SOCKET_DOCKER,
)
from ..exceptions import DockerAPIError, DockerError, DockerNotFound, DockerRequestError
from ..utils.common import FileConfiguration
from ..validate import SCHEMA_DOCKER_CONFIG
from .network import DockerNetwork
_LOGGER: logging.Logger = logging.getLogger(__name__)
MIN_SUPPORTED_DOCKER = "19.03.0"
DOCKER_NETWORK_HOST = "host"
@attr.s(frozen=True)
class CommandReturn:
"""Return object from command run."""
exit_code: int = attr.ib()
output: bytes = attr.ib()
@attr.s(frozen=True)
class DockerInfo:
"""Return docker information."""
version: AwesomeVersion = attr.ib()
storage: str = attr.ib()
logging: str = attr.ib()
cgroup: str = attr.ib()
@staticmethod
def new(data: dict[str, Any]):
"""Create a object from docker info."""
return DockerInfo(
AwesomeVersion(data.get("ServerVersion", "0.0.0")),
data.get("Driver", "unknown"),
data.get("LoggingDriver", "unknown"),
data.get("CgroupVersion", "1"),
)
@property
def supported_version(self) -> bool:
"""Return true, if docker version is supported."""
try:
return self.version >= MIN_SUPPORTED_DOCKER
except AwesomeVersionCompareException:
return False
@property
def support_cpu_realtime(self) -> bool:
"""Return true, if CONFIG_RT_GROUP_SCHED is loaded."""
if not Path("/sys/fs/cgroup/cpu/cpu.rt_runtime_us").exists():
return False
return bool(os.environ.get(ENV_SUPERVISOR_CPU_RT, 0))
class DockerConfig(FileConfiguration):
"""Home Assistant core object for Docker configuration."""
def __init__(self):
"""Initialize the JSON configuration."""
super().__init__(FILE_HASSIO_DOCKER, SCHEMA_DOCKER_CONFIG)
@property
def registries(self) -> dict[str, Any]:
"""Return credentials for docker registries."""
return self._data.get(ATTR_REGISTRIES, {})
class DockerAPI:
"""Docker Supervisor wrapper.
This class is not AsyncIO safe!
"""
def __init__(self):
"""Initialize Docker base wrapper."""
self.docker: DockerClient = DockerClient(
base_url=f"unix:/{str(SOCKET_DOCKER)}", version="auto", timeout=900
)
self.network: DockerNetwork = DockerNetwork(self.docker)
self._info: DockerInfo = DockerInfo.new(self.docker.info())
self.config: DockerConfig = DockerConfig()
@property
def images(self) -> ImageCollection:
"""Return API images."""
return self.docker.images
@property
def containers(self) -> ContainerCollection:
"""Return API containers."""
return self.docker.containers
@property
def api(self) -> APIClient:
"""Return API containers."""
return self.docker.api
@property
def info(self) -> DockerInfo:
"""Return local docker info."""
return self._info
def run(
self,
image: str,
tag: str = "latest",
dns: bool = True,
ipv4: Optional[IPv4Address] = None,
**kwargs: Any,
) -> Container:
"""Create a Docker container and run it.
Need run inside executor.
"""
name: Optional[str] = kwargs.get("name")
network_mode: Optional[str] = kwargs.get("network_mode")
hostname: Optional[str] = kwargs.get("hostname")
# Setup DNS
if dns:
kwargs["dns"] = [str(self.network.dns)]
kwargs["dns_search"] = [DNS_SUFFIX]
kwargs["domainname"] = DNS_SUFFIX
# Setup network
if not network_mode:
kwargs["network"] = None
# Create container
try:
container = self.docker.containers.create(
f"{image}:{tag}", use_config_proxy=False, **kwargs
)
except docker_errors.NotFound as err:
raise DockerNotFound(
f"Image {image}:{tag} does not exist for {name}", _LOGGER.error
) from err
except docker_errors.DockerException as err:
raise DockerAPIError(
f"Can't create container from {name}: {err}", _LOGGER.error
) from err
except requests.RequestException as err:
raise DockerRequestError(
f"Dockerd connection issue for {name}: {err}", _LOGGER.error
) from err
# Attach network
if not network_mode:
alias = [hostname] if hostname else None
try:
self.network.attach_container(container, alias=alias, ipv4=ipv4)
except DockerError:
_LOGGER.warning("Can't attach %s to hassio-network!", name)
else:
with suppress(DockerError):
self.network.detach_default_bridge(container)
else:
host_network: Network = self.docker.networks.get(DOCKER_NETWORK_HOST)
# Check if container is register on host
# https://github.com/moby/moby/issues/23302
if name in (
val.get("Name")
for val in host_network.attrs.get("Containers", {}).values()
):
with suppress(docker_errors.NotFound):
host_network.disconnect(name, force=True)
# Run container
try:
container.start()
except docker_errors.DockerException as err:
raise DockerAPIError(f"Can't start {name}: {err}", _LOGGER.error) from err
except requests.RequestException as err:
raise DockerRequestError(
f"Dockerd connection issue for {name}: {err}", _LOGGER.error
) from err
# Update metadata
with suppress(docker_errors.DockerException, requests.RequestException):
container.reload()
return container
def run_command(
self,
image: str,
tag: str = "latest",
command: Optional[str] = None,
**kwargs: Any,
) -> CommandReturn:
"""Create a temporary container and run command.
Need run inside executor.
"""
stdout = kwargs.get("stdout", True)
stderr = kwargs.get("stderr", True)
_LOGGER.info("Runing command '%s' on %s", command, image)
container = None
try:
container = self.docker.containers.run(
f"{image}:{tag}",
command=command,
network=self.network.name,
use_config_proxy=False,
**kwargs,
)
# wait until command is done
result = container.wait()
output = container.logs(stdout=stdout, stderr=stderr)
except (docker_errors.DockerException, requests.RequestException) as err:
raise DockerError(f"Can't execute command: {err}", _LOGGER.error) from err
finally:
# cleanup container
if container:
with suppress(docker_errors.DockerException, requests.RequestException):
container.remove(force=True)
return CommandReturn(result.get("StatusCode"), output)
def repair(self) -> None:
"""Repair local docker overlayfs2 issues."""
_LOGGER.info("Prune stale containers")
try:
output = self.docker.api.prune_containers()
_LOGGER.debug("Containers prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for containers prune: %s", err)
_LOGGER.info("Prune stale images")
try:
output = self.docker.api.prune_images(filters={"dangling": False})
_LOGGER.debug("Images prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for images prune: %s", err)
_LOGGER.info("Prune stale builds")
try:
output = self.docker.api.prune_builds()
_LOGGER.debug("Builds prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for builds prune: %s", err)
_LOGGER.info("Prune stale volumes")
try:
output = self.docker.api.prune_builds()
_LOGGER.debug("Volumes prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for volumes prune: %s", err)
_LOGGER.info("Prune stale networks")
try:
output = self.docker.api.prune_networks()
_LOGGER.debug("Networks prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks prune: %s", err)
_LOGGER.info("Fix stale container on hassio network")
try:
self.prune_networks(DOCKER_NETWORK)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks hassio prune: %s", err)
_LOGGER.info("Fix stale container on host network")
try:
self.prune_networks(DOCKER_NETWORK_HOST)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks host prune: %s", err)
def prune_networks(self, network_name: str) -> None:
"""Prune stale container from network.
Fix: https://github.com/moby/moby/issues/23302
"""
network: Network = self.docker.networks.get(network_name)
for cid, data in network.attrs.get("Containers", {}).items():
try:
self.docker.containers.get(cid)
continue
except docker_errors.NotFound:
_LOGGER.debug(
"Docker network %s is corrupt on container: %s", network_name, cid
)
except (docker_errors.DockerException, requests.RequestException):
_LOGGER.warning(
"Docker fatal error on container %s on %s", cid, network_name
)
continue
with suppress(docker_errors.DockerException, requests.RequestException):
network.disconnect(data.get("Name", cid), force=True)
"""Init file for supervisor docker."""

View File

@ -17,9 +17,22 @@ class Capabilities(str, Enum):
SYS_TIME = "SYS_TIME"
class ContainerState(str, Enum):
"""State of supervisor managed docker container."""
FAILED = "failed"
HEALTHY = "healthy"
RUNNING = "running"
STOPPED = "stopped"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
DBUS_PATH = "/run/dbus"
DBUS_VOLUME = {"bind": DBUS_PATH, "mode": "ro"}
ENV_TIME = "TZ"
ENV_TOKEN = "SUPERVISOR_TOKEN"
ENV_TOKEN_OLD = "HASSIO_TOKEN"
LABEL_MANAGED = "supervisor_managed"

View File

@ -5,20 +5,24 @@ import asyncio
from contextlib import suppress
import logging
import re
from time import time
from typing import Any, Awaitable
from awesomeversion import AwesomeVersion
from awesomeversion.strategy import AwesomeVersionStrategy
import docker
from docker.models.containers import Container
import requests
from . import CommandReturn
from supervisor.docker.monitor import DockerContainerStateEvent
from ..const import (
ATTR_PASSWORD,
ATTR_REGISTRY,
ATTR_USERNAME,
LABEL_ARCH,
LABEL_VERSION,
BusEvent,
CpuArch,
)
from ..coresys import CoreSys, CoreSysAttributes
@ -33,6 +37,8 @@ from ..exceptions import (
)
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils import process_lock
from .const import ContainerState
from .manager import CommandReturn
from .stats import DockerStats
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -49,6 +55,23 @@ MAP_ARCH = {
}
def _container_state_from_model(docker_container: Container) -> ContainerState:
"""Get container state from model."""
if docker_container.status == "running":
if "Health" in docker_container.attrs["State"]:
return (
ContainerState.HEALTHY
if docker_container.attrs["State"]["Health"]["Status"] == "healthy"
else ContainerState.UNHEALTHY
)
return ContainerState.RUNNING
if docker_container.attrs["State"]["ExitCode"] > 0:
return ContainerState.FAILED
return ContainerState.STOPPED
class DockerInterface(CoreSysAttributes):
"""Docker Supervisor interface."""
@ -281,18 +304,60 @@ class DockerInterface(CoreSysAttributes):
return docker_container.status == "running"
@process_lock
def attach(self, version: AwesomeVersion):
"""Attach to running Docker container."""
return self.sys_run_in_executor(self._attach, version)
def current_state(self) -> Awaitable[ContainerState]:
"""Return current state of container.
def _attach(self, version: AwesomeVersion) -> None:
Return a Future.
"""
return self.sys_run_in_executor(self._current_state)
def _current_state(self) -> ContainerState:
"""Return current state of container.
Need run inside executor.
"""
try:
docker_container = self.sys_docker.containers.get(self.name)
except docker.errors.NotFound:
return ContainerState.UNKNOWN
except docker.errors.DockerException as err:
raise DockerAPIError() from err
except requests.RequestException as err:
raise DockerRequestError() from err
return _container_state_from_model(docker_container)
@process_lock
def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> Awaitable[None]:
"""Attach to running Docker container."""
return self.sys_run_in_executor(self._attach, version, skip_state_event_if_down)
def _attach(
self, version: AwesomeVersion, skip_state_event_if_down: bool = False
) -> None:
"""Attach to running docker container.
Need run inside executor.
"""
with suppress(docker.errors.DockerException, requests.RequestException):
self._meta = self.sys_docker.containers.get(self.name).attrs
docker_container = self.sys_docker.containers.get(self.name)
self._meta = docker_container.attrs
self.sys_docker.monitor.watch_container(docker_container)
state = _container_state_from_model(docker_container)
if not (
skip_state_event_if_down
and state in [ContainerState.STOPPED, ContainerState.FAILED]
):
# Fire event with current state of container
self.sys_bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
self.name, state, docker_container.id, int(time())
),
)
with suppress(docker.errors.DockerException, requests.RequestException):
if not self._meta and self.image:
@ -300,7 +365,7 @@ class DockerInterface(CoreSysAttributes):
f"{self.image}:{version!s}"
).attrs
# Successfull?
# Successful?
if not self._meta:
raise DockerError()
_LOGGER.info("Attaching to %s with version %s", self.image, self.version)

View File

@ -0,0 +1,353 @@
"""Manager for Supervisor Docker."""
from contextlib import suppress
from ipaddress import IPv4Address
import logging
import os
from pathlib import Path
from typing import Any, Optional
import attr
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from docker import errors as docker_errors
from docker.api.client import APIClient
from docker.client import DockerClient
from docker.models.containers import Container, ContainerCollection
from docker.models.images import ImageCollection
from docker.models.networks import Network
from docker.types.daemon import CancellableStream
import requests
from supervisor.coresys import CoreSys
from supervisor.docker.monitor import DockerMonitor
from ..const import (
ATTR_REGISTRIES,
DNS_SUFFIX,
DOCKER_NETWORK,
ENV_SUPERVISOR_CPU_RT,
FILE_HASSIO_DOCKER,
SOCKET_DOCKER,
)
from ..exceptions import DockerAPIError, DockerError, DockerNotFound, DockerRequestError
from ..utils.common import FileConfiguration
from ..validate import SCHEMA_DOCKER_CONFIG
from .const import LABEL_MANAGED
from .network import DockerNetwork
_LOGGER: logging.Logger = logging.getLogger(__name__)
MIN_SUPPORTED_DOCKER = "19.03.0"
DOCKER_NETWORK_HOST = "host"
@attr.s(frozen=True)
class CommandReturn:
"""Return object from command run."""
exit_code: int = attr.ib()
output: bytes = attr.ib()
@attr.s(frozen=True)
class DockerInfo:
"""Return docker information."""
version: AwesomeVersion = attr.ib()
storage: str = attr.ib()
logging: str = attr.ib()
cgroup: str = attr.ib()
@staticmethod
def new(data: dict[str, Any]):
"""Create a object from docker info."""
return DockerInfo(
AwesomeVersion(data.get("ServerVersion", "0.0.0")),
data.get("Driver", "unknown"),
data.get("LoggingDriver", "unknown"),
data.get("CgroupVersion", "1"),
)
@property
def supported_version(self) -> bool:
"""Return true, if docker version is supported."""
try:
return self.version >= MIN_SUPPORTED_DOCKER
except AwesomeVersionCompareException:
return False
@property
def support_cpu_realtime(self) -> bool:
"""Return true, if CONFIG_RT_GROUP_SCHED is loaded."""
if not Path("/sys/fs/cgroup/cpu/cpu.rt_runtime_us").exists():
return False
return bool(os.environ.get(ENV_SUPERVISOR_CPU_RT, 0))
class DockerConfig(FileConfiguration):
"""Home Assistant core object for Docker configuration."""
def __init__(self):
"""Initialize the JSON configuration."""
super().__init__(FILE_HASSIO_DOCKER, SCHEMA_DOCKER_CONFIG)
@property
def registries(self) -> dict[str, Any]:
"""Return credentials for docker registries."""
return self._data.get(ATTR_REGISTRIES, {})
class DockerAPI:
"""Docker Supervisor wrapper.
This class is not AsyncIO safe!
"""
def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper."""
self.docker: DockerClient = DockerClient(
base_url=f"unix:/{str(SOCKET_DOCKER)}", version="auto", timeout=900
)
self.network: DockerNetwork = DockerNetwork(self.docker)
self._info: DockerInfo = DockerInfo.new(self.docker.info())
self.config: DockerConfig = DockerConfig()
self._monitor: DockerMonitor = DockerMonitor(coresys)
@property
def images(self) -> ImageCollection:
"""Return API images."""
return self.docker.images
@property
def containers(self) -> ContainerCollection:
"""Return API containers."""
return self.docker.containers
@property
def api(self) -> APIClient:
"""Return API containers."""
return self.docker.api
@property
def info(self) -> DockerInfo:
"""Return local docker info."""
return self._info
@property
def events(self) -> CancellableStream:
"""Return docker event stream."""
return self.docker.events(decode=True)
@property
def monitor(self) -> DockerMonitor:
"""Return docker events monitor."""
return self._monitor
async def load(self) -> None:
"""Start docker events monitor."""
await self.monitor.load()
async def unload(self) -> None:
"""Stop docker events monitor."""
await self.monitor.unload()
def run(
self,
image: str,
tag: str = "latest",
dns: bool = True,
ipv4: Optional[IPv4Address] = None,
**kwargs: Any,
) -> Container:
"""Create a Docker container and run it.
Need run inside executor.
"""
name: Optional[str] = kwargs.get("name")
network_mode: Optional[str] = kwargs.get("network_mode")
hostname: Optional[str] = kwargs.get("hostname")
if "labels" not in kwargs:
kwargs["labels"] = {}
elif isinstance(kwargs["labels"], list):
kwargs["labels"] = {label: "" for label in kwargs["labels"]}
kwargs["labels"][LABEL_MANAGED] = ""
# Setup DNS
if dns:
kwargs["dns"] = [str(self.network.dns)]
kwargs["dns_search"] = [DNS_SUFFIX]
kwargs["domainname"] = DNS_SUFFIX
# Setup network
if not network_mode:
kwargs["network"] = None
# Create container
try:
container = self.docker.containers.create(
f"{image}:{tag}", use_config_proxy=False, **kwargs
)
except docker_errors.NotFound as err:
raise DockerNotFound(
f"Image {image}:{tag} does not exist for {name}", _LOGGER.error
) from err
except docker_errors.DockerException as err:
raise DockerAPIError(
f"Can't create container from {name}: {err}", _LOGGER.error
) from err
except requests.RequestException as err:
raise DockerRequestError(
f"Dockerd connection issue for {name}: {err}", _LOGGER.error
) from err
# Attach network
if not network_mode:
alias = [hostname] if hostname else None
try:
self.network.attach_container(container, alias=alias, ipv4=ipv4)
except DockerError:
_LOGGER.warning("Can't attach %s to hassio-network!", name)
else:
with suppress(DockerError):
self.network.detach_default_bridge(container)
else:
host_network: Network = self.docker.networks.get(DOCKER_NETWORK_HOST)
# Check if container is register on host
# https://github.com/moby/moby/issues/23302
if name in (
val.get("Name")
for val in host_network.attrs.get("Containers", {}).values()
):
with suppress(docker_errors.NotFound):
host_network.disconnect(name, force=True)
# Run container
try:
container.start()
except docker_errors.DockerException as err:
raise DockerAPIError(f"Can't start {name}: {err}", _LOGGER.error) from err
except requests.RequestException as err:
raise DockerRequestError(
f"Dockerd connection issue for {name}: {err}", _LOGGER.error
) from err
# Update metadata
with suppress(docker_errors.DockerException, requests.RequestException):
container.reload()
return container
def run_command(
self,
image: str,
tag: str = "latest",
command: Optional[str] = None,
**kwargs: Any,
) -> CommandReturn:
"""Create a temporary container and run command.
Need run inside executor.
"""
stdout = kwargs.get("stdout", True)
stderr = kwargs.get("stderr", True)
_LOGGER.info("Runing command '%s' on %s", command, image)
container = None
try:
container = self.docker.containers.run(
f"{image}:{tag}",
command=command,
network=self.network.name,
use_config_proxy=False,
**kwargs,
)
# wait until command is done
result = container.wait()
output = container.logs(stdout=stdout, stderr=stderr)
except (docker_errors.DockerException, requests.RequestException) as err:
raise DockerError(f"Can't execute command: {err}", _LOGGER.error) from err
finally:
# cleanup container
if container:
with suppress(docker_errors.DockerException, requests.RequestException):
container.remove(force=True)
return CommandReturn(result.get("StatusCode"), output)
def repair(self) -> None:
"""Repair local docker overlayfs2 issues."""
_LOGGER.info("Prune stale containers")
try:
output = self.docker.api.prune_containers()
_LOGGER.debug("Containers prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for containers prune: %s", err)
_LOGGER.info("Prune stale images")
try:
output = self.docker.api.prune_images(filters={"dangling": False})
_LOGGER.debug("Images prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for images prune: %s", err)
_LOGGER.info("Prune stale builds")
try:
output = self.docker.api.prune_builds()
_LOGGER.debug("Builds prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for builds prune: %s", err)
_LOGGER.info("Prune stale volumes")
try:
output = self.docker.api.prune_builds()
_LOGGER.debug("Volumes prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for volumes prune: %s", err)
_LOGGER.info("Prune stale networks")
try:
output = self.docker.api.prune_networks()
_LOGGER.debug("Networks prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks prune: %s", err)
_LOGGER.info("Fix stale container on hassio network")
try:
self.prune_networks(DOCKER_NETWORK)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks hassio prune: %s", err)
_LOGGER.info("Fix stale container on host network")
try:
self.prune_networks(DOCKER_NETWORK_HOST)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks host prune: %s", err)
def prune_networks(self, network_name: str) -> None:
"""Prune stale container from network.
Fix: https://github.com/moby/moby/issues/23302
"""
network: Network = self.docker.networks.get(network_name)
for cid, data in network.attrs.get("Containers", {}).items():
try:
self.docker.containers.get(cid)
continue
except docker_errors.NotFound:
_LOGGER.debug(
"Docker network %s is corrupt on container: %s", network_name, cid
)
except (docker_errors.DockerException, requests.RequestException):
_LOGGER.warning(
"Docker fatal error on container %s on %s", cid, network_name
)
continue
with suppress(docker_errors.DockerException, requests.RequestException):
network.disconnect(data.get("Name", cid), force=True)

View File

@ -0,0 +1,94 @@
"""Supervisor docker monitor based on events."""
from dataclasses import dataclass
import logging
from threading import Thread
from typing import Optional
from docker.models.containers import Container
from docker.types.daemon import CancellableStream
from supervisor.const import BusEvent
from supervisor.coresys import CoreSys, CoreSysAttributes
from .const import LABEL_MANAGED, ContainerState
_LOGGER: logging.Logger = logging.getLogger(__name__)
@dataclass
class DockerContainerStateEvent:
"""Event for docker container state change."""
name: str
state: ContainerState
id: str
time: int
class DockerMonitor(CoreSysAttributes, Thread):
"""Docker monitor for supervisor."""
def __init__(self, coresys: CoreSys):
"""Initialize Docker monitor object."""
super().__init__()
self.coresys = coresys
self._events: Optional[CancellableStream] = None
self._unlabeled_managed_containers: list[str] = []
def watch_container(self, container: Container):
"""If container is missing the managed label, add name to list."""
if LABEL_MANAGED not in container.labels:
self._unlabeled_managed_containers += [container.name]
async def load(self):
"""Start docker events monitor."""
self._events = self.sys_docker.events
Thread.start(self)
_LOGGER.info("Started docker events monitor")
async def unload(self):
"""Stop docker events monitor."""
self._events.close()
try:
self.join(timeout=5)
except RuntimeError:
pass
_LOGGER.info("Stopped docker events monitor")
def run(self):
"""Monitor and process docker events."""
for event in self._events:
attributes: dict[str, str] = event.get("Actor", {}).get("Attributes", {})
if event["Type"] == "container" and (
LABEL_MANAGED in attributes
or attributes["name"] in self._unlabeled_managed_containers
):
container_state: Optional[ContainerState] = None
action: str = event["Action"]
if action == "start":
container_state = ContainerState.RUNNING
elif action == "die":
container_state = (
ContainerState.STOPPED
if int(event["Actor"]["Attributes"]["exitCode"]) == 0
else ContainerState.FAILED
)
elif action == "health_status: healthy":
container_state = ContainerState.HEALTHY
elif action == "health_status: unhealthy":
container_state = ContainerState.UNHEALTHY
if container_state:
self.sys_loop.call_soon_threadsafe(
self.sys_bus.fire_event,
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=attributes["name"],
state=container_state,
id=event["id"],
time=event["time"],
),
)

View File

@ -33,7 +33,9 @@ class DockerSupervisor(DockerInterface, CoreSysAttributes):
"""Return True if the container run with Privileged."""
return self.meta_host.get("Privileged", False)
def _attach(self, version: AwesomeVersion) -> None:
def _attach(
self, version: AwesomeVersion, skip_state_event_if_down: bool = False
) -> None:
"""Attach to running docker container.
Need run inside executor.

View File

@ -115,10 +115,17 @@ class HassOSDataDiskError(HassOSError):
"""Issues with the DataDisk feature from HAOS."""
# All Plugins
class PluginError(HassioError):
"""Plugin error."""
# HaCli
class CliError(HassioError):
class CliError(PluginError):
"""HA cli exception."""
@ -129,7 +136,7 @@ class CliUpdateError(CliError):
# Observer
class ObserverError(HassioError):
class ObserverError(PluginError):
"""General Observer exception."""
@ -140,7 +147,7 @@ class ObserverUpdateError(ObserverError):
# Multicast
class MulticastError(HassioError):
class MulticastError(PluginError):
"""Multicast exception."""
@ -151,7 +158,7 @@ class MulticastUpdateError(MulticastError):
# DNS
class CoreDNSError(HassioError):
class CoreDNSError(PluginError):
"""CoreDNS exception."""
@ -159,10 +166,10 @@ class CoreDNSUpdateError(CoreDNSError):
"""Error on update of a CoreDNS."""
# DNS
# Audio
class AudioError(HassioError):
class AudioError(PluginError):
"""PulseAudio exception."""

View File

@ -6,6 +6,7 @@ from awesomeversion import AwesomeVersion
from ..const import CoreState
LANDINGPAGE: AwesomeVersion = AwesomeVersion("landingpage")
WATCHDOG_RETRY_SECONDS = 10
CLOSING_STATES = [
CoreState.SHUTDOWN,

View File

@ -10,7 +10,9 @@ from typing import Awaitable, Optional
import attr
from awesomeversion import AwesomeVersion
from supervisor.const import ATTR_HOMEASSISTANT
from supervisor.const import ATTR_HOMEASSISTANT, BusEvent
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from ..coresys import CoreSys, CoreSysAttributes
from ..docker.homeassistant import DockerHomeAssistant
@ -25,7 +27,7 @@ from ..exceptions import (
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType
from ..utils import convert_to_ascii, process_lock
from .const import LANDINGPAGE
from .const import LANDINGPAGE, WATCHDOG_RETRY_SECONDS
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -57,6 +59,10 @@ class HomeAssistantCore(CoreSysAttributes):
async def load(self) -> None:
"""Prepare Home Assistant object."""
self.sys_bus.register_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, self.watchdog_container
)
try:
# Evaluate Version if we lost this information
if not self.sys_homeassistant.version:
@ -432,3 +438,52 @@ class HomeAssistantCore(CoreSysAttributes):
await self.instance.install(self.sys_homeassistant.version)
except DockerError:
_LOGGER.error("Repairing of Home Assistant failed")
async def watchdog_container(self, event: DockerContainerStateEvent) -> None:
"""Process state changes in Home Assistant container and restart if necessary."""
if not (event.name == self.instance.name and self.sys_homeassistant.watchdog):
return
if event.state == ContainerState.UNHEALTHY:
while await self.instance.current_state() == event.state:
# Don't interrupt a task in progress or if rollback is handling it
if not (self.in_progress or self.error_state):
_LOGGER.warning(
"Watchdog found Home Assistant is unhealthy, restarting..."
)
try:
await self.restart()
except HomeAssistantError as err:
_LOGGER.error("Watchdog restart of Home Assistant failed!")
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
elif event.state == ContainerState.FAILED:
rebuild = False
while await self.instance.current_state() == event.state:
# Don't interrupt a task in progress or if rollback is handling it
if not (self.in_progress or self.error_state):
_LOGGER.warning(
"Watchdog found Home Assistant failed, restarting..."
)
if not rebuild:
try:
await self.start()
except HomeAssistantError as err:
self.sys_capture_exception(err)
rebuild = True
else:
break
try:
await self.rebuild()
except HomeAssistantError as err:
_LOGGER.error("Watchdog reanimation of Home Assistant failed!")
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)

View File

@ -3,15 +3,7 @@ import logging
from ..const import AddonState
from ..coresys import CoreSysAttributes
from ..exceptions import (
AddonsError,
AudioError,
CliError,
CoreDNSError,
HomeAssistantError,
MulticastError,
ObserverError,
)
from ..exceptions import AddonsError, HomeAssistantError, ObserverError
from ..host.const import HostFeature
from ..jobs.decorator import Job, JobCondition
@ -33,16 +25,8 @@ RUN_RELOAD_HOST = 7600
RUN_RELOAD_UPDATER = 7200
RUN_RELOAD_INGRESS = 930
RUN_WATCHDOG_HOMEASSISTANT_DOCKER = 15
RUN_WATCHDOG_HOMEASSISTANT_API = 120
RUN_WATCHDOG_DNS_DOCKER = 30
RUN_WATCHDOG_AUDIO_DOCKER = 60
RUN_WATCHDOG_CLI_DOCKER = 60
RUN_WATCHDOG_OBSERVER_DOCKER = 60
RUN_WATCHDOG_MULTICAST_DOCKER = 60
RUN_WATCHDOG_ADDON_DOCKER = 30
RUN_WATCHDOG_ADDON_APPLICATON = 120
RUN_WATCHDOG_OBSERVER_APPLICATION = 180
@ -78,33 +62,12 @@ class Tasks(CoreSysAttributes):
self.sys_scheduler.register_task(self.sys_ingress.reload, RUN_RELOAD_INGRESS)
# Watchdog
self.sys_scheduler.register_task(
self._watchdog_homeassistant_docker, RUN_WATCHDOG_HOMEASSISTANT_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_homeassistant_api, RUN_WATCHDOG_HOMEASSISTANT_API
)
self.sys_scheduler.register_task(
self._watchdog_dns_docker, RUN_WATCHDOG_DNS_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_audio_docker, RUN_WATCHDOG_AUDIO_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_cli_docker, RUN_WATCHDOG_CLI_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_observer_docker, RUN_WATCHDOG_OBSERVER_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_observer_application, RUN_WATCHDOG_OBSERVER_APPLICATION
)
self.sys_scheduler.register_task(
self._watchdog_multicast_docker, RUN_WATCHDOG_MULTICAST_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_addon_docker, RUN_WATCHDOG_ADDON_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_addon_application, RUN_WATCHDOG_ADDON_APPLICATON
)
@ -168,36 +131,6 @@ class Tasks(CoreSysAttributes):
)
await self.sys_supervisor.update()
async def _watchdog_homeassistant_docker(self):
"""Check running state of Docker and start if they is close."""
if not self.sys_homeassistant.watchdog:
# Watchdog is not enabled for Home Assistant
return
if self.sys_homeassistant.error_state:
# Home Assistant is in an error state, this is handled by the rollback feature
return
if not await self.sys_homeassistant.core.is_failed():
# The home assistant container is not in a failed state
return
if self.sys_homeassistant.core.in_progress:
# Home Assistant has a task in progress
return
if await self.sys_homeassistant.core.is_running():
# Home Assistant is running
return
_LOGGER.warning("Watchdog found a problem with Home Assistant Docker!")
try:
await self.sys_homeassistant.core.start()
except HomeAssistantError as err:
_LOGGER.error("Home Assistant watchdog reanimation failed!")
self.sys_capture_exception(err)
else:
return
_LOGGER.info("Rebuilding the Home Assistant Container")
await self.sys_homeassistant.core.rebuild()
async def _watchdog_homeassistant_api(self):
"""Create scheduler task for monitoring running state of API.
@ -298,63 +231,6 @@ class Tasks(CoreSysAttributes):
)
await self.sys_plugins.multicast.update()
async def _watchdog_dns_docker(self):
"""Check running state of Docker and start if they is close."""
# if CoreDNS is active
if await self.sys_plugins.dns.is_running() or self.sys_plugins.dns.in_progress:
return
_LOGGER.warning("Watchdog found a problem with CoreDNS plugin!")
# Detect loop
await self.sys_plugins.dns.loop_detection()
try:
await self.sys_plugins.dns.start()
except CoreDNSError:
_LOGGER.error("CoreDNS watchdog reanimation failed!")
async def _watchdog_audio_docker(self):
"""Check running state of Docker and start if they is close."""
# if PulseAudio plugin is active
if (
await self.sys_plugins.audio.is_running()
or self.sys_plugins.audio.in_progress
):
return
_LOGGER.warning("Watchdog found a problem with PulseAudio plugin!")
try:
await self.sys_plugins.audio.start()
except AudioError:
_LOGGER.error("PulseAudio watchdog reanimation failed!")
async def _watchdog_cli_docker(self):
"""Check running state of Docker and start if they is close."""
# if cli plugin is active
if await self.sys_plugins.cli.is_running() or self.sys_plugins.cli.in_progress:
return
_LOGGER.warning("Watchdog found a problem with cli plugin!")
try:
await self.sys_plugins.cli.start()
except CliError:
_LOGGER.error("CLI watchdog reanimation failed!")
async def _watchdog_observer_docker(self):
"""Check running state of Docker and start if they is close."""
# if observer plugin is active
if (
await self.sys_plugins.observer.is_running()
or self.sys_plugins.observer.in_progress
):
return
_LOGGER.warning("Watchdog/Docker found a problem with observer plugin!")
try:
await self.sys_plugins.observer.start()
except ObserverError:
_LOGGER.error("Observer watchdog reanimation failed!")
async def _watchdog_observer_application(self):
"""Check running state of application and rebuild if they is not response."""
# if observer plugin is active
@ -370,39 +246,6 @@ class Tasks(CoreSysAttributes):
except ObserverError:
_LOGGER.error("Observer watchdog reanimation failed!")
async def _watchdog_multicast_docker(self):
"""Check running state of Docker and start if they is close."""
# if multicast plugin is active
if (
await self.sys_plugins.multicast.is_running()
or self.sys_plugins.multicast.in_progress
):
return
_LOGGER.warning("Watchdog found a problem with Multicast plugin!")
try:
await self.sys_plugins.multicast.start()
except MulticastError:
_LOGGER.error("Multicast watchdog reanimation failed!")
async def _watchdog_addon_docker(self):
"""Check running state of Docker and start if they is close."""
for addon in self.sys_addons.installed:
# if watchdog need looking for
if not addon.watchdog or await addon.is_running():
continue
# if Addon have running actions
if addon.in_progress or addon.state != AddonState.STARTED:
continue
_LOGGER.warning("Watchdog found a problem with %s!", addon.slug)
try:
await addon.start()
except AddonsError as err:
_LOGGER.error("%s watchdog reanimation failed with %s", addon.slug, err)
self.sys_capture_exception(err)
async def _watchdog_addon_application(self):
"""Check running state of the application and start if they is hangs."""
for addon in self.sys_addons.installed:

View File

@ -60,28 +60,7 @@ class PluginAudio(PluginBase):
except OSError as err:
_LOGGER.error("Can't read pulse-client.tmpl: %s", err)
# Check Audio state
try:
# Evaluate Version if we lost this information
if not self.version:
self.version = await self.instance.get_latest_version()
await self.instance.attach(version=self.version)
except DockerError:
_LOGGER.info("No Audio plugin Docker image %s found.", self.instance.image)
# Install PulseAudio
with suppress(AudioError):
await self.install()
else:
self.version = self.instance.version
self.image = self.instance.image
self.save_data()
# Run PulseAudio
with suppress(AudioError):
if not await self.instance.is_running():
await self.start()
await super().load()
# Setup default asound config
asound = self.sys_config.path_audio.joinpath("asound")
@ -147,7 +126,7 @@ class PluginAudio(PluginBase):
raise AudioError("Can't start Audio plugin", _LOGGER.error) from err
async def start(self) -> None:
"""Run CoreDNS."""
"""Run Audio plugin."""
_LOGGER.info("Starting Audio plugin")
try:
await self.instance.run()
@ -155,7 +134,7 @@ class PluginAudio(PluginBase):
raise AudioError("Can't start Audio plugin", _LOGGER.error) from err
async def stop(self) -> None:
"""Stop CoreDNS."""
"""Stop Audio plugin."""
_LOGGER.info("Stopping Audio plugin")
try:
await self.instance.stop()
@ -163,14 +142,14 @@ class PluginAudio(PluginBase):
raise AudioError("Can't stop Audio plugin", _LOGGER.error) from err
async def stats(self) -> DockerStats:
"""Return stats of CoreDNS."""
"""Return stats of Audio plugin."""
try:
return await self.instance.stats()
except DockerError as err:
raise AudioError() from err
async def repair(self) -> None:
"""Repair CoreDNS plugin."""
"""Repair Audio plugin."""
if await self.instance.exists():
return

View File

@ -1,13 +1,23 @@
"""Supervisor plugins base class."""
from abc import ABC, abstractmethod
import asyncio
from contextlib import suppress
import logging
from typing import Awaitable, Optional
from awesomeversion import AwesomeVersion, AwesomeVersionException
from ..const import ATTR_IMAGE, ATTR_VERSION
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from ..const import ATTR_IMAGE, ATTR_VERSION, BusEvent
from ..coresys import CoreSysAttributes
from ..docker.interface import DockerInterface
from ..exceptions import DockerError, PluginError
from ..utils.common import FileConfiguration
from .const import WATCHDOG_RETRY_SECONDS
_LOGGER: logging.Logger = logging.getLogger(__name__)
class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
@ -84,9 +94,101 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
"""
return self.instance.is_failed()
def start_watchdog(self) -> None:
"""Register docker container listener for plugin."""
self.sys_bus.register_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, self.watchdog_container
)
async def watchdog_container(self, event: DockerContainerStateEvent) -> None:
"""Process state changes in plugin container and restart if necessary."""
if not (event.name == self.instance.name):
return
if event.state == ContainerState.UNHEALTHY:
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found %s plugin is unhealthy, restarting...",
self.slug,
)
try:
await self.rebuild()
except PluginError as err:
_LOGGER.error(
"Watchdog restart of %s plugin failed!", self.slug
)
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
elif event.state in [ContainerState.FAILED, ContainerState.STOPPED]:
rebuild = event.state == ContainerState.FAILED
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found %s plugin %s, restarting...",
self.slug,
event.state.value,
)
try:
if rebuild:
await self.rebuild()
else:
await self.start()
except PluginError as err:
_LOGGER.error(
"Watchdog reanimation of %s plugin failed!", self.slug
)
self.sys_capture_exception(err)
rebuild = True
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
async def rebuild(self) -> None:
"""Rebuild system plugin."""
with suppress(DockerError):
await self.instance.stop()
await self.start()
@abstractmethod
async def start(self) -> None:
"""Start system plugin."""
async def load(self) -> None:
"""Load system plugin."""
self.start_watchdog()
# Check plugin state
try:
# Evaluate Version if we lost this information
if not self.version:
self.version = await self.instance.get_latest_version()
await self.instance.attach(
version=self.version, skip_state_event_if_down=True
)
except DockerError:
_LOGGER.info(
"No %s plugin Docker image %s found.", self.slug, self.instance.image
)
# Install plugin
with suppress(PluginError):
await self.install()
else:
self.version = self.instance.version
self.image = self.instance.image
self.save_data()
# Run plugin
with suppress(PluginError):
if not await self.instance.is_running():
await self.start()
@abstractmethod
async def install(self) -> None:

View File

@ -42,36 +42,11 @@ class PluginCli(PluginBase):
"""Return an access token for the Supervisor API."""
return self._data.get(ATTR_ACCESS_TOKEN)
async def load(self) -> None:
"""Load cli setup."""
# Check cli state
try:
# Evaluate Version if we lost this information
if not self.version:
self.version = await self.instance.get_latest_version()
await self.instance.attach(version=self.version)
except DockerError:
_LOGGER.info("No cli plugin Docker image %s found.", self.instance.image)
# Install cli
with suppress(CliError):
await self.install()
else:
self.version = self.instance.version
self.image = self.instance.image
self.save_data()
# Run CLI
with suppress(CliError):
if not await self.instance.is_running():
await self.start()
async def install(self) -> None:
"""Install cli."""
_LOGGER.info("Running setup for CLI plugin")
while True:
# read audio tag and install it
# read cli tag and install it
if not self.latest_version:
await self.sys_updater.reload()

View File

@ -10,3 +10,4 @@ FILE_HASSIO_OBSERVER = Path(SUPERVISOR_DATA, "observer.json")
FILE_HASSIO_MULTICAST = Path(SUPERVISOR_DATA, "multicast.json")
ATTR_FALLBACK = "fallback"
WATCHDOG_RETRY_SECONDS = 10

View File

@ -15,6 +15,8 @@ import jinja2
import voluptuous as vol
from supervisor.dbus.const import MulticastProtocolEnabled
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from ..const import ATTR_SERVERS, DNS_SUFFIX, LogLevel
from ..coresys import CoreSys
@ -138,31 +140,8 @@ class PluginDns(PluginBase):
except OSError as err:
_LOGGER.error("Can't read hosts.tmpl: %s", err)
# Check CoreDNS state
self._init_hosts()
try:
# Evaluate Version if we lost this information
if not self.version:
self.version = await self.instance.get_latest_version()
await self.instance.attach(version=self.version)
except DockerError:
_LOGGER.info(
"No CoreDNS plugin Docker image %s found.", self.instance.image
)
# Install CoreDNS
with suppress(CoreDNSError):
await self.install()
else:
self.version = self.instance.version
self.image = self.instance.image
self.save_data()
# Run CoreDNS
with suppress(CoreDNSError):
if not await self.instance.is_running():
await self.start()
await super().load()
# Update supervisor
self._write_resolv(HOST_RESOLV)
@ -264,6 +243,13 @@ class PluginDns(PluginBase):
await self.sys_addons.sync_dns()
async def watchdog_container(self, event: DockerContainerStateEvent) -> None:
"""Check for loop on failure before processing state change event."""
if event.name == self.instance.name and event.state == ContainerState.FAILED:
await self.loop_detection()
return await super().watchdog_container(event)
async def loop_detection(self) -> None:
"""Check if there was a loop found."""
log = await self.instance.logs()

View File

@ -35,38 +35,11 @@ class PluginMulticast(PluginBase):
"""Return latest version of Multicast."""
return self.sys_updater.version_multicast
async def load(self) -> None:
"""Load multicast setup."""
# Check Multicast state
try:
# Evaluate Version if we lost this information
if not self.version:
self.version = await self.instance.get_latest_version()
await self.instance.attach(version=self.version)
except DockerError:
_LOGGER.info(
"No Multicast plugin Docker image %s found.", self.instance.image
)
# Install Multicast plugin
with suppress(MulticastError):
await self.install()
else:
self.version = self.instance.version
self.image = self.instance.image
self.save_data()
# Run Multicast plugin
with suppress(MulticastError):
if not await self.instance.is_running():
await self.start()
async def install(self) -> None:
"""Install Multicast."""
_LOGGER.info("Running setup for Multicast plugin")
while True:
# read homeassistant tag and install it
# read multicast tag and install it
if not self.latest_version:
await self.sys_updater.reload()

View File

@ -43,33 +43,6 @@ class PluginObserver(PluginBase):
"""Return an access token for the Observer API."""
return self._data.get(ATTR_ACCESS_TOKEN)
async def load(self) -> None:
"""Load observer setup."""
# Check observer state
try:
# Evaluate Version if we lost this information
if not self.version:
self.version = await self.instance.get_latest_version()
await self.instance.attach(version=self.version)
except DockerError:
_LOGGER.info(
"No observer plugin Docker image %s found.", self.instance.image
)
# Install observer
with suppress(ObserverError):
await self.install()
else:
self.version = self.instance.version
self.image = self.instance.image
self.save_data()
# Run Observer
with suppress(ObserverError):
if not await self.instance.is_running():
await self.start()
async def install(self) -> None:
"""Install observer."""
_LOGGER.info("Running setup for observer plugin")
@ -139,12 +112,6 @@ class PluginObserver(PluginBase):
except DockerError as err:
raise ObserverError() from err
async def rebuild(self) -> None:
"""Rebuild Observer Docker container."""
with suppress(DockerError):
await self.instance.stop()
await self.start()
async def check_system_runtime(self) -> bool:
"""Check if the observer is running."""
try:

View File

@ -1,11 +1,19 @@
"""Test Home Assistant Add-ons."""
import asyncio
from unittest.mock import patch
from supervisor.addons.addon import Addon
from supervisor.const import AddonState, BusEvent
from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import AddonsError
from ..const import TEST_ADDON_SLUG
def test_options_merge(coresys: CoreSys, install_addon_ssh) -> None:
def test_options_merge(coresys: CoreSys, install_addon_ssh: Addon) -> None:
"""Test options merge."""
addon = coresys.addons.get(TEST_ADDON_SLUG)
@ -56,3 +64,182 @@ def test_options_merge(coresys: CoreSys, install_addon_ssh) -> None:
"server": {"tcp_forwarding": True},
}
addon.options = {"password": "test", "server": {"tcp_forwarding": True}}
async def test_addon_state_listener(coresys: CoreSys, install_addon_ssh: Addon) -> None:
"""Test addon is setting state from docker events."""
with patch.object(type(install_addon_ssh.instance), "attach"):
await install_addon_ssh.load()
assert install_addon_ssh.state == AddonState.UNKNOWN
with patch.object(type(install_addon_ssh), "watchdog_container"):
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.RUNNING,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
assert install_addon_ssh.state == AddonState.STARTED
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.STOPPED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
assert install_addon_ssh.state == AddonState.STOPPED
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
assert install_addon_ssh.state == AddonState.ERROR
# Test other addons are ignored
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="addon_local_non_installed",
state=ContainerState.RUNNING,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
assert install_addon_ssh.state == AddonState.ERROR
async def mock_current_state(state: ContainerState) -> ContainerState:
"""Mock for current state method."""
return state
async def test_addon_watchdog(coresys: CoreSys, install_addon_ssh: Addon) -> None:
"""Test addon watchdog works correctly."""
with patch.object(type(install_addon_ssh.instance), "attach"):
await install_addon_ssh.load()
install_addon_ssh.watchdog = True
with patch.object(Addon, "restart") as restart, patch.object(
Addon, "start"
) as start, patch.object(
type(install_addon_ssh.instance), "current_state"
) as current_state:
current_state.return_value = mock_current_state(ContainerState.UNHEALTHY)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.UNHEALTHY,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_called_once()
start.assert_not_called()
restart.reset_mock()
current_state.return_value = mock_current_state(ContainerState.FAILED)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_called_once()
start.reset_mock()
# Do not process event if container state has changed since fired
current_state.return_value = mock_current_state(ContainerState.HEALTHY)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_not_called()
# Do not restart when addon stopped normally
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.STOPPED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_not_called()
# Other addons ignored
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="addon_local_non_installed",
state=ContainerState.UNHEALTHY,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_not_called()
async def test_addon_watchdog_rebuild_on_failure(
coresys: CoreSys, install_addon_ssh: Addon
) -> None:
"""Test addon watchdog rebuilds if start fails."""
with patch.object(type(install_addon_ssh.instance), "attach"):
await install_addon_ssh.load()
install_addon_ssh.watchdog = True
with patch.object(Addon, "start", side_effect=AddonsError()) as start, patch.object(
Addon, "rebuild"
) as rebuild, patch.object(
type(install_addon_ssh.instance),
"current_state",
return_value=mock_current_state(ContainerState.FAILED),
):
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
start.assert_called_once()
rebuild.assert_called_once()

View File

@ -27,7 +27,8 @@ from supervisor.dbus.network import NetworkManager
from supervisor.dbus.resolved import Resolved
from supervisor.dbus.systemd import Systemd
from supervisor.dbus.timedate import TimeDate
from supervisor.docker import DockerAPI
from supervisor.docker.manager import DockerAPI
from supervisor.docker.monitor import DockerMonitor
from supervisor.store.addon import AddonStore
from supervisor.store.repository import Repository
from supervisor.utils.dbus import DBus
@ -48,20 +49,28 @@ def docker() -> DockerAPI:
"""Mock DockerAPI."""
images = [MagicMock(tags=["ghcr.io/home-assistant/amd64-hassio-supervisor:latest"])]
with patch("supervisor.docker.DockerClient", return_value=MagicMock()), patch(
"supervisor.docker.DockerAPI.images", return_value=MagicMock()
), patch("supervisor.docker.DockerAPI.containers", return_value=MagicMock()), patch(
"supervisor.docker.DockerAPI.api", return_value=MagicMock()
with patch(
"supervisor.docker.manager.DockerClient", return_value=MagicMock()
), patch(
"supervisor.docker.DockerAPI.images.list", return_value=images
"supervisor.docker.manager.DockerAPI.images", return_value=MagicMock()
), patch(
"supervisor.docker.DockerAPI.info",
"supervisor.docker.manager.DockerAPI.containers", return_value=MagicMock()
), patch(
"supervisor.docker.manager.DockerAPI.api", return_value=MagicMock()
), patch(
"supervisor.docker.manager.DockerAPI.images.list", return_value=images
), patch(
"supervisor.docker.manager.DockerAPI.info",
return_value=MagicMock(),
), patch(
"supervisor.docker.DockerConfig",
"supervisor.docker.manager.DockerConfig",
return_value=MagicMock(),
), patch(
"supervisor.docker.manager.DockerAPI.load"
), patch(
"supervisor.docker.manager.DockerAPI.unload"
):
docker_obj = DockerAPI()
docker_obj = DockerAPI(MagicMock())
docker_obj.info.logging = "journald"
docker_obj.info.storage = "overlay2"
docker_obj.info.version = "1.0.0"
@ -219,6 +228,7 @@ async def coresys(loop, docker, network_manager, aiohttp_client, run_dir) -> Cor
# Mock docker
coresys_obj._docker = docker
coresys_obj.docker._monitor = DockerMonitor(coresys_obj)
# Set internet state
coresys_obj.supervisor._connectivity = True

View File

@ -1,12 +1,20 @@
"""Test Docker interface."""
from unittest.mock import Mock, PropertyMock, call, patch
from typing import Any
from unittest.mock import MagicMock, Mock, PropertyMock, call, patch
from awesomeversion import AwesomeVersion
from docker.errors import DockerException, NotFound
from docker.models.containers import Container
from docker.models.images import Image
import pytest
from requests import RequestException
from supervisor.const import CpuArch
from supervisor.const import BusEvent, CpuArch
from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState
from supervisor.docker.interface import DockerInterface
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import DockerAPIError, DockerError, DockerRequestError
@pytest.fixture(autouse=True)
@ -50,3 +58,151 @@ async def test_docker_image_default_platform(coresys: CoreSys):
await instance.install(AwesomeVersion("1.2.3"), "test")
assert pull.call_count == 1
assert pull.call_args == call("test:1.2.3", platform="linux/386")
@pytest.mark.parametrize(
"attrs,expected",
[
({"State": {"Status": "running"}}, ContainerState.RUNNING),
({"State": {"Status": "exited", "ExitCode": 0}}, ContainerState.STOPPED),
({"State": {"Status": "exited", "ExitCode": 137}}, ContainerState.FAILED),
(
{"State": {"Status": "running", "Health": {"Status": "healthy"}}},
ContainerState.HEALTHY,
),
(
{"State": {"Status": "running", "Health": {"Status": "unhealthy"}}},
ContainerState.UNHEALTHY,
),
],
)
async def test_current_state(
coresys: CoreSys, attrs: dict[str, Any], expected: ContainerState
):
"""Test current state for container."""
container_collection = MagicMock()
container_collection.get.return_value = Container(attrs)
with patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
):
assert await coresys.homeassistant.core.instance.current_state() == expected
async def test_current_state_failures(coresys: CoreSys):
"""Test failure states for current state."""
container_collection = MagicMock()
with patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
):
container_collection.get.side_effect = NotFound("dne")
assert (
await coresys.homeassistant.core.instance.current_state()
== ContainerState.UNKNOWN
)
container_collection.get.side_effect = DockerException()
with pytest.raises(DockerAPIError):
await coresys.homeassistant.core.instance.current_state()
container_collection.get.side_effect = RequestException()
with pytest.raises(DockerRequestError):
await coresys.homeassistant.core.instance.current_state()
@pytest.mark.parametrize(
"attrs,expected,fired_when_skip_down",
[
({"State": {"Status": "running"}}, ContainerState.RUNNING, True),
({"State": {"Status": "exited", "ExitCode": 0}}, ContainerState.STOPPED, False),
(
{"State": {"Status": "exited", "ExitCode": 137}},
ContainerState.FAILED,
False,
),
(
{"State": {"Status": "running", "Health": {"Status": "healthy"}}},
ContainerState.HEALTHY,
True,
),
(
{"State": {"Status": "running", "Health": {"Status": "unhealthy"}}},
ContainerState.UNHEALTHY,
True,
),
],
)
async def test_attach_existing_container(
coresys: CoreSys,
attrs: dict[str, Any],
expected: ContainerState,
fired_when_skip_down: bool,
):
"""Test attaching to existing container."""
attrs["Id"] = "abc123"
attrs["Config"] = {}
container_collection = MagicMock()
container_collection.get.return_value = Container(attrs)
with patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
), patch.object(type(coresys.bus), "fire_event") as fire_event, patch(
"supervisor.docker.interface.time", return_value=1
):
await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3"))
fire_event.assert_called_once_with(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent("homeassistant", expected, "abc123", 1),
)
fire_event.reset_mock()
await coresys.homeassistant.core.instance.attach(
AwesomeVersion("2022.7.3"), skip_state_event_if_down=True
)
if fired_when_skip_down:
fire_event.assert_called_once_with(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent("homeassistant", expected, "abc123", 1),
)
else:
fire_event.assert_not_called()
async def test_attach_container_failure(coresys: CoreSys):
"""Test attach fails to find container but finds image."""
container_collection = MagicMock()
container_collection.get.side_effect = DockerException()
image_collection = MagicMock()
image_config = {"Image": "sha256:abc123"}
image_collection.get.return_value = Image({"Config": image_config})
with patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
), patch(
"supervisor.docker.manager.DockerAPI.images",
new=PropertyMock(return_value=image_collection),
), patch.object(
type(coresys.bus), "fire_event"
) as fire_event:
await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3"))
fire_event.assert_not_called()
assert coresys.homeassistant.core.instance.meta_config == image_config
async def test_attach_total_failure(coresys: CoreSys):
"""Test attach fails to find container or image."""
container_collection = MagicMock()
container_collection.get.side_effect = DockerException()
image_collection = MagicMock()
image_collection.get.side_effect = DockerException()
with patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
), patch(
"supervisor.docker.manager.DockerAPI.images",
new=PropertyMock(return_value=image_collection),
), pytest.raises(
DockerError
):
await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3"))

View File

@ -0,0 +1,148 @@
"""Test docker events monitor."""
import asyncio
from typing import Any, Optional
from unittest.mock import MagicMock, PropertyMock, patch
from awesomeversion import AwesomeVersion
from docker.models.containers import Container
import pytest
from supervisor.const import BusEvent
from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
@pytest.mark.parametrize(
"event,expected",
[
(
{
"Type": "container",
"Action": "start",
"Actor": {"Attributes": {"supervisor_managed": ""}},
},
ContainerState.RUNNING,
),
(
{
"Type": "container",
"Action": "die",
"Actor": {"Attributes": {"supervisor_managed": "", "exitCode": "0"}},
},
ContainerState.STOPPED,
),
(
{
"Type": "container",
"Action": "die",
"Actor": {"Attributes": {"supervisor_managed": "", "exitCode": "137"}},
},
ContainerState.FAILED,
),
(
{
"Type": "container",
"Action": "health_status: healthy",
"Actor": {"Attributes": {"supervisor_managed": ""}},
},
ContainerState.HEALTHY,
),
(
{
"Type": "container",
"Action": "health_status: unhealthy",
"Actor": {"Attributes": {"supervisor_managed": ""}},
},
ContainerState.UNHEALTHY,
),
(
{
"Type": "container",
"Action": "exec_die",
"Actor": {"Attributes": {"supervisor_managed": ""}},
},
None,
),
(
{
"Type": "container",
"Action": "start",
"Actor": {"Attributes": {}},
},
None,
),
(
{
"Type": "network",
"Action": "start",
"Actor": {"Attributes": {}},
},
None,
),
],
)
async def test_events(
coresys: CoreSys, event: dict[str, Any], expected: Optional[ContainerState]
):
"""Test events created from docker events."""
event["Actor"]["Attributes"]["name"] = "some_container"
event["id"] = "abc123"
event["time"] = 123
with patch(
"supervisor.docker.manager.DockerAPI.events",
new=PropertyMock(return_value=[event]),
), patch.object(type(coresys.bus), "fire_event") as fire_event:
await coresys.docker.monitor.load()
await asyncio.sleep(0.1)
if expected:
fire_event.assert_called_once_with(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent("some_container", expected, "abc123", 123),
)
else:
fire_event.assert_not_called()
async def test_unlabeled_container(coresys: CoreSys):
"""Test attaching to unlabeled container is still watched."""
container_collection = MagicMock()
container_collection.get.return_value = Container(
{
"Name": "homeassistant",
"Id": "abc123",
"State": {"Status": "running"},
"Config": {},
}
)
with patch(
"supervisor.docker.manager.DockerAPI.containers",
new=PropertyMock(return_value=container_collection),
):
await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3"))
with patch(
"supervisor.docker.manager.DockerAPI.events",
new=PropertyMock(
return_value=[
{
"id": "abc123",
"time": 123,
"Type": "container",
"Action": "die",
"Actor": {
"Attributes": {"name": "homeassistant", "exitCode": "137"}
},
}
]
),
), patch.object(type(coresys.bus), "fire_event") as fire_event:
await coresys.docker.monitor.load()
await asyncio.sleep(0.1)
fire_event.assert_called_once_with(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
"homeassistant", ContainerState.FAILED, "abc123", 123
),
)

View File

@ -0,0 +1,142 @@
"""Test Home Assistant watchdog."""
import asyncio
from unittest.mock import PropertyMock, patch
from awesomeversion import AwesomeVersion
from supervisor.const import BusEvent
from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import HomeAssistantError
async def mock_current_state(state: ContainerState) -> ContainerState:
"""Mock for current state method."""
return state
async def test_home_assistant_watchdog(coresys: CoreSys) -> None:
"""Test homeassistant watchdog works correctly."""
coresys.homeassistant.version = AwesomeVersion("2022.7.3")
with patch(
"supervisor.docker.interface.DockerInterface.version",
new=PropertyMock(return_value=AwesomeVersion("2022.7.3")),
), patch.object(type(coresys.homeassistant.core.instance), "attach"):
await coresys.homeassistant.core.load()
coresys.homeassistant.core.watchdog = True
with patch.object(
type(coresys.homeassistant.core), "restart"
) as restart, patch.object(
type(coresys.homeassistant.core), "start"
) as start, patch.object(
type(coresys.homeassistant.core.instance), "current_state"
) as current_state:
current_state.return_value = mock_current_state(ContainerState.UNHEALTHY)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="homeassistant",
state=ContainerState.UNHEALTHY,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_called_once()
start.assert_not_called()
restart.reset_mock()
current_state.return_value = mock_current_state(ContainerState.FAILED)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="homeassistant",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_called_once()
start.reset_mock()
# Do not process event if container state has changed since fired
current_state.return_value = mock_current_state(ContainerState.HEALTHY)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="homeassistant",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_not_called()
# Do not restart when home assistant stopped normally
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="homeassistant",
state=ContainerState.STOPPED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_not_called()
# Other containers ignored
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="addon_local_other",
state=ContainerState.UNHEALTHY,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
restart.assert_not_called()
start.assert_not_called()
async def test_home_assistant_watchdog_rebuild_on_failure(coresys: CoreSys) -> None:
"""Test home assistant watchdog rebuilds if start fails."""
coresys.homeassistant.version = AwesomeVersion("2022.7.3")
with patch(
"supervisor.docker.interface.DockerInterface.version",
new=PropertyMock(return_value=AwesomeVersion("2022.7.3")),
), patch.object(type(coresys.homeassistant.core.instance), "attach"):
await coresys.homeassistant.core.load()
coresys.homeassistant.core.watchdog = True
with patch.object(
type(coresys.homeassistant.core), "start", side_effect=HomeAssistantError()
) as start, patch.object(
type(coresys.homeassistant.core), "rebuild"
) as rebuild, patch.object(
type(coresys.homeassistant.core.instance),
"current_state",
return_value=mock_current_state(ContainerState.FAILED),
):
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="homeassistant",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
start.assert_called_once()
rebuild.assert_called_once()

View File

@ -1,14 +1,21 @@
"""Test DNS plugin."""
import asyncio
from ipaddress import IPv4Address
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
import pytest
from supervisor.const import LogLevel
from supervisor.const import BusEvent, LogLevel
from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState
from supervisor.docker.interface import DockerInterface
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.plugins.dns import HostEntry
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
from tests.plugins.test_plugin_base import mock_current_state, mock_is_running
@pytest.fixture(name="docker_interface")
@ -122,3 +129,64 @@ async def test_reset(coresys: CoreSys):
names=["observer", "observer.local.hass.io"],
),
]
async def mock_logs(logs: bytes) -> bytes:
"""Mock for logs method."""
return logs
async def test_loop_detection_on_failure(coresys: CoreSys):
"""Test loop detection when coredns fails."""
assert len(coresys.resolution.issues) == 0
assert len(coresys.resolution.suggestions) == 0
with patch.object(type(coresys.plugins.dns.instance), "attach"), patch.object(
type(coresys.plugins.dns.instance),
"is_running",
return_value=mock_is_running(True),
):
await coresys.plugins.dns.load()
with patch.object(type(coresys.plugins.dns), "rebuild") as rebuild, patch.object(
type(coresys.plugins.dns.instance),
"current_state",
side_effect=[
mock_current_state(ContainerState.FAILED),
mock_current_state(ContainerState.FAILED),
],
), patch.object(type(coresys.plugins.dns.instance), "logs") as logs:
logs.return_value = mock_logs(b"")
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="hassio_dns",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
assert len(coresys.resolution.issues) == 0
assert len(coresys.resolution.suggestions) == 0
rebuild.assert_called_once()
rebuild.reset_mock()
logs.return_value = mock_logs(b"plugin/loop: Loop")
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="hassio_dns",
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
assert coresys.resolution.issues == [
Issue(IssueType.DNS_LOOP, ContextType.PLUGIN, "dns")
]
assert coresys.resolution.suggestions == [
Suggestion(SuggestionType.EXECUTE_RESET, ContextType.PLUGIN, "dns")
]
rebuild.assert_called_once()

View File

@ -0,0 +1,304 @@
"""Test base plugin functionality."""
import asyncio
from unittest.mock import patch
from awesomeversion import AwesomeVersion
import pytest
from supervisor.const import BusEvent
from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import (
AudioError,
CliError,
CoreDNSError,
DockerError,
MulticastError,
ObserverError,
PluginError,
)
from supervisor.plugins.audio import PluginAudio
from supervisor.plugins.base import PluginBase
from supervisor.plugins.cli import PluginCli
from supervisor.plugins.dns import PluginDns
from supervisor.plugins.multicast import PluginMulticast
from supervisor.plugins.observer import PluginObserver
@pytest.fixture(name="plugin")
async def fixture_plugin(
coresys: CoreSys, request: pytest.FixtureRequest
) -> PluginBase:
"""Get plugin from param."""
if request.param == PluginAudio:
yield coresys.plugins.audio
elif request.param == PluginCli:
yield coresys.plugins.cli
elif request.param == PluginDns:
with patch.object(PluginDns, "loop_detection"):
yield coresys.plugins.dns
elif request.param == PluginMulticast:
yield coresys.plugins.multicast
elif request.param == PluginObserver:
yield coresys.plugins.observer
async def mock_current_state(state: ContainerState) -> ContainerState:
"""Mock for current state method."""
return state
async def mock_is_running(running: bool) -> bool:
"""Mock for is running method."""
return running
async def mock_get_latest_version(version: AwesomeVersion) -> AwesomeVersion:
"""Mock for get latest version method."""
return version
@pytest.mark.parametrize(
"plugin",
[PluginAudio, PluginCli, PluginDns, PluginMulticast, PluginObserver],
indirect=True,
)
async def test_plugin_watchdog(coresys: CoreSys, plugin: PluginBase) -> None:
"""Test plugin watchdog works correctly."""
with patch.object(type(plugin.instance), "attach"), patch.object(
type(plugin.instance), "is_running", return_value=mock_is_running(True)
):
await plugin.load()
with patch.object(type(plugin), "rebuild") as rebuild, patch.object(
type(plugin), "start"
) as start, patch.object(type(plugin.instance), "current_state") as current_state:
current_state.return_value = mock_current_state(ContainerState.UNHEALTHY)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=plugin.instance.name,
state=ContainerState.UNHEALTHY,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
rebuild.assert_called_once()
start.assert_not_called()
rebuild.reset_mock()
current_state.return_value = mock_current_state(ContainerState.FAILED)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=plugin.instance.name,
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
rebuild.assert_called_once()
start.assert_not_called()
rebuild.reset_mock()
# Plugins are restarted anytime they stop, not just on failure
current_state.return_value = mock_current_state(ContainerState.STOPPED)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=plugin.instance.name,
state=ContainerState.STOPPED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
rebuild.assert_not_called()
start.assert_called_once()
start.reset_mock()
# Do not process event if container state has changed since fired
current_state.return_value = mock_current_state(ContainerState.HEALTHY)
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=plugin.instance.name,
state=ContainerState.FAILED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
rebuild.assert_not_called()
start.assert_not_called()
# Other containers ignored
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name="addon_local_other",
state=ContainerState.UNHEALTHY,
id="abc123",
time=1,
),
)
await asyncio.sleep(0)
rebuild.assert_not_called()
start.assert_not_called()
@pytest.mark.parametrize(
"plugin,error",
[
(PluginAudio, AudioError),
(PluginCli, CliError),
(PluginDns, CoreDNSError),
(PluginMulticast, MulticastError),
(PluginObserver, ObserverError),
],
indirect=["plugin"],
)
async def test_plugin_watchdog_rebuild_on_failure(
coresys: CoreSys, plugin: PluginBase, error: PluginError
) -> None:
"""Test plugin watchdog rebuilds if start fails."""
with patch.object(type(plugin.instance), "attach"), patch.object(
type(plugin.instance), "is_running", return_value=mock_is_running(True)
):
await plugin.load()
with patch("supervisor.plugins.base.WATCHDOG_RETRY_SECONDS", 0), patch.object(
type(plugin), "rebuild"
) as rebuild, patch.object(
type(plugin), "start", side_effect=error
) as start, patch.object(
type(plugin.instance),
"current_state",
side_effect=[
mock_current_state(ContainerState.STOPPED),
mock_current_state(ContainerState.STOPPED),
],
):
coresys.bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
name=plugin.instance.name,
state=ContainerState.STOPPED,
id="abc123",
time=1,
),
)
await asyncio.sleep(0.1)
start.assert_called_once()
rebuild.assert_called_once()
@pytest.mark.parametrize(
"plugin",
[PluginAudio, PluginCli, PluginDns, PluginMulticast, PluginObserver],
indirect=True,
)
async def test_plugin_load_running_container(
coresys: CoreSys, plugin: PluginBase
) -> None:
"""Test plugins load and attach to a running container."""
test_version = AwesomeVersion("2022.7.3")
with patch.object(
type(coresys.bus), "register_event"
) as register_event, patch.object(
type(plugin.instance), "attach"
) as attach, patch.object(
type(plugin), "install"
) as install, patch.object(
type(plugin), "start"
) as start, patch.object(
type(plugin.instance),
"get_latest_version",
return_value=mock_get_latest_version(test_version),
), patch.object(
type(plugin.instance), "is_running", return_value=mock_is_running(True)
):
await plugin.load()
register_event.assert_any_call(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, plugin.watchdog_container
)
attach.assert_called_once_with(
version=test_version, skip_state_event_if_down=True
)
install.assert_not_called()
start.assert_not_called()
@pytest.mark.parametrize(
"plugin",
[PluginAudio, PluginCli, PluginDns, PluginMulticast, PluginObserver],
indirect=True,
)
async def test_plugin_load_stopped_container(
coresys: CoreSys, plugin: PluginBase
) -> None:
"""Test plugins load and start existing container."""
test_version = AwesomeVersion("2022.7.3")
with patch.object(
type(coresys.bus), "register_event"
) as register_event, patch.object(
type(plugin.instance), "attach"
) as attach, patch.object(
type(plugin), "install"
) as install, patch.object(
type(plugin), "start"
) as start, patch.object(
type(plugin.instance),
"get_latest_version",
return_value=mock_get_latest_version(test_version),
), patch.object(
type(plugin.instance), "is_running", return_value=mock_is_running(False)
):
await plugin.load()
register_event.assert_any_call(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, plugin.watchdog_container
)
attach.assert_called_once_with(
version=test_version, skip_state_event_if_down=True
)
install.assert_not_called()
start.assert_called_once()
@pytest.mark.parametrize(
"plugin",
[PluginAudio, PluginCli, PluginDns, PluginMulticast, PluginObserver],
indirect=True,
)
async def test_plugin_load_missing_container(
coresys: CoreSys, plugin: PluginBase
) -> None:
"""Test plugins load and create and start container."""
test_version = AwesomeVersion("2022.7.3")
with patch.object(
type(coresys.bus), "register_event"
) as register_event, patch.object(
type(plugin.instance), "attach", side_effect=DockerError()
) as attach, patch.object(
type(plugin), "install"
) as install, patch.object(
type(plugin), "start"
) as start, patch.object(
type(plugin.instance),
"get_latest_version",
return_value=mock_get_latest_version(test_version),
), patch.object(
type(plugin.instance), "is_running", return_value=mock_is_running(False)
):
await plugin.load()
register_event.assert_any_call(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE, plugin.watchdog_container
)
attach.assert_called_once_with(
version=test_version, skip_state_event_if_down=True
)
install.assert_called_once()
start.assert_called_once()

View File

@ -21,7 +21,9 @@ def test_get_images(coresys: CoreSys):
images = container._get_images()
assert images[0].tags[0] == "test"
with patch("supervisor.docker.DockerAPI.images.list", side_effect=DockerException):
with patch(
"supervisor.docker.manager.DockerAPI.images.list", side_effect=DockerException
):
images = container._get_images()
assert not images