Add job group execution limit option (#4457)

* Add job group execution limit option

* Fix pylint issues

* Assign variable before usage

* Cleanup jobs when done

* Remove isinstance check for performance

* Explicitly raise from None

* Add some more documentation info
This commit is contained in:
Mike Degatano 2023-08-08 16:49:17 -04:00 committed by GitHub
parent 71077fb0f7
commit 1611beccd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 615 additions and 244 deletions

View File

@ -791,10 +791,7 @@ class Addon(AddonModel):
raise AddonsError() from err raise AddonsError() from err
async def write_stdin(self, data) -> None: async def write_stdin(self, data) -> None:
"""Write data to add-on stdin. """Write data to add-on stdin."""
Return a coroutine.
"""
if not self.with_stdin: if not self.with_stdin:
raise AddonsNotSupportedError( raise AddonsNotSupportedError(
f"Add-on {self.slug} does not support writing to stdin!", _LOGGER.error f"Add-on {self.slug} does not support writing to stdin!", _LOGGER.error
@ -889,7 +886,10 @@ class Addon(AddonModel):
await self._backup_command(self.backup_pre) await self._backup_command(self.backup_pre)
elif is_running and self.backup_mode == AddonBackupMode.COLD: elif is_running and self.backup_mode == AddonBackupMode.COLD:
_LOGGER.info("Shutdown add-on %s for cold backup", self.slug) _LOGGER.info("Shutdown add-on %s for cold backup", self.slug)
await self.instance.stop() try:
await self.instance.stop()
except DockerError as err:
raise AddonsError() from err
try: try:
_LOGGER.info("Building backup for add-on %s", self.slug) _LOGGER.info("Building backup for add-on %s", self.slug)

View File

@ -36,14 +36,15 @@ from ..exceptions import (
CoreDNSError, CoreDNSError,
DBusError, DBusError,
DockerError, DockerError,
DockerJobError,
DockerNotFound, DockerNotFound,
HardwareNotFound, HardwareNotFound,
) )
from ..hardware.const import PolicyGroup from ..hardware.const import PolicyGroup
from ..hardware.data import Device from ..hardware.data import Device
from ..jobs.decorator import Job, JobCondition, JobExecutionLimit from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.decorator import Job
from ..resolution.const import ContextType, IssueType, SuggestionType from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils import process_lock
from ..utils.sentry import capture_exception from ..utils.sentry import capture_exception
from .const import ( from .const import (
ENV_TIME, ENV_TIME,
@ -73,8 +74,8 @@ class DockerAddon(DockerInterface):
def __init__(self, coresys: CoreSys, addon: Addon): def __init__(self, coresys: CoreSys, addon: Addon):
"""Initialize Docker Home Assistant wrapper.""" """Initialize Docker Home Assistant wrapper."""
super().__init__(coresys)
self.addon: Addon = addon self.addon: Addon = addon
super().__init__(coresys)
self._hw_listener: EventListener | None = None self._hw_listener: EventListener | None = None
@ -493,7 +494,8 @@ class DockerAddon(DockerInterface):
return mounts return mounts
async def _run(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
if await self.is_running(): if await self.is_running():
return return
@ -503,7 +505,7 @@ class DockerAddon(DockerInterface):
_LOGGER.warning("%s running with disabled protected mode!", self.addon.name) _LOGGER.warning("%s running with disabled protected mode!", self.addon.name)
# Cleanup # Cleanup
await self._stop() await self.stop()
# Don't set a hostname if no separate UTS namespace is used # Don't set a hostname if no separate UTS namespace is used
hostname = None if self.uts_mode else self.addon.hostname hostname = None if self.uts_mode else self.addon.hostname
@ -563,7 +565,8 @@ class DockerAddon(DockerInterface):
BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events
) )
async def _update( @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None: ) -> None:
"""Update a docker image.""" """Update a docker image."""
@ -574,15 +577,16 @@ class DockerAddon(DockerInterface):
) )
# Update docker image # Update docker image
await self._install( await self.install(
version, image=image, latest=latest, need_build=self.addon.latest_need_build version, image=image, latest=latest, need_build=self.addon.latest_need_build
) )
# Stop container & cleanup # Stop container & cleanup
with suppress(DockerError): with suppress(DockerError):
await self._stop() await self.stop()
async def _install( @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def install(
self, self,
version: AwesomeVersion, version: AwesomeVersion,
image: str | None = None, image: str | None = None,
@ -595,7 +599,7 @@ class DockerAddon(DockerInterface):
if need_build is None and self.addon.need_build or need_build: if need_build is None and self.addon.need_build or need_build:
await self._build(version) await self._build(version)
else: else:
await super()._install(version, image, latest, arch) await super().install(version, image, latest, arch)
async def _build(self, version: AwesomeVersion) -> None: async def _build(self, version: AwesomeVersion) -> None:
"""Build a Docker container.""" """Build a Docker container."""
@ -632,14 +636,14 @@ class DockerAddon(DockerInterface):
_LOGGER.info("Build %s:%s done", self.image, version) _LOGGER.info("Build %s:%s done", self.image, version)
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def export_image(self, tar_file: Path) -> Awaitable[None]: def export_image(self, tar_file: Path) -> Awaitable[None]:
"""Export current images into a tar file.""" """Export current images into a tar file."""
return self.sys_run_in_executor( return self.sys_run_in_executor(
self.sys_docker.export_image, self.image, self.version, tar_file self.sys_docker.export_image, self.image, self.version, tar_file
) )
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def import_image(self, tar_file: Path) -> None: async def import_image(self, tar_file: Path) -> None:
"""Import a tar file as image.""" """Import a tar file as image."""
docker_image = await self.sys_run_in_executor( docker_image = await self.sys_run_in_executor(
@ -650,9 +654,9 @@ class DockerAddon(DockerInterface):
_LOGGER.info("Importing image %s and version %s", tar_file, self.version) _LOGGER.info("Importing image %s and version %s", tar_file, self.version)
with suppress(DockerError): with suppress(DockerError):
await self._cleanup() await self.cleanup()
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def write_stdin(self, data: bytes) -> None: async def write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin.""" """Write to add-on stdin."""
if not await self.is_running(): if not await self.is_running():
@ -682,7 +686,8 @@ class DockerAddon(DockerInterface):
_LOGGER.error("Can't write to %s stdin: %s", self.name, err) _LOGGER.error("Can't write to %s stdin: %s", self.name, err)
raise DockerError() from err raise DockerError() from err
async def _stop(self, remove_container: bool = True) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container.""" """Stop/remove Docker container."""
# DNS # DNS
if self.ip_address != NO_ADDDRESS: if self.ip_address != NO_ADDDRESS:
@ -697,7 +702,7 @@ class DockerAddon(DockerInterface):
self.sys_bus.remove_listener(self._hw_listener) self.sys_bus.remove_listener(self._hw_listener)
self._hw_listener = None self._hw_listener = None
await super()._stop(remove_container) await super().stop(remove_container)
async def _validate_trust( async def _validate_trust(
self, image_id: str, image: str, version: AwesomeVersion self, image_id: str, image: str, version: AwesomeVersion

View File

@ -6,7 +6,10 @@ from docker.types import Mount
from ..const import DOCKER_CPU_RUNTIME_ALLOCATION, MACHINE_ID from ..const import DOCKER_CPU_RUNTIME_ALLOCATION, MACHINE_ID
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..hardware.const import PolicyGroup from ..hardware.const import PolicyGroup
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ( from .const import (
ENV_TIME, ENV_TIME,
MOUNT_DBUS, MOUNT_DBUS,
@ -82,13 +85,14 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
return None return None
return DOCKER_CPU_RUNTIME_ALLOCATION return DOCKER_CPU_RUNTIME_ALLOCATION
async def _run(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
if await self.is_running(): if await self.is_running():
return return
# Cleanup # Cleanup
await self._stop() await self.stop()
# Create & Run container # Create & Run container
docker_container = await self.sys_run_in_executor( docker_container = await self.sys_run_in_executor(

View File

@ -2,6 +2,9 @@
import logging import logging
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ENV_TIME, ENV_TOKEN from .const import ENV_TIME, ENV_TOKEN
from .interface import DockerInterface from .interface import DockerInterface
@ -23,13 +26,14 @@ class DockerCli(DockerInterface, CoreSysAttributes):
"""Return name of Docker container.""" """Return name of Docker container."""
return CLI_DOCKER_NAME return CLI_DOCKER_NAME
async def _run(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
if await self.is_running(): if await self.is_running():
return return
# Cleanup # Cleanup
await self._stop() await self.stop()
# Create & Run container # Create & Run container
docker_container = await self.sys_run_in_executor( docker_container = await self.sys_run_in_executor(

View File

@ -4,6 +4,9 @@ import logging
from docker.types import Mount from docker.types import Mount
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ENV_TIME, MOUNT_DBUS, MountType from .const import ENV_TIME, MOUNT_DBUS, MountType
from .interface import DockerInterface from .interface import DockerInterface
@ -25,13 +28,14 @@ class DockerDNS(DockerInterface, CoreSysAttributes):
"""Return name of Docker container.""" """Return name of Docker container."""
return DNS_DOCKER_NAME return DNS_DOCKER_NAME
async def _run(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
if await self.is_running(): if await self.is_running():
return return
# Cleanup # Cleanup
await self._stop() await self.stop()
# Create & Run container # Create & Run container
docker_container = await self.sys_run_in_executor( docker_container = await self.sys_run_in_executor(

View File

@ -7,9 +7,11 @@ from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from docker.types import Mount from docker.types import Mount
from ..const import LABEL_MACHINE, MACHINE_ID from ..const import LABEL_MACHINE, MACHINE_ID
from ..exceptions import DockerJobError
from ..hardware.const import PolicyGroup from ..hardware.const import PolicyGroup
from ..homeassistant.const import LANDINGPAGE from ..homeassistant.const import LANDINGPAGE
from ..utils import process_lock from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ( from .const import (
ENV_TIME, ENV_TIME,
ENV_TOKEN, ENV_TOKEN,
@ -131,13 +133,14 @@ class DockerHomeAssistant(DockerInterface):
return mounts return mounts
async def _run(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
if await self.is_running(): if await self.is_running():
return return
# Cleanup # Cleanup
await self._stop() await self.stop()
# Create & Run container # Create & Run container
docker_container = await self.sys_run_in_executor( docker_container = await self.sys_run_in_executor(
@ -173,7 +176,7 @@ class DockerHomeAssistant(DockerInterface):
"Starting Home Assistant %s with version %s", self.image, self.version "Starting Home Assistant %s with version %s", self.image, self.version
) )
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def execute_command(self, command: str) -> CommandReturn: async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command.""" """Create a temporary container and run command."""
return await self.sys_run_in_executor( return await self.sys_run_in_executor(

View File

@ -8,6 +8,7 @@ import logging
import re import re
from time import time from time import time
from typing import Any from typing import Any
from uuid import uuid4
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
from awesomeversion.strategy import AwesomeVersionStrategy from awesomeversion.strategy import AwesomeVersionStrategy
@ -24,18 +25,21 @@ from ..const import (
BusEvent, BusEvent,
CpuArch, CpuArch,
) )
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys
from ..exceptions import ( from ..exceptions import (
CodeNotaryError, CodeNotaryError,
CodeNotaryUntrusted, CodeNotaryUntrusted,
DockerAPIError, DockerAPIError,
DockerError, DockerError,
DockerJobError,
DockerNotFound, DockerNotFound,
DockerRequestError, DockerRequestError,
DockerTrustError, DockerTrustError,
) )
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType, SuggestionType from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils import process_lock
from ..utils.sentry import capture_exception from ..utils.sentry import capture_exception
from .const import ContainerState, RestartPolicy from .const import ContainerState, RestartPolicy
from .manager import CommandReturn from .manager import CommandReturn
@ -73,11 +77,12 @@ def _container_state_from_model(docker_container: Container) -> ContainerState:
return ContainerState.STOPPED return ContainerState.STOPPED
class DockerInterface(CoreSysAttributes): class DockerInterface(JobGroup):
"""Docker Supervisor interface.""" """Docker Supervisor interface."""
def __init__(self, coresys: CoreSys): def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper.""" """Initialize Docker base wrapper."""
super().__init__(coresys, f"container_{self.name or uuid4().hex}")
self.coresys: CoreSys = coresys self.coresys: CoreSys = coresys
self._meta: dict[str, Any] | None = None self._meta: dict[str, Any] | None = None
self.lock: asyncio.Lock = asyncio.Lock() self.lock: asyncio.Lock = asyncio.Lock()
@ -204,25 +209,15 @@ class DockerInterface(CoreSysAttributes):
await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials) await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials)
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def install( async def install(
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
arch: CpuArch | None = None,
) -> Awaitable[None]:
"""Pull docker image."""
return self._install(version, image, latest, arch)
async def _install(
self, self,
version: AwesomeVersion, version: AwesomeVersion,
image: str | None = None, image: str | None = None,
latest: bool = False, latest: bool = False,
arch: CpuArch | None = None, arch: CpuArch | None = None,
) -> None: ) -> None:
"""Pull Docker image.""" """Pull docker image."""
image = image or self.image image = image or self.image
arch = arch or self.sys_arch.supervisor arch = arch or self.sys_arch.supervisor
@ -328,17 +323,11 @@ class DockerInterface(CoreSysAttributes):
return _container_state_from_model(docker_container) return _container_state_from_model(docker_container)
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def attach( async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> Awaitable[None]:
"""Attach to running Docker container."""
return self._attach(version, skip_state_event_if_down)
async def _attach(
self, version: AwesomeVersion, skip_state_event_if_down: bool = False
) -> None: ) -> None:
"""Attach to running docker container.""" """Attach to running Docker container."""
with suppress(docker.errors.DockerException, requests.RequestException): with suppress(docker.errors.DockerException, requests.RequestException):
docker_container = await self.sys_run_in_executor( docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name self.sys_docker.containers.get, self.name
@ -370,21 +359,13 @@ class DockerInterface(CoreSysAttributes):
raise DockerError() raise DockerError()
_LOGGER.info("Attaching to %s with version %s", self.image, self.version) _LOGGER.info("Attaching to %s with version %s", self.image, self.version)
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def run(self) -> Awaitable[None]: async def run(self) -> None:
"""Run Docker image."""
return self._run()
async def _run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
raise NotImplementedError() raise NotImplementedError()
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def stop(self, remove_container: bool = True) -> Awaitable[None]: async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
return self._stop(remove_container)
async def _stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container.""" """Stop/remove Docker container."""
with suppress(DockerNotFound): with suppress(DockerNotFound):
await self.sys_run_in_executor( await self.sys_run_in_executor(
@ -394,34 +375,28 @@ class DockerInterface(CoreSysAttributes):
remove_container, remove_container,
) )
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def start(self) -> Awaitable[None]: def start(self) -> Awaitable[None]:
"""Start Docker container.""" """Start Docker container."""
return self.sys_run_in_executor(self.sys_docker.start_container, self.name) return self.sys_run_in_executor(self.sys_docker.start_container, self.name)
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def remove(self) -> None: async def remove(self) -> None:
"""Remove Docker images.""" """Remove Docker images."""
# Cleanup container # Cleanup container
with suppress(DockerError): with suppress(DockerError):
await self._stop() await self.stop()
await self.sys_run_in_executor( await self.sys_run_in_executor(
self.sys_docker.remove_image, self.image, self.version self.sys_docker.remove_image, self.image, self.version
) )
self._meta = None self._meta = None
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def update( async def update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> Awaitable[None]:
"""Update a Docker image."""
return self._update(version, image, latest)
async def _update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None: ) -> None:
"""Update a docker image.""" """Update a Docker image."""
image = image or self.image image = image or self.image
_LOGGER.info( _LOGGER.info(
@ -429,11 +404,11 @@ class DockerInterface(CoreSysAttributes):
) )
# Update docker image # Update docker image
await self._install(version, image=image, latest=latest) await self.install(version, image=image, latest=latest)
# Stop container & cleanup # Stop container & cleanup
with suppress(DockerError): with suppress(DockerError):
await self._stop() await self.stop()
async def logs(self) -> bytes: async def logs(self) -> bytes:
"""Return Docker logs of container.""" """Return Docker logs of container."""
@ -444,12 +419,8 @@ class DockerInterface(CoreSysAttributes):
return b"" return b""
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def cleanup(self, old_image: str | None = None) -> Awaitable[None]: def cleanup(self, old_image: str | None = None) -> Awaitable[None]:
"""Check if old version exists and cleanup."""
return self._cleanup(old_image)
def _cleanup(self, old_image: str | None = None) -> Awaitable[None]:
"""Check if old version exists and cleanup.""" """Check if old version exists and cleanup."""
return self.sys_run_in_executor( return self.sys_run_in_executor(
self.sys_docker.cleanup_old_images, self.sys_docker.cleanup_old_images,
@ -458,14 +429,14 @@ class DockerInterface(CoreSysAttributes):
{old_image} if old_image else None, {old_image} if old_image else None,
) )
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def restart(self) -> Awaitable[None]: def restart(self) -> Awaitable[None]:
"""Restart docker container.""" """Restart docker container."""
return self.sys_run_in_executor( return self.sys_run_in_executor(
self.sys_docker.restart_container, self.name, self.timeout self.sys_docker.restart_container, self.name, self.timeout
) )
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def execute_command(self, command: str) -> CommandReturn: async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command.""" """Create a temporary container and run command."""
raise NotImplementedError() raise NotImplementedError()
@ -526,7 +497,7 @@ class DockerInterface(CoreSysAttributes):
available_version.sort(reverse=True) available_version.sort(reverse=True)
return available_version[0] return available_version[0]
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def run_inside(self, command: str) -> Awaitable[CommandReturn]: def run_inside(self, command: str) -> Awaitable[CommandReturn]:
"""Execute a command inside Docker container.""" """Execute a command inside Docker container."""
return self.sys_run_in_executor( return self.sys_run_in_executor(
@ -540,7 +511,7 @@ class DockerInterface(CoreSysAttributes):
checksum = image_id.partition(":")[2] checksum = image_id.partition(":")[2]
return await self.sys_security.verify_own_content(checksum) return await self.sys_security.verify_own_content(checksum)
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def check_trust(self) -> None: async def check_trust(self) -> None:
"""Check trust of exists Docker image.""" """Check trust of exists Docker image."""
try: try:

View File

@ -2,6 +2,9 @@
import logging import logging
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ENV_TIME, Capabilities from .const import ENV_TIME, Capabilities
from .interface import DockerInterface from .interface import DockerInterface
@ -28,13 +31,14 @@ class DockerMulticast(DockerInterface, CoreSysAttributes):
"""Generate needed capabilities.""" """Generate needed capabilities."""
return [Capabilities.NET_ADMIN.value] return [Capabilities.NET_ADMIN.value]
async def _run(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
if await self.is_running(): if await self.is_running():
return return
# Cleanup # Cleanup
await self._stop() await self.stop()
# Create & Run container # Create & Run container
docker_container = await self.sys_run_in_executor( docker_container = await self.sys_run_in_executor(

View File

@ -3,6 +3,9 @@ import logging
from ..const import DOCKER_NETWORK_MASK from ..const import DOCKER_NETWORK_MASK
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ENV_TIME, ENV_TOKEN, MOUNT_DOCKER, RestartPolicy from .const import ENV_TIME, ENV_TOKEN, MOUNT_DOCKER, RestartPolicy
from .interface import DockerInterface from .interface import DockerInterface
@ -25,13 +28,14 @@ class DockerObserver(DockerInterface, CoreSysAttributes):
"""Return name of Docker container.""" """Return name of Docker container."""
return OBSERVER_DOCKER_NAME return OBSERVER_DOCKER_NAME
async def _run(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image.""" """Run Docker image."""
if await self.is_running(): if await self.is_running():
return return
# Cleanup # Cleanup
await self._stop() await self.stop()
# Create & Run container # Create & Run container
docker_container = await self.sys_run_in_executor( docker_container = await self.sys_run_in_executor(

View File

@ -9,7 +9,9 @@ import docker
import requests import requests
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import DockerError from ..exceptions import DockerError, DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import PropagationMode from .const import PropagationMode
from .interface import DockerInterface from .interface import DockerInterface
@ -43,8 +45,9 @@ class DockerSupervisor(DockerInterface, CoreSysAttributes):
if mount.get("Destination") == "/data" if mount.get("Destination") == "/data"
) )
async def _attach( @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
self, version: AwesomeVersion, skip_state_event_if_down: bool = False async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> None: ) -> None:
"""Attach to running docker container.""" """Attach to running docker container."""
try: try:

View File

@ -36,6 +36,18 @@ class JobConditionException(JobException):
"""Exception happening for job conditions.""" """Exception happening for job conditions."""
class JobStartException(JobException):
"""Exception occurred starting a job on in current asyncio task."""
class JobNotFound(JobException):
"""Exception for job not found."""
class JobGroupExecutionLimitExceeded(JobException):
"""Exception when job group execution limit exceeded."""
# HomeAssistant # HomeAssistant
@ -478,6 +490,10 @@ class DockerNotFound(DockerError):
"""Docker object don't Exists.""" """Docker object don't Exists."""
class DockerJobError(DockerError, JobException):
"""Error executing docker job."""
# Hardware # Hardware

View File

@ -11,7 +11,7 @@ import attr
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
from ..const import ATTR_HOMEASSISTANT, BusEvent from ..const import ATTR_HOMEASSISTANT, BusEvent
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys
from ..docker.const import ContainerState from ..docker.const import ContainerState
from ..docker.homeassistant import DockerHomeAssistant from ..docker.homeassistant import DockerHomeAssistant
from ..docker.monitor import DockerContainerStateEvent from ..docker.monitor import DockerContainerStateEvent
@ -22,11 +22,13 @@ from ..exceptions import (
HomeAssistantError, HomeAssistantError,
HomeAssistantJobError, HomeAssistantJobError,
HomeAssistantUpdateError, HomeAssistantUpdateError,
JobException,
) )
from ..jobs.const import JobExecutionLimit from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job, JobCondition from ..jobs.decorator import Job, JobCondition
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType from ..resolution.const import ContextType, IssueType
from ..utils import convert_to_ascii, process_lock from ..utils import convert_to_ascii
from ..utils.sentry import capture_exception from ..utils.sentry import capture_exception
from .const import ( from .const import (
LANDINGPAGE, LANDINGPAGE,
@ -49,12 +51,12 @@ class ConfigResult:
log = attr.ib() log = attr.ib()
class HomeAssistantCore(CoreSysAttributes): class HomeAssistantCore(JobGroup):
"""Home Assistant core object for handle it.""" """Home Assistant core object for handle it."""
def __init__(self, coresys: CoreSys): def __init__(self, coresys: CoreSys):
"""Initialize Home Assistant object.""" """Initialize Home Assistant object."""
self.coresys: CoreSys = coresys super().__init__(coresys, "home_assistant_core")
self.instance: DockerHomeAssistant = DockerHomeAssistant(coresys) self.instance: DockerHomeAssistant = DockerHomeAssistant(coresys)
self.lock: asyncio.Lock = asyncio.Lock() self.lock: asyncio.Lock = asyncio.Lock()
self._error_state: bool = False self._error_state: bool = False
@ -95,9 +97,9 @@ class HomeAssistantCore(CoreSysAttributes):
_LOGGER.info("Starting HomeAssistant landingpage") _LOGGER.info("Starting HomeAssistant landingpage")
if not await self.instance.is_running(): if not await self.instance.is_running():
with suppress(HomeAssistantError): with suppress(HomeAssistantError):
await self._start() await self.start()
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
async def install_landingpage(self) -> None: async def install_landingpage(self) -> None:
"""Install a landing page.""" """Install a landing page."""
# Try to use a preinstalled landingpage # Try to use a preinstalled landingpage
@ -127,7 +129,7 @@ class HomeAssistantCore(CoreSysAttributes):
LANDINGPAGE, image=self.sys_updater.image_homeassistant LANDINGPAGE, image=self.sys_updater.image_homeassistant
) )
break break
except DockerError: except (DockerError, JobException):
pass pass
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
capture_exception(err) capture_exception(err)
@ -139,7 +141,7 @@ class HomeAssistantCore(CoreSysAttributes):
self.sys_homeassistant.image = self.sys_updater.image_homeassistant self.sys_homeassistant.image = self.sys_updater.image_homeassistant
self.sys_homeassistant.save_data() self.sys_homeassistant.save_data()
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
async def install(self) -> None: async def install(self) -> None:
"""Install a landing page.""" """Install a landing page."""
_LOGGER.info("Home Assistant setup") _LOGGER.info("Home Assistant setup")
@ -155,7 +157,7 @@ class HomeAssistantCore(CoreSysAttributes):
image=self.sys_updater.image_homeassistant, image=self.sys_updater.image_homeassistant,
) )
break break
except DockerError: except (DockerError, JobException):
pass pass
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
capture_exception(err) capture_exception(err)
@ -171,7 +173,7 @@ class HomeAssistantCore(CoreSysAttributes):
# finishing # finishing
try: try:
_LOGGER.info("Starting Home Assistant") _LOGGER.info("Starting Home Assistant")
await self._start() await self.start()
except HomeAssistantError: except HomeAssistantError:
_LOGGER.error("Can't start Home Assistant!") _LOGGER.error("Can't start Home Assistant!")
@ -179,7 +181,6 @@ class HomeAssistantCore(CoreSysAttributes):
with suppress(DockerError): with suppress(DockerError):
await self.instance.cleanup() await self.instance.cleanup()
@process_lock
@Job( @Job(
conditions=[ conditions=[
JobCondition.FREE_SPACE, JobCondition.FREE_SPACE,
@ -188,6 +189,7 @@ class HomeAssistantCore(CoreSysAttributes):
JobCondition.PLUGINS_UPDATED, JobCondition.PLUGINS_UPDATED,
JobCondition.SUPERVISOR_UPDATED, JobCondition.SUPERVISOR_UPDATED,
], ],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError, on_condition=HomeAssistantJobError,
) )
async def update( async def update(
@ -231,7 +233,7 @@ class HomeAssistantCore(CoreSysAttributes):
self.sys_homeassistant.image = self.sys_updater.image_homeassistant self.sys_homeassistant.image = self.sys_updater.image_homeassistant
if running: if running:
await self._start() await self.start()
_LOGGER.info("Successfully started Home Assistant %s", to_version) _LOGGER.info("Successfully started Home Assistant %s", to_version)
# Successfull - last step # Successfull - last step
@ -281,23 +283,7 @@ class HomeAssistantCore(CoreSysAttributes):
self.sys_resolution.create_issue(IssueType.UPDATE_FAILED, ContextType.CORE) self.sys_resolution.create_issue(IssueType.UPDATE_FAILED, ContextType.CORE)
raise HomeAssistantUpdateError() raise HomeAssistantUpdateError()
async def _start(self) -> None: @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
"""Start Home Assistant Docker & wait."""
# Create new API token
self.sys_homeassistant.supervisor_token = secrets.token_hex(56)
self.sys_homeassistant.save_data()
# Write audio settings
self.sys_homeassistant.write_pulse()
try:
await self.instance.run()
except DockerError as err:
raise HomeAssistantError() from err
await self._block_till_run(self.sys_homeassistant.version)
@process_lock
async def start(self) -> None: async def start(self) -> None:
"""Run Home Assistant docker.""" """Run Home Assistant docker."""
if await self.instance.is_running(): if await self.instance.is_running():
@ -314,9 +300,21 @@ class HomeAssistantCore(CoreSysAttributes):
await self._block_till_run(self.sys_homeassistant.version) await self._block_till_run(self.sys_homeassistant.version)
# No Instance/Container found, extended start # No Instance/Container found, extended start
else: else:
await self._start() # Create new API token
self.sys_homeassistant.supervisor_token = secrets.token_hex(56)
self.sys_homeassistant.save_data()
@process_lock # Write audio settings
self.sys_homeassistant.write_pulse()
try:
await self.instance.run()
except DockerError as err:
raise HomeAssistantError() from err
await self._block_till_run(self.sys_homeassistant.version)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
async def stop(self) -> None: async def stop(self) -> None:
"""Stop Home Assistant Docker.""" """Stop Home Assistant Docker."""
try: try:
@ -324,7 +322,7 @@ class HomeAssistantCore(CoreSysAttributes):
except DockerError as err: except DockerError as err:
raise HomeAssistantError() from err raise HomeAssistantError() from err
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
async def restart(self) -> None: async def restart(self) -> None:
"""Restart Home Assistant Docker.""" """Restart Home Assistant Docker."""
try: try:
@ -334,12 +332,12 @@ class HomeAssistantCore(CoreSysAttributes):
await self._block_till_run(self.sys_homeassistant.version) await self._block_till_run(self.sys_homeassistant.version)
@process_lock @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
async def rebuild(self) -> None: async def rebuild(self) -> None:
"""Rebuild Home Assistant Docker container.""" """Rebuild Home Assistant Docker container."""
with suppress(DockerError): with suppress(DockerError):
await self.instance.stop() await self.instance.stop()
await self._start() await self.start()
def logs(self) -> Awaitable[bytes]: def logs(self) -> Awaitable[bytes]:
"""Get HomeAssistant docker logs. """Get HomeAssistant docker logs.
@ -356,10 +354,7 @@ class HomeAssistantCore(CoreSysAttributes):
return self.instance.check_trust() return self.instance.check_trust()
async def stats(self) -> DockerStats: async def stats(self) -> DockerStats:
"""Return stats of Home Assistant. """Return stats of Home Assistant."""
Return a coroutine.
"""
try: try:
return await self.instance.stats() return await self.instance.stats()
except DockerError as err: except DockerError as err:
@ -386,9 +381,12 @@ class HomeAssistantCore(CoreSysAttributes):
async def check_config(self) -> ConfigResult: async def check_config(self) -> ConfigResult:
"""Run Home Assistant config check.""" """Run Home Assistant config check."""
result = await self.instance.execute_command( try:
"python3 -m homeassistant -c /config --script check_config" result = await self.instance.execute_command(
) "python3 -m homeassistant -c /config --script check_config"
)
except DockerError as err:
raise HomeAssistantError() from err
# If not valid # If not valid
if result.exit_code is None: if result.exit_code is None:

View File

@ -1,53 +1,69 @@
"""Supervisor job manager.""" """Supervisor job manager."""
from collections.abc import Callable
from contextlib import contextmanager
from contextvars import ContextVar, Token
import logging import logging
from uuid import UUID, uuid4
from attrs import define, field
from attrs.setters import frozen
from attrs.validators import ge, le
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import JobNotFound, JobStartException
from ..utils.common import FileConfiguration from ..utils.common import FileConfiguration
from .const import ATTR_IGNORE_CONDITIONS, FILE_CONFIG_JOBS, JobCondition from .const import ATTR_IGNORE_CONDITIONS, FILE_CONFIG_JOBS, JobCondition
from .validate import SCHEMA_JOBS_CONFIG from .validate import SCHEMA_JOBS_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__package__) # Context vars only act as a global within the same asyncio task
# When a new asyncio task is started the current context is copied over.
# Modifications to it in one task are not visible to others though.
# This allows us to track what job is currently in progress in each task.
_CURRENT_JOB: ContextVar[UUID] = ContextVar("current_job")
_LOGGER: logging.Logger = logging.getLogger(__name__)
class SupervisorJob(CoreSysAttributes): @define
"""Supervisor running job class.""" class SupervisorJob:
"""Representation of a job running in supervisor."""
def __init__(self, coresys: CoreSys, name: str): name: str = field(on_setattr=frozen)
"""Initialize the JobManager class.""" progress: int = field(default=0, validator=[ge(0), le(100)])
self.coresys: CoreSys = coresys stage: str | None = None
self.name: str = name uuid: UUID = field(init=False, factory=lambda: uuid4().hex, on_setattr=frozen)
self._progress: int = 0 parent_id: UUID = field(
self._stage: str | None = None init=False, factory=lambda: _CURRENT_JOB.get(None), on_setattr=frozen
)
done: bool = field(init=False, default=False)
@property @contextmanager
def progress(self) -> int: def start(self, *, on_done: Callable[["SupervisorJob"], None] | None = None):
"""Return the current progress.""" """Start the job in the current task.
return self._progress
@property This can only be called if the parent ID matches the job running in the current task.
def stage(self) -> str | None: This is to ensure that each asyncio task can only be doing one job at a time as that
"""Return the current stage.""" determines what resources it can and cannot access.
return self._stage """
if self.done:
raise JobStartException("Job is already complete")
if _CURRENT_JOB.get(None) != self.parent_id:
raise JobStartException("Job has a different parent from current job")
def update(self, progress: int | None = None, stage: str | None = None) -> None: token: Token[UUID] | None = None
"""Update the job object.""" try:
if progress is not None: token = _CURRENT_JOB.set(self.uuid)
if progress >= round(100): yield self
self.sys_jobs.remove_job(self) finally:
return self.done = True
self._progress = round(progress) if token:
if stage is not None: _CURRENT_JOB.reset(token)
self._stage = stage if on_done:
_LOGGER.debug( on_done(self)
"Job updated; name: %s, progress: %s, stage: %s",
self.name,
self.progress,
self.stage,
)
class JobManager(FileConfiguration, CoreSysAttributes): class JobManager(FileConfiguration, CoreSysAttributes):
"""Job class.""" """Job Manager class."""
def __init__(self, coresys: CoreSys): def __init__(self, coresys: CoreSys):
"""Initialize the JobManager class.""" """Initialize the JobManager class."""
@ -58,7 +74,7 @@ class JobManager(FileConfiguration, CoreSysAttributes):
@property @property
def jobs(self) -> list[SupervisorJob]: def jobs(self) -> list[SupervisorJob]:
"""Return a list of current jobs.""" """Return a list of current jobs."""
return self._jobs return list(self._jobs.values())
@property @property
def ignore_conditions(self) -> list[JobCondition]: def ignore_conditions(self) -> list[JobCondition]:
@ -70,18 +86,28 @@ class JobManager(FileConfiguration, CoreSysAttributes):
"""Set a list of ignored condition.""" """Set a list of ignored condition."""
self._data[ATTR_IGNORE_CONDITIONS] = value self._data[ATTR_IGNORE_CONDITIONS] = value
def get_job(self, name: str) -> SupervisorJob: def new_job(self, name: str, initial_stage: str | None = None) -> SupervisorJob:
"""Return a job, create one if it does not exist.""" """Create a new job."""
if name not in self._jobs: job = SupervisorJob(name, stage=initial_stage)
self._jobs[name] = SupervisorJob(self.coresys, name) self._jobs[job.uuid] = job
return job
return self._jobs[name] def get_job(self, uuid: UUID | None = None) -> SupervisorJob | None:
"""Return a job by uuid if it exists. Returns the current job of the asyncio task if uuid omitted."""
if uuid:
return self._jobs.get(uuid)
if uuid := _CURRENT_JOB.get(None):
return self._jobs.get(uuid)
return None
def remove_job(self, job: SupervisorJob) -> None: def remove_job(self, job: SupervisorJob) -> None:
"""Remove a job.""" """Remove a job by UUID."""
if job.name in self._jobs: if job.uuid not in self._jobs:
del self._jobs[job.name] raise JobNotFound(f"Could not find job {job.name}", _LOGGER.error)
def clear(self) -> None: if not job.done:
"""Clear all jobs.""" _LOGGER.warning("Removing incomplete job %s from job manager", job.name)
self._jobs.clear()
del self._jobs[job.uuid]

View File

@ -34,3 +34,5 @@ class JobExecutionLimit(str, Enum):
THROTTLE = "throttle" THROTTLE = "throttle"
THROTTLE_WAIT = "throttle_wait" THROTTLE_WAIT = "throttle_wait"
THROTTLE_RATE_LIMIT = "throttle_rate_limit" THROTTLE_RATE_LIMIT = "throttle_rate_limit"
GROUP_ONCE = "group_once"
GROUP_WAIT = "group_wait"

View File

@ -8,11 +8,17 @@ from typing import Any
from ..const import CoreState from ..const import CoreState
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HassioError, JobConditionException, JobException from ..exceptions import (
HassioError,
JobConditionException,
JobException,
JobGroupExecutionLimitExceeded,
)
from ..host.const import HostFeature from ..host.const import HostFeature
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
from ..utils.sentry import capture_exception from ..utils.sentry import capture_exception
from .const import JobCondition, JobExecutionLimit from .const import JobCondition, JobExecutionLimit
from .job_group import JobGroup
_LOGGER: logging.Logger = logging.getLogger(__package__) _LOGGER: logging.Logger = logging.getLogger(__package__)
@ -44,6 +50,10 @@ class Job(CoreSysAttributes):
self._method = None self._method = None
self._last_call = datetime.min self._last_call = datetime.min
self._rate_limited_calls: list[datetime] | None = None self._rate_limited_calls: list[datetime] | None = None
self._job_group_limit = self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
)
# Validate Options # Validate Options
if ( if (
@ -76,19 +86,28 @@ class Job(CoreSysAttributes):
self.coresys, self._last_call, self._rate_limited_calls self.coresys, self._last_call, self._rate_limited_calls
) )
def _post_init(self, args: tuple[Any]) -> None: def _post_init(self, obj: JobGroup | CoreSysAttributes) -> None:
"""Runtime init.""" """Runtime init."""
if self.name is None: if self.name is None:
self.name = str(self._method.__qualname__).lower().replace(".", "_") self.name = str(self._method.__qualname__).lower().replace(".", "_")
# Coresys # Coresys
try: try:
self.coresys = args[0].coresys self.coresys = obj.coresys
except AttributeError: except AttributeError:
pass pass
if not self.coresys: if not self.coresys:
raise RuntimeError(f"Job on {self.name} need to be an coresys object!") raise RuntimeError(f"Job on {self.name} need to be an coresys object!")
# Job groups
if self._job_group_limit:
try:
_ = obj.acquire and obj.release
except AttributeError:
raise RuntimeError(
f"Job on {self.name} need to be a JobGroup to use group based limits!"
) from None
# Others # Others
if self._lock is None: if self._lock is None:
self._lock = asyncio.Semaphore() self._lock = asyncio.Semaphore()
@ -98,11 +117,15 @@ class Job(CoreSysAttributes):
self._method = method self._method = method
@wraps(method) @wraps(method)
async def wrapper(*args, **kwargs) -> Any: async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any:
"""Wrap the method.""" """Wrap the method.
self._post_init(args)
job = self.sys_jobs.get_job(self.name) This method must be on an instance of CoreSysAttributes. If a JOB_GROUP limit
is used, then it must be on an instance of JobGroup.
"""
self._post_init(obj)
job = self.sys_jobs.new_job(self.name)
# Handle condition # Handle condition
if self.conditions: if self.conditions:
@ -118,6 +141,13 @@ class Job(CoreSysAttributes):
# Handle exection limits # Handle exection limits
if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE): if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE):
await self._acquire_exection_limit() await self._acquire_exection_limit()
elif self._job_group_limit:
try:
await obj.acquire(job, self.limit == JobExecutionLimit.GROUP_WAIT)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit == JobExecutionLimit.THROTTLE: elif self.limit == JobExecutionLimit.THROTTLE:
time_since_last_call = datetime.now() - self._last_call time_since_last_call = datetime.now() - self._last_call
if time_since_last_call < self.throttle_period: if time_since_last_call < self.throttle_period:
@ -146,22 +176,23 @@ class Job(CoreSysAttributes):
) )
# Execute Job # Execute Job
try: with job.start(on_done=self.sys_jobs.remove_job if self.cleanup else None):
self._last_call = datetime.now() try:
if self._rate_limited_calls is not None: self._last_call = datetime.now()
self._rate_limited_calls.append(self._last_call) if self._rate_limited_calls is not None:
self._rate_limited_calls.append(self._last_call)
return await self._method(*args, **kwargs) return await self._method(obj, *args, **kwargs)
except HassioError as err: except HassioError as err:
raise err raise err
except Exception as err: except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err) _LOGGER.exception("Unhandled exception: %s", err)
capture_exception(err) capture_exception(err)
raise JobException() from err raise JobException() from err
finally: finally:
if self.cleanup: self._release_exception_limits()
self.sys_jobs.remove_job(job) if self._job_group_limit:
self._release_exception_limits() obj.release()
return wrapper return wrapper

View File

@ -0,0 +1,73 @@
"""Job group object."""
from asyncio import Lock
from . import SupervisorJob
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import JobException, JobGroupExecutionLimitExceeded
class JobGroup(CoreSysAttributes):
"""Object with methods that require a common lock.
This is used in classes like our DockerInterface class. Where each method
requires a lock as it involves some extensive I/O with Docker. But some
methods may need to call others as a part of processing to complete a
higher-level task and should not need to relinquish the lock in between.
"""
def __init__(self, coresys: CoreSys, group_name: str) -> None:
"""Initialize object."""
self.coresys: CoreSys = coresys
self._group_name: str = group_name
self._lock: Lock = Lock()
self._active_job: SupervisorJob | None = None
self._parent_jobs: list[SupervisorJob] = []
@property
def active_job(self) -> SupervisorJob | None:
"""Get active job ID."""
return self._active_job
@property
def group_name(self) -> str:
"""Return group name."""
return self._group_name
@property
def has_lock(self) -> bool:
"""Return true if current task has the lock on this job group."""
return (
self.active_job
and (task_job := self.sys_jobs.get_job())
and self.active_job == task_job
)
async def acquire(self, job: SupervisorJob, wait: bool = False) -> None:
"""Acquire the lock for the group for the specified job."""
# If there's another job running and we're not waiting, raise
if self.active_job and not self.has_lock and not wait:
raise JobGroupExecutionLimitExceeded(
f"Another job is running for job group {self.group_name}"
)
# Else if we don't have the lock, acquire it
if not self.has_lock:
await self._lock.acquire()
# Store the job ID we acquired the lock for
if self.active_job:
self._parent_jobs.append(self.active_job)
self._active_job = job
def release(self) -> None:
"""Release the lock for the group or return it to parent."""
if not self.has_lock:
raise JobException("Cannot release as caller does not own lock")
if self._parent_jobs:
self._active_job = self._parent_jobs.pop()
else:
self._active_job = None
self._lock.release()

View File

@ -57,7 +57,7 @@ async def test_image_added_removed_on_update(
assert install_addon_ssh.image == "local/amd64-addon-ssh" assert install_addon_ssh.image == "local/amd64-addon-ssh"
assert coresys.addons.store.get(TEST_ADDON_SLUG).image == "test/amd64-my-ssh-addon" assert coresys.addons.store.get(TEST_ADDON_SLUG).image == "test/amd64-my-ssh-addon"
with patch.object(DockerInterface, "_install") as install, patch.object( with patch.object(DockerInterface, "install") as install, patch.object(
DockerAddon, "_build" DockerAddon, "_build"
) as build: ) as build:
await install_addon_ssh.update() await install_addon_ssh.update()
@ -77,7 +77,7 @@ async def test_image_added_removed_on_update(
assert install_addon_ssh.image == "test/amd64-my-ssh-addon" assert install_addon_ssh.image == "test/amd64-my-ssh-addon"
assert coresys.addons.store.get(TEST_ADDON_SLUG).image == "local/amd64-addon-ssh" assert coresys.addons.store.get(TEST_ADDON_SLUG).image == "local/amd64-addon-ssh"
with patch.object(DockerInterface, "_install") as install, patch.object( with patch.object(DockerInterface, "install") as install, patch.object(
DockerAddon, "_build" DockerAddon, "_build"
) as build: ) as build:
await install_addon_ssh.update() await install_addon_ssh.update()
@ -249,7 +249,7 @@ async def test_update(
assert install_addon_ssh.need_update is True assert install_addon_ssh.need_update is True
with patch.object(DockerInterface, "_install"), patch.object( with patch.object(DockerInterface, "install"), patch.object(
DockerAddon, "is_running", return_value=False DockerAddon, "is_running", return_value=False
): ):
start_task = await coresys.addons.update(TEST_ADDON_SLUG) start_task = await coresys.addons.update(TEST_ADDON_SLUG)

View File

@ -177,7 +177,7 @@ async def test_api_store_update_healthcheck(
asyncio.create_task(container_events()) asyncio.create_task(container_events())
with patch.object(DockerAddon, "run", new=container_events_task), patch.object( with patch.object(DockerAddon, "run", new=container_events_task), patch.object(
DockerInterface, "_install" DockerInterface, "install"
), patch.object(DockerAddon, "is_running", return_value=False), patch.object( ), patch.object(DockerAddon, "is_running", return_value=False), patch.object(
CpuArch, "supported", new=PropertyMock(return_value=["amd64"]) CpuArch, "supported", new=PropertyMock(return_value=["amd64"])
): ):

View File

@ -70,6 +70,13 @@ async def path_extern() -> None:
yield yield
@pytest.fixture
async def supervisor_name() -> None:
"""Set env for supervisor name."""
os.environ["SUPERVISOR_NAME"] = "hassio_supervisor"
yield
@pytest.fixture @pytest.fixture
async def docker() -> DockerAPI: async def docker() -> DockerAPI:
"""Mock DockerAPI.""" """Mock DockerAPI."""
@ -286,7 +293,13 @@ async def fixture_all_dbus_services(
@pytest.fixture @pytest.fixture
async def coresys( async def coresys(
event_loop, docker, dbus_session_bus, all_dbus_services, aiohttp_client, run_dir event_loop,
docker,
dbus_session_bus,
all_dbus_services,
aiohttp_client,
run_dir,
supervisor_name,
) -> CoreSys: ) -> CoreSys:
"""Create a CoreSys Mock.""" """Create a CoreSys Mock."""
with patch("supervisor.bootstrap.initialize_system"), patch( with patch("supervisor.bootstrap.initialize_system"), patch(
@ -409,7 +422,9 @@ def sys_supervisor():
@pytest.fixture @pytest.fixture
async def api_client( async def api_client(
aiohttp_client, coresys: CoreSys, request: pytest.FixtureRequest aiohttp_client,
coresys: CoreSys,
request: pytest.FixtureRequest,
) -> TestClient: ) -> TestClient:
"""Fixture for RestAPI client.""" """Fixture for RestAPI client."""
@ -428,9 +443,7 @@ async def api_client(
api = RestAPI(coresys) api = RestAPI(coresys)
api.webapp = web.Application(middlewares=[_security_middleware]) api.webapp = web.Application(middlewares=[_security_middleware])
api.start = AsyncMock() api.start = AsyncMock()
with patch("supervisor.docker.supervisor.os") as os: await api.load()
os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"}
await api.load()
yield await aiohttp_client(api.webapp) yield await aiohttp_client(api.webapp)
@ -593,16 +606,12 @@ async def journald_logs(coresys: CoreSys) -> MagicMock:
@pytest.fixture @pytest.fixture
async def docker_logs(docker: DockerAPI) -> MagicMock: async def docker_logs(docker: DockerAPI, supervisor_name) -> MagicMock:
"""Mock log output for a container from docker.""" """Mock log output for a container from docker."""
container_mock = MagicMock() container_mock = MagicMock()
container_mock.logs.return_value = load_binary_fixture("logs_docker_container.txt") container_mock.logs.return_value = load_binary_fixture("logs_docker_container.txt")
docker.containers.get.return_value = container_mock docker.containers.get.return_value = container_mock
yield container_mock.logs
with patch("supervisor.docker.supervisor.os") as os:
os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"}
yield container_mock.logs
@pytest.fixture @pytest.fixture
@ -640,7 +649,6 @@ async def os_available(request: pytest.FixtureRequest) -> None:
@pytest.fixture @pytest.fixture
async def mount_propagation(docker: DockerAPI, coresys: CoreSys) -> None: async def mount_propagation(docker: DockerAPI, coresys: CoreSys) -> None:
"""Mock supervisor connected to container with propagation set.""" """Mock supervisor connected to container with propagation set."""
os.environ["SUPERVISOR_NAME"] = "hassio_supervisor"
docker.containers.get.return_value = supervisor = MagicMock() docker.containers.get.return_value = supervisor = MagicMock()
supervisor.attrs = { supervisor.attrs = {
"Mounts": [ "Mounts": [

View File

@ -194,7 +194,7 @@ async def test_addon_run_docker_error(
coresys, addonsdata_system, "basic-addon-config.json" coresys, addonsdata_system, "basic-addon-config.json"
) )
with patch.object(DockerAddon, "_stop"), patch.object( with patch.object(DockerAddon, "stop"), patch.object(
AddonOptions, "validate", new=PropertyMock(return_value=lambda _: None) AddonOptions, "validate", new=PropertyMock(return_value=lambda _: None)
), pytest.raises(DockerNotFound): ), pytest.raises(DockerNotFound):
await docker_addon.run() await docker_addon.run()
@ -218,7 +218,7 @@ async def test_addon_run_add_host_error(
coresys, addonsdata_system, "basic-addon-config.json" coresys, addonsdata_system, "basic-addon-config.json"
) )
with patch.object(DockerAddon, "_stop"), patch.object( with patch.object(DockerAddon, "stop"), patch.object(
AddonOptions, "validate", new=PropertyMock(return_value=lambda _: None) AddonOptions, "validate", new=PropertyMock(return_value=lambda _: None)
), patch.object(PluginDns, "add_host", side_effect=(err := CoreDNSError())): ), patch.object(PluginDns, "add_host", side_effect=(err := CoreDNSError())):
await docker_addon.run() await docker_addon.run()

View File

@ -92,7 +92,7 @@ async def test_install_docker_error(
): ):
"""Test install fails due to docker error.""" """Test install fails due to docker error."""
coresys.security.force = True coresys.security.force = True
with patch.object(HomeAssistantCore, "_start"), patch.object( with patch.object(HomeAssistantCore, "start"), patch.object(
DockerHomeAssistant, "cleanup" DockerHomeAssistant, "cleanup"
), patch.object( ), patch.object(
Updater, "image_homeassistant", new=PropertyMock(return_value="homeassistant") Updater, "image_homeassistant", new=PropertyMock(return_value="homeassistant")
@ -119,7 +119,7 @@ async def test_install_other_error(
"""Test install fails due to other error.""" """Test install fails due to other error."""
coresys.docker.images.pull.side_effect = [(err := OSError()), MagicMock()] coresys.docker.images.pull.side_effect = [(err := OSError()), MagicMock()]
with patch.object(HomeAssistantCore, "_start"), patch.object( with patch.object(HomeAssistantCore, "start"), patch.object(
DockerHomeAssistant, "cleanup" DockerHomeAssistant, "cleanup"
), patch.object( ), patch.object(
Updater, "image_homeassistant", new=PropertyMock(return_value="homeassistant") Updater, "image_homeassistant", new=PropertyMock(return_value="homeassistant")

View File

@ -18,8 +18,10 @@ from supervisor.exceptions import (
) )
from supervisor.host.const import HostFeature from supervisor.host.const import HostFeature
from supervisor.host.manager import HostManager from supervisor.host.manager import HostManager
from supervisor.jobs import SupervisorJob
from supervisor.jobs.const import JobExecutionLimit from supervisor.jobs.const import JobExecutionLimit
from supervisor.jobs.decorator import Job, JobCondition from supervisor.jobs.decorator import Job, JobCondition
from supervisor.jobs.job_group import JobGroup
from supervisor.plugins.audio import PluginAudio from supervisor.plugins.audio import PluginAudio
from supervisor.resolution.const import UnhealthyReason from supervisor.resolution.const import UnhealthyReason
from supervisor.utils.dt import utcnow from supervisor.utils.dt import utcnow
@ -552,3 +554,171 @@ async def test_host_network(coresys: CoreSys):
coresys.jobs.ignore_conditions = [JobCondition.HOST_NETWORK] coresys.jobs.ignore_conditions = [JobCondition.HOST_NETWORK]
assert await test.execute() assert await test.execute()
async def test_job_group_once(coresys: CoreSys, loop: asyncio.BaseEventLoop):
"""Test job group once execution limitation."""
class TestClass(JobGroup):
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
super().__init__(coresys, "TestClass")
self.event = asyncio.Event()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException)
async def inner_execute(self) -> bool:
"""Inner class method called by execute, group level lock allows this."""
await self.event.wait()
return True
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException)
async def execute(self) -> bool:
"""Execute the class method."""
return await self.inner_execute()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException)
async def separate_execute(self) -> bool:
"""Alternate execute method that shares group lock."""
return True
@Job(limit=JobExecutionLimit.ONCE, on_condition=JobException)
async def unrelated_method(self) -> bool:
"""Unrelated method, sparate job with separate lock."""
return True
test = TestClass(coresys)
run_task = loop.create_task(test.execute())
await asyncio.sleep(0)
# All methods with group limits should be locked
with pytest.raises(JobException):
await test.execute()
with pytest.raises(JobException):
await test.inner_execute()
with pytest.raises(JobException):
await test.separate_execute()
# The once method is still callable
assert await test.unrelated_method()
test.event.set()
assert await run_task
async def test_job_group_wait(coresys: CoreSys, loop: asyncio.BaseEventLoop):
"""Test job group wait execution limitation."""
class TestClass(JobGroup):
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
super().__init__(coresys, "TestClass")
self.execute_count = 0
self.other_count = 0
self.event = asyncio.Event()
@Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException)
async def inner_execute(self) -> None:
"""Inner class method called by execute, group level lock allows this."""
self.execute_count += 1
await self.event.wait()
@Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException)
async def execute(self) -> None:
"""Execute the class method."""
await self.inner_execute()
@Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException)
async def separate_execute(self) -> None:
"""Alternate execute method that shares group lock."""
self.other_count += 1
test = TestClass(coresys)
run_task = loop.create_task(test.execute())
await asyncio.sleep(0)
repeat_task = loop.create_task(test.execute())
other_task = loop.create_task(test.separate_execute())
await asyncio.sleep(0)
assert test.execute_count == 1
assert test.other_count == 0
test.event.set()
await run_task
await repeat_task
await other_task
assert test.execute_count == 2
assert test.other_count == 1
async def test_job_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop):
"""Test job is cleaned up."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.event = asyncio.Event()
self.job: SupervisorJob | None = None
@Job(limit=JobExecutionLimit.ONCE)
async def execute(self):
"""Execute the class method."""
self.job = coresys.jobs.get_job()
await self.event.wait()
return True
test = TestClass(coresys)
run_task = loop.create_task(test.execute())
await asyncio.sleep(0)
assert coresys.jobs.jobs == [test.job]
assert not test.job.done
test.event.set()
assert await run_task
assert coresys.jobs.jobs == []
assert test.job.done
async def test_job_skip_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop):
"""Test job is left in job manager when cleanup is false."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.event = asyncio.Event()
self.job: SupervisorJob | None = None
@Job(limit=JobExecutionLimit.ONCE, cleanup=False)
async def execute(self):
"""Execute the class method."""
self.job = coresys.jobs.get_job()
await self.event.wait()
return True
test = TestClass(coresys)
run_task = loop.create_task(test.execute())
await asyncio.sleep(0)
assert coresys.jobs.jobs == [test.job]
assert not test.job.done
test.event.set()
assert await run_task
assert coresys.jobs.jobs == [test.job]
assert test.job.done

View File

@ -1,39 +1,76 @@
"""Test the condition decorators.""" """Test the condition decorators."""
import pytest
# pylint: disable=protected-access,import-error # pylint: disable=protected-access,import-error
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.exceptions import JobStartException
TEST_JOB = "test" TEST_JOB = "test"
async def test_add_job(coresys: CoreSys): async def test_add_job(coresys: CoreSys):
"""Test adding jobs.""" """Test adding jobs."""
job = coresys.jobs.get_job(TEST_JOB) job = coresys.jobs.new_job(TEST_JOB)
assert job.name in coresys.jobs.jobs assert job in coresys.jobs.jobs
async def test_remove_job_directly(coresys: CoreSys): async def test_remove_job_directly(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Test removing jobs from manager.""" """Test removing jobs from manager."""
job = coresys.jobs.get_job(TEST_JOB) job = coresys.jobs.new_job(TEST_JOB)
assert job in coresys.jobs.jobs
assert job.name in coresys.jobs.jobs
coresys.jobs.remove_job(job) coresys.jobs.remove_job(job)
assert job.name not in coresys.jobs.jobs assert job not in coresys.jobs.jobs
assert f"Removing incomplete job {job.name}" in caplog.text
async def test_remove_job_with_progress(coresys: CoreSys): async def test_job_done(coresys: CoreSys):
"""Test removing jobs by setting progress to 100.""" """Test done set correctly with jobs."""
job = coresys.jobs.get_job(TEST_JOB) job = coresys.jobs.new_job(TEST_JOB)
assert not job.done
assert coresys.jobs.get_job() != job
assert job.name in coresys.jobs.jobs with job.start():
job.update(progress=100) assert coresys.jobs.get_job() == job
assert job.name not in coresys.jobs.jobs assert not job.done
assert coresys.jobs.get_job() != job
assert job.done
with pytest.raises(JobStartException):
with job.start():
pass
async def test_job_start_bad_parent(coresys: CoreSys):
"""Test job cannot be started outside of parent."""
job = coresys.jobs.new_job(TEST_JOB)
job2 = coresys.jobs.new_job(f"{TEST_JOB}_2")
with job.start():
with pytest.raises(JobStartException):
with job2.start():
pass
with job2.start():
assert coresys.jobs.get_job() == job2
async def test_update_job(coresys: CoreSys): async def test_update_job(coresys: CoreSys):
"""Test updating jobs.""" """Test updating jobs."""
job = coresys.jobs.get_job(TEST_JOB) job = coresys.jobs.new_job(TEST_JOB)
job.update(progress=50, stage="stage") job.progress = 50
assert job.progress == 50 assert job.progress == 50
job.stage = "stage"
assert job.stage == "stage" assert job.stage == "stage"
with pytest.raises(ValueError):
job.progress = 110
with pytest.raises(ValueError):
job.progress = -10

View File

@ -6,8 +6,16 @@ import pytest
from supervisor.const import LogLevel from supervisor.const import LogLevel
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.docker.audio import DockerAudio
from tests.plugins.test_dns import fixture_docker_interface # noqa: F401
@pytest.fixture(name="docker_interface")
async def fixture_docker_interface() -> tuple[AsyncMock, AsyncMock]:
"""Mock docker interface methods."""
with patch.object(DockerAudio, "run") as run, patch.object(
DockerAudio, "restart"
) as restart:
yield (run, restart)
@pytest.fixture(name="write_json") @pytest.fixture(name="write_json")

View File

@ -9,7 +9,7 @@ import pytest
from supervisor.const import BusEvent, LogLevel from supervisor.const import BusEvent, LogLevel
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.docker.const import ContainerState from supervisor.docker.const import ContainerState
from supervisor.docker.interface import DockerInterface from supervisor.docker.dns import DockerDNS
from supervisor.docker.monitor import DockerContainerStateEvent from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.plugins.dns import HostEntry from supervisor.plugins.dns import HostEntry
from supervisor.resolution.const import ContextType, IssueType, SuggestionType from supervisor.resolution.const import ContextType, IssueType, SuggestionType
@ -19,8 +19,8 @@ from supervisor.resolution.data import Issue, Suggestion
@pytest.fixture(name="docker_interface") @pytest.fixture(name="docker_interface")
async def fixture_docker_interface() -> tuple[AsyncMock, AsyncMock]: async def fixture_docker_interface() -> tuple[AsyncMock, AsyncMock]:
"""Mock docker interface methods.""" """Mock docker interface methods."""
with patch.object(DockerInterface, "run") as run, patch.object( with patch.object(DockerDNS, "run") as run, patch.object(
DockerInterface, "restart" DockerDNS, "restart"
) as restart: ) as restart:
yield (run, restart) yield (run, restart)