Watchdog for Add-ons (#1970)

* Watchdog for Add-ons

* Run task

* Extend appliaction watchdog

* fix spell

* Add running tasks

* Add tests

* Fix states

* Update supervisor/misc/tasks.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update tests/test_validate.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Adjust timeout

* change timeout

* Modify tasker

* slots the task object

* fix typing

* Add tests

* fix lint

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Pascal Vizeli 2020-08-26 22:20:35 +02:00 committed by GitHub
parent 8dea50ce83
commit efcfc1f841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 417 additions and 139 deletions

6
API.md
View File

@ -609,7 +609,8 @@ Get all available add-ons.
"ingress_entry": "null|/api/hassio_ingress/slug",
"ingress_url": "null|/api/hassio_ingress/slug/entry.html",
"ingress_port": "null|int",
"ingress_panel": "null|bool"
"ingress_panel": "null|bool",
"watchdog": "null|bool"
}
```
@ -629,7 +630,8 @@ Get all available add-ons.
"options": {},
"audio_output": "null|0,0",
"audio_input": "null|0,0",
"ingress_panel": "bool"
"ingress_panel": "bool",
"watchdog": "bool"
}
```

View File

@ -5,7 +5,7 @@ import logging
import tarfile
from typing import Dict, List, Optional, Union
from ..const import BOOT_AUTO, STATE_STARTED, AddonStartup
from ..const import BOOT_AUTO, AddonStartup, AddonState
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
AddonsError,
@ -108,7 +108,7 @@ class AddonManager(CoreSysAttributes):
"""Shutdown addons."""
tasks: List[Addon] = []
for addon in self.installed:
if await addon.state() != STATE_STARTED or addon.startup != stage:
if addon.state != AddonState.STARTED or addon.startup != stage:
continue
tasks.append(addon)
@ -176,6 +176,8 @@ class AddonManager(CoreSysAttributes):
await addon.instance.remove()
except DockerAPIError as err:
raise AddonsError() from err
else:
addon.state = AddonState.UNKNOWN
await addon.remove_data()
@ -238,7 +240,7 @@ class AddonManager(CoreSysAttributes):
raise AddonsNotSupportedError()
# Update instance
last_state = await addon.state()
last_state: AddonState = addon.state
try:
await addon.instance.update(store.version, store.image)
@ -255,7 +257,7 @@ class AddonManager(CoreSysAttributes):
await addon.install_apparmor()
# restore state
if last_state == STATE_STARTED:
if last_state == AddonState.STARTED:
await addon.start()
async def rebuild(self, slug: str) -> None:
@ -279,7 +281,7 @@ class AddonManager(CoreSysAttributes):
raise AddonsNotSupportedError()
# remove docker container but not addon config
last_state = await addon.state()
last_state: AddonState = addon.state
try:
await addon.instance.remove()
await addon.instance.install(addon.version)
@ -290,7 +292,7 @@ class AddonManager(CoreSysAttributes):
_LOGGER.info("Add-on '%s' successfully rebuilt", slug)
# restore state
if last_state == STATE_STARTED:
if last_state == AddonState.STARTED:
await addon.start()
async def restore(self, slug: str, tar_file: tarfile.TarFile) -> None:

View File

