From 9bee58a8b17bd991cb9b23107a737294823bfb19 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 5 Aug 2025 13:24:44 +0200 Subject: [PATCH] Migrate to JobConcurrency and JobThrottle parameters (#6065) --- supervisor/addons/addon.py | 30 ++--- supervisor/backups/manager.py | 14 +-- supervisor/docker/addon.py | 18 +-- supervisor/docker/audio.py | 4 +- supervisor/docker/cli.py | 4 +- supervisor/docker/dns.py | 4 +- supervisor/docker/homeassistant.py | 6 +- supervisor/docker/interface.py | 28 ++--- supervisor/docker/multicast.py | 4 +- supervisor/docker/observer.py | 4 +- supervisor/docker/supervisor.py | 11 +- supervisor/homeassistant/api.py | 4 +- supervisor/homeassistant/core.py | 18 +-- supervisor/homeassistant/module.py | 6 +- supervisor/homeassistant/secrets.py | 5 +- supervisor/host/sound.py | 5 +- supervisor/jobs/const.py | 15 --- supervisor/jobs/decorator.py | 47 +------ supervisor/misc/tasks.py | 5 +- supervisor/os/data_disk.py | 6 +- supervisor/os/manager.py | 4 +- supervisor/plugins/audio.py | 4 +- supervisor/plugins/cli.py | 4 +- supervisor/plugins/dns.py | 4 +- supervisor/plugins/multicast.py | 4 +- supervisor/plugins/observer.py | 4 +- supervisor/resolution/checks/addon_pwned.py | 4 +- supervisor/resolution/checks/dns_server.py | 4 +- .../resolution/checks/dns_server_ipv6.py | 4 +- .../fixups/system_execute_integrity.py | 4 +- supervisor/security/module.py | 5 +- supervisor/supervisor.py | 4 +- supervisor/updater.py | 7 +- tests/jobs/test_job_decorator.py | 118 +++++++++--------- 34 files changed, 181 insertions(+), 231 deletions(-) diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index c43db0c98..64e2494a7 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -77,7 +77,7 @@ from ..exceptions import ( ) from ..hardware.data import Device from ..homeassistant.const import WSEvent -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency, JobThrottle from ..jobs.decorator import Job from ..resolution.const import ContextType, IssueType, UnhealthyReason from ..resolution.data import Issue @@ -733,8 +733,8 @@ class Addon(AddonModel): @Job( name="addon_unload", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def unload(self) -> None: """Unload add-on and remove data.""" @@ -766,8 +766,8 @@ class Addon(AddonModel): @Job( name="addon_install", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def install(self) -> None: """Install and setup this addon.""" @@ -807,8 +807,8 @@ class Addon(AddonModel): @Job( name="addon_uninstall", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def uninstall( self, *, remove_config: bool, remove_image: bool = True @@ -873,8 +873,8 @@ class Addon(AddonModel): @Job( name="addon_update", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def update(self) -> asyncio.Task | None: """Update this addon to latest version. @@ -923,8 +923,8 @@ class Addon(AddonModel): @Job( name="addon_rebuild", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def rebuild(self) -> asyncio.Task | None: """Rebuild this addons container and image. @@ -1068,8 +1068,8 @@ class Addon(AddonModel): @Job( name="addon_start", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def start(self) -> asyncio.Task: """Set options and start add-on. @@ -1117,8 +1117,8 @@ class Addon(AddonModel): @Job( name="addon_stop", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def stop(self) -> None: """Stop add-on.""" @@ -1131,8 +1131,8 @@ class Addon(AddonModel): @Job( name="addon_restart", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def restart(self) -> asyncio.Task: """Restart add-on. @@ -1166,8 +1166,8 @@ class Addon(AddonModel): @Job( name="addon_write_stdin", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def write_stdin(self, data) -> None: """Write data to add-on stdin.""" @@ -1200,8 +1200,8 @@ class Addon(AddonModel): @Job( name="addon_begin_backup", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def begin_backup(self) -> bool: """Execute pre commands or stop addon if necessary. @@ -1222,8 +1222,8 @@ class Addon(AddonModel): @Job( name="addon_end_backup", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def end_backup(self) -> asyncio.Task | None: """Execute post commands or restart addon if necessary. @@ -1260,8 +1260,8 @@ class Addon(AddonModel): @Job( name="addon_backup", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def backup(self, tar_file: tarfile.TarFile) -> asyncio.Task | None: """Backup state of an add-on. @@ -1368,8 +1368,8 @@ class Addon(AddonModel): @Job( name="addon_restore", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=AddonsJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def restore(self, tar_file: tarfile.TarFile) -> asyncio.Task | None: """Restore state of an add-on. @@ -1521,10 +1521,10 @@ class Addon(AddonModel): @Job( name="addon_restart_after_problem", - limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, on_condition=AddonsJobError, + throttle=JobThrottle.GROUP_RATE_LIMIT, ) async def _restart_after_problem(self, state: ContainerState): """Restart unhealthy or failed addon.""" diff --git a/supervisor/backups/manager.py b/supervisor/backups/manager.py index ac95fe5f7..c37e4aea3 100644 --- a/supervisor/backups/manager.py +++ b/supervisor/backups/manager.py @@ -27,7 +27,7 @@ from ..exceptions import ( BackupJobError, BackupMountDownError, ) -from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobCondition, JobExecutionLimit +from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobConcurrency, JobCondition from ..jobs.decorator import Job from ..jobs.job_group import JobGroup from ..mounts.mount import Mount @@ -583,9 +583,9 @@ class BackupManager(FileConfiguration, JobGroup): @Job( name="backup_manager_full_backup", conditions=[JobCondition.RUNNING], - limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, cleanup=False, + concurrency=JobConcurrency.GROUP_REJECT, ) async def do_backup_full( self, @@ -630,9 +630,9 @@ class BackupManager(FileConfiguration, JobGroup): @Job( name="backup_manager_partial_backup", conditions=[JobCondition.RUNNING], - limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, cleanup=False, + concurrency=JobConcurrency.GROUP_REJECT, ) async def do_backup_partial( self, @@ -810,9 +810,9 @@ class BackupManager(FileConfiguration, JobGroup): JobCondition.INTERNET_SYSTEM, JobCondition.RUNNING, ], - limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, cleanup=False, + concurrency=JobConcurrency.GROUP_REJECT, ) async def do_restore_full( self, @@ -869,9 +869,9 @@ class BackupManager(FileConfiguration, JobGroup): JobCondition.INTERNET_SYSTEM, JobCondition.RUNNING, ], - limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, cleanup=False, + concurrency=JobConcurrency.GROUP_REJECT, ) async def do_restore_partial( self, @@ -930,8 +930,8 @@ class BackupManager(FileConfiguration, JobGroup): @Job( name="backup_manager_freeze_all", conditions=[JobCondition.RUNNING], - limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def freeze_all(self, timeout: float = DEFAULT_FREEZE_TIMEOUT) -> None: """Freeze system to prepare for an external backup such as an image snapshot.""" @@ -999,9 +999,9 @@ class BackupManager(FileConfiguration, JobGroup): @Job( name="backup_manager_signal_thaw", conditions=[JobCondition.FROZEN], - limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, internal=True, + concurrency=JobConcurrency.GROUP_REJECT, ) async def thaw_all(self) -> None: """Signal thaw task to begin unfreezing the system.""" diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index 422925a15..200a8618b 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -39,7 +39,7 @@ from ..exceptions import ( ) from ..hardware.const import PolicyGroup from ..hardware.data import Device -from ..jobs.const import JobCondition, JobExecutionLimit +from ..jobs.const import JobConcurrency, JobCondition from ..jobs.decorator import Job from ..resolution.const import CGROUP_V2_VERSION, ContextType, IssueType, SuggestionType from ..utils.sentry import async_capture_exception @@ -553,8 +553,8 @@ class DockerAddon(DockerInterface): @Job( name="docker_addon_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self) -> None: """Run Docker image.""" @@ -619,8 +619,8 @@ class DockerAddon(DockerInterface): @Job( name="docker_addon_update", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def update( self, @@ -647,8 +647,8 @@ class DockerAddon(DockerInterface): @Job( name="docker_addon_install", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def install( self, @@ -735,8 +735,8 @@ class DockerAddon(DockerInterface): @Job( name="docker_addon_import_image", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def import_image(self, tar_file: Path) -> None: """Import a tar file as image.""" @@ -750,7 +750,7 @@ class DockerAddon(DockerInterface): with suppress(DockerError): await self.cleanup() - @Job(name="docker_addon_cleanup", limit=JobExecutionLimit.GROUP_WAIT) + @Job(name="docker_addon_cleanup", concurrency=JobConcurrency.GROUP_QUEUE) async def cleanup( self, old_image: str | None = None, @@ -774,8 +774,8 @@ class DockerAddon(DockerInterface): @Job( name="docker_addon_write_stdin", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def write_stdin(self, data: bytes) -> None: """Write to add-on stdin.""" @@ -808,8 +808,8 @@ class DockerAddon(DockerInterface): @Job( name="docker_addon_stop", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def stop(self, remove_container: bool = True) -> None: """Stop/remove Docker container.""" @@ -848,8 +848,8 @@ class DockerAddon(DockerInterface): @Job( name="docker_addon_hardware_events", conditions=[JobCondition.OS_AGENT], - limit=JobExecutionLimit.SINGLE_WAIT, internal=True, + concurrency=JobConcurrency.QUEUE, ) async def _hardware_events(self, device: Device) -> None: """Process Hardware events for adjust device access.""" diff --git a/supervisor/docker/audio.py b/supervisor/docker/audio.py index 15a4e2026..570ab18c0 100644 --- a/supervisor/docker/audio.py +++ b/supervisor/docker/audio.py @@ -9,7 +9,7 @@ from ..const import DOCKER_CPU_RUNTIME_ALLOCATION from ..coresys import CoreSysAttributes from ..exceptions import DockerJobError from ..hardware.const import PolicyGroup -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from .const import ( ENV_TIME, @@ -89,8 +89,8 @@ class DockerAudio(DockerInterface, CoreSysAttributes): @Job( name="docker_audio_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self) -> None: """Run Docker image.""" diff --git a/supervisor/docker/cli.py b/supervisor/docker/cli.py index e259ac19a..87c6a4fa5 100644 --- a/supervisor/docker/cli.py +++ b/supervisor/docker/cli.py @@ -4,7 +4,7 @@ import logging from ..coresys import CoreSysAttributes from ..exceptions import DockerJobError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from .const import ENV_TIME, ENV_TOKEN from .interface import DockerInterface @@ -29,8 +29,8 @@ class DockerCli(DockerInterface, CoreSysAttributes): @Job( name="docker_cli_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self) -> None: """Run Docker image.""" diff --git a/supervisor/docker/dns.py b/supervisor/docker/dns.py index fbee13f28..b9b83ab97 100644 --- a/supervisor/docker/dns.py +++ b/supervisor/docker/dns.py @@ -6,7 +6,7 @@ from docker.types import Mount from ..coresys import CoreSysAttributes from ..exceptions import DockerJobError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from .const import ENV_TIME, MOUNT_DBUS, MountType from .interface import DockerInterface @@ -31,8 +31,8 @@ class DockerDNS(DockerInterface, CoreSysAttributes): @Job( name="docker_dns_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self) -> None: """Run Docker image.""" diff --git a/supervisor/docker/homeassistant.py b/supervisor/docker/homeassistant.py index e9085de95..b92559b55 100644 --- a/supervisor/docker/homeassistant.py +++ b/supervisor/docker/homeassistant.py @@ -12,7 +12,7 @@ from ..const import LABEL_MACHINE from ..exceptions import DockerJobError from ..hardware.const import PolicyGroup from ..homeassistant.const import LANDINGPAGE -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from .const import ( ENV_TIME, @@ -161,8 +161,8 @@ class DockerHomeAssistant(DockerInterface): @Job( name="docker_home_assistant_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self, *, restore_job_id: str | None = None) -> None: """Run Docker image.""" @@ -200,8 +200,8 @@ class DockerHomeAssistant(DockerInterface): @Job( name="docker_home_assistant_execute_command", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def execute_command(self, command: str) -> CommandReturn: """Create a temporary container and run command.""" diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index 9c608669f..838a86c0f 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -39,7 +39,7 @@ from ..exceptions import ( DockerRequestError, DockerTrustError, ) -from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobExecutionLimit +from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency from ..jobs.decorator import Job from ..jobs.job_group import JobGroup from ..resolution.const import ContextType, IssueType, SuggestionType @@ -219,8 +219,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_install", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def install( self, @@ -338,7 +338,7 @@ class DockerInterface(JobGroup, ABC): return _container_state_from_model(docker_container) - @Job(name="docker_interface_attach", limit=JobExecutionLimit.GROUP_WAIT) + @Job(name="docker_interface_attach", concurrency=JobConcurrency.GROUP_QUEUE) async def attach( self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False ) -> None: @@ -376,8 +376,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self) -> None: """Run Docker image.""" @@ -406,8 +406,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_stop", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def stop(self, remove_container: bool = True) -> None: """Stop/remove Docker container.""" @@ -421,8 +421,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_start", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) def start(self) -> Awaitable[None]: """Start Docker container.""" @@ -430,8 +430,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_remove", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def remove(self, *, remove_image: bool = True) -> None: """Remove Docker images.""" @@ -448,8 +448,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_check_image", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def check_image( self, @@ -497,8 +497,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_update", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def update( self, version: AwesomeVersion, image: str | None = None, latest: bool = False @@ -526,7 +526,7 @@ class DockerInterface(JobGroup, ABC): return b"" - @Job(name="docker_interface_cleanup", limit=JobExecutionLimit.GROUP_WAIT) + @Job(name="docker_interface_cleanup", concurrency=JobConcurrency.GROUP_QUEUE) async def cleanup( self, old_image: str | None = None, @@ -543,8 +543,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_restart", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) def restart(self) -> Awaitable[None]: """Restart docker container.""" @@ -554,8 +554,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_execute_command", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def execute_command(self, command: str) -> CommandReturn: """Create a temporary container and run command.""" @@ -619,8 +619,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_run_inside", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) def run_inside(self, command: str) -> Awaitable[CommandReturn]: """Execute a command inside Docker container.""" @@ -635,8 +635,8 @@ class DockerInterface(JobGroup, ABC): @Job( name="docker_interface_check_trust", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def check_trust(self) -> None: """Check trust of exists Docker image.""" diff --git a/supervisor/docker/multicast.py b/supervisor/docker/multicast.py index 9935f7ee3..7bf0e2514 100644 --- a/supervisor/docker/multicast.py +++ b/supervisor/docker/multicast.py @@ -4,7 +4,7 @@ import logging from ..coresys import CoreSysAttributes from ..exceptions import DockerJobError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from .const import ENV_TIME, Capabilities from .interface import DockerInterface @@ -34,8 +34,8 @@ class DockerMulticast(DockerInterface, CoreSysAttributes): @Job( name="docker_multicast_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self) -> None: """Run Docker image.""" diff --git a/supervisor/docker/observer.py b/supervisor/docker/observer.py index 4bcaf24d1..9a3c5aed5 100644 --- a/supervisor/docker/observer.py +++ b/supervisor/docker/observer.py @@ -5,7 +5,7 @@ import logging from ..const import DOCKER_IPV4_NETWORK_MASK, OBSERVER_DOCKER_NAME from ..coresys import CoreSysAttributes from ..exceptions import DockerJobError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from .const import ENV_TIME, ENV_TOKEN, MOUNT_DOCKER, RestartPolicy from .interface import DockerInterface @@ -30,8 +30,8 @@ class DockerObserver(DockerInterface, CoreSysAttributes): @Job( name="docker_observer_run", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def run(self) -> None: """Run Docker image.""" diff --git a/supervisor/docker/supervisor.py b/supervisor/docker/supervisor.py index 785b0cf34..710710d6f 100644 --- a/supervisor/docker/supervisor.py +++ b/supervisor/docker/supervisor.py @@ -10,7 +10,7 @@ import docker import requests from ..exceptions import DockerError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from .const import PropagationMode from .interface import DockerInterface @@ -45,7 +45,7 @@ class DockerSupervisor(DockerInterface): if mount.get("Destination") == "/data" ) - @Job(name="docker_supervisor_attach", limit=JobExecutionLimit.GROUP_WAIT) + @Job(name="docker_supervisor_attach", concurrency=JobConcurrency.GROUP_QUEUE) async def attach( self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False ) -> None: @@ -77,7 +77,7 @@ class DockerSupervisor(DockerInterface): ipv4=self.sys_docker.network.supervisor, ) - @Job(name="docker_supervisor_retag", limit=JobExecutionLimit.GROUP_WAIT) + @Job(name="docker_supervisor_retag", concurrency=JobConcurrency.GROUP_QUEUE) def retag(self) -> Awaitable[None]: """Retag latest image to version.""" return self.sys_run_in_executor(self._retag) @@ -108,7 +108,10 @@ class DockerSupervisor(DockerInterface): f"Can't retag Supervisor version: {err}", _LOGGER.error ) from err - @Job(name="docker_supervisor_update_start_tag", limit=JobExecutionLimit.GROUP_WAIT) + @Job( + name="docker_supervisor_update_start_tag", + concurrency=JobConcurrency.GROUP_QUEUE, + ) def update_start_tag(self, image: str, version: AwesomeVersion) -> Awaitable[None]: """Update start tag to new version.""" return self.sys_run_in_executor(self._update_start_tag, image, version) diff --git a/supervisor/homeassistant/api.py b/supervisor/homeassistant/api.py index b85f8dce3..8dc0004d0 100644 --- a/supervisor/homeassistant/api.py +++ b/supervisor/homeassistant/api.py @@ -15,7 +15,7 @@ from multidict import MultiMapping from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import HomeAssistantAPIError, HomeAssistantAuthError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency from ..jobs.decorator import Job from ..utils import check_port, version_is_new_enough from .const import LANDINGPAGE @@ -46,8 +46,8 @@ class HomeAssistantAPI(CoreSysAttributes): @Job( name="home_assistant_api_ensure_access_token", - limit=JobExecutionLimit.SINGLE_WAIT, internal=True, + concurrency=JobConcurrency.QUEUE, ) async def ensure_access_token(self) -> None: """Ensure there is an access token.""" diff --git a/supervisor/homeassistant/core.py b/supervisor/homeassistant/core.py index d525b37f9..3106c2622 100644 --- a/supervisor/homeassistant/core.py +++ b/supervisor/homeassistant/core.py @@ -28,7 +28,7 @@ from ..exceptions import ( HomeAssistantUpdateError, JobException, ) -from ..jobs.const import JOB_GROUP_HOME_ASSISTANT_CORE, JobExecutionLimit +from ..jobs.const import JOB_GROUP_HOME_ASSISTANT_CORE, JobConcurrency, JobThrottle from ..jobs.decorator import Job, JobCondition from ..jobs.job_group import JobGroup from ..resolution.const import ContextType, IssueType @@ -123,8 +123,8 @@ class HomeAssistantCore(JobGroup): @Job( name="home_assistant_core_install_landing_page", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def install_landingpage(self) -> None: """Install a landing page.""" @@ -171,8 +171,8 @@ class HomeAssistantCore(JobGroup): @Job( name="home_assistant_core_install", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def install(self) -> None: """Install a landing page.""" @@ -222,8 +222,8 @@ class HomeAssistantCore(JobGroup): JobCondition.PLUGINS_UPDATED, JobCondition.SUPERVISOR_UPDATED, ], - limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def update( self, @@ -324,8 +324,8 @@ class HomeAssistantCore(JobGroup): @Job( name="home_assistant_core_start", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def start(self) -> None: """Run Home Assistant docker.""" @@ -359,8 +359,8 @@ class HomeAssistantCore(JobGroup): @Job( name="home_assistant_core_stop", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def stop(self, *, remove_container: bool = False) -> None: """Stop Home Assistant Docker.""" @@ -371,8 +371,8 @@ class HomeAssistantCore(JobGroup): @Job( name="home_assistant_core_restart", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def restart(self, *, safe_mode: bool = False) -> None: """Restart Home Assistant Docker.""" @@ -392,8 +392,8 @@ class HomeAssistantCore(JobGroup): @Job( name="home_assistant_core_rebuild", - limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError, + concurrency=JobConcurrency.GROUP_REJECT, ) async def rebuild(self, *, safe_mode: bool = False) -> None: """Rebuild Home Assistant Docker container.""" @@ -546,9 +546,9 @@ class HomeAssistantCore(JobGroup): @Job( name="home_assistant_core_restart_after_problem", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + throttle=JobThrottle.RATE_LIMIT, ) async def _restart_after_problem(self, state: ContainerState): """Restart unhealthy or failed Home Assistant.""" diff --git a/supervisor/homeassistant/module.py b/supervisor/homeassistant/module.py index bf88c7064..9f2cba6b2 100644 --- a/supervisor/homeassistant/module.py +++ b/supervisor/homeassistant/module.py @@ -46,7 +46,8 @@ from ..exceptions import ( ) from ..hardware.const import PolicyGroup from ..hardware.data import Device -from ..jobs.decorator import Job, JobExecutionLimit +from ..jobs.const import JobConcurrency, JobThrottle +from ..jobs.decorator import Job from ..resolution.const import UnhealthyReason from ..utils import remove_folder, remove_folder_with_excludes from ..utils.common import FileConfiguration @@ -551,9 +552,10 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes): @Job( name="home_assistant_get_users", - limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(minutes=5), internal=True, + concurrency=JobConcurrency.QUEUE, + throttle=JobThrottle.THROTTLE, ) async def get_users(self) -> list[IngressSessionDataUser]: """Get list of all configured users.""" diff --git a/supervisor/homeassistant/secrets.py b/supervisor/homeassistant/secrets.py index ab02400a8..ce4fcac03 100644 --- a/supervisor/homeassistant/secrets.py +++ b/supervisor/homeassistant/secrets.py @@ -6,7 +6,7 @@ from pathlib import Path from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import YamlFileError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency, JobThrottle from ..jobs.decorator import Job from ..utils.yaml import read_yaml_file @@ -43,9 +43,10 @@ class HomeAssistantSecrets(CoreSysAttributes): @Job( name="home_assistant_secrets_read", - limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(seconds=60), internal=True, + concurrency=JobConcurrency.QUEUE, + throttle=JobThrottle.THROTTLE, ) async def _read_secrets(self): """Read secrets.yaml into memory.""" diff --git a/supervisor/host/sound.py b/supervisor/host/sound.py index 39583c22c..fc0d3fdc8 100644 --- a/supervisor/host/sound.py +++ b/supervisor/host/sound.py @@ -9,7 +9,7 @@ from pulsectl import Pulse, PulseError, PulseIndexError, PulseOperationFailed from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import PulseAudioError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobConcurrency, JobThrottle from ..jobs.decorator import Job _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -236,8 +236,9 @@ class SoundControl(CoreSysAttributes): @Job( name="sound_control_update", - limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(seconds=2), + concurrency=JobConcurrency.QUEUE, + throttle=JobThrottle.THROTTLE, ) async def update(self, reload_pulse: bool = False): """Update properties over dbus.""" diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index a92dfbeeb..88d061a45 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -84,18 +84,3 @@ class JobThrottle(StrEnum): RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE GROUP_RATE_LIMIT = "group_rate_limit" # Group version of RATE_LIMIT - - -class JobExecutionLimit(StrEnum): - """Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead.""" - - ONCE = "once" - SINGLE_WAIT = "single_wait" - THROTTLE = "throttle" - THROTTLE_WAIT = "throttle_wait" - THROTTLE_RATE_LIMIT = "throttle_rate_limit" - GROUP_ONCE = "group_once" - GROUP_WAIT = "group_wait" - GROUP_THROTTLE = "group_throttle" - GROUP_THROTTLE_WAIT = "group_throttle_wait" - GROUP_THROTTLE_RATE_LIMIT = "group_throttle_rate_limit" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index 8f8b1064c..13f0ed4bb 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -20,7 +20,7 @@ from ..host.const import HostFeature from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType from ..utils.sentry import async_capture_exception from . import SupervisorJob -from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle +from .const import JobConcurrency, JobCondition, JobThrottle from .job_group import JobGroup _LOGGER: logging.Logger = logging.getLogger(__package__) @@ -43,8 +43,6 @@ class Job(CoreSysAttributes): | None = None, throttle_max_calls: int | None = None, internal: bool = False, - # Backward compatibility - DEPRECATED - limit: JobExecutionLimit | None = None, ): # pylint: disable=too-many-positional-arguments """Initialize the Job decorator. @@ -58,7 +56,6 @@ class Job(CoreSysAttributes): throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs). throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs). internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False. - limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead. Raises: RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy. @@ -79,54 +76,12 @@ class Job(CoreSysAttributes): self._rate_limited_calls: dict[str | None, list[datetime]] | None = None self._internal = internal - # Handle backward compatibility with limit parameter - if limit is not None: - if concurrency is not None or throttle is not None: - raise RuntimeError( - f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!" - ) - # Map old limit values to new parameters - concurrency, throttle = self._map_limit_to_new_params(limit) - self.concurrency = concurrency self.throttle = throttle # Validate Options self._validate_parameters() - def _map_limit_to_new_params( - self, limit: JobExecutionLimit - ) -> tuple[JobConcurrency | None, JobThrottle | None]: - """Map old limit parameter to new concurrency and throttle parameters.""" - mapping = { - JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None), - JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None), - JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE), - JobExecutionLimit.THROTTLE_WAIT: ( - JobConcurrency.QUEUE, - JobThrottle.THROTTLE, - ), - JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT), - JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None), - JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None), - JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE), - JobExecutionLimit.GROUP_THROTTLE_WAIT: ( - # Seems a bit counter intuitive, but GROUP_QUEUE deadlocks - # tests/jobs/test_job_decorator.py::test_execution_limit_group_throttle_wait - # The reason this deadlocks is because when using GROUP_QUEUE and the - # throttle limit is hit, the group lock is trying to be unlocked outside - # of the job context. The current implementation doesn't allow to unlock - # the group lock when the job is not running. - JobConcurrency.QUEUE, - JobThrottle.GROUP_THROTTLE, - ), - JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: ( - None, - JobThrottle.GROUP_RATE_LIMIT, - ), - } - return mapping.get(limit, (None, None)) - def _validate_parameters(self) -> None: """Validate job parameters.""" # Validate throttle parameters diff --git a/supervisor/misc/tasks.py b/supervisor/misc/tasks.py index 971ebc9f5..4d05443cc 100644 --- a/supervisor/misc/tasks.py +++ b/supervisor/misc/tasks.py @@ -15,7 +15,8 @@ from ..exceptions import ( ObserverError, ) from ..homeassistant.const import LANDINGPAGE, WSType -from ..jobs.decorator import Job, JobCondition, JobExecutionLimit +from ..jobs.const import JobConcurrency +from ..jobs.decorator import Job, JobCondition from ..plugins.const import PLUGIN_UPDATE_CONDITIONS from ..utils.dt import utcnow from ..utils.sentry import async_capture_exception @@ -160,7 +161,7 @@ class Tasks(CoreSysAttributes): JobCondition.INTERNET_HOST, JobCondition.RUNNING, ], - limit=JobExecutionLimit.ONCE, + concurrency=JobConcurrency.REJECT, ) async def _update_supervisor(self): """Check and run update of Supervisor Supervisor.""" diff --git a/supervisor/os/data_disk.py b/supervisor/os/data_disk.py index 17a2f4116..78dd99f91 100644 --- a/supervisor/os/data_disk.py +++ b/supervisor/os/data_disk.py @@ -22,7 +22,7 @@ from ..exceptions import ( HassOSJobError, HostError, ) -from ..jobs.const import JobCondition, JobExecutionLimit +from ..jobs.const import JobConcurrency, JobCondition from ..jobs.decorator import Job from ..resolution.checks.base import CheckBase from ..resolution.checks.disabled_data_disk import CheckDisabledDataDisk @@ -205,8 +205,8 @@ class DataDisk(CoreSysAttributes): @Job( name="data_disk_migrate", conditions=[JobCondition.HAOS, JobCondition.OS_AGENT, JobCondition.HEALTHY], - limit=JobExecutionLimit.ONCE, on_condition=HassOSJobError, + concurrency=JobConcurrency.REJECT, ) async def migrate_disk(self, new_disk: str) -> None: """Move data partition to a new disk.""" @@ -305,8 +305,8 @@ class DataDisk(CoreSysAttributes): @Job( name="data_disk_wipe", conditions=[JobCondition.HAOS, JobCondition.OS_AGENT, JobCondition.HEALTHY], - limit=JobExecutionLimit.ONCE, on_condition=HassOSJobError, + concurrency=JobConcurrency.REJECT, ) async def wipe_disk(self) -> None: """Wipe the current data disk.""" diff --git a/supervisor/os/manager.py b/supervisor/os/manager.py index f99673925..dcf82665c 100644 --- a/supervisor/os/manager.py +++ b/supervisor/os/manager.py @@ -21,7 +21,7 @@ from ..exceptions import ( HassOSSlotUpdateError, HassOSUpdateError, ) -from ..jobs.const import JobCondition, JobExecutionLimit +from ..jobs.const import JobConcurrency, JobCondition from ..jobs.decorator import Job from ..resolution.const import UnhealthyReason from ..utils.sentry import async_capture_exception @@ -277,8 +277,8 @@ class OSManager(CoreSysAttributes): JobCondition.RUNNING, JobCondition.SUPERVISOR_UPDATED, ], - limit=JobExecutionLimit.ONCE, on_condition=HassOSJobError, + concurrency=JobConcurrency.REJECT, ) async def update(self, version: AwesomeVersion | None = None) -> None: """Update HassOS system.""" diff --git a/supervisor/plugins/audio.py b/supervisor/plugins/audio.py index db5ae02bd..32fda4157 100644 --- a/supervisor/plugins/audio.py +++ b/supervisor/plugins/audio.py @@ -24,7 +24,7 @@ from ..exceptions import ( DockerError, PluginError, ) -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobThrottle from ..jobs.decorator import Job from ..resolution.const import UnhealthyReason from ..utils.json import write_json_file @@ -205,10 +205,10 @@ class PluginAudio(PluginBase): @Job( name="plugin_audio_restart_after_problem", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, on_condition=AudioJobError, + throttle=JobThrottle.RATE_LIMIT, ) async def _restart_after_problem(self, state: ContainerState): """Restart unhealthy or failed plugin.""" diff --git a/supervisor/plugins/cli.py b/supervisor/plugins/cli.py index 954089920..23e89a8a0 100644 --- a/supervisor/plugins/cli.py +++ b/supervisor/plugins/cli.py @@ -15,7 +15,7 @@ from ..docker.cli import DockerCli from ..docker.const import ContainerState from ..docker.stats import DockerStats from ..exceptions import CliError, CliJobError, CliUpdateError, DockerError, PluginError -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobThrottle from ..jobs.decorator import Job from ..utils.sentry import async_capture_exception from .base import PluginBase @@ -118,10 +118,10 @@ class PluginCli(PluginBase): @Job( name="plugin_cli_restart_after_problem", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, on_condition=CliJobError, + throttle=JobThrottle.RATE_LIMIT, ) async def _restart_after_problem(self, state: ContainerState): """Restart unhealthy or failed plugin.""" diff --git a/supervisor/plugins/dns.py b/supervisor/plugins/dns.py index 3480598c6..9ea98f0d9 100644 --- a/supervisor/plugins/dns.py +++ b/supervisor/plugins/dns.py @@ -31,7 +31,7 @@ from ..exceptions import ( DockerError, PluginError, ) -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobThrottle from ..jobs.decorator import Job from ..resolution.const import ContextType, IssueType, SuggestionType, UnhealthyReason from ..utils.json import write_json_file @@ -351,10 +351,10 @@ class PluginDns(PluginBase): @Job( name="plugin_dns_restart_after_problem", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, on_condition=CoreDNSJobError, + throttle=JobThrottle.RATE_LIMIT, ) async def _restart_after_problem(self, state: ContainerState): """Restart unhealthy or failed plugin.""" diff --git a/supervisor/plugins/multicast.py b/supervisor/plugins/multicast.py index 04d550e89..86e60f489 100644 --- a/supervisor/plugins/multicast.py +++ b/supervisor/plugins/multicast.py @@ -18,7 +18,7 @@ from ..exceptions import ( MulticastUpdateError, PluginError, ) -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobThrottle from ..jobs.decorator import Job from ..utils.sentry import async_capture_exception from .base import PluginBase @@ -114,10 +114,10 @@ class PluginMulticast(PluginBase): @Job( name="plugin_multicast_restart_after_problem", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, on_condition=MulticastJobError, + throttle=JobThrottle.RATE_LIMIT, ) async def _restart_after_problem(self, state: ContainerState): """Restart unhealthy or failed plugin.""" diff --git a/supervisor/plugins/observer.py b/supervisor/plugins/observer.py index 6c5ab6355..e599d7f34 100644 --- a/supervisor/plugins/observer.py +++ b/supervisor/plugins/observer.py @@ -21,7 +21,7 @@ from ..exceptions import ( ObserverUpdateError, PluginError, ) -from ..jobs.const import JobExecutionLimit +from ..jobs.const import JobThrottle from ..jobs.decorator import Job from ..utils.sentry import async_capture_exception from .base import PluginBase @@ -130,10 +130,10 @@ class PluginObserver(PluginBase): @Job( name="plugin_observer_restart_after_problem", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, throttle_period=WATCHDOG_THROTTLE_PERIOD, throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, on_condition=ObserverJobError, + throttle=JobThrottle.RATE_LIMIT, ) async def _restart_after_problem(self, state: ContainerState): """Restart unhealthy or failed plugin.""" diff --git a/supervisor/resolution/checks/addon_pwned.py b/supervisor/resolution/checks/addon_pwned.py index d7f1427fc..9a0dea230 100644 --- a/supervisor/resolution/checks/addon_pwned.py +++ b/supervisor/resolution/checks/addon_pwned.py @@ -6,7 +6,7 @@ import logging from ...const import AddonState, CoreState from ...coresys import CoreSys from ...exceptions import PwnedConnectivityError, PwnedError, PwnedSecret -from ...jobs.const import JobCondition, JobExecutionLimit +from ...jobs.const import JobCondition, JobThrottle from ...jobs.decorator import Job from ..const import ContextType, IssueType, SuggestionType from .base import CheckBase @@ -25,8 +25,8 @@ class CheckAddonPwned(CheckBase): @Job( name="check_addon_pwned_run", conditions=[JobCondition.INTERNET_SYSTEM], - limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=24), + throttle=JobThrottle.THROTTLE, ) async def run_check(self) -> None: """Run check if not affected by issue.""" diff --git a/supervisor/resolution/checks/dns_server.py b/supervisor/resolution/checks/dns_server.py index 0bbb8b0ce..efd701124 100644 --- a/supervisor/resolution/checks/dns_server.py +++ b/supervisor/resolution/checks/dns_server.py @@ -9,7 +9,7 @@ from aiodns.error import DNSError from ...const import CoreState from ...coresys import CoreSys -from ...jobs.const import JobCondition, JobExecutionLimit +from ...jobs.const import JobCondition, JobThrottle from ...jobs.decorator import Job from ...utils.sentry import async_capture_exception from ..const import DNS_CHECK_HOST, ContextType, IssueType @@ -36,8 +36,8 @@ class CheckDNSServer(CheckBase): @Job( name="check_dns_server_run", conditions=[JobCondition.INTERNET_SYSTEM], - limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=24), + throttle=JobThrottle.THROTTLE, ) async def run_check(self) -> None: """Run check if not affected by issue.""" diff --git a/supervisor/resolution/checks/dns_server_ipv6.py b/supervisor/resolution/checks/dns_server_ipv6.py index 8b6f9f8a9..d6c512539 100644 --- a/supervisor/resolution/checks/dns_server_ipv6.py +++ b/supervisor/resolution/checks/dns_server_ipv6.py @@ -7,7 +7,7 @@ from aiodns.error import DNSError from ...const import CoreState from ...coresys import CoreSys -from ...jobs.const import JobCondition, JobExecutionLimit +from ...jobs.const import JobCondition, JobThrottle from ...jobs.decorator import Job from ...utils.sentry import async_capture_exception from ..const import DNS_ERROR_NO_DATA, ContextType, IssueType @@ -26,8 +26,8 @@ class CheckDNSServerIPv6(CheckBase): @Job( name="check_dns_server_ipv6_run", conditions=[JobCondition.INTERNET_SYSTEM], - limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=24), + throttle=JobThrottle.THROTTLE, ) async def run_check(self) -> None: """Run check if not affected by issue.""" diff --git a/supervisor/resolution/fixups/system_execute_integrity.py b/supervisor/resolution/fixups/system_execute_integrity.py index 58711b467..a908b09b0 100644 --- a/supervisor/resolution/fixups/system_execute_integrity.py +++ b/supervisor/resolution/fixups/system_execute_integrity.py @@ -5,7 +5,7 @@ import logging from ...coresys import CoreSys from ...exceptions import ResolutionFixupError, ResolutionFixupJobError -from ...jobs.const import JobCondition, JobExecutionLimit +from ...jobs.const import JobCondition, JobThrottle from ...jobs.decorator import Job from ...security.const import ContentTrustResult from ..const import ContextType, IssueType, SuggestionType @@ -26,8 +26,8 @@ class FixupSystemExecuteIntegrity(FixupBase): name="fixup_system_execute_integrity_process", conditions=[JobCondition.INTERNET_SYSTEM], on_condition=ResolutionFixupJobError, - limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=8), + throttle=JobThrottle.THROTTLE, ) async def process_fixup(self, reference: str | None = None) -> None: """Initialize the fixup class.""" diff --git a/supervisor/security/module.py b/supervisor/security/module.py index d15be69de..f09a52dc0 100644 --- a/supervisor/security/module.py +++ b/supervisor/security/module.py @@ -17,7 +17,8 @@ from ..exceptions import ( PwnedError, SecurityJobError, ) -from ..jobs.decorator import Job, JobCondition, JobExecutionLimit +from ..jobs.const import JobConcurrency +from ..jobs.decorator import Job, JobCondition from ..resolution.const import ContextType, IssueType, SuggestionType from ..utils.codenotary import cas_validate from ..utils.common import FileConfiguration @@ -107,7 +108,7 @@ class Security(FileConfiguration, CoreSysAttributes): name="security_manager_integrity_check", conditions=[JobCondition.INTERNET_SYSTEM], on_condition=SecurityJobError, - limit=JobExecutionLimit.ONCE, + concurrency=JobConcurrency.REJECT, ) async def integrity_check(self) -> IntegrityResult: """Run a full system integrity check of the platform. diff --git a/supervisor/supervisor.py b/supervisor/supervisor.py index 943323967..12a7b63e8 100644 --- a/supervisor/supervisor.py +++ b/supervisor/supervisor.py @@ -32,7 +32,7 @@ from .exceptions import ( SupervisorJobError, SupervisorUpdateError, ) -from .jobs.const import JobCondition, JobExecutionLimit +from .jobs.const import JobCondition, JobThrottle from .jobs.decorator import Job from .resolution.const import ContextType, IssueType, UnhealthyReason from .utils.codenotary import calc_checksum @@ -288,8 +288,8 @@ class Supervisor(CoreSysAttributes): @Job( name="supervisor_check_connectivity", - limit=JobExecutionLimit.THROTTLE, throttle_period=_check_connectivity_throttle_period, + throttle=JobThrottle.THROTTLE, ) async def check_connectivity(self) -> None: """Check the Internet connectivity from Supervisor's point of view.""" diff --git a/supervisor/updater.py b/supervisor/updater.py index d02a697c3..cb9389390 100644 --- a/supervisor/updater.py +++ b/supervisor/updater.py @@ -8,6 +8,8 @@ import logging import aiohttp from awesomeversion import AwesomeVersion +from supervisor.jobs.const import JobConcurrency, JobThrottle + from .bus import EventListener from .const import ( ATTR_AUDIO, @@ -34,7 +36,7 @@ from .exceptions import ( UpdaterError, UpdaterJobError, ) -from .jobs.decorator import Job, JobCondition, JobExecutionLimit +from .jobs.decorator import Job, JobCondition from .utils.codenotary import calc_checksum from .utils.common import FileConfiguration from .validate import SCHEMA_UPDATER_CONFIG @@ -198,8 +200,9 @@ class Updater(FileConfiguration, CoreSysAttributes): name="updater_fetch_data", conditions=[JobCondition.INTERNET_SYSTEM], on_condition=UpdaterJobError, - limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(seconds=30), + concurrency=JobConcurrency.QUEUE, + throttle=JobThrottle.THROTTLE, ) async def fetch_data(self): """Fetch current versions from Github. diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index 91b634af3..eacc8a893 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -20,7 +20,7 @@ from supervisor.exceptions import ( from supervisor.host.const import HostFeature from supervisor.host.manager import HostManager from supervisor.jobs import JobSchedulerOptions, SupervisorJob -from supervisor.jobs.const import JobConcurrency, JobExecutionLimit, JobThrottle +from supervisor.jobs.const import JobConcurrency, JobThrottle from supervisor.jobs.decorator import Job, JobCondition from supervisor.jobs.job_group import JobGroup from supervisor.os.manager import OSManager @@ -280,8 +280,8 @@ async def test_exception_conditions(coresys: CoreSys): await test.execute() -async def test_execution_limit_single_wait(coresys: CoreSys): - """Test the single wait job execution limit.""" +async def test_concurrency_queue(coresys: CoreSys): + """Test the queue job concurrency.""" class TestClass: """Test class.""" @@ -292,8 +292,8 @@ async def test_execution_limit_single_wait(coresys: CoreSys): self.run = asyncio.Lock() @Job( - name="test_execution_limit_single_wait_execute", - limit=JobExecutionLimit.SINGLE_WAIT, + name="test_concurrency_queue_execute", + concurrency=JobConcurrency.QUEUE, ) async def execute(self, sleep: float): """Execute the class method.""" @@ -306,8 +306,8 @@ async def test_execution_limit_single_wait(coresys: CoreSys): await asyncio.gather(*[test.execute(0.1), test.execute(0.1), test.execute(0.1)]) -async def test_execution_limit_throttle_wait(coresys: CoreSys): - """Test the throttle wait job execution limit.""" +async def test_concurrency_queue_with_throttle(coresys: CoreSys): + """Test the queue concurrency with throttle.""" class TestClass: """Test class.""" @@ -319,8 +319,9 @@ async def test_execution_limit_throttle_wait(coresys: CoreSys): self.call = 0 @Job( - name="test_execution_limit_throttle_wait_execute", - limit=JobExecutionLimit.THROTTLE_WAIT, + name="test_concurrency_queue_with_throttle_execute", + concurrency=JobConcurrency.QUEUE, + throttle=JobThrottle.THROTTLE, throttle_period=timedelta(hours=1), ) async def execute(self, sleep: float): @@ -340,10 +341,8 @@ async def test_execution_limit_throttle_wait(coresys: CoreSys): @pytest.mark.parametrize("error", [None, PluginJobError]) -async def test_execution_limit_throttle_rate_limit( - coresys: CoreSys, error: JobException | None -): - """Test the throttle wait job execution limit.""" +async def test_throttle_rate_limit(coresys: CoreSys, error: JobException | None): + """Test the throttle rate limit.""" class TestClass: """Test class.""" @@ -355,8 +354,8 @@ async def test_execution_limit_throttle_rate_limit( self.call = 0 @Job( - name=f"test_execution_limit_throttle_rate_limit_execute_{uuid4().hex}", - limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + name=f"test_throttle_rate_limit_execute_{uuid4().hex}", + throttle=JobThrottle.RATE_LIMIT, throttle_period=timedelta(hours=1), throttle_max_calls=2, on_condition=error, @@ -381,8 +380,8 @@ async def test_execution_limit_throttle_rate_limit( assert test.call == 3 -async def test_execution_limit_throttle(coresys: CoreSys): - """Test the ignore conditions decorator.""" +async def test_throttle_basic(coresys: CoreSys): + """Test the basic throttle functionality.""" class TestClass: """Test class.""" @@ -394,8 +393,8 @@ async def test_execution_limit_throttle(coresys: CoreSys): self.call = 0 @Job( - name="test_execution_limit_throttle_execute", - limit=JobExecutionLimit.THROTTLE, + name="test_throttle_basic_execute", + throttle=JobThrottle.THROTTLE, throttle_period=timedelta(hours=1), ) async def execute(self, sleep: float): @@ -414,8 +413,8 @@ async def test_execution_limit_throttle(coresys: CoreSys): assert test.call == 1 -async def test_execution_limit_once(coresys: CoreSys): - """Test the ignore conditions decorator.""" +async def test_concurrency_reject(coresys: CoreSys): + """Test the reject concurrency.""" class TestClass: """Test class.""" @@ -426,8 +425,8 @@ async def test_execution_limit_once(coresys: CoreSys): self.run = asyncio.Lock() @Job( - name="test_execution_limit_once_execute", - limit=JobExecutionLimit.ONCE, + name="test_concurrency_reject_execute", + concurrency=JobConcurrency.REJECT, on_condition=JobException, ) async def execute(self, sleep: float): @@ -603,8 +602,8 @@ async def test_host_network(coresys: CoreSys): assert await test.execute() -async def test_job_group_once(coresys: CoreSys): - """Test job group once execution limitation.""" +async def test_job_group_reject(coresys: CoreSys): + """Test job group reject concurrency limitation.""" class TestClass(JobGroup): """Test class.""" @@ -615,8 +614,8 @@ async def test_job_group_once(coresys: CoreSys): self.event = asyncio.Event() @Job( - name="test_job_group_once_inner_execute", - limit=JobExecutionLimit.GROUP_ONCE, + name="test_job_group_reject_inner_execute", + concurrency=JobConcurrency.GROUP_REJECT, on_condition=JobException, ) async def inner_execute(self) -> bool: @@ -625,8 +624,8 @@ async def test_job_group_once(coresys: CoreSys): return True @Job( - name="test_job_group_once_execute", - limit=JobExecutionLimit.GROUP_ONCE, + name="test_job_group_reject_execute", + concurrency=JobConcurrency.GROUP_REJECT, on_condition=JobException, ) async def execute(self) -> bool: @@ -634,8 +633,8 @@ async def test_job_group_once(coresys: CoreSys): return await self.inner_execute() @Job( - name="test_job_group_once_separate_execute", - limit=JobExecutionLimit.GROUP_ONCE, + name="test_job_group_reject_separate_execute", + concurrency=JobConcurrency.GROUP_REJECT, on_condition=JobException, ) async def separate_execute(self) -> bool: @@ -643,8 +642,8 @@ async def test_job_group_once(coresys: CoreSys): return True @Job( - name="test_job_group_once_unrelated", - limit=JobExecutionLimit.ONCE, + name="test_job_group_reject_unrelated", + concurrency=JobConcurrency.REJECT, on_condition=JobException, ) async def unrelated_method(self) -> bool: @@ -672,8 +671,8 @@ async def test_job_group_once(coresys: CoreSys): assert await run_task -async def test_job_group_wait(coresys: CoreSys): - """Test job group wait execution limitation.""" +async def test_job_group_queue(coresys: CoreSys): + """Test job group queue concurrency limitation.""" class TestClass(JobGroup): """Test class.""" @@ -686,8 +685,8 @@ async def test_job_group_wait(coresys: CoreSys): self.event = asyncio.Event() @Job( - name="test_job_group_wait_inner_execute", - limit=JobExecutionLimit.GROUP_WAIT, + name="test_job_group_queue_inner_execute", + concurrency=JobConcurrency.GROUP_QUEUE, on_condition=JobException, ) async def inner_execute(self) -> None: @@ -696,8 +695,8 @@ async def test_job_group_wait(coresys: CoreSys): await self.event.wait() @Job( - name="test_job_group_wait_execute", - limit=JobExecutionLimit.GROUP_WAIT, + name="test_job_group_queue_execute", + concurrency=JobConcurrency.GROUP_QUEUE, on_condition=JobException, ) async def execute(self) -> None: @@ -705,8 +704,8 @@ async def test_job_group_wait(coresys: CoreSys): await self.inner_execute() @Job( - name="test_job_group_wait_separate_execute", - limit=JobExecutionLimit.GROUP_WAIT, + name="test_job_group_queue_separate_execute", + concurrency=JobConcurrency.GROUP_QUEUE, on_condition=JobException, ) async def separate_execute(self) -> None: @@ -746,7 +745,7 @@ async def test_job_cleanup(coresys: CoreSys): self.event = asyncio.Event() self.job: SupervisorJob | None = None - @Job(name="test_job_cleanup_execute", limit=JobExecutionLimit.ONCE) + @Job(name="test_job_cleanup_execute", concurrency=JobConcurrency.REJECT) async def execute(self): """Execute the class method.""" self.job = coresys.jobs.current @@ -781,7 +780,7 @@ async def test_job_skip_cleanup(coresys: CoreSys): @Job( name="test_job_skip_cleanup_execute", - limit=JobExecutionLimit.ONCE, + concurrency=JobConcurrency.REJECT, cleanup=False, ) async def execute(self): @@ -804,8 +803,8 @@ async def test_job_skip_cleanup(coresys: CoreSys): assert test.job.done -async def test_execution_limit_group_throttle(coresys: CoreSys): - """Test the group throttle execution limit.""" +async def test_group_throttle(coresys: CoreSys): + """Test the group throttle.""" class TestClass(JobGroup): """Test class.""" @@ -817,8 +816,8 @@ async def test_execution_limit_group_throttle(coresys: CoreSys): self.call = 0 @Job( - name="test_execution_limit_group_throttle_execute", - limit=JobExecutionLimit.GROUP_THROTTLE, + name="test_group_throttle_execute", + throttle=JobThrottle.GROUP_THROTTLE, throttle_period=timedelta(milliseconds=95), ) async def execute(self, sleep: float): @@ -851,8 +850,8 @@ async def test_execution_limit_group_throttle(coresys: CoreSys): assert test2.call == 2 -async def test_execution_limit_group_throttle_wait(coresys: CoreSys): - """Test the group throttle wait job execution limit.""" +async def test_group_throttle_with_queue(coresys: CoreSys): + """Test the group throttle with queue concurrency.""" class TestClass(JobGroup): """Test class.""" @@ -864,8 +863,9 @@ async def test_execution_limit_group_throttle_wait(coresys: CoreSys): self.call = 0 @Job( - name="test_execution_limit_group_throttle_wait_execute", - limit=JobExecutionLimit.GROUP_THROTTLE_WAIT, + name="test_group_throttle_with_queue_execute", + concurrency=JobConcurrency.QUEUE, + throttle=JobThrottle.GROUP_THROTTLE, throttle_period=timedelta(milliseconds=95), ) async def execute(self, sleep: float): @@ -901,10 +901,8 @@ async def test_execution_limit_group_throttle_wait(coresys: CoreSys): @pytest.mark.parametrize("error", [None, PluginJobError]) -async def test_execution_limit_group_throttle_rate_limit( - coresys: CoreSys, error: JobException | None -): - """Test the group throttle rate limit job execution limit.""" +async def test_group_throttle_rate_limit(coresys: CoreSys, error: JobException | None): + """Test the group throttle rate limit.""" class TestClass(JobGroup): """Test class.""" @@ -916,8 +914,8 @@ async def test_execution_limit_group_throttle_rate_limit( self.call = 0 @Job( - name=f"test_execution_limit_group_throttle_rate_limit_execute_{uuid4().hex}", - limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + name=f"test_group_throttle_rate_limit_execute_{uuid4().hex}", + throttle=JobThrottle.GROUP_RATE_LIMIT, throttle_period=timedelta(hours=1), throttle_max_calls=2, on_condition=error, @@ -1013,7 +1011,7 @@ async def test_job_starting_separate_task(coresys: CoreSys): @Job( name="test_job_starting_separate_task_job_task", - limit=JobExecutionLimit.GROUP_ONCE, + concurrency=JobConcurrency.GROUP_REJECT, ) async def job_task(self): """Create a separate long running job task.""" @@ -1035,7 +1033,7 @@ async def test_job_starting_separate_task(coresys: CoreSys): @Job( name="test_job_starting_separate_task_job_await", - limit=JobExecutionLimit.GROUP_ONCE, + concurrency=JobConcurrency.GROUP_REJECT, ) async def job_await(self): """Await a simple job in same group to confirm lock released.""" @@ -1080,7 +1078,7 @@ async def test_job_always_removed_on_check_failure(coresys: CoreSys): @Job( name="test_job_always_removed_on_check_failure_limit", - limit=JobExecutionLimit.ONCE, + concurrency=JobConcurrency.REJECT, cleanup=False, ) async def limit_check(self):