diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index a3c79797b..54085849d 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -791,10 +791,7 @@ class Addon(AddonModel): raise AddonsError() from err async def write_stdin(self, data) -> None: - """Write data to add-on stdin. - - Return a coroutine. - """ + """Write data to add-on stdin.""" if not self.with_stdin: raise AddonsNotSupportedError( f"Add-on {self.slug} does not support writing to stdin!", _LOGGER.error @@ -889,7 +886,10 @@ class Addon(AddonModel): await self._backup_command(self.backup_pre) elif is_running and self.backup_mode == AddonBackupMode.COLD: _LOGGER.info("Shutdown add-on %s for cold backup", self.slug) - await self.instance.stop() + try: + await self.instance.stop() + except DockerError as err: + raise AddonsError() from err try: _LOGGER.info("Building backup for add-on %s", self.slug) diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index f6c2c562c..2c7ebdfe4 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -36,14 +36,15 @@ from ..exceptions import ( CoreDNSError, DBusError, DockerError, + DockerJobError, DockerNotFound, HardwareNotFound, ) from ..hardware.const import PolicyGroup from ..hardware.data import Device -from ..jobs.decorator import Job, JobCondition, JobExecutionLimit +from ..jobs.const import JobCondition, JobExecutionLimit +from ..jobs.decorator import Job from ..resolution.const import ContextType, IssueType, SuggestionType -from ..utils import process_lock from ..utils.sentry import capture_exception from .const import ( ENV_TIME, @@ -73,8 +74,8 @@ class DockerAddon(DockerInterface): def __init__(self, coresys: CoreSys, addon: Addon): """Initialize Docker Home Assistant wrapper.""" - super().__init__(coresys) self.addon: Addon = addon + super().__init__(coresys) self._hw_listener: EventListener | None = None @@ -493,7 +494,8 @@ class DockerAddon(DockerInterface): return mounts - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" if await self.is_running(): return @@ -503,7 +505,7 @@ class DockerAddon(DockerInterface): _LOGGER.warning("%s running with disabled protected mode!", self.addon.name) # Cleanup - await self._stop() + await self.stop() # Don't set a hostname if no separate UTS namespace is used hostname = None if self.uts_mode else self.addon.hostname @@ -563,7 +565,8 @@ class DockerAddon(DockerInterface): BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events ) - async def _update( + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def update( self, version: AwesomeVersion, image: str | None = None, latest: bool = False ) -> None: """Update a docker image.""" @@ -574,15 +577,16 @@ class DockerAddon(DockerInterface): ) # Update docker image - await self._install( + await self.install( version, image=image, latest=latest, need_build=self.addon.latest_need_build ) # Stop container & cleanup with suppress(DockerError): - await self._stop() + await self.stop() - async def _install( + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def install( self, version: AwesomeVersion, image: str | None = None, @@ -595,7 +599,7 @@ class DockerAddon(DockerInterface): if need_build is None and self.addon.need_build or need_build: await self._build(version) else: - await super()._install(version, image, latest, arch) + await super().install(version, image, latest, arch) async def _build(self, version: AwesomeVersion) -> None: """Build a Docker container.""" @@ -632,14 +636,14 @@ class DockerAddon(DockerInterface): _LOGGER.info("Build %s:%s done", self.image, version) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) def export_image(self, tar_file: Path) -> Awaitable[None]: """Export current images into a tar file.""" return self.sys_run_in_executor( self.sys_docker.export_image, self.image, self.version, tar_file ) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) async def import_image(self, tar_file: Path) -> None: """Import a tar file as image.""" docker_image = await self.sys_run_in_executor( @@ -650,9 +654,9 @@ class DockerAddon(DockerInterface): _LOGGER.info("Importing image %s and version %s", tar_file, self.version) with suppress(DockerError): - await self._cleanup() + await self.cleanup() - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) async def write_stdin(self, data: bytes) -> None: """Write to add-on stdin.""" if not await self.is_running(): @@ -682,7 +686,8 @@ class DockerAddon(DockerInterface): _LOGGER.error("Can't write to %s stdin: %s", self.name, err) raise DockerError() from err - async def _stop(self, remove_container: bool = True) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def stop(self, remove_container: bool = True) -> None: """Stop/remove Docker container.""" # DNS if self.ip_address != NO_ADDDRESS: @@ -697,7 +702,7 @@ class DockerAddon(DockerInterface): self.sys_bus.remove_listener(self._hw_listener) self._hw_listener = None - await super()._stop(remove_container) + await super().stop(remove_container) async def _validate_trust( self, image_id: str, image: str, version: AwesomeVersion diff --git a/supervisor/docker/audio.py b/supervisor/docker/audio.py index 14d7c3c8d..d828bec67 100644 --- a/supervisor/docker/audio.py +++ b/supervisor/docker/audio.py @@ -6,7 +6,10 @@ from docker.types import Mount from ..const import DOCKER_CPU_RUNTIME_ALLOCATION, MACHINE_ID from ..coresys import CoreSysAttributes +from ..exceptions import DockerJobError from ..hardware.const import PolicyGroup +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .const import ( ENV_TIME, MOUNT_DBUS, @@ -82,13 +85,14 @@ class DockerAudio(DockerInterface, CoreSysAttributes): return None return DOCKER_CPU_RUNTIME_ALLOCATION - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" if await self.is_running(): return # Cleanup - await self._stop() + await self.stop() # Create & Run container docker_container = await self.sys_run_in_executor( diff --git a/supervisor/docker/cli.py b/supervisor/docker/cli.py index ec146de4e..e83b46763 100644 --- a/supervisor/docker/cli.py +++ b/supervisor/docker/cli.py @@ -2,6 +2,9 @@ import logging from ..coresys import CoreSysAttributes +from ..exceptions import DockerJobError +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .const import ENV_TIME, ENV_TOKEN from .interface import DockerInterface @@ -23,13 +26,14 @@ class DockerCli(DockerInterface, CoreSysAttributes): """Return name of Docker container.""" return CLI_DOCKER_NAME - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" if await self.is_running(): return # Cleanup - await self._stop() + await self.stop() # Create & Run container docker_container = await self.sys_run_in_executor( diff --git a/supervisor/docker/dns.py b/supervisor/docker/dns.py index 2b835d092..bdfe98d50 100644 --- a/supervisor/docker/dns.py +++ b/supervisor/docker/dns.py @@ -4,6 +4,9 @@ import logging from docker.types import Mount from ..coresys import CoreSysAttributes +from ..exceptions import DockerJobError +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .const import ENV_TIME, MOUNT_DBUS, MountType from .interface import DockerInterface @@ -25,13 +28,14 @@ class DockerDNS(DockerInterface, CoreSysAttributes): """Return name of Docker container.""" return DNS_DOCKER_NAME - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" if await self.is_running(): return # Cleanup - await self._stop() + await self.stop() # Create & Run container docker_container = await self.sys_run_in_executor( diff --git a/supervisor/docker/homeassistant.py b/supervisor/docker/homeassistant.py index 1662bf18a..7369f3a3b 100644 --- a/supervisor/docker/homeassistant.py +++ b/supervisor/docker/homeassistant.py @@ -7,9 +7,11 @@ from awesomeversion import AwesomeVersion, AwesomeVersionCompareException from docker.types import Mount from ..const import LABEL_MACHINE, MACHINE_ID +from ..exceptions import DockerJobError from ..hardware.const import PolicyGroup from ..homeassistant.const import LANDINGPAGE -from ..utils import process_lock +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .const import ( ENV_TIME, ENV_TOKEN, @@ -131,13 +133,14 @@ class DockerHomeAssistant(DockerInterface): return mounts - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" if await self.is_running(): return # Cleanup - await self._stop() + await self.stop() # Create & Run container docker_container = await self.sys_run_in_executor( @@ -173,7 +176,7 @@ class DockerHomeAssistant(DockerInterface): "Starting Home Assistant %s with version %s", self.image, self.version ) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) async def execute_command(self, command: str) -> CommandReturn: """Create a temporary container and run command.""" return await self.sys_run_in_executor( diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index a21b57b29..98cb38664 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -8,6 +8,7 @@ import logging import re from time import time from typing import Any +from uuid import uuid4 from awesomeversion import AwesomeVersion from awesomeversion.strategy import AwesomeVersionStrategy @@ -24,18 +25,21 @@ from ..const import ( BusEvent, CpuArch, ) -from ..coresys import CoreSys, CoreSysAttributes +from ..coresys import CoreSys from ..exceptions import ( CodeNotaryError, CodeNotaryUntrusted, DockerAPIError, DockerError, + DockerJobError, DockerNotFound, DockerRequestError, DockerTrustError, ) +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job +from ..jobs.job_group import JobGroup from ..resolution.const import ContextType, IssueType, SuggestionType -from ..utils import process_lock from ..utils.sentry import capture_exception from .const import ContainerState, RestartPolicy from .manager import CommandReturn @@ -73,11 +77,12 @@ def _container_state_from_model(docker_container: Container) -> ContainerState: return ContainerState.STOPPED -class DockerInterface(CoreSysAttributes): +class DockerInterface(JobGroup): """Docker Supervisor interface.""" def __init__(self, coresys: CoreSys): """Initialize Docker base wrapper.""" + super().__init__(coresys, f"container_{self.name or uuid4().hex}") self.coresys: CoreSys = coresys self._meta: dict[str, Any] | None = None self.lock: asyncio.Lock = asyncio.Lock() @@ -204,25 +209,15 @@ class DockerInterface(CoreSysAttributes): await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials) - @process_lock - def install( - self, - version: AwesomeVersion, - image: str | None = None, - latest: bool = False, - arch: CpuArch | None = None, - ) -> Awaitable[None]: - """Pull docker image.""" - return self._install(version, image, latest, arch) - - async def _install( + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def install( self, version: AwesomeVersion, image: str | None = None, latest: bool = False, arch: CpuArch | None = None, ) -> None: - """Pull Docker image.""" + """Pull docker image.""" image = image or self.image arch = arch or self.sys_arch.supervisor @@ -328,17 +323,11 @@ class DockerInterface(CoreSysAttributes): return _container_state_from_model(docker_container) - @process_lock - def attach( + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def attach( self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False - ) -> Awaitable[None]: - """Attach to running Docker container.""" - return self._attach(version, skip_state_event_if_down) - - async def _attach( - self, version: AwesomeVersion, skip_state_event_if_down: bool = False ) -> None: - """Attach to running docker container.""" + """Attach to running Docker container.""" with suppress(docker.errors.DockerException, requests.RequestException): docker_container = await self.sys_run_in_executor( self.sys_docker.containers.get, self.name @@ -370,21 +359,13 @@ class DockerInterface(CoreSysAttributes): raise DockerError() _LOGGER.info("Attaching to %s with version %s", self.image, self.version) - @process_lock - def run(self) -> Awaitable[None]: - """Run Docker image.""" - return self._run() - - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" raise NotImplementedError() - @process_lock - def stop(self, remove_container: bool = True) -> Awaitable[None]: - """Stop/remove Docker container.""" - return self._stop(remove_container) - - async def _stop(self, remove_container: bool = True) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def stop(self, remove_container: bool = True) -> None: """Stop/remove Docker container.""" with suppress(DockerNotFound): await self.sys_run_in_executor( @@ -394,34 +375,28 @@ class DockerInterface(CoreSysAttributes): remove_container, ) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) def start(self) -> Awaitable[None]: """Start Docker container.""" return self.sys_run_in_executor(self.sys_docker.start_container, self.name) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) async def remove(self) -> None: """Remove Docker images.""" # Cleanup container with suppress(DockerError): - await self._stop() + await self.stop() await self.sys_run_in_executor( self.sys_docker.remove_image, self.image, self.version ) self._meta = None - @process_lock - def update( - self, version: AwesomeVersion, image: str | None = None, latest: bool = False - ) -> Awaitable[None]: - """Update a Docker image.""" - return self._update(version, image, latest) - - async def _update( + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def update( self, version: AwesomeVersion, image: str | None = None, latest: bool = False ) -> None: - """Update a docker image.""" + """Update a Docker image.""" image = image or self.image _LOGGER.info( @@ -429,11 +404,11 @@ class DockerInterface(CoreSysAttributes): ) # Update docker image - await self._install(version, image=image, latest=latest) + await self.install(version, image=image, latest=latest) # Stop container & cleanup with suppress(DockerError): - await self._stop() + await self.stop() async def logs(self) -> bytes: """Return Docker logs of container.""" @@ -444,12 +419,8 @@ class DockerInterface(CoreSysAttributes): return b"" - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) def cleanup(self, old_image: str | None = None) -> Awaitable[None]: - """Check if old version exists and cleanup.""" - return self._cleanup(old_image) - - def _cleanup(self, old_image: str | None = None) -> Awaitable[None]: """Check if old version exists and cleanup.""" return self.sys_run_in_executor( self.sys_docker.cleanup_old_images, @@ -458,14 +429,14 @@ class DockerInterface(CoreSysAttributes): {old_image} if old_image else None, ) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) def restart(self) -> Awaitable[None]: """Restart docker container.""" return self.sys_run_in_executor( self.sys_docker.restart_container, self.name, self.timeout ) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) async def execute_command(self, command: str) -> CommandReturn: """Create a temporary container and run command.""" raise NotImplementedError() @@ -526,7 +497,7 @@ class DockerInterface(CoreSysAttributes): available_version.sort(reverse=True) return available_version[0] - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) def run_inside(self, command: str) -> Awaitable[CommandReturn]: """Execute a command inside Docker container.""" return self.sys_run_in_executor( @@ -540,7 +511,7 @@ class DockerInterface(CoreSysAttributes): checksum = image_id.partition(":")[2] return await self.sys_security.verify_own_content(checksum) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) async def check_trust(self) -> None: """Check trust of exists Docker image.""" try: diff --git a/supervisor/docker/multicast.py b/supervisor/docker/multicast.py index 1470081d7..f6dfab974 100644 --- a/supervisor/docker/multicast.py +++ b/supervisor/docker/multicast.py @@ -2,6 +2,9 @@ import logging from ..coresys import CoreSysAttributes +from ..exceptions import DockerJobError +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .const import ENV_TIME, Capabilities from .interface import DockerInterface @@ -28,13 +31,14 @@ class DockerMulticast(DockerInterface, CoreSysAttributes): """Generate needed capabilities.""" return [Capabilities.NET_ADMIN.value] - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" if await self.is_running(): return # Cleanup - await self._stop() + await self.stop() # Create & Run container docker_container = await self.sys_run_in_executor( diff --git a/supervisor/docker/observer.py b/supervisor/docker/observer.py index 4412cf8be..a72d2492c 100644 --- a/supervisor/docker/observer.py +++ b/supervisor/docker/observer.py @@ -3,6 +3,9 @@ import logging from ..const import DOCKER_NETWORK_MASK from ..coresys import CoreSysAttributes +from ..exceptions import DockerJobError +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .const import ENV_TIME, ENV_TOKEN, MOUNT_DOCKER, RestartPolicy from .interface import DockerInterface @@ -25,13 +28,14 @@ class DockerObserver(DockerInterface, CoreSysAttributes): """Return name of Docker container.""" return OBSERVER_DOCKER_NAME - async def _run(self) -> None: + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def run(self) -> None: """Run Docker image.""" if await self.is_running(): return # Cleanup - await self._stop() + await self.stop() # Create & Run container docker_container = await self.sys_run_in_executor( diff --git a/supervisor/docker/supervisor.py b/supervisor/docker/supervisor.py index a94009946..fbbdae8c2 100644 --- a/supervisor/docker/supervisor.py +++ b/supervisor/docker/supervisor.py @@ -9,7 +9,9 @@ import docker import requests from ..coresys import CoreSysAttributes -from ..exceptions import DockerError +from ..exceptions import DockerError, DockerJobError +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .const import PropagationMode from .interface import DockerInterface @@ -43,8 +45,9 @@ class DockerSupervisor(DockerInterface, CoreSysAttributes): if mount.get("Destination") == "/data" ) - async def _attach( - self, version: AwesomeVersion, skip_state_event_if_down: bool = False + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + async def attach( + self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False ) -> None: """Attach to running docker container.""" try: diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index fbfbcd8ed..329d35b09 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -36,6 +36,18 @@ class JobConditionException(JobException): """Exception happening for job conditions.""" +class JobStartException(JobException): + """Exception occurred starting a job on in current asyncio task.""" + + +class JobNotFound(JobException): + """Exception for job not found.""" + + +class JobGroupExecutionLimitExceeded(JobException): + """Exception when job group execution limit exceeded.""" + + # HomeAssistant @@ -478,6 +490,10 @@ class DockerNotFound(DockerError): """Docker object don't Exists.""" +class DockerJobError(DockerError, JobException): + """Error executing docker job.""" + + # Hardware diff --git a/supervisor/homeassistant/core.py b/supervisor/homeassistant/core.py index a0fad2826..9c514c290 100644 --- a/supervisor/homeassistant/core.py +++ b/supervisor/homeassistant/core.py @@ -11,7 +11,7 @@ import attr from awesomeversion import AwesomeVersion from ..const import ATTR_HOMEASSISTANT, BusEvent -from ..coresys import CoreSys, CoreSysAttributes +from ..coresys import CoreSys from ..docker.const import ContainerState from ..docker.homeassistant import DockerHomeAssistant from ..docker.monitor import DockerContainerStateEvent @@ -22,11 +22,13 @@ from ..exceptions import ( HomeAssistantError, HomeAssistantJobError, HomeAssistantUpdateError, + JobException, ) from ..jobs.const import JobExecutionLimit from ..jobs.decorator import Job, JobCondition +from ..jobs.job_group import JobGroup from ..resolution.const import ContextType, IssueType -from ..utils import convert_to_ascii, process_lock +from ..utils import convert_to_ascii from ..utils.sentry import capture_exception from .const import ( LANDINGPAGE, @@ -49,12 +51,12 @@ class ConfigResult: log = attr.ib() -class HomeAssistantCore(CoreSysAttributes): +class HomeAssistantCore(JobGroup): """Home Assistant core object for handle it.""" def __init__(self, coresys: CoreSys): """Initialize Home Assistant object.""" - self.coresys: CoreSys = coresys + super().__init__(coresys, "home_assistant_core") self.instance: DockerHomeAssistant = DockerHomeAssistant(coresys) self.lock: asyncio.Lock = asyncio.Lock() self._error_state: bool = False @@ -95,9 +97,9 @@ class HomeAssistantCore(CoreSysAttributes): _LOGGER.info("Starting HomeAssistant landingpage") if not await self.instance.is_running(): with suppress(HomeAssistantError): - await self._start() + await self.start() - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) async def install_landingpage(self) -> None: """Install a landing page.""" # Try to use a preinstalled landingpage @@ -127,7 +129,7 @@ class HomeAssistantCore(CoreSysAttributes): LANDINGPAGE, image=self.sys_updater.image_homeassistant ) break - except DockerError: + except (DockerError, JobException): pass except Exception as err: # pylint: disable=broad-except capture_exception(err) @@ -139,7 +141,7 @@ class HomeAssistantCore(CoreSysAttributes): self.sys_homeassistant.image = self.sys_updater.image_homeassistant self.sys_homeassistant.save_data() - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) async def install(self) -> None: """Install a landing page.""" _LOGGER.info("Home Assistant setup") @@ -155,7 +157,7 @@ class HomeAssistantCore(CoreSysAttributes): image=self.sys_updater.image_homeassistant, ) break - except DockerError: + except (DockerError, JobException): pass except Exception as err: # pylint: disable=broad-except capture_exception(err) @@ -171,7 +173,7 @@ class HomeAssistantCore(CoreSysAttributes): # finishing try: _LOGGER.info("Starting Home Assistant") - await self._start() + await self.start() except HomeAssistantError: _LOGGER.error("Can't start Home Assistant!") @@ -179,7 +181,6 @@ class HomeAssistantCore(CoreSysAttributes): with suppress(DockerError): await self.instance.cleanup() - @process_lock @Job( conditions=[ JobCondition.FREE_SPACE, @@ -188,6 +189,7 @@ class HomeAssistantCore(CoreSysAttributes): JobCondition.PLUGINS_UPDATED, JobCondition.SUPERVISOR_UPDATED, ], + limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, ) async def update( @@ -231,7 +233,7 @@ class HomeAssistantCore(CoreSysAttributes): self.sys_homeassistant.image = self.sys_updater.image_homeassistant if running: - await self._start() + await self.start() _LOGGER.info("Successfully started Home Assistant %s", to_version) # Successfull - last step @@ -281,23 +283,7 @@ class HomeAssistantCore(CoreSysAttributes): self.sys_resolution.create_issue(IssueType.UPDATE_FAILED, ContextType.CORE) raise HomeAssistantUpdateError() - async def _start(self) -> None: - """Start Home Assistant Docker & wait.""" - # Create new API token - self.sys_homeassistant.supervisor_token = secrets.token_hex(56) - self.sys_homeassistant.save_data() - - # Write audio settings - self.sys_homeassistant.write_pulse() - - try: - await self.instance.run() - except DockerError as err: - raise HomeAssistantError() from err - - await self._block_till_run(self.sys_homeassistant.version) - - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) async def start(self) -> None: """Run Home Assistant docker.""" if await self.instance.is_running(): @@ -314,9 +300,21 @@ class HomeAssistantCore(CoreSysAttributes): await self._block_till_run(self.sys_homeassistant.version) # No Instance/Container found, extended start else: - await self._start() + # Create new API token + self.sys_homeassistant.supervisor_token = secrets.token_hex(56) + self.sys_homeassistant.save_data() - @process_lock + # Write audio settings + self.sys_homeassistant.write_pulse() + + try: + await self.instance.run() + except DockerError as err: + raise HomeAssistantError() from err + + await self._block_till_run(self.sys_homeassistant.version) + + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) async def stop(self) -> None: """Stop Home Assistant Docker.""" try: @@ -324,7 +322,7 @@ class HomeAssistantCore(CoreSysAttributes): except DockerError as err: raise HomeAssistantError() from err - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) async def restart(self) -> None: """Restart Home Assistant Docker.""" try: @@ -334,12 +332,12 @@ class HomeAssistantCore(CoreSysAttributes): await self._block_till_run(self.sys_homeassistant.version) - @process_lock + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) async def rebuild(self) -> None: """Rebuild Home Assistant Docker container.""" with suppress(DockerError): await self.instance.stop() - await self._start() + await self.start() def logs(self) -> Awaitable[bytes]: """Get HomeAssistant docker logs. @@ -356,10 +354,7 @@ class HomeAssistantCore(CoreSysAttributes): return self.instance.check_trust() async def stats(self) -> DockerStats: - """Return stats of Home Assistant. - - Return a coroutine. - """ + """Return stats of Home Assistant.""" try: return await self.instance.stats() except DockerError as err: @@ -386,9 +381,12 @@ class HomeAssistantCore(CoreSysAttributes): async def check_config(self) -> ConfigResult: """Run Home Assistant config check.""" - result = await self.instance.execute_command( - "python3 -m homeassistant -c /config --script check_config" - ) + try: + result = await self.instance.execute_command( + "python3 -m homeassistant -c /config --script check_config" + ) + except DockerError as err: + raise HomeAssistantError() from err # If not valid if result.exit_code is None: diff --git a/supervisor/jobs/__init__.py b/supervisor/jobs/__init__.py index 6b2bb8d11..08b8793e9 100644 --- a/supervisor/jobs/__init__.py +++ b/supervisor/jobs/__init__.py @@ -1,53 +1,69 @@ """Supervisor job manager.""" +from collections.abc import Callable +from contextlib import contextmanager +from contextvars import ContextVar, Token import logging +from uuid import UUID, uuid4 + +from attrs import define, field +from attrs.setters import frozen +from attrs.validators import ge, le from ..coresys import CoreSys, CoreSysAttributes +from ..exceptions import JobNotFound, JobStartException from ..utils.common import FileConfiguration from .const import ATTR_IGNORE_CONDITIONS, FILE_CONFIG_JOBS, JobCondition from .validate import SCHEMA_JOBS_CONFIG -_LOGGER: logging.Logger = logging.getLogger(__package__) +# Context vars only act as a global within the same asyncio task +# When a new asyncio task is started the current context is copied over. +# Modifications to it in one task are not visible to others though. +# This allows us to track what job is currently in progress in each task. +_CURRENT_JOB: ContextVar[UUID] = ContextVar("current_job") + +_LOGGER: logging.Logger = logging.getLogger(__name__) -class SupervisorJob(CoreSysAttributes): - """Supervisor running job class.""" +@define +class SupervisorJob: + """Representation of a job running in supervisor.""" - def __init__(self, coresys: CoreSys, name: str): - """Initialize the JobManager class.""" - self.coresys: CoreSys = coresys - self.name: str = name - self._progress: int = 0 - self._stage: str | None = None + name: str = field(on_setattr=frozen) + progress: int = field(default=0, validator=[ge(0), le(100)]) + stage: str | None = None + uuid: UUID = field(init=False, factory=lambda: uuid4().hex, on_setattr=frozen) + parent_id: UUID = field( + init=False, factory=lambda: _CURRENT_JOB.get(None), on_setattr=frozen + ) + done: bool = field(init=False, default=False) - @property - def progress(self) -> int: - """Return the current progress.""" - return self._progress + @contextmanager + def start(self, *, on_done: Callable[["SupervisorJob"], None] | None = None): + """Start the job in the current task. - @property - def stage(self) -> str | None: - """Return the current stage.""" - return self._stage + This can only be called if the parent ID matches the job running in the current task. + This is to ensure that each asyncio task can only be doing one job at a time as that + determines what resources it can and cannot access. + """ + if self.done: + raise JobStartException("Job is already complete") + if _CURRENT_JOB.get(None) != self.parent_id: + raise JobStartException("Job has a different parent from current job") - def update(self, progress: int | None = None, stage: str | None = None) -> None: - """Update the job object.""" - if progress is not None: - if progress >= round(100): - self.sys_jobs.remove_job(self) - return - self._progress = round(progress) - if stage is not None: - self._stage = stage - _LOGGER.debug( - "Job updated; name: %s, progress: %s, stage: %s", - self.name, - self.progress, - self.stage, - ) + token: Token[UUID] | None = None + try: + token = _CURRENT_JOB.set(self.uuid) + yield self + finally: + self.done = True + if token: + _CURRENT_JOB.reset(token) + if on_done: + on_done(self) class JobManager(FileConfiguration, CoreSysAttributes): - """Job class.""" + """Job Manager class.""" def __init__(self, coresys: CoreSys): """Initialize the JobManager class.""" @@ -58,7 +74,7 @@ class JobManager(FileConfiguration, CoreSysAttributes): @property def jobs(self) -> list[SupervisorJob]: """Return a list of current jobs.""" - return self._jobs + return list(self._jobs.values()) @property def ignore_conditions(self) -> list[JobCondition]: @@ -70,18 +86,28 @@ class JobManager(FileConfiguration, CoreSysAttributes): """Set a list of ignored condition.""" self._data[ATTR_IGNORE_CONDITIONS] = value - def get_job(self, name: str) -> SupervisorJob: - """Return a job, create one if it does not exist.""" - if name not in self._jobs: - self._jobs[name] = SupervisorJob(self.coresys, name) + def new_job(self, name: str, initial_stage: str | None = None) -> SupervisorJob: + """Create a new job.""" + job = SupervisorJob(name, stage=initial_stage) + self._jobs[job.uuid] = job + return job - return self._jobs[name] + def get_job(self, uuid: UUID | None = None) -> SupervisorJob | None: + """Return a job by uuid if it exists. Returns the current job of the asyncio task if uuid omitted.""" + if uuid: + return self._jobs.get(uuid) + + if uuid := _CURRENT_JOB.get(None): + return self._jobs.get(uuid) + + return None def remove_job(self, job: SupervisorJob) -> None: - """Remove a job.""" - if job.name in self._jobs: - del self._jobs[job.name] + """Remove a job by UUID.""" + if job.uuid not in self._jobs: + raise JobNotFound(f"Could not find job {job.name}", _LOGGER.error) - def clear(self) -> None: - """Clear all jobs.""" - self._jobs.clear() + if not job.done: + _LOGGER.warning("Removing incomplete job %s from job manager", job.name) + + del self._jobs[job.uuid] diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index 091e46c67..90b4549dd 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -34,3 +34,5 @@ class JobExecutionLimit(str, Enum): THROTTLE = "throttle" THROTTLE_WAIT = "throttle_wait" THROTTLE_RATE_LIMIT = "throttle_rate_limit" + GROUP_ONCE = "group_once" + GROUP_WAIT = "group_wait" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index ebdda844c..a77392cd1 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -8,11 +8,17 @@ from typing import Any from ..const import CoreState from ..coresys import CoreSys, CoreSysAttributes -from ..exceptions import HassioError, JobConditionException, JobException +from ..exceptions import ( + HassioError, + JobConditionException, + JobException, + JobGroupExecutionLimitExceeded, +) from ..host.const import HostFeature from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType from ..utils.sentry import capture_exception from .const import JobCondition, JobExecutionLimit +from .job_group import JobGroup _LOGGER: logging.Logger = logging.getLogger(__package__) @@ -44,6 +50,10 @@ class Job(CoreSysAttributes): self._method = None self._last_call = datetime.min self._rate_limited_calls: list[datetime] | None = None + self._job_group_limit = self.limit in ( + JobExecutionLimit.GROUP_ONCE, + JobExecutionLimit.GROUP_WAIT, + ) # Validate Options if ( @@ -76,19 +86,28 @@ class Job(CoreSysAttributes): self.coresys, self._last_call, self._rate_limited_calls ) - def _post_init(self, args: tuple[Any]) -> None: + def _post_init(self, obj: JobGroup | CoreSysAttributes) -> None: """Runtime init.""" if self.name is None: self.name = str(self._method.__qualname__).lower().replace(".", "_") # Coresys try: - self.coresys = args[0].coresys + self.coresys = obj.coresys except AttributeError: pass if not self.coresys: raise RuntimeError(f"Job on {self.name} need to be an coresys object!") + # Job groups + if self._job_group_limit: + try: + _ = obj.acquire and obj.release + except AttributeError: + raise RuntimeError( + f"Job on {self.name} need to be a JobGroup to use group based limits!" + ) from None + # Others if self._lock is None: self._lock = asyncio.Semaphore() @@ -98,11 +117,15 @@ class Job(CoreSysAttributes): self._method = method @wraps(method) - async def wrapper(*args, **kwargs) -> Any: - """Wrap the method.""" - self._post_init(args) + async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any: + """Wrap the method. - job = self.sys_jobs.get_job(self.name) + This method must be on an instance of CoreSysAttributes. If a JOB_GROUP limit + is used, then it must be on an instance of JobGroup. + """ + self._post_init(obj) + + job = self.sys_jobs.new_job(self.name) # Handle condition if self.conditions: @@ -118,6 +141,13 @@ class Job(CoreSysAttributes): # Handle exection limits if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE): await self._acquire_exection_limit() + elif self._job_group_limit: + try: + await obj.acquire(job, self.limit == JobExecutionLimit.GROUP_WAIT) + except JobGroupExecutionLimitExceeded as err: + if self.on_condition: + raise self.on_condition(str(err)) from err + raise err elif self.limit == JobExecutionLimit.THROTTLE: time_since_last_call = datetime.now() - self._last_call if time_since_last_call < self.throttle_period: @@ -146,22 +176,23 @@ class Job(CoreSysAttributes): ) # Execute Job - try: - self._last_call = datetime.now() - if self._rate_limited_calls is not None: - self._rate_limited_calls.append(self._last_call) + with job.start(on_done=self.sys_jobs.remove_job if self.cleanup else None): + try: + self._last_call = datetime.now() + if self._rate_limited_calls is not None: + self._rate_limited_calls.append(self._last_call) - return await self._method(*args, **kwargs) - except HassioError as err: - raise err - except Exception as err: - _LOGGER.exception("Unhandled exception: %s", err) - capture_exception(err) - raise JobException() from err - finally: - if self.cleanup: - self.sys_jobs.remove_job(job) - self._release_exception_limits() + return await self._method(obj, *args, **kwargs) + except HassioError as err: + raise err + except Exception as err: + _LOGGER.exception("Unhandled exception: %s", err) + capture_exception(err) + raise JobException() from err + finally: + self._release_exception_limits() + if self._job_group_limit: + obj.release() return wrapper diff --git a/supervisor/jobs/job_group.py b/supervisor/jobs/job_group.py new file mode 100644 index 000000000..c50f719a7 --- /dev/null +++ b/supervisor/jobs/job_group.py @@ -0,0 +1,73 @@ +"""Job group object.""" + +from asyncio import Lock + +from . import SupervisorJob +from ..coresys import CoreSys, CoreSysAttributes +from ..exceptions import JobException, JobGroupExecutionLimitExceeded + + +class JobGroup(CoreSysAttributes): + """Object with methods that require a common lock. + + This is used in classes like our DockerInterface class. Where each method + requires a lock as it involves some extensive I/O with Docker. But some + methods may need to call others as a part of processing to complete a + higher-level task and should not need to relinquish the lock in between. + """ + + def __init__(self, coresys: CoreSys, group_name: str) -> None: + """Initialize object.""" + self.coresys: CoreSys = coresys + self._group_name: str = group_name + self._lock: Lock = Lock() + self._active_job: SupervisorJob | None = None + self._parent_jobs: list[SupervisorJob] = [] + + @property + def active_job(self) -> SupervisorJob | None: + """Get active job ID.""" + return self._active_job + + @property + def group_name(self) -> str: + """Return group name.""" + return self._group_name + + @property + def has_lock(self) -> bool: + """Return true if current task has the lock on this job group.""" + return ( + self.active_job + and (task_job := self.sys_jobs.get_job()) + and self.active_job == task_job + ) + + async def acquire(self, job: SupervisorJob, wait: bool = False) -> None: + """Acquire the lock for the group for the specified job.""" + # If there's another job running and we're not waiting, raise + if self.active_job and not self.has_lock and not wait: + raise JobGroupExecutionLimitExceeded( + f"Another job is running for job group {self.group_name}" + ) + + # Else if we don't have the lock, acquire it + if not self.has_lock: + await self._lock.acquire() + + # Store the job ID we acquired the lock for + if self.active_job: + self._parent_jobs.append(self.active_job) + + self._active_job = job + + def release(self) -> None: + """Release the lock for the group or return it to parent.""" + if not self.has_lock: + raise JobException("Cannot release as caller does not own lock") + + if self._parent_jobs: + self._active_job = self._parent_jobs.pop() + else: + self._active_job = None + self._lock.release() diff --git a/tests/addons/test_manager.py b/tests/addons/test_manager.py index 98c348d7c..dc5f30dcd 100644 --- a/tests/addons/test_manager.py +++ b/tests/addons/test_manager.py @@ -57,7 +57,7 @@ async def test_image_added_removed_on_update( assert install_addon_ssh.image == "local/amd64-addon-ssh" assert coresys.addons.store.get(TEST_ADDON_SLUG).image == "test/amd64-my-ssh-addon" - with patch.object(DockerInterface, "_install") as install, patch.object( + with patch.object(DockerInterface, "install") as install, patch.object( DockerAddon, "_build" ) as build: await install_addon_ssh.update() @@ -77,7 +77,7 @@ async def test_image_added_removed_on_update( assert install_addon_ssh.image == "test/amd64-my-ssh-addon" assert coresys.addons.store.get(TEST_ADDON_SLUG).image == "local/amd64-addon-ssh" - with patch.object(DockerInterface, "_install") as install, patch.object( + with patch.object(DockerInterface, "install") as install, patch.object( DockerAddon, "_build" ) as build: await install_addon_ssh.update() @@ -249,7 +249,7 @@ async def test_update( assert install_addon_ssh.need_update is True - with patch.object(DockerInterface, "_install"), patch.object( + with patch.object(DockerInterface, "install"), patch.object( DockerAddon, "is_running", return_value=False ): start_task = await coresys.addons.update(TEST_ADDON_SLUG) diff --git a/tests/api/test_store.py b/tests/api/test_store.py index 29f844496..ceb08f069 100644 --- a/tests/api/test_store.py +++ b/tests/api/test_store.py @@ -177,7 +177,7 @@ async def test_api_store_update_healthcheck( asyncio.create_task(container_events()) with patch.object(DockerAddon, "run", new=container_events_task), patch.object( - DockerInterface, "_install" + DockerInterface, "install" ), patch.object(DockerAddon, "is_running", return_value=False), patch.object( CpuArch, "supported", new=PropertyMock(return_value=["amd64"]) ): diff --git a/tests/conftest.py b/tests/conftest.py index e922b6902..52a40aad1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -70,6 +70,13 @@ async def path_extern() -> None: yield +@pytest.fixture +async def supervisor_name() -> None: + """Set env for supervisor name.""" + os.environ["SUPERVISOR_NAME"] = "hassio_supervisor" + yield + + @pytest.fixture async def docker() -> DockerAPI: """Mock DockerAPI.""" @@ -286,7 +293,13 @@ async def fixture_all_dbus_services( @pytest.fixture async def coresys( - event_loop, docker, dbus_session_bus, all_dbus_services, aiohttp_client, run_dir + event_loop, + docker, + dbus_session_bus, + all_dbus_services, + aiohttp_client, + run_dir, + supervisor_name, ) -> CoreSys: """Create a CoreSys Mock.""" with patch("supervisor.bootstrap.initialize_system"), patch( @@ -409,7 +422,9 @@ def sys_supervisor(): @pytest.fixture async def api_client( - aiohttp_client, coresys: CoreSys, request: pytest.FixtureRequest + aiohttp_client, + coresys: CoreSys, + request: pytest.FixtureRequest, ) -> TestClient: """Fixture for RestAPI client.""" @@ -428,9 +443,7 @@ async def api_client( api = RestAPI(coresys) api.webapp = web.Application(middlewares=[_security_middleware]) api.start = AsyncMock() - with patch("supervisor.docker.supervisor.os") as os: - os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"} - await api.load() + await api.load() yield await aiohttp_client(api.webapp) @@ -593,16 +606,12 @@ async def journald_logs(coresys: CoreSys) -> MagicMock: @pytest.fixture -async def docker_logs(docker: DockerAPI) -> MagicMock: +async def docker_logs(docker: DockerAPI, supervisor_name) -> MagicMock: """Mock log output for a container from docker.""" container_mock = MagicMock() container_mock.logs.return_value = load_binary_fixture("logs_docker_container.txt") docker.containers.get.return_value = container_mock - - with patch("supervisor.docker.supervisor.os") as os: - os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"} - - yield container_mock.logs + yield container_mock.logs @pytest.fixture @@ -640,7 +649,6 @@ async def os_available(request: pytest.FixtureRequest) -> None: @pytest.fixture async def mount_propagation(docker: DockerAPI, coresys: CoreSys) -> None: """Mock supervisor connected to container with propagation set.""" - os.environ["SUPERVISOR_NAME"] = "hassio_supervisor" docker.containers.get.return_value = supervisor = MagicMock() supervisor.attrs = { "Mounts": [ diff --git a/tests/docker/test_addon.py b/tests/docker/test_addon.py index 70f52836f..bd284f49d 100644 --- a/tests/docker/test_addon.py +++ b/tests/docker/test_addon.py @@ -194,7 +194,7 @@ async def test_addon_run_docker_error( coresys, addonsdata_system, "basic-addon-config.json" ) - with patch.object(DockerAddon, "_stop"), patch.object( + with patch.object(DockerAddon, "stop"), patch.object( AddonOptions, "validate", new=PropertyMock(return_value=lambda _: None) ), pytest.raises(DockerNotFound): await docker_addon.run() @@ -218,7 +218,7 @@ async def test_addon_run_add_host_error( coresys, addonsdata_system, "basic-addon-config.json" ) - with patch.object(DockerAddon, "_stop"), patch.object( + with patch.object(DockerAddon, "stop"), patch.object( AddonOptions, "validate", new=PropertyMock(return_value=lambda _: None) ), patch.object(PluginDns, "add_host", side_effect=(err := CoreDNSError())): await docker_addon.run() diff --git a/tests/homeassistant/test_core.py b/tests/homeassistant/test_core.py index 5a1576b7d..671fc758e 100644 --- a/tests/homeassistant/test_core.py +++ b/tests/homeassistant/test_core.py @@ -92,7 +92,7 @@ async def test_install_docker_error( ): """Test install fails due to docker error.""" coresys.security.force = True - with patch.object(HomeAssistantCore, "_start"), patch.object( + with patch.object(HomeAssistantCore, "start"), patch.object( DockerHomeAssistant, "cleanup" ), patch.object( Updater, "image_homeassistant", new=PropertyMock(return_value="homeassistant") @@ -119,7 +119,7 @@ async def test_install_other_error( """Test install fails due to other error.""" coresys.docker.images.pull.side_effect = [(err := OSError()), MagicMock()] - with patch.object(HomeAssistantCore, "_start"), patch.object( + with patch.object(HomeAssistantCore, "start"), patch.object( DockerHomeAssistant, "cleanup" ), patch.object( Updater, "image_homeassistant", new=PropertyMock(return_value="homeassistant") diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index 22903050e..57bf09f97 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -18,8 +18,10 @@ from supervisor.exceptions import ( ) from supervisor.host.const import HostFeature from supervisor.host.manager import HostManager +from supervisor.jobs import SupervisorJob from supervisor.jobs.const import JobExecutionLimit from supervisor.jobs.decorator import Job, JobCondition +from supervisor.jobs.job_group import JobGroup from supervisor.plugins.audio import PluginAudio from supervisor.resolution.const import UnhealthyReason from supervisor.utils.dt import utcnow @@ -552,3 +554,171 @@ async def test_host_network(coresys: CoreSys): coresys.jobs.ignore_conditions = [JobCondition.HOST_NETWORK] assert await test.execute() + + +async def test_job_group_once(coresys: CoreSys, loop: asyncio.BaseEventLoop): + """Test job group once execution limitation.""" + + class TestClass(JobGroup): + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + super().__init__(coresys, "TestClass") + self.event = asyncio.Event() + + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException) + async def inner_execute(self) -> bool: + """Inner class method called by execute, group level lock allows this.""" + await self.event.wait() + return True + + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException) + async def execute(self) -> bool: + """Execute the class method.""" + return await self.inner_execute() + + @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException) + async def separate_execute(self) -> bool: + """Alternate execute method that shares group lock.""" + return True + + @Job(limit=JobExecutionLimit.ONCE, on_condition=JobException) + async def unrelated_method(self) -> bool: + """Unrelated method, sparate job with separate lock.""" + return True + + test = TestClass(coresys) + run_task = loop.create_task(test.execute()) + await asyncio.sleep(0) + + # All methods with group limits should be locked + with pytest.raises(JobException): + await test.execute() + + with pytest.raises(JobException): + await test.inner_execute() + + with pytest.raises(JobException): + await test.separate_execute() + + # The once method is still callable + assert await test.unrelated_method() + + test.event.set() + assert await run_task + + +async def test_job_group_wait(coresys: CoreSys, loop: asyncio.BaseEventLoop): + """Test job group wait execution limitation.""" + + class TestClass(JobGroup): + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + super().__init__(coresys, "TestClass") + self.execute_count = 0 + self.other_count = 0 + self.event = asyncio.Event() + + @Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException) + async def inner_execute(self) -> None: + """Inner class method called by execute, group level lock allows this.""" + self.execute_count += 1 + await self.event.wait() + + @Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException) + async def execute(self) -> None: + """Execute the class method.""" + await self.inner_execute() + + @Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException) + async def separate_execute(self) -> None: + """Alternate execute method that shares group lock.""" + self.other_count += 1 + + test = TestClass(coresys) + run_task = loop.create_task(test.execute()) + await asyncio.sleep(0) + + repeat_task = loop.create_task(test.execute()) + other_task = loop.create_task(test.separate_execute()) + await asyncio.sleep(0) + + assert test.execute_count == 1 + assert test.other_count == 0 + + test.event.set() + await run_task + await repeat_task + await other_task + + assert test.execute_count == 2 + assert test.other_count == 1 + + +async def test_job_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop): + """Test job is cleaned up.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + self.coresys = coresys + self.event = asyncio.Event() + self.job: SupervisorJob | None = None + + @Job(limit=JobExecutionLimit.ONCE) + async def execute(self): + """Execute the class method.""" + self.job = coresys.jobs.get_job() + await self.event.wait() + return True + + test = TestClass(coresys) + run_task = loop.create_task(test.execute()) + await asyncio.sleep(0) + + assert coresys.jobs.jobs == [test.job] + assert not test.job.done + + test.event.set() + assert await run_task + + assert coresys.jobs.jobs == [] + assert test.job.done + + +async def test_job_skip_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop): + """Test job is left in job manager when cleanup is false.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + self.coresys = coresys + self.event = asyncio.Event() + self.job: SupervisorJob | None = None + + @Job(limit=JobExecutionLimit.ONCE, cleanup=False) + async def execute(self): + """Execute the class method.""" + self.job = coresys.jobs.get_job() + await self.event.wait() + return True + + test = TestClass(coresys) + run_task = loop.create_task(test.execute()) + await asyncio.sleep(0) + + assert coresys.jobs.jobs == [test.job] + assert not test.job.done + + test.event.set() + assert await run_task + + assert coresys.jobs.jobs == [test.job] + assert test.job.done diff --git a/tests/jobs/test_job_manager.py b/tests/jobs/test_job_manager.py index 0a0e3c77c..7fc7164bd 100644 --- a/tests/jobs/test_job_manager.py +++ b/tests/jobs/test_job_manager.py @@ -1,39 +1,76 @@ """Test the condition decorators.""" + +import pytest + # pylint: disable=protected-access,import-error from supervisor.coresys import CoreSys +from supervisor.exceptions import JobStartException TEST_JOB = "test" async def test_add_job(coresys: CoreSys): """Test adding jobs.""" - job = coresys.jobs.get_job(TEST_JOB) + job = coresys.jobs.new_job(TEST_JOB) - assert job.name in coresys.jobs.jobs + assert job in coresys.jobs.jobs -async def test_remove_job_directly(coresys: CoreSys): +async def test_remove_job_directly(coresys: CoreSys, caplog: pytest.LogCaptureFixture): """Test removing jobs from manager.""" - job = coresys.jobs.get_job(TEST_JOB) + job = coresys.jobs.new_job(TEST_JOB) + + assert job in coresys.jobs.jobs - assert job.name in coresys.jobs.jobs coresys.jobs.remove_job(job) - assert job.name not in coresys.jobs.jobs + assert job not in coresys.jobs.jobs + assert f"Removing incomplete job {job.name}" in caplog.text -async def test_remove_job_with_progress(coresys: CoreSys): - """Test removing jobs by setting progress to 100.""" - job = coresys.jobs.get_job(TEST_JOB) +async def test_job_done(coresys: CoreSys): + """Test done set correctly with jobs.""" + job = coresys.jobs.new_job(TEST_JOB) + assert not job.done + assert coresys.jobs.get_job() != job - assert job.name in coresys.jobs.jobs - job.update(progress=100) - assert job.name not in coresys.jobs.jobs + with job.start(): + assert coresys.jobs.get_job() == job + assert not job.done + + assert coresys.jobs.get_job() != job + assert job.done + + with pytest.raises(JobStartException): + with job.start(): + pass + + +async def test_job_start_bad_parent(coresys: CoreSys): + """Test job cannot be started outside of parent.""" + job = coresys.jobs.new_job(TEST_JOB) + job2 = coresys.jobs.new_job(f"{TEST_JOB}_2") + + with job.start(): + with pytest.raises(JobStartException): + with job2.start(): + pass + + with job2.start(): + assert coresys.jobs.get_job() == job2 async def test_update_job(coresys: CoreSys): """Test updating jobs.""" - job = coresys.jobs.get_job(TEST_JOB) + job = coresys.jobs.new_job(TEST_JOB) - job.update(progress=50, stage="stage") + job.progress = 50 assert job.progress == 50 + + job.stage = "stage" assert job.stage == "stage" + + with pytest.raises(ValueError): + job.progress = 110 + + with pytest.raises(ValueError): + job.progress = -10 diff --git a/tests/plugins/test_audio.py b/tests/plugins/test_audio.py index 21a41ab2d..29150feed 100644 --- a/tests/plugins/test_audio.py +++ b/tests/plugins/test_audio.py @@ -6,8 +6,16 @@ import pytest from supervisor.const import LogLevel from supervisor.coresys import CoreSys +from supervisor.docker.audio import DockerAudio -from tests.plugins.test_dns import fixture_docker_interface # noqa: F401 + +@pytest.fixture(name="docker_interface") +async def fixture_docker_interface() -> tuple[AsyncMock, AsyncMock]: + """Mock docker interface methods.""" + with patch.object(DockerAudio, "run") as run, patch.object( + DockerAudio, "restart" + ) as restart: + yield (run, restart) @pytest.fixture(name="write_json") diff --git a/tests/plugins/test_dns.py b/tests/plugins/test_dns.py index b408c747c..6d4fcbf59 100644 --- a/tests/plugins/test_dns.py +++ b/tests/plugins/test_dns.py @@ -9,7 +9,7 @@ import pytest from supervisor.const import BusEvent, LogLevel from supervisor.coresys import CoreSys from supervisor.docker.const import ContainerState -from supervisor.docker.interface import DockerInterface +from supervisor.docker.dns import DockerDNS from supervisor.docker.monitor import DockerContainerStateEvent from supervisor.plugins.dns import HostEntry from supervisor.resolution.const import ContextType, IssueType, SuggestionType @@ -19,8 +19,8 @@ from supervisor.resolution.data import Issue, Suggestion @pytest.fixture(name="docker_interface") async def fixture_docker_interface() -> tuple[AsyncMock, AsyncMock]: """Mock docker interface methods.""" - with patch.object(DockerInterface, "run") as run, patch.object( - DockerInterface, "restart" + with patch.object(DockerDNS, "run") as run, patch.object( + DockerDNS, "restart" ) as restart: yield (run, restart)