@ -1,4 +1,5 @@
"""Init file for Supervisor add-ons."""
import asyncio
from contextlib import suppress
from copy import deepcopy
from ipaddress import IPv4Address
@ -11,6 +12,7 @@ import tarfile
from tempfile import TemporaryDirectory
from typing import Any, Awaitable, Dict, List, Optional
import aiohttp
import voluptuous as vol
from voluptuous.humanize import humanize_error
@ -35,9 +37,9 @@ from ..const import (
ATTR_USER,
ATTR_UUID,
ATTR_VERSION,
ATTR_WATCHDOG,
DNS_SUFFIX,
STATE_STARTED,
STATE_STOPPED,
AddonState,
)
from ..coresys import CoreSys
from ..docker.addon import DockerAddon
@ -50,6 +52,7 @@ from ..exceptions import (
HostAppArmorError,
JsonFileError,
)
from ..utils import check_port
from ..utils.apparmor import adjust_profile
from ..utils.json import read_json_file, write_json_file
from ..utils.tar import atomic_contents_add, secure_path
@ -64,8 +67,15 @@ RE_WEBUI = re.compile(
r":\/\/\[HOST\]:\[PORT:(?P<t_port>\d+)\](?P<s_suffix>.*)$"
)
RE_WATCHDOG = re.compile(
r"^(?:(?P<s_prefix>https?|tcp)|\[PROTO:(?P<t_proto>\w+)\])"
r":\/\/\[HOST\]:\[PORT:(?P<t_port>\d+)\](?P<s_suffix>.*)$"
)
RE_OLD_AUDIO = re.compile(r"\d+,\d+")
WATCHDOG_TIMEOUT = aiohttp.ClientTimeout(total=10)
class Addon(AddonModel):
"""Hold data for add-on inside Supervisor."""
@ -74,12 +84,24 @@ class Addon(AddonModel):
"""Initialize data holder."""
super().__init__(coresys, slug)
self.instance: DockerAddon = DockerAddon(coresys, self)
self.state: AddonState = AddonState.UNKNOWN
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress
async def load(self) -> None:
"""Async initialize of object."""
with suppress(DockerAPIError):
await self.instance.attach(tag=self.version)
# Evaluate state
if await self.instance.is_running():
self.state = AddonState.STARTED
else:
self.state = AddonState.STOPPED
@property
def ip_address(self) -> IPv4Address:
"""Return IP of add-on instance."""
@ -155,6 +177,16 @@ class Addon(AddonModel):
"""Set auto update."""
self.persist[ATTR_AUTO_UPDATE] = value
@property
def watchdog(self) -> bool:
"""Return True if watchdog is enable."""
return self.persist[ATTR_WATCHDOG]
@watchdog.setter
def watchdog(self, value: bool) -> None:
"""Set watchdog enable/disable."""
self.persist[ATTR_WATCHDOG] = value
@property
def uuid(self) -> str:
"""Return an API token for this add-on."""
@ -230,8 +262,6 @@ class Addon(AddonModel):
if not url:
return None
webui = RE_WEBUI.match(url)
if not webui:
return None
# extract arguments
t_port = webui.group("t_port")
@ -245,10 +275,6 @@ class Addon(AddonModel):
else:
port = self.ports.get(f"{t_port}/tcp", t_port)
# for interface config or port lists
if isinstance(port, (tuple, list)):
port = port[-1]
# lookup the correct protocol from config
if t_proto:
proto = "https" if self.options.get(t_proto) else "http"
@ -353,6 +379,48 @@ class Addon(AddonModel):
"""Save data of add-on."""
self.sys_addons.data.save_data()
async def watchdog_application(self) -> bool:
"""Return True if application is running."""
url = super().watchdog
if not url:
return True
application = RE_WATCHDOG.match(url)
# extract arguments
t_port = application.group("t_port")
t_proto = application.group("t_proto")
s_prefix = application.group("s_prefix") or ""
s_suffix = application.group("s_suffix") or ""
# search host port for this docker port
if self.host_network:
port = self.ports.get(f"{t_port}/tcp", t_port)
else:
port = t_port
# TCP monitoring
if s_prefix == "tcp":
return await self.sys_run_in_executor(check_port, self.ip_address, port)
# lookup the correct protocol from config
if t_proto:
proto = "https" if self.options.get(t_proto) else "http"
else:
proto = s_prefix
# Make HTTP request
try:
url = f"{proto}://{self.ip_address}:{port}{s_suffix}"
async with self.sys_websession_ssl.get(
url, timeout=WATCHDOG_TIMEOUT
) as req:
if req.status < 300:
return True
except (asyncio.TimeoutError, aiohttp.ClientError):
pass
return False
async def write_options(self) -> None:
"""Return True if add-on options is written to data."""
schema = self.schema
@ -462,12 +530,6 @@ class Addon(AddonModel):
return False
return True
async def state(self) -> str:
"""Return running state of add-on."""
if await self.instance.is_running():
return STATE_STARTED
return STATE_STOPPED
async def start(self) -> None:
"""Set options and start add-on."""
if await self.instance.is_running():
@ -490,6 +552,8 @@ class Addon(AddonModel):
await self.instance.run()
except DockerAPIError as err:
raise AddonsError() from err
else:
self.state = AddonState.STARTED
async def stop(self) -> None:
"""Stop add-on."""
@ -497,6 +561,8 @@ class Addon(AddonModel):
return await self.instance.stop()
except DockerAPIError as err:
raise AddonsError() from err
else:
self.state = AddonState.STOPPED
async def restart(self) -> None:
"""Restart add-on."""
@ -511,6 +577,13 @@ class Addon(AddonModel):
"""
return self.instance.logs()
def is_running(self) -> Awaitable[bool]:
"""Return True if Docker container is running.
Return a coroutine.
"""
return self.instance.is_running()
async def stats(self) -> DockerStats:
"""Return stats of container."""
try:
@ -548,7 +621,7 @@ class Addon(AddonModel):
ATTR_USER: self.persist,
ATTR_SYSTEM: self.data,
ATTR_VERSION: self.version,
ATTR_STATE: await self.state(),
ATTR_STATE: self.state,
}
# Store local configs/state
@ -683,7 +756,7 @@ class Addon(AddonModel):
raise AddonsError() from err
# Run add-on
if data[ATTR_STATE] == STATE_STARTED:
if data[ATTR_STATE] == AddonState.STARTED:
return await self.start()
_LOGGER.info("Finish restore for add-on %s", self.slug)

View File

@ -61,6 +61,7 @@ from ..const import (
ATTR_USB,
ATTR_VERSION,
ATTR_VIDEO,
ATTR_WATCHDOG,
ATTR_WEBUI,
SECURITY_DEFAULT,
SECURITY_DISABLE,
@ -248,6 +249,11 @@ class AddonModel(CoreSysAttributes, ABC):
"""Return URL to webui or None."""
return self.data.get(ATTR_WEBUI)
@property
def watchdog(self) -> Optional[str]:
"""Return URL to for watchdog or None."""
return self.data.get(ATTR_WATCHDOG)
@property
def ingress_port(self) -> Optional[int]:
"""Return Ingress port."""

View File

@ -80,22 +80,22 @@ from ..const import (
ATTR_UUID,
ATTR_VERSION,
ATTR_VIDEO,
ATTR_WATCHDOG,
ATTR_WEBUI,
BOOT_AUTO,
BOOT_MANUAL,
PRIVILEGED_ALL,
ROLE_ALL,
ROLE_DEFAULT,
STATE_STARTED,
STATE_STOPPED,
AddonStages,
AddonStartup,
AddonState,
)
from ..coresys import CoreSys
from ..discovery.validate import valid_discovery_service
from ..validate import (
DOCKER_PORTS,
DOCKER_PORTS_DESCRIPTION,
docker_ports,
docker_ports_description,
network_port,
token,
uuid_match,
@ -197,8 +197,11 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_INIT, default=True): vol.Boolean(),
vol.Optional(ATTR_ADVANCED, default=False): vol.Boolean(),
vol.Optional(ATTR_STAGE, default=AddonStages.STABLE): vol.Coerce(AddonStages),
vol.Optional(ATTR_PORTS): DOCKER_PORTS,
vol.Optional(ATTR_PORTS_DESCRIPTION): DOCKER_PORTS_DESCRIPTION,
vol.Optional(ATTR_PORTS): docker_ports,
vol.Optional(ATTR_PORTS_DESCRIPTION): docker_ports_description,
vol.Optional(ATTR_WATCHDOG): vol.Match(
r"^(?:https?|\[PROTO:\w+\]|tcp):\/\/\[HOST\]:\[PORT:\d+\].*$"
),
vol.Optional(ATTR_WEBUI): vol.Match(
r"^(?:https?|\[PROTO:\w+\]):\/\/\[HOST\]:\[PORT:\d+\].*$"
),
@ -301,11 +304,12 @@ SCHEMA_ADDON_USER = vol.Schema(
vol.Optional(ATTR_OPTIONS, default=dict): dict,
vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(),
vol.Optional(ATTR_BOOT): vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): DOCKER_PORTS,
vol.Optional(ATTR_NETWORK): docker_ports,
vol.Optional(ATTR_AUDIO_OUTPUT): vol.Maybe(vol.Coerce(str)),
vol.Optional(ATTR_AUDIO_INPUT): vol.Maybe(vol.Coerce(str)),
vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(),
vol.Optional(ATTR_WATCHDOG, default=False): vol.Boolean(),
},
extra=vol.REMOVE_EXTRA,
)
@ -331,7 +335,7 @@ SCHEMA_ADDON_SNAPSHOT = vol.Schema(
{
vol.Required(ATTR_USER): SCHEMA_ADDON_USER,
vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM,
vol.Required(ATTR_STATE): vol.In([STATE_STARTED, STATE_STOPPED]),
vol.Required(ATTR_STATE): vol.Coerce(AddonState),
vol.Required(ATTR_VERSION): vol.Coerce(str),
},
extra=vol.REMOVE_EXTRA,

View File

@ -85,6 +85,7 @@ from ..const import (
ATTR_VERSION,
ATTR_VERSION_LATEST,
ATTR_VIDEO,
ATTR_WATCHDOG,
ATTR_WEBUI,
BOOT_AUTO,
BOOT_MANUAL,
@ -92,12 +93,12 @@ from ..const import (
CONTENT_TYPE_PNG,
CONTENT_TYPE_TEXT,
REQUEST_FROM,
STATE_NONE,
AddonState,
)
from ..coresys import CoreSysAttributes
from ..docker.stats import DockerStats
from ..exceptions import APIError
from ..validate import DOCKER_PORTS
from ..validate import docker_ports
from .utils import api_process, api_process_raw, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -108,11 +109,12 @@ SCHEMA_VERSION = vol.Schema({vol.Optional(ATTR_VERSION): vol.Coerce(str)})
SCHEMA_OPTIONS = vol.Schema(
{
vol.Optional(ATTR_BOOT): vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): vol.Maybe(DOCKER_PORTS),
vol.Optional(ATTR_NETWORK): vol.Maybe(docker_ports),
vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(),
vol.Optional(ATTR_AUDIO_OUTPUT): vol.Maybe(vol.Coerce(str)),
vol.Optional(ATTR_AUDIO_INPUT): vol.Maybe(vol.Coerce(str)),
vol.Optional(ATTR_INGRESS_PANEL): vol.Boolean(),
vol.Optional(ATTR_WATCHDOG): vol.Boolean(),
}
)
@ -213,7 +215,7 @@ class APIAddons(CoreSysAttributes):
ATTR_MACHINE: addon.supported_machine,
ATTR_HOMEASSISTANT: addon.homeassistant_version,
ATTR_URL: addon.url,
ATTR_STATE: STATE_NONE,
ATTR_STATE: AddonState.UNKNOWN,
ATTR_DETACHED: addon.is_detached,
ATTR_AVAILABLE: addon.available,
ATTR_BUILD: addon.need_build,
@ -255,12 +257,13 @@ class APIAddons(CoreSysAttributes):
ATTR_INGRESS_URL: None,
ATTR_INGRESS_PORT: None,
ATTR_INGRESS_PANEL: None,
ATTR_WATCHDOG: None,
}
if isinstance(addon, Addon) and addon.is_installed:
data.update(
{
ATTR_STATE: await addon.state(),
ATTR_STATE: addon.state,
ATTR_WEBUI: addon.webui,
ATTR_INGRESS_ENTRY: addon.ingress_entry,
ATTR_INGRESS_URL: addon.ingress_url,
@ -271,6 +274,7 @@ class APIAddons(CoreSysAttributes):
ATTR_AUTO_UPDATE: addon.auto_update,
ATTR_IP_ADDRESS: str(addon.ip_address),
ATTR_VERSION: addon.version,
ATTR_WATCHDOG: addon.watchdog,
}
)
@ -306,6 +310,8 @@ class APIAddons(CoreSysAttributes):
if ATTR_INGRESS_PANEL in body:
addon.ingress_panel = body[ATTR_INGRESS_PANEL]
await self.sys_ingress.update_hass_panel(addon)
if ATTR_WATCHDOG in body:
addon.watchdog = body[ATTR_WATCHDOG]
addon.save_persist()

View File

@ -86,7 +86,7 @@ class APISupervisor(CoreSysAttributes):
ATTR_NAME: addon.name,
ATTR_SLUG: addon.slug,
ATTR_DESCRIPTON: addon.description,
ATTR_STATE: await addon.state(),
ATTR_STATE: addon.state,
ATTR_VERSION: addon.latest_version,
ATTR_INSTALLED: addon.version,
ATTR_REPOSITORY: addon.repository,

View File

@ -235,10 +235,6 @@ def check_environment() -> None:
if not SOCKET_DOCKER.is_socket():
_LOGGER.critical("Can't find Docker socket!")
# check socat exec
if not shutil.which("socat"):
_LOGGER.critical("Can't find socat!")
# check socat exec
if not shutil.which("gdbus"):
_LOGGER.critical("Can't find gdbus!")

View File

@ -279,9 +279,6 @@ WANT_SERVICE = "want"
BOOT_AUTO = "auto"
BOOT_MANUAL = "manual"
STATE_STARTED = "started"
STATE_STOPPED = "stopped"
STATE_NONE = "none"
MAP_CONFIG = "config"
MAP_SSL = "ssl"
@ -370,6 +367,14 @@ class AddonStages(str, Enum):
DEPRECATED = "deprecated"
class AddonState(str, Enum):
"""State of add-on."""
STARTED = "started"
STOPPED = "stopped"
UNKNOWN = "unknown"
class UpdateChannels(str, Enum):
"""Core supported update channels."""

View File

@ -71,7 +71,7 @@ class Core(CoreSysAttributes):
elif self.sys_config.version != self.sys_supervisor.version:
self.healthy = False
_LOGGER.error(
"Update %s of Supervisor %s fails!",
"Update %s of Supervisor %s failed!",
self.sys_config.version,
self.sys_supervisor.version,
)
@ -245,7 +245,7 @@ class Core(CoreSysAttributes):
# Stage 1
try:
async with async_timeout.timeout(10):
await asyncio.wait([self.sys_api.stop()])
await asyncio.wait([self.sys_api.stop(), self.sys_scheduler.shutdown()])
except asyncio.TimeoutError:
_LOGGER.warning("Stage 1: Force Shutdown!")

View File

@ -405,22 +405,24 @@ class DockerInterface(CoreSysAttributes):
_LOGGER.error("Can't read stats from %s: %s", self.name, err)
raise DockerAPIError() from err
def is_fails(self) -> Awaitable[bool]:
def is_failed(self) -> Awaitable[bool]:
"""Return True if Docker is failing state.
Return a Future.
"""
return self.sys_run_in_executor(self._is_fails)
return self.sys_run_in_executor(self._is_failed)
def _is_fails(self) -> bool:
def _is_failed(self) -> bool:
"""Return True if Docker is failing state.
Need run inside executor.
"""
try:
docker_container = self.sys_docker.containers.get(self.name)
except (docker.errors.DockerException, requests.RequestException):
except docker.errors.NotFound:
return False
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerAPIError() from err
# container is not running
if docker_container.status != "exited":

View File

@ -135,7 +135,7 @@ class AuthError(HassioError):
class AuthPasswordResetError(HassioError):
"""Auth error if password reset fails."""
"""Auth error if password reset failed."""
# Host
@ -150,11 +150,11 @@ class HostNotSupportedError(HassioNotSupportedError):
class HostServiceError(HostError):
"""Host service functions fails."""
"""Host service functions failed."""
class HostAppArmorError(HostError):
"""Host apparmor functions fails."""
"""Host apparmor functions failed."""
# API

View File

@ -153,9 +153,9 @@ class HassOS(CoreSysAttributes):
self.sys_create_task(self.sys_host.control.reboot())
return
# Update fails
# Update failed
await self.sys_dbus.rauc.update()
_LOGGER.error("HassOS update fails with: %s", self.sys_dbus.rauc.last_error)
_LOGGER.error("HassOS update failed with: %s", self.sys_dbus.rauc.last_error)
raise HassOSUpdateError()
async def mark_healthy(self) -> None:

View File

@ -163,7 +163,7 @@ class HomeAssistantCore(CoreSysAttributes):
to_version, image=self.sys_updater.image_homeassistant
)
except DockerAPIError as err:
_LOGGER.warning("Update Home Assistant image fails")
_LOGGER.warning("Update Home Assistant image failed")
raise HomeAssistantUpdateError() from err
else:
self.sys_homeassistant.version = self.instance.version
@ -185,7 +185,7 @@ class HomeAssistantCore(CoreSysAttributes):
# Update going wrong, revert it
if self.error_state and rollback:
_LOGGER.critical("HomeAssistant update fails -> rollback!")
_LOGGER.critical("HomeAssistant update failed -> rollback!")
# Make a copy of the current log file if it exsist
logfile = self.sys_config.path_homeassistant / "home-assistant.log"
if logfile.exists():
@ -288,12 +288,12 @@ class HomeAssistantCore(CoreSysAttributes):
"""
return self.instance.is_running()
def is_fails(self) -> Awaitable[bool]:
"""Return True if a Docker container is fails state.
def is_failed(self) -> Awaitable[bool]:
"""Return True if a Docker container is failed state.
Return a coroutine.
"""
return self.instance.is_fails()
return self.instance.is_failed()
@property
def in_progress(self) -> bool:
@ -408,4 +408,4 @@ class HomeAssistantCore(CoreSysAttributes):
try:
await self.instance.install(self.sys_homeassistant.version)
except DockerAPIError:
_LOGGER.error("Repairing of Home Assistant fails")
_LOGGER.error("Repairing of Home Assistant failed")

View File

@ -103,4 +103,4 @@ class HostManager(CoreSysAttributes):
try:
await self.apparmor.load()
except HassioError as err:
_LOGGER.waring("Load host AppArmor on start fails: %s", err)
_LOGGER.waring("Load host AppArmor on start failed: %s", err)

View File

@ -228,5 +228,5 @@ class Hardware:
if proc.returncode == 0:
return
_LOGGER.warning("udevadm device triggering fails!")
_LOGGER.warning("udevadm device triggering failed!")
raise HardwareNotSupportedError()

View File

@ -1,16 +1,29 @@
"""Schedule for Supervisor."""
import asyncio
from datetime import date, datetime, time, timedelta
import logging
from typing import Awaitable, Callable, List, Optional, Union
from uuid import UUID, uuid4
import async_timeout
import attr
from ..const import CoreStates
from ..coresys import CoreSys, CoreSysAttributes
_LOGGER: logging.Logger = logging.getLogger(__name__)
INTERVAL = "interval"
REPEAT = "repeat"
CALL = "callback"
TASK = "task"
@attr.s(slots=True)
class _Task:
"""Task object."""
id: UUID = attr.ib()
coro_callback: Callable[..., Awaitable[None]] = attr.ib(eq=False)
interval: Union[float, time] = attr.ib(eq=False)
repeat: bool = attr.ib(eq=False)
job: Optional[asyncio.tasks.Task] = attr.ib(eq=False)
next: Optional[asyncio.TimerHandle] = attr.ib(eq=False)
class Scheduler(CoreSysAttributes):
@ -19,43 +32,49 @@ class Scheduler(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize task schedule."""
self.coresys: CoreSys = coresys
self._data = {}
self._tasks: List[_Task] = []
def register_task(self, coro_callback, interval, repeat=True):
def register_task(
self,
coro_callback: Callable[..., Awaitable[None]],
interval: Union[float, time],
repeat: bool = True,
) -> UUID:
"""Schedule a coroutine.
The coroutine need to be a callback without arguments.
"""
task_id = hash(coro_callback)
# Generate data
opts = {CALL: coro_callback, INTERVAL: interval, REPEAT: repeat}
task = _Task(uuid4(), coro_callback, interval, repeat, None, None)
# Schedule task
self._data[task_id] = opts
self._schedule_task(interval, task_id)
self._tasks.append(task)
self._schedule_task(task)
return task_id
return task.id
def _run_task(self, task_id):
def _run_task(self, task: _Task) -> None:
"""Run a scheduled task."""
data = self._data[task_id]
if self.sys_core.state == CoreStates.RUNNING:
self.sys_create_task(data[CALL]())
async def _wrap_task():
"""Run schedule task and reschedule."""
try:
if self.sys_core.state == CoreStates.RUNNING:
await task.coro_callback()
finally:
if task.repeat and self.sys_core.state != CoreStates.STOPPING:
self._schedule_task(task)
else:
self._tasks.remove(task)
if data[REPEAT]:
self._schedule_task(data[INTERVAL], task_id)
else:
self._data.pop(task_id)
task.job = self.sys_create_task(_wrap_task())
def _schedule_task(self, interval, task_id):
def _schedule_task(self, task: _Task) -> None:
"""Schedule a task on loop."""
if isinstance(interval, (int, float)):
job = self.sys_loop.call_later(interval, self._run_task, task_id)
elif isinstance(interval, time):
today = datetime.combine(date.today(), interval)
tomorrow = datetime.combine(date.today() + timedelta(days=1), interval)
if isinstance(task.interval, (int, float)):
task.next = self.sys_loop.call_later(task.interval, self._run_task, task)
elif isinstance(task.interval, time):
today = datetime.combine(date.today(), task.interval)
tomorrow = datetime.combine(date.today() + timedelta(days=1), task.interval)
# Check if we run it today or next day
if today > datetime.today():
@ -63,14 +82,35 @@ class Scheduler(CoreSysAttributes):
else:
calc = tomorrow
job = self.sys_loop.call_at(calc.timestamp(), self._run_task, task_id)
task.next = self.sys_loop.call_at(calc.timestamp(), self._run_task, task)
else:
_LOGGER.critical(
"Unknown interval %s (type: %s) for scheduler %s",
interval,
type(interval),
task_id,
task.interval,
type(task.interval),
task.id,
)
# Store job
self._data[task_id][TASK] = job
async def shutdown(self, timeout=10) -> None:
"""Shutdown all task inside the scheduler."""
running: List[asyncio.tasks.Task] = []
# Cancel next task / get running list
_LOGGER.info("Shutdown scheduled tasks")
for task in self._tasks:
if task.next:
task.next.cancel()
if not task.job or task.job.done():
continue
running.append(task.job)
task.job.cancel()
if not running:
return
# Wait until all are shutdown
try:
async with async_timeout.timeout(timeout):
await asyncio.wait(running)
except asyncio.TimeoutError:
_LOGGER.error("Timeout while waiting for jobs shutdown")

