From 93ba8a35749f73d9f4ca60a4cffa6f36a96bbe19 Mon Sep 17 00:00:00 2001 From: Mike Degatano Date: Mon, 21 Aug 2023 03:15:37 -0400 Subject: [PATCH] Add job names and references everywhere (#4495) * Add job names and references everywhere * Remove group names check and switch to const * Ensure unique job names in decorator tests --- supervisor/addons/__init__.py | 21 +- supervisor/addons/addon.py | 94 +++--- supervisor/addons/model.py | 11 +- supervisor/backups/manager.py | 28 +- supervisor/docker/addon.py | 48 +++- supervisor/docker/audio.py | 6 +- supervisor/docker/cli.py | 6 +- supervisor/docker/dns.py | 6 +- supervisor/docker/homeassistant.py | 12 +- supervisor/docker/interface.py | 83 +++++- supervisor/docker/multicast.py | 6 +- supervisor/docker/observer.py | 6 +- supervisor/docker/supervisor.py | 6 +- supervisor/homeassistant/api.py | 5 +- supervisor/homeassistant/core.py | 45 ++- supervisor/homeassistant/module.py | 2 +- supervisor/homeassistant/secrets.py | 6 +- supervisor/host/network.py | 2 +- supervisor/host/sound.py | 6 +- supervisor/jobs/__init__.py | 7 +- supervisor/jobs/const.py | 7 + supervisor/jobs/decorator.py | 174 ++++++++--- supervisor/jobs/job_group.py | 10 +- supervisor/misc/tasks.py | 20 +- supervisor/mounts/manager.py | 32 ++- supervisor/os/data_disk.py | 3 +- supervisor/os/manager.py | 4 +- supervisor/plugins/audio.py | 2 + supervisor/plugins/cli.py | 2 + supervisor/plugins/dns.py | 2 + supervisor/plugins/multicast.py | 2 + supervisor/plugins/observer.py | 2 + supervisor/resolution/checks/addon_pwned.py | 3 +- supervisor/resolution/checks/dns_server.py | 3 +- .../resolution/checks/dns_server_ipv6.py | 5 +- supervisor/resolution/fixup.py | 5 +- .../resolution/fixups/store_execute_reload.py | 1 + .../resolution/fixups/store_execute_reset.py | 1 + .../fixups/system_execute_integrity.py | 1 + supervisor/security/module.py | 1 + supervisor/store/__init__.py | 7 +- supervisor/store/git.py | 2 + supervisor/supervisor.py | 7 +- supervisor/updater.py | 1 + tests/jobs/test_job_decorator.py | 272 ++++++++++++++++-- 45 files changed, 779 insertions(+), 196 deletions(-) diff --git a/supervisor/addons/__init__.py b/supervisor/addons/__init__.py index efb6d670d..310d520a6 100644 --- a/supervisor/addons/__init__.py +++ b/supervisor/addons/__init__.py @@ -152,11 +152,15 @@ class AddonManager(CoreSysAttributes): capture_exception(err) @Job( + name="addon_manager_install", conditions=ADDON_UPDATE_CONDITIONS, on_condition=AddonsJobError, ) async def install(self, slug: str) -> None: """Install an add-on.""" + if job := self.sys_jobs.get_job(): + job.reference = slug + if slug in self.local: raise AddonsError(f"Add-on {slug} is already installed", _LOGGER.warning) store = self.store.get(slug) @@ -247,6 +251,7 @@ class AddonManager(CoreSysAttributes): _LOGGER.info("Add-on '%s' successfully removed", slug) @Job( + name="addon_manager_update", conditions=ADDON_UPDATE_CONDITIONS, on_condition=AddonsJobError, ) @@ -258,6 +263,9 @@ class AddonManager(CoreSysAttributes): Returns a coroutine that completes when addon has state 'started' (see addon.start) if addon is started after update. Else nothing is returned. """ + if job := self.sys_jobs.get_job(): + job.reference = slug + if slug not in self.local: raise AddonsError(f"Add-on {slug} is not installed", _LOGGER.error) addon = self.local[slug] @@ -307,6 +315,7 @@ class AddonManager(CoreSysAttributes): ) @Job( + name="addon_manager_rebuild", conditions=[ JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST, @@ -320,6 +329,9 @@ class AddonManager(CoreSysAttributes): Returns a coroutine that completes when addon has state 'started' (see addon.start) if addon is started after rebuild. Else nothing is returned. """ + if job := self.sys_jobs.get_job(): + job.reference = slug + if slug not in self.local: raise AddonsError(f"Add-on {slug} is not installed", _LOGGER.error) addon = self.local[slug] @@ -359,6 +371,7 @@ class AddonManager(CoreSysAttributes): ) @Job( + name="addon_manager_restore", conditions=[ JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST, @@ -374,6 +387,9 @@ class AddonManager(CoreSysAttributes): Returns a coroutine that completes when addon has state 'started' (see addon.start) if addon is started after restore. Else nothing is returned. """ + if job := self.sys_jobs.get_job(): + job.reference = slug + if slug not in self.local: _LOGGER.debug("Add-on %s is not local available for restore", slug) addon = Addon(self.coresys, slug) @@ -396,7 +412,10 @@ class AddonManager(CoreSysAttributes): return wait_for_start - @Job(conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST]) + @Job( + name="addon_manager_repair", + conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST], + ) async def repair(self) -> None: """Repair local add-ons.""" needs_repair: list[Addon] = [] diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index 8b8ce6ac2..2d722e633 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -131,54 +131,6 @@ class Addon(AddonModel): self._startup_event = asyncio.Event() self._startup_task: asyncio.Task | None = None - @Job( - name=f"addon_{slug}_restart_after_problem", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, - throttle_period=WATCHDOG_THROTTLE_PERIOD, - throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, - on_condition=AddonsJobError, - ) - async def restart_after_problem(addon: Addon, state: ContainerState): - """Restart unhealthy or failed addon.""" - attempts = 0 - while await addon.instance.current_state() == state: - if not addon.in_progress: - _LOGGER.warning( - "Watchdog found addon %s is %s, restarting...", - addon.name, - state.value, - ) - try: - if state == ContainerState.FAILED: - # Ensure failed container is removed before attempting reanimation - if attempts == 0: - with suppress(DockerError): - await addon.instance.stop(remove_container=True) - - await (await addon.start()) - else: - await (await addon.restart()) - except AddonsError as err: - attempts = attempts + 1 - _LOGGER.error( - "Watchdog restart of addon %s failed!", addon.name - ) - capture_exception(err) - else: - break - - if attempts >= WATCHDOG_MAX_ATTEMPTS: - _LOGGER.critical( - "Watchdog cannot restart addon %s, failed all %s attempts", - addon.name, - attempts, - ) - break - - await asyncio.sleep(WATCHDOG_RETRY_SECONDS) - - self._restart_after_problem = restart_after_problem - def __repr__(self) -> str: """Return internal representation.""" return f"" @@ -1033,6 +985,50 @@ class Addon(AddonModel): """ return self.instance.check_trust() + @Job( + name="addon_restart_after_problem", + limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + on_condition=AddonsJobError, + ) + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed addon.""" + attempts = 0 + while await self.instance.current_state() == state: + if not self.in_progress: + _LOGGER.warning( + "Watchdog found addon %s is %s, restarting...", + self.name, + state.value, + ) + try: + if state == ContainerState.FAILED: + # Ensure failed container is removed before attempting reanimation + if attempts == 0: + with suppress(DockerError): + await self.instance.stop(remove_container=True) + + await (await self.start()) + else: + await (await self.restart()) + except AddonsError as err: + attempts = attempts + 1 + _LOGGER.error("Watchdog restart of addon %s failed!", self.name) + capture_exception(err) + else: + break + + if attempts >= WATCHDOG_MAX_ATTEMPTS: + _LOGGER.critical( + "Watchdog cannot restart addon %s, failed all %s attempts", + self.name, + attempts, + ) + break + + await asyncio.sleep(WATCHDOG_RETRY_SECONDS) + async def container_state_changed(self, event: DockerContainerStateEvent) -> None: """Set addon state from container state.""" if event.name != self.instance.name: @@ -1067,4 +1063,4 @@ class Addon(AddonModel): ContainerState.STOPPED, ContainerState.UNHEALTHY, ]: - await self._restart_after_problem(self, event.state) + await self._restart_after_problem(event.state) diff --git a/supervisor/addons/model.py b/supervisor/addons/model.py index 2f65600a4..a0f347c8f 100644 --- a/supervisor/addons/model.py +++ b/supervisor/addons/model.py @@ -1,5 +1,6 @@ """Init file for Supervisor add-ons.""" from abc import ABC, abstractmethod +from collections import defaultdict from collections.abc import Awaitable, Callable from contextlib import suppress import logging @@ -79,9 +80,11 @@ from ..const import ( AddonStage, AddonStartup, ) -from ..coresys import CoreSys, CoreSysAttributes +from ..coresys import CoreSys from ..docker.const import Capabilities from ..exceptions import AddonsNotSupportedError +from ..jobs.const import JOB_GROUP_ADDON +from ..jobs.job_group import JobGroup from .const import ATTR_BACKUP, ATTR_CODENOTARY, AddonBackupMode from .options import AddonOptions, UiOptions from .validate import RE_SERVICE, RE_VOLUME @@ -91,12 +94,14 @@ _LOGGER: logging.Logger = logging.getLogger(__name__) Data = dict[str, Any] -class AddonModel(CoreSysAttributes, ABC): +class AddonModel(JobGroup, ABC): """Add-on Data layout.""" def __init__(self, coresys: CoreSys, slug: str): """Initialize data holder.""" - self.coresys: CoreSys = coresys + super().__init__( + coresys, JOB_GROUP_ADDON.format_map(defaultdict(str, slug=slug)), slug + ) self.slug: str = slug @property diff --git a/supervisor/backups/manager.py b/supervisor/backups/manager.py index 0f810e185..6f303f14e 100644 --- a/supervisor/backups/manager.py +++ b/supervisor/backups/manager.py @@ -110,6 +110,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes): backup.store_repositories() backup.store_dockerconfig() + # Add backup ID to job + if job := self.sys_jobs.get_job(): + job.reference = backup.slug + return backup def load(self): @@ -224,7 +228,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes): finally: self.sys_core.state = CoreState.RUNNING - @Job(conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING]) + @Job( + name="backup_manager_full_backup", + conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING], + ) async def do_backup_full( self, name="", @@ -250,7 +257,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes): _LOGGER.info("Creating full backup with slug %s completed", backup.slug) return backup - @Job(conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING]) + @Job( + name="backup_manager_partial_backup", + conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING], + ) async def do_backup_partial( self, name: str = "", @@ -371,16 +381,21 @@ class BackupManager(FileConfiguration, CoreSysAttributes): await self.sys_homeassistant.core.restart() @Job( + name="backup_manager_full_restore", conditions=[ JobCondition.FREE_SPACE, JobCondition.HEALTHY, JobCondition.INTERNET_HOST, JobCondition.INTERNET_SYSTEM, JobCondition.RUNNING, - ] + ], ) async def do_restore_full(self, backup: Backup, password=None): """Restore a backup.""" + # Add backup ID to job + if job := self.sys_jobs.get_job(): + job.reference = backup.slug + if self.lock.locked(): _LOGGER.error("A backup/restore process is already running") return False @@ -418,13 +433,14 @@ class BackupManager(FileConfiguration, CoreSysAttributes): _LOGGER.info("Full-Restore %s done", backup.slug) @Job( + name="backup_manager_partial_restore", conditions=[ JobCondition.FREE_SPACE, JobCondition.HEALTHY, JobCondition.INTERNET_HOST, JobCondition.INTERNET_SYSTEM, JobCondition.RUNNING, - ] + ], ) async def do_restore_partial( self, @@ -435,6 +451,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes): password: str | None = None, ): """Restore a backup.""" + # Add backup ID to job + if job := self.sys_jobs.get_job(): + job.reference = backup.slug + if self.lock.locked(): _LOGGER.error("A backup/restore process is already running") return False diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index 2c7ebdfe4..41bfbdb10 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -494,7 +494,11 @@ class DockerAddon(DockerInterface): return mounts - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_addon_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" if await self.is_running(): @@ -565,7 +569,11 @@ class DockerAddon(DockerInterface): BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events ) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_addon_update", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def update( self, version: AwesomeVersion, image: str | None = None, latest: bool = False ) -> None: @@ -585,7 +593,11 @@ class DockerAddon(DockerInterface): with suppress(DockerError): await self.stop() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_addon_install", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def install( self, version: AwesomeVersion, @@ -636,14 +648,22 @@ class DockerAddon(DockerInterface): _LOGGER.info("Build %s:%s done", self.image, version) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_addon_export_image", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) def export_image(self, tar_file: Path) -> Awaitable[None]: """Export current images into a tar file.""" return self.sys_run_in_executor( self.sys_docker.export_image, self.image, self.version, tar_file ) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_addon_import_image", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def import_image(self, tar_file: Path) -> None: """Import a tar file as image.""" docker_image = await self.sys_run_in_executor( @@ -656,7 +676,11 @@ class DockerAddon(DockerInterface): with suppress(DockerError): await self.cleanup() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_addon_write_stdin", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def write_stdin(self, data: bytes) -> None: """Write to add-on stdin.""" if not await self.is_running(): @@ -686,7 +710,11 @@ class DockerAddon(DockerInterface): _LOGGER.error("Can't write to %s stdin: %s", self.name, err) raise DockerError() from err - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_addon_stop", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def stop(self, remove_container: bool = True) -> None: """Stop/remove Docker container.""" # DNS @@ -714,7 +742,11 @@ class DockerAddon(DockerInterface): checksum = image_id.partition(":")[2] return await self.sys_security.verify_content(self.addon.codenotary, checksum) - @Job(conditions=[JobCondition.OS_AGENT], limit=JobExecutionLimit.SINGLE_WAIT) + @Job( + name="docker_addon_hardware_events", + conditions=[JobCondition.OS_AGENT], + limit=JobExecutionLimit.SINGLE_WAIT, + ) async def _hardware_events(self, device: Device) -> None: """Process Hardware events for adjust device access.""" if not any( diff --git a/supervisor/docker/audio.py b/supervisor/docker/audio.py index d828bec67..f9c58ed17 100644 --- a/supervisor/docker/audio.py +++ b/supervisor/docker/audio.py @@ -85,7 +85,11 @@ class DockerAudio(DockerInterface, CoreSysAttributes): return None return DOCKER_CPU_RUNTIME_ALLOCATION - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_audio_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" if await self.is_running(): diff --git a/supervisor/docker/cli.py b/supervisor/docker/cli.py index e83b46763..7f0fe9a7a 100644 --- a/supervisor/docker/cli.py +++ b/supervisor/docker/cli.py @@ -26,7 +26,11 @@ class DockerCli(DockerInterface, CoreSysAttributes): """Return name of Docker container.""" return CLI_DOCKER_NAME - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_cli_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" if await self.is_running(): diff --git a/supervisor/docker/dns.py b/supervisor/docker/dns.py index bdfe98d50..e4e620e8a 100644 --- a/supervisor/docker/dns.py +++ b/supervisor/docker/dns.py @@ -28,7 +28,11 @@ class DockerDNS(DockerInterface, CoreSysAttributes): """Return name of Docker container.""" return DNS_DOCKER_NAME - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_dns_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" if await self.is_running(): diff --git a/supervisor/docker/homeassistant.py b/supervisor/docker/homeassistant.py index 7369f3a3b..8f59e3a7d 100644 --- a/supervisor/docker/homeassistant.py +++ b/supervisor/docker/homeassistant.py @@ -133,7 +133,11 @@ class DockerHomeAssistant(DockerInterface): return mounts - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_home_assistant_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" if await self.is_running(): @@ -176,7 +180,11 @@ class DockerHomeAssistant(DockerInterface): "Starting Home Assistant %s with version %s", self.image, self.version ) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_home_assistant_execute_command", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def execute_command(self, command: str) -> CommandReturn: """Create a temporary container and run command.""" return await self.sys_run_in_executor( diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index 98cb38664..20fd8c298 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio +from collections import defaultdict from collections.abc import Awaitable from contextlib import suppress import logging @@ -36,7 +37,7 @@ from ..exceptions import ( DockerRequestError, DockerTrustError, ) -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobExecutionLimit from ..jobs.decorator import Job from ..jobs.job_group import JobGroup from ..resolution.const import ContextType, IssueType, SuggestionType @@ -82,7 +83,13 @@ class DockerInterface(JobGroup): def __init__(self, coresys: CoreSys): """Initialize Docker base wrapper.""" - super().__init__(coresys, f"container_{self.name or uuid4().hex}") + super().__init__( + coresys, + JOB_GROUP_DOCKER_INTERFACE.format_map( + defaultdict(str, name=self.name or uuid4().hex) + ), + self.name, + ) self.coresys: CoreSys = coresys self._meta: dict[str, Any] | None = None self.lock: asyncio.Lock = asyncio.Lock() @@ -209,7 +216,11 @@ class DockerInterface(JobGroup): await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_install", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def install( self, version: AwesomeVersion, @@ -323,7 +334,11 @@ class DockerInterface(JobGroup): return _container_state_from_model(docker_container) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_attach", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def attach( self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False ) -> None: @@ -359,12 +374,20 @@ class DockerInterface(JobGroup): raise DockerError() _LOGGER.info("Attaching to %s with version %s", self.image, self.version) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" raise NotImplementedError() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_stop", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def stop(self, remove_container: bool = True) -> None: """Stop/remove Docker container.""" with suppress(DockerNotFound): @@ -375,12 +398,20 @@ class DockerInterface(JobGroup): remove_container, ) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_start", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) def start(self) -> Awaitable[None]: """Start Docker container.""" return self.sys_run_in_executor(self.sys_docker.start_container, self.name) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_remove", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def remove(self) -> None: """Remove Docker images.""" # Cleanup container @@ -392,7 +423,11 @@ class DockerInterface(JobGroup): ) self._meta = None - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_update", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def update( self, version: AwesomeVersion, image: str | None = None, latest: bool = False ) -> None: @@ -419,7 +454,11 @@ class DockerInterface(JobGroup): return b"" - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_cleanup", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) def cleanup(self, old_image: str | None = None) -> Awaitable[None]: """Check if old version exists and cleanup.""" return self.sys_run_in_executor( @@ -429,14 +468,22 @@ class DockerInterface(JobGroup): {old_image} if old_image else None, ) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_restart", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) def restart(self) -> Awaitable[None]: """Restart docker container.""" return self.sys_run_in_executor( self.sys_docker.restart_container, self.name, self.timeout ) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_execute_command", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def execute_command(self, command: str) -> CommandReturn: """Create a temporary container and run command.""" raise NotImplementedError() @@ -497,7 +544,11 @@ class DockerInterface(JobGroup): available_version.sort(reverse=True) return available_version[0] - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_run_inside", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) def run_inside(self, command: str) -> Awaitable[CommandReturn]: """Execute a command inside Docker container.""" return self.sys_run_in_executor( @@ -511,7 +562,11 @@ class DockerInterface(JobGroup): checksum = image_id.partition(":")[2] return await self.sys_security.verify_own_content(checksum) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_interface_check_trust", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def check_trust(self) -> None: """Check trust of exists Docker image.""" try: diff --git a/supervisor/docker/multicast.py b/supervisor/docker/multicast.py index f6dfab974..d24686c12 100644 --- a/supervisor/docker/multicast.py +++ b/supervisor/docker/multicast.py @@ -31,7 +31,11 @@ class DockerMulticast(DockerInterface, CoreSysAttributes): """Generate needed capabilities.""" return [Capabilities.NET_ADMIN.value] - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_multicast_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" if await self.is_running(): diff --git a/supervisor/docker/observer.py b/supervisor/docker/observer.py index a72d2492c..a1d5c66c6 100644 --- a/supervisor/docker/observer.py +++ b/supervisor/docker/observer.py @@ -28,7 +28,11 @@ class DockerObserver(DockerInterface, CoreSysAttributes): """Return name of Docker container.""" return OBSERVER_DOCKER_NAME - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_observer_run", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def run(self) -> None: """Run Docker image.""" if await self.is_running(): diff --git a/supervisor/docker/supervisor.py b/supervisor/docker/supervisor.py index fbbdae8c2..c7f346c78 100644 --- a/supervisor/docker/supervisor.py +++ b/supervisor/docker/supervisor.py @@ -45,7 +45,11 @@ class DockerSupervisor(DockerInterface, CoreSysAttributes): if mount.get("Destination") == "/data" ) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError) + @Job( + name="docker_supervisor_attach", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=DockerJobError, + ) async def attach( self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False ) -> None: diff --git a/supervisor/homeassistant/api.py b/supervisor/homeassistant/api.py index ed3a29ce7..e0f0a23e5 100644 --- a/supervisor/homeassistant/api.py +++ b/supervisor/homeassistant/api.py @@ -32,7 +32,10 @@ class HomeAssistantAPI(CoreSysAttributes): self.access_token: str | None = None self._access_token_expires: datetime | None = None - @Job(limit=JobExecutionLimit.SINGLE_WAIT) + @Job( + name="home_assistant_api_ensure_access_token", + limit=JobExecutionLimit.SINGLE_WAIT, + ) async def ensure_access_token(self) -> None: """Ensure there is an access token.""" if ( diff --git a/supervisor/homeassistant/core.py b/supervisor/homeassistant/core.py index 9c514c290..55e799ac9 100644 --- a/supervisor/homeassistant/core.py +++ b/supervisor/homeassistant/core.py @@ -24,7 +24,7 @@ from ..exceptions import ( HomeAssistantUpdateError, JobException, ) -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JOB_GROUP_HOME_ASSISTANT_CORE, JobExecutionLimit from ..jobs.decorator import Job, JobCondition from ..jobs.job_group import JobGroup from ..resolution.const import ContextType, IssueType @@ -56,7 +56,7 @@ class HomeAssistantCore(JobGroup): def __init__(self, coresys: CoreSys): """Initialize Home Assistant object.""" - super().__init__(coresys, "home_assistant_core") + super().__init__(coresys, JOB_GROUP_HOME_ASSISTANT_CORE) self.instance: DockerHomeAssistant = DockerHomeAssistant(coresys) self.lock: asyncio.Lock = asyncio.Lock() self._error_state: bool = False @@ -99,7 +99,11 @@ class HomeAssistantCore(JobGroup): with suppress(HomeAssistantError): await self.start() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) + @Job( + name="home_assistant_core_install_landing_page", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=HomeAssistantJobError, + ) async def install_landingpage(self) -> None: """Install a landing page.""" # Try to use a preinstalled landingpage @@ -141,7 +145,11 @@ class HomeAssistantCore(JobGroup): self.sys_homeassistant.image = self.sys_updater.image_homeassistant self.sys_homeassistant.save_data() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) + @Job( + name="home_assistant_core_install", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=HomeAssistantJobError, + ) async def install(self) -> None: """Install a landing page.""" _LOGGER.info("Home Assistant setup") @@ -182,6 +190,7 @@ class HomeAssistantCore(JobGroup): await self.instance.cleanup() @Job( + name="home_assistant_core_update", conditions=[ JobCondition.FREE_SPACE, JobCondition.HEALTHY, @@ -283,7 +292,11 @@ class HomeAssistantCore(JobGroup): self.sys_resolution.create_issue(IssueType.UPDATE_FAILED, ContextType.CORE) raise HomeAssistantUpdateError() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) + @Job( + name="home_assistant_core_start", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=HomeAssistantJobError, + ) async def start(self) -> None: """Run Home Assistant docker.""" if await self.instance.is_running(): @@ -314,7 +327,11 @@ class HomeAssistantCore(JobGroup): await self._block_till_run(self.sys_homeassistant.version) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) + @Job( + name="home_assistant_core_stop", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=HomeAssistantJobError, + ) async def stop(self) -> None: """Stop Home Assistant Docker.""" try: @@ -322,7 +339,11 @@ class HomeAssistantCore(JobGroup): except DockerError as err: raise HomeAssistantError() from err - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) + @Job( + name="home_assistant_core_restart", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=HomeAssistantJobError, + ) async def restart(self) -> None: """Restart Home Assistant Docker.""" try: @@ -332,7 +353,11 @@ class HomeAssistantCore(JobGroup): await self._block_till_run(self.sys_homeassistant.version) - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError) + @Job( + name="home_assistant_core_rebuild", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=HomeAssistantJobError, + ) async def rebuild(self) -> None: """Rebuild Home Assistant Docker container.""" with suppress(DockerError): @@ -429,10 +454,11 @@ class HomeAssistantCore(JobGroup): raise HomeAssistantCrashError() @Job( + name="home_assistant_core_repair", conditions=[ JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST, - ] + ], ) async def repair(self): """Repair local Home Assistant data.""" @@ -454,6 +480,7 @@ class HomeAssistantCore(JobGroup): await self._restart_after_problem(event.state) @Job( + name="home_assistant_core_restart_after_problem", limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, diff --git a/supervisor/homeassistant/module.py b/supervisor/homeassistant/module.py index f85321f0d..a12d7c1d3 100644 --- a/supervisor/homeassistant/module.py +++ b/supervisor/homeassistant/module.py @@ -304,7 +304,7 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes): self.sys_homeassistant.websocket.send_message({ATTR_TYPE: "usb/scan"}) - @Job() + @Job(name="home_assistant_module_backup") async def backup(self, tar_file: tarfile.TarFile) -> None: """Backup Home Assistant Core config/ directory.""" diff --git a/supervisor/homeassistant/secrets.py b/supervisor/homeassistant/secrets.py index 250840df9..4d0d901bb 100644 --- a/supervisor/homeassistant/secrets.py +++ b/supervisor/homeassistant/secrets.py @@ -40,7 +40,11 @@ class HomeAssistantSecrets(CoreSysAttributes): """Reload secrets.""" await self._read_secrets() - @Job(limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(seconds=60)) + @Job( + name="home_assistant_secrets_read", + limit=JobExecutionLimit.THROTTLE_WAIT, + throttle_period=timedelta(seconds=60), + ) async def _read_secrets(self): """Read secrets.yaml into memory.""" if not self.path_secrets.exists(): diff --git a/supervisor/host/network.py b/supervisor/host/network.py index 08f223651..f8ec73b41 100644 --- a/supervisor/host/network.py +++ b/supervisor/host/network.py @@ -107,7 +107,7 @@ class NetworkManager(CoreSysAttributes): return Interface.from_dbus_interface(self.sys_dbus.network.get(inet_name)) - @Job(conditions=[JobCondition.HOST_NETWORK]) + @Job(name="network_manager_load", conditions=[JobCondition.HOST_NETWORK]) async def load(self): """Load network information and reapply defaults over dbus.""" # Apply current settings on each interface so OS can update any out of date defaults diff --git a/supervisor/host/sound.py b/supervisor/host/sound.py index 07f512ed4..8413c349c 100644 --- a/supervisor/host/sound.py +++ b/supervisor/host/sound.py @@ -232,7 +232,11 @@ class SoundControl(CoreSysAttributes): await self.sys_run_in_executor(_activate_profile) await self.update() - @Job(limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(seconds=10)) + @Job( + name="sound_control_update", + limit=JobExecutionLimit.THROTTLE_WAIT, + throttle_period=timedelta(seconds=10), + ) async def update(self): """Update properties over dbus.""" _LOGGER.info("Updating PulseAudio information") diff --git a/supervisor/jobs/__init__.py b/supervisor/jobs/__init__.py index 08b8793e9..36445168f 100644 --- a/supervisor/jobs/__init__.py +++ b/supervisor/jobs/__init__.py @@ -29,6 +29,7 @@ class SupervisorJob: """Representation of a job running in supervisor.""" name: str = field(on_setattr=frozen) + reference: str | None = None progress: int = field(default=0, validator=[ge(0), le(100)]) stage: str | None = None uuid: UUID = field(init=False, factory=lambda: uuid4().hex, on_setattr=frozen) @@ -86,9 +87,11 @@ class JobManager(FileConfiguration, CoreSysAttributes): """Set a list of ignored condition.""" self._data[ATTR_IGNORE_CONDITIONS] = value - def new_job(self, name: str, initial_stage: str | None = None) -> SupervisorJob: + def new_job( + self, name: str, reference: str | None = None, initial_stage: str | None = None + ) -> SupervisorJob: """Create a new job.""" - job = SupervisorJob(name, stage=initial_stage) + job = SupervisorJob(name, reference=reference, stage=initial_stage) self._jobs[job.uuid] = job return job diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index 90b4549dd..5ef27ba95 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -8,6 +8,10 @@ FILE_CONFIG_JOBS = Path(SUPERVISOR_DATA, "jobs.json") ATTR_IGNORE_CONDITIONS = "ignore_conditions" +JOB_GROUP_ADDON = "addon_{slug}" +JOB_GROUP_DOCKER_INTERFACE = "container_{name}" +JOB_GROUP_HOME_ASSISTANT_CORE = "home_assistant_core" + class JobCondition(str, Enum): """Job condition enum.""" @@ -36,3 +40,6 @@ class JobExecutionLimit(str, Enum): THROTTLE_RATE_LIMIT = "throttle_rate_limit" GROUP_ONCE = "group_once" GROUP_WAIT = "group_wait" + GROUP_THROTTLE = "group_throttle" + GROUP_THROTTLE_WAIT = "group_throttle_wait" + GROUP_THROTTLE_RATE_LIMIT = "group_throttle_rate_limit" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index a77392cd1..a96d0799b 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -21,6 +21,7 @@ from .const import JobCondition, JobExecutionLimit from .job_group import JobGroup _LOGGER: logging.Logger = logging.getLogger(__package__) +_JOB_NAMES: set[str] = set() class Job(CoreSysAttributes): @@ -28,7 +29,7 @@ class Job(CoreSysAttributes): def __init__( self, - name: str | None = None, + name: str, conditions: list[JobCondition] | None = None, cleanup: bool = True, on_condition: JobException | None = None, @@ -39,6 +40,10 @@ class Job(CoreSysAttributes): throttle_max_calls: int | None = None, ): """Initialize the Job class.""" + if name in _JOB_NAMES: + raise RuntimeError(f"A job already exists with name {name}!") + + _JOB_NAMES.add(name) self.name = name self.conditions = conditions self.cleanup = cleanup @@ -48,12 +53,8 @@ class Job(CoreSysAttributes): self.throttle_max_calls = throttle_max_calls self._lock: asyncio.Semaphore | None = None self._method = None - self._last_call = datetime.min - self._rate_limited_calls: list[datetime] | None = None - self._job_group_limit = self.limit in ( - JobExecutionLimit.GROUP_ONCE, - JobExecutionLimit.GROUP_WAIT, - ) + self._last_call: dict[str | None, datetime] = {} + self._rate_limited_calls: dict[str, list[datetime]] | None = None # Validate Options if ( @@ -62,19 +63,70 @@ class Job(CoreSysAttributes): JobExecutionLimit.THROTTLE, JobExecutionLimit.THROTTLE_WAIT, JobExecutionLimit.THROTTLE_RATE_LIMIT, + JobExecutionLimit.GROUP_THROTTLE, + JobExecutionLimit.GROUP_THROTTLE_WAIT, + JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, ) and self._throttle_period is None ): - raise RuntimeError("Using Job without a Throttle period!") + raise RuntimeError( + f"Job {name} is using execution limit {limit.value} without a throttle period!" + ) - if self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT: + if self.limit in ( + JobExecutionLimit.THROTTLE_RATE_LIMIT, + JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + ): if self.throttle_max_calls is None: - raise RuntimeError("Using rate limit without throttle max calls!") + raise RuntimeError( + f"Job {name} is using execution limit {limit.value} without throttle max calls!" + ) - self._rate_limited_calls = [] + self._rate_limited_calls = {} - @property - def throttle_period(self) -> timedelta | None: + def last_call(self, group_name: str | None = None) -> datetime: + """Return last call datetime.""" + return self._last_call.get(group_name, datetime.min) + + def set_last_call(self, value: datetime, group_name: str | None = None) -> None: + """Set last call datetime.""" + self._last_call[group_name] = value + + def rate_limited_calls( + self, group_name: str | None = None + ) -> list[datetime] | None: + """Return rate limited calls if used.""" + if self._rate_limited_calls is None: + return None + + return self._rate_limited_calls.get(group_name, []) + + def add_rate_limited_call( + self, value: datetime, group_name: str | None = None + ) -> None: + """Add a rate limited call to list if used.""" + if self._rate_limited_calls is None: + raise RuntimeError( + f"Rate limited calls not available for limit type {self.limit}" + ) + + if group_name in self._rate_limited_calls: + self._rate_limited_calls[group_name].append(value) + else: + self._rate_limited_calls[group_name] = [value] + + def set_rate_limited_calls( + self, value: list[datetime], group_name: str | None = None + ) -> None: + """Set rate limited calls if used.""" + if self._rate_limited_calls is None: + raise RuntimeError( + f"Rate limited calls not available for limit type {self.limit}" + ) + + self._rate_limited_calls[group_name] = value + + def throttle_period(self, group_name: str | None = None) -> timedelta | None: """Return throttle period.""" if self._throttle_period is None: return None @@ -83,14 +135,13 @@ class Job(CoreSysAttributes): return self._throttle_period return self._throttle_period( - self.coresys, self._last_call, self._rate_limited_calls + self.coresys, + self.last_call(group_name), + self.rate_limited_calls(group_name), ) - def _post_init(self, obj: JobGroup | CoreSysAttributes) -> None: + def _post_init(self, obj: JobGroup | CoreSysAttributes) -> JobGroup | None: """Runtime init.""" - if self.name is None: - self.name = str(self._method.__qualname__).lower().replace(".", "_") - # Coresys try: self.coresys = obj.coresys @@ -99,8 +150,18 @@ class Job(CoreSysAttributes): if not self.coresys: raise RuntimeError(f"Job on {self.name} need to be an coresys object!") + # Setup lock for limits + if self._lock is None: + self._lock = asyncio.Semaphore() + # Job groups - if self._job_group_limit: + if self.limit in ( + JobExecutionLimit.GROUP_ONCE, + JobExecutionLimit.GROUP_WAIT, + JobExecutionLimit.GROUP_THROTTLE, + JobExecutionLimit.GROUP_THROTTLE_WAIT, + JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + ): try: _ = obj.acquire and obj.release except AttributeError: @@ -108,9 +169,8 @@ class Job(CoreSysAttributes): f"Job on {self.name} need to be a JobGroup to use group based limits!" ) from None - # Others - if self._lock is None: - self._lock = asyncio.Semaphore() + return obj + return None def __call__(self, method): """Call the wrapper logic.""" @@ -123,9 +183,11 @@ class Job(CoreSysAttributes): This method must be on an instance of CoreSysAttributes. If a JOB_GROUP limit is used, then it must be on an instance of JobGroup. """ - self._post_init(obj) - - job = self.sys_jobs.new_job(self.name) + job_group = self._post_init(obj) + group_name: str | None = job_group.group_name if job_group else None + job = self.sys_jobs.new_job( + self.name, job_group.job_reference if job_group else None + ) # Handle condition if self.conditions: @@ -141,46 +203,63 @@ class Job(CoreSysAttributes): # Handle exection limits if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE): await self._acquire_exection_limit() - elif self._job_group_limit: + elif self.limit in ( + JobExecutionLimit.GROUP_ONCE, + JobExecutionLimit.GROUP_WAIT, + ): try: await obj.acquire(job, self.limit == JobExecutionLimit.GROUP_WAIT) except JobGroupExecutionLimitExceeded as err: if self.on_condition: raise self.on_condition(str(err)) from err raise err - elif self.limit == JobExecutionLimit.THROTTLE: - time_since_last_call = datetime.now() - self._last_call - if time_since_last_call < self.throttle_period: + elif self.limit in ( + JobExecutionLimit.THROTTLE, + JobExecutionLimit.GROUP_THROTTLE, + ): + time_since_last_call = datetime.now() - self.last_call(group_name) + if time_since_last_call < self.throttle_period(group_name): return - elif self.limit == JobExecutionLimit.THROTTLE_WAIT: + elif self.limit in ( + JobExecutionLimit.THROTTLE_WAIT, + JobExecutionLimit.GROUP_THROTTLE_WAIT, + ): await self._acquire_exection_limit() - time_since_last_call = datetime.now() - self._last_call - if time_since_last_call < self.throttle_period: + time_since_last_call = datetime.now() - self.last_call(group_name) + if time_since_last_call < self.throttle_period(group_name): self._release_exception_limits() return - elif self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT: + elif self.limit in ( + JobExecutionLimit.THROTTLE_RATE_LIMIT, + JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + ): # Only reprocess array when necessary (at limit) - if len(self._rate_limited_calls) >= self.throttle_max_calls: - self._rate_limited_calls = [ - call - for call in self._rate_limited_calls - if call > datetime.now() - self.throttle_period - ] + if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls: + self.set_rate_limited_calls( + [ + call + for call in self.rate_limited_calls(group_name) + if call > datetime.now() - self.throttle_period(group_name) + ], + group_name, + ) - if len(self._rate_limited_calls) >= self.throttle_max_calls: + if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls: on_condition = ( JobException if self.on_condition is None else self.on_condition ) raise on_condition( - f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period}", + f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}", ) # Execute Job with job.start(on_done=self.sys_jobs.remove_job if self.cleanup else None): try: - self._last_call = datetime.now() - if self._rate_limited_calls is not None: - self._rate_limited_calls.append(self._last_call) + self.set_last_call(datetime.now(), group_name) + if self.rate_limited_calls(group_name) is not None: + self.add_rate_limited_call( + self.last_call(group_name), group_name + ) return await self._method(obj, *args, **kwargs) except HassioError as err: @@ -191,7 +270,10 @@ class Job(CoreSysAttributes): raise JobException() from err finally: self._release_exception_limits() - if self._job_group_limit: + if self.limit in ( + JobExecutionLimit.GROUP_ONCE, + JobExecutionLimit.GROUP_WAIT, + ): obj.release() return wrapper @@ -314,6 +396,7 @@ class Job(CoreSysAttributes): JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE, JobExecutionLimit.THROTTLE_WAIT, + JobExecutionLimit.GROUP_THROTTLE_WAIT, ): return @@ -331,6 +414,7 @@ class Job(CoreSysAttributes): JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE, JobExecutionLimit.THROTTLE_WAIT, + JobExecutionLimit.GROUP_THROTTLE_WAIT, ): return self._lock.release() diff --git a/supervisor/jobs/job_group.py b/supervisor/jobs/job_group.py index c50f719a7..4e698cebd 100644 --- a/supervisor/jobs/job_group.py +++ b/supervisor/jobs/job_group.py @@ -16,13 +16,16 @@ class JobGroup(CoreSysAttributes): higher-level task and should not need to relinquish the lock in between. """ - def __init__(self, coresys: CoreSys, group_name: str) -> None: + def __init__( + self, coresys: CoreSys, group_name: str, job_reference: str | None = None + ) -> None: """Initialize object.""" self.coresys: CoreSys = coresys self._group_name: str = group_name self._lock: Lock = Lock() self._active_job: SupervisorJob | None = None self._parent_jobs: list[SupervisorJob] = [] + self._job_reference: str | None = job_reference @property def active_job(self) -> SupervisorJob | None: @@ -43,6 +46,11 @@ class JobGroup(CoreSysAttributes): and self.active_job == task_job ) + @property + def job_reference(self) -> str | None: + """Return value to use as reference for all jobs created for this job group.""" + return self._job_reference + async def acquire(self, job: SupervisorJob, wait: bool = False) -> None: """Acquire the lock for the group for the specified job.""" # If there's another job running and we're not waiting, raise diff --git a/supervisor/misc/tasks.py b/supervisor/misc/tasks.py index ea148fa6a..89d60a400 100644 --- a/supervisor/misc/tasks.py +++ b/supervisor/misc/tasks.py @@ -83,7 +83,10 @@ class Tasks(CoreSysAttributes): _LOGGER.info("All core tasks are scheduled") - @Job(conditions=ADDON_UPDATE_CONDITIONS + [JobCondition.RUNNING]) + @Job( + name="tasks_update_addons", + conditions=ADDON_UPDATE_CONDITIONS + [JobCondition.RUNNING], + ) async def _update_addons(self): """Check if an update is available for an Add-on and update it.""" start_tasks: list[Awaitable[None]] = [] @@ -112,13 +115,14 @@ class Tasks(CoreSysAttributes): await asyncio.gather(*start_tasks) @Job( + name="tasks_update_supervisor", conditions=[ JobCondition.AUTO_UPDATE, JobCondition.FREE_SPACE, JobCondition.HEALTHY, JobCondition.INTERNET_HOST, JobCondition.RUNNING, - ] + ], ) async def _update_supervisor(self): """Check and run update of Supervisor Supervisor.""" @@ -172,7 +176,7 @@ class Tasks(CoreSysAttributes): finally: self._cache[HASS_WATCHDOG_API] = 0 - @Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) + @Job(name="tasks_update_cli", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) async def _update_cli(self): """Check and run update of cli.""" if not self.sys_plugins.cli.need_update: @@ -183,7 +187,7 @@ class Tasks(CoreSysAttributes): ) await self.sys_plugins.cli.update() - @Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) + @Job(name="tasks_update_dns", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) async def _update_dns(self): """Check and run update of CoreDNS plugin.""" if not self.sys_plugins.dns.need_update: @@ -195,7 +199,7 @@ class Tasks(CoreSysAttributes): ) await self.sys_plugins.dns.update() - @Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) + @Job(name="tasks_update_audio", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) async def _update_audio(self): """Check and run update of PulseAudio plugin.""" if not self.sys_plugins.audio.need_update: @@ -207,7 +211,7 @@ class Tasks(CoreSysAttributes): ) await self.sys_plugins.audio.update() - @Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) + @Job(name="tasks_update_observer", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) async def _update_observer(self): """Check and run update of Observer plugin.""" if not self.sys_plugins.observer.need_update: @@ -219,7 +223,7 @@ class Tasks(CoreSysAttributes): ) await self.sys_plugins.observer.update() - @Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) + @Job(name="tasks_update_multicast", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS) async def _update_multicast(self): """Check and run update of multicast.""" if not self.sys_plugins.multicast.need_update: @@ -292,7 +296,7 @@ class Tasks(CoreSysAttributes): # Adjust state addon.state = AddonState.STOPPED - @Job(conditions=[JobCondition.SUPERVISOR_UPDATED]) + @Job(name="tasks_reload_store", conditions=[JobCondition.SUPERVISOR_UPDATED]) async def _reload_store(self) -> None: """Reload store and check for addon updates.""" await self.sys_store.reload() diff --git a/supervisor/mounts/manager.py b/supervisor/mounts/manager.py index f6865dafa..01a1f1c94 100644 --- a/supervisor/mounts/manager.py +++ b/supervisor/mounts/manager.py @@ -139,7 +139,7 @@ class MountManager(FileConfiguration, CoreSysAttributes): ] ) - @Job(conditions=[JobCondition.MOUNT_AVAILABLE]) + @Job(name="mount_manager_reload", conditions=[JobCondition.MOUNT_AVAILABLE]) async def reload(self) -> None: """Update mounts info via dbus and reload failed mounts.""" if not self.mounts: @@ -180,9 +180,17 @@ class MountManager(FileConfiguration, CoreSysAttributes): ], ) - @Job(conditions=[JobCondition.MOUNT_AVAILABLE], on_condition=MountJobError) + @Job( + name="mount_manager_create_mount", + conditions=[JobCondition.MOUNT_AVAILABLE], + on_condition=MountJobError, + ) async def create_mount(self, mount: Mount) -> None: """Add/update a mount.""" + # Add mount name to job + if job := self.sys_jobs.get_job(): + job.reference = mount.name + if mount.name in self._mounts: _LOGGER.debug("Mount '%s' exists, unmounting then mounting from new config") await self.remove_mount(mount.name, retain_entry=True) @@ -200,9 +208,17 @@ class MountManager(FileConfiguration, CoreSysAttributes): elif mount.usage == MountUsage.SHARE: await self._bind_share(mount) - @Job(conditions=[JobCondition.MOUNT_AVAILABLE], on_condition=MountJobError) + @Job( + name="mount_manager_remove_mount", + conditions=[JobCondition.MOUNT_AVAILABLE], + on_condition=MountJobError, + ) async def remove_mount(self, name: str, *, retain_entry: bool = False) -> None: """Remove a mount.""" + # Add mount name to job + if job := self.sys_jobs.get_job(): + job.reference = name + if name not in self._mounts: raise MountNotFound( f"Cannot remove '{name}', no mount exists with that name" @@ -223,9 +239,17 @@ class MountManager(FileConfiguration, CoreSysAttributes): return mount - @Job(conditions=[JobCondition.MOUNT_AVAILABLE], on_condition=MountJobError) + @Job( + name="mount_manager_reload_mount", + conditions=[JobCondition.MOUNT_AVAILABLE], + on_condition=MountJobError, + ) async def reload_mount(self, name: str) -> None: """Reload a mount to retry mounting with same config.""" + # Add mount name to job + if job := self.sys_jobs.get_job(): + job.reference = name + if name not in self._mounts: raise MountNotFound( f"Cannot reload '{name}', no mount exists with that name" diff --git a/supervisor/os/data_disk.py b/supervisor/os/data_disk.py index 561c4c88c..94b07dcfa 100644 --- a/supervisor/os/data_disk.py +++ b/supervisor/os/data_disk.py @@ -165,7 +165,7 @@ class DataDisk(CoreSysAttributes): if block.drive == drive.object_path ] - @Job(conditions=[JobCondition.OS_AGENT]) + @Job(name="data_disk_load", conditions=[JobCondition.OS_AGENT]) async def load(self) -> None: """Load DataDisk feature.""" # Update datadisk details on OS-Agent @@ -173,6 +173,7 @@ class DataDisk(CoreSysAttributes): await self.sys_dbus.agent.datadisk.reload_device() @Job( + name="data_disk_migrate", conditions=[JobCondition.HAOS, JobCondition.OS_AGENT, JobCondition.HEALTHY], limit=JobExecutionLimit.ONCE, on_condition=HassOSJobError, diff --git a/supervisor/os/manager.py b/supervisor/os/manager.py index 1a25896c1..c58bc2ce4 100644 --- a/supervisor/os/manager.py +++ b/supervisor/os/manager.py @@ -156,6 +156,7 @@ class OSManager(CoreSysAttributes): ) @Job( + name="os_manager_config_sync", conditions=[JobCondition.HAOS], on_condition=HassOSJobError, ) @@ -170,6 +171,7 @@ class OSManager(CoreSysAttributes): await self.sys_host.services.restart("hassos-config.service") @Job( + name="os_manager_update", conditions=[ JobCondition.HAOS, JobCondition.INTERNET_SYSTEM, @@ -225,7 +227,7 @@ class OSManager(CoreSysAttributes): ) raise HassOSUpdateError() - @Job(conditions=[JobCondition.HAOS]) + @Job(name="os_manager_mark_healthy", conditions=[JobCondition.HAOS]) async def mark_healthy(self) -> None: """Set booted partition as good for rauc.""" try: diff --git a/supervisor/plugins/audio.py b/supervisor/plugins/audio.py index 5e4760657..558d5fa93 100644 --- a/supervisor/plugins/audio.py +++ b/supervisor/plugins/audio.py @@ -118,6 +118,7 @@ class PluginAudio(PluginBase): self.save_data() @Job( + name="plugin_audio_update", conditions=PLUGIN_UPDATE_CONDITIONS, on_condition=AudioJobError, ) @@ -218,6 +219,7 @@ class PluginAudio(PluginBase): ) from err @Job( + name="plugin_audio_restart_after_problem", limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, diff --git a/supervisor/plugins/cli.py b/supervisor/plugins/cli.py index 73ef84e05..46c54877f 100644 --- a/supervisor/plugins/cli.py +++ b/supervisor/plugins/cli.py @@ -75,6 +75,7 @@ class PluginCli(PluginBase): self.save_data() @Job( + name="plugin_cli_update", conditions=PLUGIN_UPDATE_CONDITIONS, on_condition=CliJobError, ) @@ -151,6 +152,7 @@ class PluginCli(PluginBase): capture_exception(err) @Job( + name="plugin_cli_restart_after_problem", limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, diff --git a/supervisor/plugins/dns.py b/supervisor/plugins/dns.py index db4d09da0..a9cdaa288 100644 --- a/supervisor/plugins/dns.py +++ b/supervisor/plugins/dns.py @@ -187,6 +187,7 @@ class PluginDns(PluginBase): await self.write_hosts() @Job( + name="plugin_dns_update", conditions=PLUGIN_UPDATE_CONDITIONS, on_condition=CoreDNSJobError, ) @@ -269,6 +270,7 @@ class PluginDns(PluginBase): return await super().watchdog_container(event) @Job( + name="plugin_dns_restart_after_problem", limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, diff --git a/supervisor/plugins/multicast.py b/supervisor/plugins/multicast.py index 28523fa0a..7cd3b79d4 100644 --- a/supervisor/plugins/multicast.py +++ b/supervisor/plugins/multicast.py @@ -71,6 +71,7 @@ class PluginMulticast(PluginBase): self.save_data() @Job( + name="plugin_multicast_update", conditions=PLUGIN_UPDATE_CONDITIONS, on_condition=MulticastJobError, ) @@ -146,6 +147,7 @@ class PluginMulticast(PluginBase): capture_exception(err) @Job( + name="plugin_multicast_restart_after_problem", limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, diff --git a/supervisor/plugins/observer.py b/supervisor/plugins/observer.py index 0981eca7d..6d64ede65 100644 --- a/supervisor/plugins/observer.py +++ b/supervisor/plugins/observer.py @@ -79,6 +79,7 @@ class PluginObserver(PluginBase): self.save_data() @Job( + name="plugin_observer_update", conditions=PLUGIN_UPDATE_CONDITIONS, on_condition=ObserverJobError, ) @@ -156,6 +157,7 @@ class PluginObserver(PluginBase): capture_exception(err) @Job( + name="plugin_observer_restart_after_problem", limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, diff --git a/supervisor/resolution/checks/addon_pwned.py b/supervisor/resolution/checks/addon_pwned.py index fbd48b7b4..b9a7c5a05 100644 --- a/supervisor/resolution/checks/addon_pwned.py +++ b/supervisor/resolution/checks/addon_pwned.py @@ -22,6 +22,7 @@ class CheckAddonPwned(CheckBase): """CheckAddonPwned class for check.""" @Job( + name="check_addon_pwned_run", conditions=[JobCondition.INTERNET_SYSTEM], limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=24), @@ -62,7 +63,7 @@ class CheckAddonPwned(CheckBase): except PwnedError: pass - @Job(conditions=[JobCondition.INTERNET_SYSTEM]) + @Job(name="check_addon_pwned_approve", conditions=[JobCondition.INTERNET_SYSTEM]) async def approve_check(self, reference: str | None = None) -> bool: """Approve check if it is affected by issue.""" addon = self.sys_addons.get(reference) diff --git a/supervisor/resolution/checks/dns_server.py b/supervisor/resolution/checks/dns_server.py index c88e4b2e9..f74643197 100644 --- a/supervisor/resolution/checks/dns_server.py +++ b/supervisor/resolution/checks/dns_server.py @@ -23,6 +23,7 @@ class CheckDNSServer(CheckBase): """CheckDNSServer class for check.""" @Job( + name="check_dns_server_run", conditions=[JobCondition.INTERNET_SYSTEM], limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=24), @@ -42,7 +43,7 @@ class CheckDNSServer(CheckBase): ) capture_exception(results[i]) - @Job(conditions=[JobCondition.INTERNET_SYSTEM]) + @Job(name="check_dns_server_approve", conditions=[JobCondition.INTERNET_SYSTEM]) async def approve_check(self, reference: str | None = None) -> bool: """Approve check if it is affected by issue.""" if reference not in self.dns_servers: diff --git a/supervisor/resolution/checks/dns_server_ipv6.py b/supervisor/resolution/checks/dns_server_ipv6.py index 13657021d..8ece866bc 100644 --- a/supervisor/resolution/checks/dns_server_ipv6.py +++ b/supervisor/resolution/checks/dns_server_ipv6.py @@ -23,6 +23,7 @@ class CheckDNSServerIPv6(CheckBase): """CheckDNSServerIPv6 class for check.""" @Job( + name="check_dns_server_ipv6_run", conditions=[JobCondition.INTERNET_SYSTEM], limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=24), @@ -47,7 +48,9 @@ class CheckDNSServerIPv6(CheckBase): ) capture_exception(results[i]) - @Job(conditions=[JobCondition.INTERNET_SYSTEM]) + @Job( + name="check_dns_server_ipv6_approve", conditions=[JobCondition.INTERNET_SYSTEM] + ) async def approve_check(self, reference: str | None = None) -> bool: """Approve check if it is affected by issue.""" if reference not in self.dns_servers: diff --git a/supervisor/resolution/fixup.py b/supervisor/resolution/fixup.py index 1d34703e4..da56f7d18 100644 --- a/supervisor/resolution/fixup.py +++ b/supervisor/resolution/fixup.py @@ -36,7 +36,10 @@ class ResolutionFixup(CoreSysAttributes): """Return a list of all fixups.""" return list(self._fixups.values()) - @Job(conditions=[JobCondition.HEALTHY, JobCondition.RUNNING]) + @Job( + name="resolution_fixup_run_autofix", + conditions=[JobCondition.HEALTHY, JobCondition.RUNNING], + ) async def run_autofix(self) -> None: """Run all startup fixes.""" _LOGGER.info("Starting system autofix at state %s", self.sys_core.state) diff --git a/supervisor/resolution/fixups/store_execute_reload.py b/supervisor/resolution/fixups/store_execute_reload.py index a01b91335..569703532 100644 --- a/supervisor/resolution/fixups/store_execute_reload.py +++ b/supervisor/resolution/fixups/store_execute_reload.py @@ -25,6 +25,7 @@ class FixupStoreExecuteReload(FixupBase): """Storage class for fixup.""" @Job( + name="fixup_store_execute_reload_process", conditions=[JobCondition.INTERNET_SYSTEM, JobCondition.FREE_SPACE], on_condition=ResolutionFixupJobError, ) diff --git a/supervisor/resolution/fixups/store_execute_reset.py b/supervisor/resolution/fixups/store_execute_reset.py index 6bc1a1ab3..048dfda7c 100644 --- a/supervisor/resolution/fixups/store_execute_reset.py +++ b/supervisor/resolution/fixups/store_execute_reset.py @@ -26,6 +26,7 @@ class FixupStoreExecuteReset(FixupBase): """Storage class for fixup.""" @Job( + name="fixup_store_execute_reset_process", conditions=[JobCondition.INTERNET_SYSTEM, JobCondition.FREE_SPACE], on_condition=ResolutionFixupJobError, ) diff --git a/supervisor/resolution/fixups/system_execute_integrity.py b/supervisor/resolution/fixups/system_execute_integrity.py index 46c282c64..13e65aeae 100644 --- a/supervisor/resolution/fixups/system_execute_integrity.py +++ b/supervisor/resolution/fixups/system_execute_integrity.py @@ -22,6 +22,7 @@ class FixupSystemExecuteIntegrity(FixupBase): """Storage class for fixup.""" @Job( + name="fixup_system_execute_integrity_process", conditions=[JobCondition.INTERNET_SYSTEM], on_condition=ResolutionFixupJobError, limit=JobExecutionLimit.THROTTLE, diff --git a/supervisor/security/module.py b/supervisor/security/module.py index caeafc3cf..f7e03d59b 100644 --- a/supervisor/security/module.py +++ b/supervisor/security/module.py @@ -103,6 +103,7 @@ class Security(FileConfiguration, CoreSysAttributes): return @Job( + name="security_manager_integrity_check", conditions=[JobCondition.INTERNET_SYSTEM], on_condition=SecurityJobError, limit=JobExecutionLimit.ONCE, diff --git a/supervisor/store/__init__.py b/supervisor/store/__init__.py index 18a1e8e8b..0b91baaa0 100644 --- a/supervisor/store/__init__.py +++ b/supervisor/store/__init__.py @@ -80,7 +80,11 @@ class StoreManager(CoreSysAttributes, FileConfiguration): self._data[ATTR_REPOSITORIES], add_with_errors=True ) - @Job(conditions=[JobCondition.SUPERVISOR_UPDATED], on_condition=StoreJobError) + @Job( + name="store_manager_reload", + conditions=[JobCondition.SUPERVISOR_UPDATED], + on_condition=StoreJobError, + ) async def reload(self) -> None: """Update add-ons from repository and reload list.""" tasks = [self.sys_create_task(repository.update()) for repository in self.all] @@ -92,6 +96,7 @@ class StoreManager(CoreSysAttributes, FileConfiguration): self._read_addons() @Job( + name="store_manager_add_repository", conditions=[JobCondition.INTERNET_SYSTEM, JobCondition.SUPERVISOR_UPDATED], on_condition=StoreJobError, ) diff --git a/supervisor/store/git.py b/supervisor/store/git.py index 671e82dbf..690f68297 100644 --- a/supervisor/store/git.py +++ b/supervisor/store/git.py @@ -77,6 +77,7 @@ class GitRepo(CoreSysAttributes): raise StoreGitError() from err @Job( + name="git_repo_clone", conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_SYSTEM], on_condition=StoreJobError, ) @@ -112,6 +113,7 @@ class GitRepo(CoreSysAttributes): raise StoreGitCloneError() from err @Job( + name="git_repo_pull", conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_SYSTEM], on_condition=StoreJobError, ) diff --git a/supervisor/supervisor.py b/supervisor/supervisor.py index f17e43e44..56fe9668f 100644 --- a/supervisor/supervisor.py +++ b/supervisor/supervisor.py @@ -211,7 +211,11 @@ class Supervisor(CoreSysAttributes): self.sys_create_task(self.sys_core.stop()) - @Job(conditions=[JobCondition.RUNNING], on_condition=SupervisorJobError) + @Job( + name="supervisor_restart", + conditions=[JobCondition.RUNNING], + on_condition=SupervisorJobError, + ) async def restart(self) -> None: """Restart Supervisor soft.""" self.sys_core.exit_code = 100 @@ -255,6 +259,7 @@ class Supervisor(CoreSysAttributes): _LOGGER.error("Repair of Supervisor failed") @Job( + name="supervisor_check_connectivity", limit=JobExecutionLimit.THROTTLE, throttle_period=_check_connectivity_throttle_period, ) diff --git a/supervisor/updater.py b/supervisor/updater.py index 3959a88e0..e32f8d895 100644 --- a/supervisor/updater.py +++ b/supervisor/updater.py @@ -181,6 +181,7 @@ class Updater(FileConfiguration, CoreSysAttributes): self._data[ATTR_AUTO_UPDATE] = value @Job( + name="updater_fetch_data", conditions=[JobCondition.INTERNET_SYSTEM], on_condition=UpdaterJobError, limit=JobExecutionLimit.THROTTLE_WAIT, diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index 57bf09f97..177545e1a 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -3,6 +3,7 @@ import asyncio from datetime import timedelta from unittest.mock import AsyncMock, Mock, PropertyMock, patch +from uuid import uuid4 from aiohttp.client_exceptions import ClientError import pytest @@ -37,7 +38,7 @@ async def test_healthy(coresys: CoreSys, caplog: pytest.LogCaptureFixture): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.HEALTHY]) + @Job(name="test_healthy_execute", conditions=[JobCondition.HEALTHY]) async def execute(self): """Execute the class method.""" return True @@ -79,12 +80,18 @@ async def test_internet( """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.INTERNET_HOST]) + @Job( + name=f"test_internet_execute_host_{uuid4().hex}", + conditions=[JobCondition.INTERNET_HOST], + ) async def execute_host(self): """Execute the class method.""" return True - @Job(conditions=[JobCondition.INTERNET_SYSTEM]) + @Job( + name=f"test_internet_execute_system_{uuid4().hex}", + conditions=[JobCondition.INTERNET_SYSTEM], + ) async def execute_system(self): """Execute the class method.""" return True @@ -120,7 +127,7 @@ async def test_free_space(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.FREE_SPACE]) + @Job(name="test_free_space_execute", conditions=[JobCondition.FREE_SPACE]) async def execute(self): """Execute the class method.""" return True @@ -146,7 +153,7 @@ async def test_haos(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.HAOS]) + @Job(name="test_haos_execute", conditions=[JobCondition.HAOS]) async def execute(self): """Execute the class method.""" return True @@ -172,7 +179,7 @@ async def test_exception(coresys: CoreSys, capture_exception: Mock): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.HEALTHY]) + @Job(name="test_exception_execute", conditions=[JobCondition.HEALTHY]) async def execute(self): """Execute the class method.""" raise HassioError() @@ -196,7 +203,9 @@ async def test_exception_not_handle(coresys: CoreSys, capture_exception: Mock): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.HEALTHY]) + @Job( + name="test_exception_not_handle_execute", conditions=[JobCondition.HEALTHY] + ) async def execute(self): """Execute the class method.""" raise err @@ -219,7 +228,7 @@ async def test_running(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.RUNNING]) + @Job(name="test_running_execute", conditions=[JobCondition.RUNNING]) async def execute(self): """Execute the class method.""" return True @@ -246,7 +255,11 @@ async def test_exception_conditions(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.RUNNING], on_condition=HassioError) + @Job( + name="test_exception_conditions_execute", + conditions=[JobCondition.RUNNING], + on_condition=HassioError, + ) async def execute(self): """Execute the class method.""" return True @@ -274,7 +287,10 @@ async def test_execution_limit_single_wait( self.coresys = coresys self.run = asyncio.Lock() - @Job(limit=JobExecutionLimit.SINGLE_WAIT) + @Job( + name="test_execution_limit_single_wait_execute", + limit=JobExecutionLimit.SINGLE_WAIT, + ) async def execute(self, sleep: float): """Execute the class method.""" assert not self.run.locked() @@ -300,7 +316,11 @@ async def test_execution_limit_throttle_wait( self.run = asyncio.Lock() self.call = 0 - @Job(limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(hours=1)) + @Job( + name="test_execution_limit_throttle_wait_execute", + limit=JobExecutionLimit.THROTTLE_WAIT, + throttle_period=timedelta(hours=1), + ) async def execute(self, sleep: float): """Execute the class method.""" assert not self.run.locked() @@ -333,6 +353,7 @@ async def test_execution_limit_throttle_rate_limit( self.call = 0 @Job( + name=f"test_execution_limit_throttle_rate_limit_execute_{uuid4().hex}", limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=timedelta(hours=1), throttle_max_calls=2, @@ -370,7 +391,11 @@ async def test_execution_limit_throttle(coresys: CoreSys, loop: asyncio.BaseEven self.run = asyncio.Lock() self.call = 0 - @Job(limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=1)) + @Job( + name="test_execution_limit_throttle_execute", + limit=JobExecutionLimit.THROTTLE, + throttle_period=timedelta(hours=1), + ) async def execute(self, sleep: float): """Execute the class method.""" assert not self.run.locked() @@ -398,7 +423,11 @@ async def test_execution_limit_once(coresys: CoreSys, loop: asyncio.BaseEventLoo self.coresys = coresys self.run = asyncio.Lock() - @Job(limit=JobExecutionLimit.ONCE, on_condition=JobException) + @Job( + name="test_execution_limit_once_execute", + limit=JobExecutionLimit.ONCE, + on_condition=JobException, + ) async def execute(self, sleep: float): """Execute the class method.""" assert not self.run.locked() @@ -425,7 +454,10 @@ async def test_supervisor_updated(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.SUPERVISOR_UPDATED]) + @Job( + name="test_supervisor_updated_execute", + conditions=[JobCondition.SUPERVISOR_UPDATED], + ) async def execute(self) -> bool: """Execute the class method.""" return True @@ -453,7 +485,10 @@ async def test_plugins_updated(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.PLUGINS_UPDATED]) + @Job( + name="test_plugins_updated_execute", + conditions=[JobCondition.PLUGINS_UPDATED], + ) async def execute(self) -> bool: """Execute the class method.""" return True @@ -486,7 +521,7 @@ async def test_auto_update(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.AUTO_UPDATE]) + @Job(name="test_auto_update_execute", conditions=[JobCondition.AUTO_UPDATE]) async def execute(self) -> bool: """Execute the class method.""" return True @@ -512,7 +547,7 @@ async def test_os_agent(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.OS_AGENT]) + @Job(name="test_os_agent_execute", conditions=[JobCondition.OS_AGENT]) async def execute(self) -> bool: """Execute the class method.""" return True @@ -541,7 +576,7 @@ async def test_host_network(coresys: CoreSys): """Initialize the test class.""" self.coresys = coresys - @Job(conditions=[JobCondition.HOST_NETWORK]) + @Job(name="test_host_network_execute", conditions=[JobCondition.HOST_NETWORK]) async def execute(self) -> bool: """Execute the class method.""" return True @@ -567,23 +602,39 @@ async def test_job_group_once(coresys: CoreSys, loop: asyncio.BaseEventLoop): super().__init__(coresys, "TestClass") self.event = asyncio.Event() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException) + @Job( + name="test_job_group_once_inner_execute", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=JobException, + ) async def inner_execute(self) -> bool: """Inner class method called by execute, group level lock allows this.""" await self.event.wait() return True - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException) + @Job( + name="test_job_group_once_execute", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=JobException, + ) async def execute(self) -> bool: """Execute the class method.""" return await self.inner_execute() - @Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException) + @Job( + name="test_job_group_once_separate_execute", + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=JobException, + ) async def separate_execute(self) -> bool: """Alternate execute method that shares group lock.""" return True - @Job(limit=JobExecutionLimit.ONCE, on_condition=JobException) + @Job( + name="test_job_group_once_unrelated", + limit=JobExecutionLimit.ONCE, + on_condition=JobException, + ) async def unrelated_method(self) -> bool: """Unrelated method, sparate job with separate lock.""" return True @@ -622,18 +673,30 @@ async def test_job_group_wait(coresys: CoreSys, loop: asyncio.BaseEventLoop): self.other_count = 0 self.event = asyncio.Event() - @Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException) + @Job( + name="test_job_group_wait_inner_execute", + limit=JobExecutionLimit.GROUP_WAIT, + on_condition=JobException, + ) async def inner_execute(self) -> None: """Inner class method called by execute, group level lock allows this.""" self.execute_count += 1 await self.event.wait() - @Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException) + @Job( + name="test_job_group_wait_execute", + limit=JobExecutionLimit.GROUP_WAIT, + on_condition=JobException, + ) async def execute(self) -> None: """Execute the class method.""" await self.inner_execute() - @Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException) + @Job( + name="test_job_group_wait_separate_execute", + limit=JobExecutionLimit.GROUP_WAIT, + on_condition=JobException, + ) async def separate_execute(self) -> None: """Alternate execute method that shares group lock.""" self.other_count += 1 @@ -670,7 +733,7 @@ async def test_job_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop): self.event = asyncio.Event() self.job: SupervisorJob | None = None - @Job(limit=JobExecutionLimit.ONCE) + @Job(name="test_job_cleanup_execute", limit=JobExecutionLimit.ONCE) async def execute(self): """Execute the class method.""" self.job = coresys.jobs.get_job() @@ -703,7 +766,11 @@ async def test_job_skip_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop): self.event = asyncio.Event() self.job: SupervisorJob | None = None - @Job(limit=JobExecutionLimit.ONCE, cleanup=False) + @Job( + name="test_job_skip_cleanup_execute", + limit=JobExecutionLimit.ONCE, + cleanup=False, + ) async def execute(self): """Execute the class method.""" self.job = coresys.jobs.get_job() @@ -722,3 +789,154 @@ async def test_job_skip_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop): assert coresys.jobs.jobs == [test.job] assert test.job.done + + +async def test_execution_limit_group_throttle( + coresys: CoreSys, loop: asyncio.BaseEventLoop +): + """Test the group throttle execution limit.""" + + class TestClass(JobGroup): + """Test class.""" + + def __init__(self, coresys: CoreSys, reference: str): + """Initialize the test class.""" + super().__init__(coresys, f"test_class_{reference}", reference) + self.run = asyncio.Lock() + self.call = 0 + + @Job( + name="test_execution_limit_group_throttle_execute", + limit=JobExecutionLimit.GROUP_THROTTLE, + throttle_period=timedelta(milliseconds=95), + ) + async def execute(self, sleep: float): + """Execute the class method.""" + assert not self.run.locked() + async with self.run: + await asyncio.sleep(sleep) + self.call += 1 + + test1 = TestClass(coresys, "test1") + test2 = TestClass(coresys, "test2") + + # One call of each should work. The subsequent calls will be silently throttled due to period + await asyncio.gather( + test1.execute(0), test1.execute(0), test2.execute(0), test2.execute(0) + ) + assert test1.call == 1 + assert test2.call == 1 + + # First call to each will work again since period cleared. Second throttled once more as they don't wait + with time_machine.travel(utcnow() + timedelta(milliseconds=100)): + await asyncio.gather( + test1.execute(0.1), + test1.execute(0.1), + test2.execute(0.1), + test2.execute(0.1), + ) + + assert test1.call == 2 + assert test2.call == 2 + + +async def test_execution_limit_group_throttle_wait( + coresys: CoreSys, loop: asyncio.BaseEventLoop +): + """Test the group throttle wait job execution limit.""" + + class TestClass(JobGroup): + """Test class.""" + + def __init__(self, coresys: CoreSys, reference: str): + """Initialize the test class.""" + super().__init__(coresys, f"test_class_{reference}", reference) + self.run = asyncio.Lock() + self.call = 0 + + @Job( + name="test_execution_limit_group_throttle_wait_execute", + limit=JobExecutionLimit.GROUP_THROTTLE_WAIT, + throttle_period=timedelta(milliseconds=95), + ) + async def execute(self, sleep: float): + """Execute the class method.""" + assert not self.run.locked() + async with self.run: + await asyncio.sleep(sleep) + self.call += 1 + + test1 = TestClass(coresys, "test1") + test2 = TestClass(coresys, "test2") + + # One call of each should work. The subsequent calls will be silently throttled after waiting due to period + await asyncio.gather( + *[test1.execute(0), test1.execute(0), test2.execute(0), test2.execute(0)] + ) + assert test1.call == 1 + assert test2.call == 1 + + # All calls should work as we cleared the period. And tasks take longer then period and are queued + with time_machine.travel(utcnow() + timedelta(milliseconds=100)): + await asyncio.gather( + *[ + test1.execute(0.1), + test1.execute(0.1), + test2.execute(0.1), + test2.execute(0.1), + ] + ) + + assert test1.call == 3 + assert test2.call == 3 + + +@pytest.mark.parametrize("error", [None, PluginJobError]) +async def test_execution_limit_group_throttle_rate_limit( + coresys: CoreSys, loop: asyncio.BaseEventLoop, error: JobException | None +): + """Test the group throttle rate limit job execution limit.""" + + class TestClass(JobGroup): + """Test class.""" + + def __init__(self, coresys: CoreSys, reference: str): + """Initialize the test class.""" + super().__init__(coresys, f"test_class_{reference}", reference) + self.run = asyncio.Lock() + self.call = 0 + + @Job( + name=f"test_execution_limit_group_throttle_rate_limit_execute_{uuid4().hex}", + limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + throttle_period=timedelta(hours=1), + throttle_max_calls=2, + on_condition=error, + ) + async def execute(self): + """Execute the class method.""" + self.call += 1 + + test1 = TestClass(coresys, "test1") + test2 = TestClass(coresys, "test2") + + await asyncio.gather( + *[test1.execute(), test1.execute(), test2.execute(), test2.execute()] + ) + assert test1.call == 2 + assert test2.call == 2 + + with pytest.raises(JobException if error is None else error): + await test1.execute() + with pytest.raises(JobException if error is None else error): + await test2.execute() + + assert test1.call == 2 + assert test2.call == 2 + + with time_machine.travel(utcnow() + timedelta(hours=1)): + await test1.execute() + await test2.execute() + + assert test1.call == 3 + assert test2.call == 3