View File

@ -1,6 +1,7 @@
"""A collection of tasks."""
import logging
from ..const import AddonState
from ..coresys import CoreSysAttributes
from ..exceptions import (
AddonsError,
@ -36,6 +37,9 @@ RUN_WATCHDOG_AUDIO_DOCKER = 30
RUN_WATCHDOG_CLI_DOCKER = 40
RUN_WATCHDOG_MULTICAST_DOCKER = 50
RUN_WATCHDOG_ADDON_DOCKER = 30
RUN_WATCHDOG_ADDON_APPLICATON = 90
class Tasks(CoreSysAttributes):
"""Handle Tasks inside Supervisor."""
@ -126,6 +130,16 @@ class Tasks(CoreSysAttributes):
self._watchdog_multicast_docker, RUN_WATCHDOG_MULTICAST_DOCKER
)
)
self.jobs.add(
self.sys_scheduler.register_task(
self._watchdog_addon_docker, RUN_WATCHDOG_ADDON_DOCKER
)
)
self.jobs.add(
self.sys_scheduler.register_task(
self._watchdog_addon_application, RUN_WATCHDOG_ADDON_APPLICATON
)
)
_LOGGER.info("All core tasks are scheduled")
@ -140,7 +154,7 @@ class Tasks(CoreSysAttributes):
continue
if not addon.test_update_schema():
_LOGGER.warning(
"Add-on %s will be ignored, schema tests fails", addon.slug
"Add-on %s will be ignored, schema tests failed", addon.slug
)
continue
@ -169,7 +183,7 @@ class Tasks(CoreSysAttributes):
"""Check running state of Docker and start if they is close."""
# if Home Assistant is active
if (
not await self.sys_homeassistant.core.is_fails()
not await self.sys_homeassistant.core.is_failed()
or not self.sys_homeassistant.watchdog
or self.sys_homeassistant.error_state
):
@ -185,8 +199,9 @@ class Tasks(CoreSysAttributes):
_LOGGER.warning("Watchdog found a problem with Home Assistant Docker!")
try:
await self.sys_homeassistant.core.start()
except HomeAssistantError:
_LOGGER.error("Watchdog Home Assistant reanimation fails!")
except HomeAssistantError as err:
_LOGGER.error("Watchdog Home Assistant reanimation failed!")
self.sys_capture_exception(err)
async def _watchdog_homeassistant_api(self):
"""Create scheduler task for monitoring running state of API.
@ -196,7 +211,7 @@ class Tasks(CoreSysAttributes):
"""
# If Home-Assistant is active
if (
not await self.sys_homeassistant.core.is_fails()
not await self.sys_homeassistant.core.is_failed()
or not self.sys_homeassistant.watchdog
or self.sys_homeassistant.error_state
):
@ -222,8 +237,9 @@ class Tasks(CoreSysAttributes):
_LOGGER.error("Watchdog found a problem with Home Assistant API!")
try:
await self.sys_homeassistant.core.restart()
except HomeAssistantError:
_LOGGER.error("Watchdog Home Assistant reanimation fails!")
except HomeAssistantError as err:
_LOGGER.error("Watchdog Home Assistant reanimation failed!")
self.sys_capture_exception(err)
finally:
self._cache[HASS_WATCHDOG_API] = 0
@ -266,16 +282,16 @@ class Tasks(CoreSysAttributes):
return
_LOGGER.warning("Watchdog found a problem with CoreDNS plugin!")
# Reset of fails
if await self.sys_plugins.dns.is_fails():
_LOGGER.error("CoreDNS plugin is in fails state / Reset config")
# Reset of failed
if await self.sys_plugins.dns.is_failed():
_LOGGER.error("CoreDNS plugin is in failed state / Reset config")
await self.sys_plugins.dns.reset()
await self.sys_plugins.dns.loop_detection()
try:
await self.sys_plugins.dns.start()
except CoreDNSError:
_LOGGER.error("Watchdog CoreDNS reanimation fails!")
_LOGGER.error("Watchdog CoreDNS reanimation failed!")
async def _watchdog_audio_docker(self):
"""Check running state of Docker and start if they is close."""
@ -290,7 +306,7 @@ class Tasks(CoreSysAttributes):
try:
await self.sys_plugins.audio.start()
except AudioError:
_LOGGER.error("Watchdog PulseAudio reanimation fails!")
_LOGGER.error("Watchdog PulseAudio reanimation failed!")
async def _watchdog_cli_docker(self):
"""Check running state of Docker and start if they is close."""
@ -302,7 +318,7 @@ class Tasks(CoreSysAttributes):
try:
await self.sys_plugins.cli.start()
except CliError:
_LOGGER.error("Watchdog cli reanimation fails!")
_LOGGER.error("Watchdog cli reanimation failed!")
async def _watchdog_multicast_docker(self):
"""Check running state of Docker and start if they is close."""
@ -317,4 +333,54 @@ class Tasks(CoreSysAttributes):
try:
await self.sys_plugins.multicast.start()
except MulticastError:
_LOGGER.error("Watchdog Multicast reanimation fails!")
_LOGGER.error("Watchdog Multicast reanimation failed!")
async def _watchdog_addon_docker(self):
"""Check running state of Docker and start if they is close."""
for addon in self.sys_addons.installed:
# if watchdog need looking for
if not addon.watchdog or await addon.is_running():
continue
# if Addon have running actions
if addon.in_progress or addon.state != AddonState.STARTED:
continue
_LOGGER.warning("Watchdog found a problem with %s!", addon.slug)
try:
await addon.start()
except AddonsError as err:
_LOGGER.error("Watchdog %s reanimation failed!", addon.slug)
self.sys_capture_exception(err)
async def _watchdog_addon_application(self):
"""Check running state of the application and start if they is hangs."""
for addon in self.sys_addons.installed:
# if watchdog need looking for
if not addon.watchdog or addon.state != AddonState.STARTED:
continue
# Init cache data
retry_scan = self._cache.get(addon.slug, 0)
# if Addon have running actions / Application work
if addon.in_progress or await addon.watchdog_application():
continue
# Look like we run into a problem
retry_scan += 1
if retry_scan == 1:
self._cache[addon.slug] = retry_scan
_LOGGER.warning(
"Watchdog missing application response from %s", addon.slug
)
return
_LOGGER.warning("Watchdog found a problem with %s application!", addon.slug)
try:
await addon.restart()
except AddonsError as err:
_LOGGER.error("Watchdog %s reanimation failed!", addon.slug)
self.sys_capture_exception(err)
finally:
self._cache[addon.slug] = 0

View File

@ -155,7 +155,7 @@ class Audio(JsonConfig, CoreSysAttributes):
try:
await self.instance.update(version, image=self.sys_updater.image_audio)
except DockerAPIError as err:
_LOGGER.error("Audio update fails")
_LOGGER.error("Audio update failed")
raise AudioUpdateError() from err
else:
self.version = version
@ -226,7 +226,7 @@ class Audio(JsonConfig, CoreSysAttributes):
try:
await self.instance.install(self.version)
except DockerAPIError:
_LOGGER.error("Repairing of Audio fails")
_LOGGER.error("Repairing of Audio failed")
def pulse_client(self, input_profile=None, output_profile=None) -> str:
"""Generate an /etc/pulse/client.conf data."""

View File

@ -133,7 +133,7 @@ class HaCli(CoreSysAttributes, JsonConfig):
version, image=self.sys_updater.image_cli, latest=True
)
except DockerAPIError as err:
_LOGGER.error("HA cli update fails")
_LOGGER.error("HA cli update failed")
raise CliUpdateError() from err
else:
self.version = version
@ -193,4 +193,4 @@ class HaCli(CoreSysAttributes, JsonConfig):
try:
await self.instance.install(self.version, latest=True)
except DockerAPIError:
_LOGGER.error("Repairing of HA cli fails")
_LOGGER.error("Repairing of HA cli failed")

View File

@ -197,7 +197,7 @@ class CoreDNS(JsonConfig, CoreSysAttributes):
try:
await self.instance.update(version, image=self.sys_updater.image_dns)
except DockerAPIError as err:
_LOGGER.error("CoreDNS update fails")
_LOGGER.error("CoreDNS update failed")
raise CoreDNSUpdateError() from err
else:
self.version = version
@ -404,12 +404,12 @@ class CoreDNS(JsonConfig, CoreSysAttributes):
"""
return self.instance.is_running()
def is_fails(self) -> Awaitable[bool]:
"""Return True if a Docker container is fails state.
def is_failed(self) -> Awaitable[bool]:
"""Return True if a Docker container is failed state.
Return a coroutine.
"""
return self.instance.is_fails()
return self.instance.is_failed()
async def repair(self) -> None:
"""Repair CoreDNS plugin."""
@ -420,7 +420,7 @@ class CoreDNS(JsonConfig, CoreSysAttributes):
try:
await self.instance.install(self.version)
except DockerAPIError:
_LOGGER.error("Repairing of CoreDNS fails")
_LOGGER.error("Repairing of CoreDNS failed")
def _write_resolv(self, resolv_conf: Path) -> None:
"""Update/Write resolv.conf file."""

View File

@ -128,7 +128,7 @@ class Multicast(JsonConfig, CoreSysAttributes):
try:
await self.instance.update(version, image=self.sys_updater.image_multicast)
except DockerAPIError as err:
_LOGGER.error("Multicast update fails")
_LOGGER.error("Multicast update failed")
raise MulticastUpdateError() from err
else:
self.version = version
@ -190,12 +190,12 @@ class Multicast(JsonConfig, CoreSysAttributes):
"""
return self.instance.is_running()
def is_fails(self) -> Awaitable[bool]:
"""Return True if a Docker container is fails state.
def is_failed(self) -> Awaitable[bool]:
"""Return True if a Docker container is failed state.
Return a coroutine.
"""
return self.instance.is_fails()
return self.instance.is_failed()
async def repair(self) -> None:
"""Repair Multicast plugin."""
@ -206,4 +206,4 @@ class Multicast(JsonConfig, CoreSysAttributes):
try:
await self.instance.install(self.version)
except DockerAPIError:
_LOGGER.error("Repairing of Multicast fails")
_LOGGER.error("Repairing of Multicast failed")

View File

@ -122,7 +122,7 @@ class Supervisor(CoreSysAttributes):
self.sys_updater.image_supervisor, version
)
except DockerAPIError as err:
_LOGGER.error("Update of Supervisor fails!")
_LOGGER.error("Update of Supervisor failed!")
raise SupervisorUpdateError() from err
else:
self.sys_config.version = version
@ -160,4 +160,4 @@ class Supervisor(CoreSysAttributes):
try:
await self.instance.retag()
except DockerAPIError:
_LOGGER.error("Repairing of Supervisor fails")
_LOGGER.error("Repairing of Supervisor failed")

View File

@ -99,18 +99,9 @@ def validate_repository(repository: str) -> str:
# pylint: disable=no-value-for-parameter
repositories = vol.All([validate_repository], vol.Unique())
DOCKER_PORTS = vol.Schema(
{
vol.All(vol.Coerce(str), vol.Match(r"^\d+(?:/tcp|/udp)?$")): vol.Maybe(
network_port
)
}
)
DOCKER_PORTS_DESCRIPTION = vol.Schema(
{vol.All(vol.Coerce(str), vol.Match(r"^\d+(?:/tcp|/udp)?$")): vol.Coerce(str)}
)
docker_port = vol.All(str, vol.Match(r"^\d+(?:/tcp|/udp)?$"))
docker_ports = vol.Schema({docker_port: vol.Maybe(network_port)})
docker_ports_description = vol.Schema({docker_port: vol.Coerce(str)})
# pylint: disable=no-value-for-parameter

View File

@ -154,3 +154,16 @@ def test_invalid_machine():
with pytest.raises(vol.Invalid):
assert vd.SCHEMA_ADDON_CONFIG(config)
def test_watchdog_url():
"""Test Valid watchdog options."""
config = load_json_fixture("basic-addon-config.json")
for test_options in (
"tcp://[HOST]:[PORT:8123]",
"http://[HOST]:[PORT:8080]/health",
"https://[HOST]:[PORT:80]/",
):
config["watchdog"] = test_options
assert vd.SCHEMA_ADDON_CONFIG(config)

View File

@ -0,0 +1,72 @@
"""Test Supervisor scheduler backend."""
import asyncio
from supervisor.const import CoreStates
async def test_simple_task(coresys):
"""Schedule a simple task."""
coresys.core.state = CoreStates.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, False)
await asyncio.sleep(0.3)
assert len(trigger) == 1
async def test_simple_task_repeat(coresys):
"""Schedule a simple task and repeat."""
coresys.core.state = CoreStates.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
assert len(trigger) > 1
async def test_simple_task_shutdown(coresys):
"""Schedule a simple task with shudown."""
coresys.core.state = CoreStates.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
await coresys.scheduler.shutdown()
assert len(trigger) > 1
old = len(trigger)
await asyncio.sleep(0.2)
assert len(trigger) == old
async def test_simple_task_repeat_block(coresys):
"""Schedule a simple task with repeat and block."""
coresys.core.state = CoreStates.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
await asyncio.sleep(2)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
assert len(trigger) == 1
await coresys.scheduler.shutdown()