diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index f7b398910..98fe09f01 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -34,8 +34,26 @@ class JobCondition(StrEnum): SUPERVISOR_UPDATED = "supervisor_updated" +class JobConcurrency(StrEnum): + """Job concurrency control.""" + + REJECT = "reject" # Fail if already running (was ONCE) + QUEUE = "queue" # Wait if already running (was SINGLE_WAIT) + GROUP_REJECT = "group_reject" # Was GROUP_ONCE + GROUP_QUEUE = "group_queue" # Was GROUP_WAIT + + +class JobThrottle(StrEnum): + """Job throttling control.""" + + THROTTLE = "throttle" # Skip if called too frequently + RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period + GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE + GROUP_RATE_LIMIT = "group_rate_limit" # Group version of THROTTLE_RATE_LIMIT + + class JobExecutionLimit(StrEnum): - """Job Execution limits.""" + """Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead.""" ONCE = "once" SINGLE_WAIT = "single_wait" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index 0747a50b8..d18b2cf51 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -20,7 +20,7 @@ from ..host.const import HostFeature from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType from ..utils.sentry import async_capture_exception from . import SupervisorJob -from .const import JobCondition, JobExecutionLimit +from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle from .job_group import JobGroup _LOGGER: logging.Logger = logging.getLogger(__package__) @@ -36,13 +36,16 @@ class Job(CoreSysAttributes): conditions: list[JobCondition] | None = None, cleanup: bool = True, on_condition: type[JobException] | None = None, - limit: JobExecutionLimit | None = None, + concurrency: JobConcurrency | None = None, + throttle: JobThrottle | None = None, throttle_period: timedelta | Callable[[CoreSys, datetime, list[datetime] | None], timedelta] | None = None, throttle_max_calls: int | None = None, internal: bool = False, - ): + # Backward compatibility - DEPRECATED + limit: JobExecutionLimit | None = None, + ): # pylint: disable=too-many-positional-arguments """Initialize the Job decorator. Args: @@ -50,13 +53,15 @@ class Job(CoreSysAttributes): conditions (list[JobCondition] | None): List of conditions that must be met before the job runs. cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart. on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure. - limit (JobExecutionLimit | None): Execution limit policy for the job (e.g., throttle, once, group-based). - throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for rate-limited jobs). + concurrency (JobConcurrency | None): Concurrency control policy (e.g., reject, queue, group-based). + throttle (JobThrottle | None): Throttling policy (e.g., throttle, rate_limit, group-based). + throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs). throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs). internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False. + limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead. Raises: - RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected limit. + RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy. """ if name in _JOB_NAMES: @@ -67,7 +72,6 @@ class Job(CoreSysAttributes): self.conditions = conditions self.cleanup = cleanup self.on_condition = on_condition - self.limit = limit self._throttle_period = throttle_period self._throttle_max_calls = throttle_max_calls self._lock: asyncio.Semaphore | None = None @@ -75,32 +79,73 @@ class Job(CoreSysAttributes): self._rate_limited_calls: dict[str | None, list[datetime]] | None = None self._internal = internal + # Handle backward compatibility with limit parameter + if limit is not None: + if concurrency is not None or throttle is not None: + raise RuntimeError( + f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!" + ) + # Map old limit values to new parameters + concurrency, throttle = self._map_limit_to_new_params(limit) + + self.concurrency = concurrency + self.throttle = throttle + # Validate Options + self._validate_parameters() + + def _map_limit_to_new_params( + self, limit: JobExecutionLimit + ) -> tuple[JobConcurrency | None, JobThrottle | None]: + """Map old limit parameter to new concurrency and throttle parameters.""" + mapping = { + JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None), + JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None), + JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE), + JobExecutionLimit.THROTTLE_WAIT: ( + JobConcurrency.QUEUE, + JobThrottle.THROTTLE, + ), + JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT), + JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None), + JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None), + JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE), + JobExecutionLimit.GROUP_THROTTLE_WAIT: ( + JobConcurrency.GROUP_QUEUE, + JobThrottle.GROUP_THROTTLE, + ), + JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: ( + None, + JobThrottle.GROUP_RATE_LIMIT, + ), + } + return mapping.get(limit, (None, None)) + + def _validate_parameters(self) -> None: + """Validate job parameters.""" + # Validate throttle parameters if ( - self.limit + self.throttle in ( - JobExecutionLimit.THROTTLE, - JobExecutionLimit.THROTTLE_WAIT, - JobExecutionLimit.THROTTLE_RATE_LIMIT, - JobExecutionLimit.GROUP_THROTTLE, - JobExecutionLimit.GROUP_THROTTLE_WAIT, - JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + JobThrottle.THROTTLE, + JobThrottle.GROUP_THROTTLE, + JobThrottle.RATE_LIMIT, + JobThrottle.GROUP_RATE_LIMIT, ) and self._throttle_period is None ): raise RuntimeError( - f"Job {name} is using execution limit {limit} without a throttle period!" + f"Job {self.name} is using throttle {self.throttle} without a throttle period!" ) - if self.limit in ( - JobExecutionLimit.THROTTLE_RATE_LIMIT, - JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + if self.throttle in ( + JobThrottle.RATE_LIMIT, + JobThrottle.GROUP_RATE_LIMIT, ): if self._throttle_max_calls is None: raise RuntimeError( - f"Job {name} is using execution limit {limit} without throttle max calls!" + f"Job {self.name} is using throttle {self.throttle} without throttle max calls!" ) - self._rate_limited_calls = {} @property @@ -131,7 +176,7 @@ class Job(CoreSysAttributes): """Return rate limited calls if used.""" if self._rate_limited_calls is None: raise RuntimeError( - f"Rate limited calls not available for limit type {self.limit}" + "Rate limited calls not available for this throttle type" ) return self._rate_limited_calls.get(group_name, []) @@ -142,7 +187,7 @@ class Job(CoreSysAttributes): """Add a rate limited call to list if used.""" if self._rate_limited_calls is None: raise RuntimeError( - f"Rate limited calls not available for limit type {self.limit}" + "Rate limited calls not available for this throttle type" ) if group_name in self._rate_limited_calls: @@ -156,7 +201,7 @@ class Job(CoreSysAttributes): """Set rate limited calls if used.""" if self._rate_limited_calls is None: raise RuntimeError( - f"Rate limited calls not available for limit type {self.limit}" + "Rate limited calls not available for this throttle type" ) self._rate_limited_calls[group_name] = value @@ -193,13 +238,14 @@ class Job(CoreSysAttributes): if obj.acquire and obj.release: # type: ignore job_group = cast(JobGroup, obj) - if not job_group and self.limit in ( - JobExecutionLimit.GROUP_ONCE, - JobExecutionLimit.GROUP_WAIT, - JobExecutionLimit.GROUP_THROTTLE, - JobExecutionLimit.GROUP_THROTTLE_WAIT, - JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, - ): + # Check for group-based parameters + group_based_params = [ + self.concurrency + in (JobConcurrency.GROUP_REJECT, JobConcurrency.GROUP_QUEUE), + self.throttle in (JobThrottle.GROUP_THROTTLE, JobThrottle.GROUP_RATE_LIMIT), + ] + + if not job_group and any(group_based_params): raise RuntimeError( f"Job on {self.name} need to be a JobGroup to use group based limits!" ) from None @@ -255,71 +301,10 @@ class Job(CoreSysAttributes): except JobConditionException as err: return self._handle_job_condition_exception(err) - # Handle exection limits - if self.limit in ( - JobExecutionLimit.SINGLE_WAIT, - JobExecutionLimit.ONCE, - ): - await self._acquire_exection_limit() - elif self.limit in ( - JobExecutionLimit.GROUP_ONCE, - JobExecutionLimit.GROUP_WAIT, - ): - try: - await cast(JobGroup, job_group).acquire( - job, self.limit == JobExecutionLimit.GROUP_WAIT - ) - except JobGroupExecutionLimitExceeded as err: - if self.on_condition: - raise self.on_condition(str(err)) from err - raise err - elif self.limit in ( - JobExecutionLimit.THROTTLE, - JobExecutionLimit.GROUP_THROTTLE, - ): - time_since_last_call = datetime.now() - self.last_call(group_name) - if time_since_last_call < self.throttle_period(group_name): - return - elif self.limit in ( - JobExecutionLimit.THROTTLE_WAIT, - JobExecutionLimit.GROUP_THROTTLE_WAIT, - ): - await self._acquire_exection_limit() - time_since_last_call = datetime.now() - self.last_call(group_name) - if time_since_last_call < self.throttle_period(group_name): - self._release_exception_limits() - return - elif self.limit in ( - JobExecutionLimit.THROTTLE_RATE_LIMIT, - JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, - ): - # Only reprocess array when necessary (at limit) - if ( - len(self.rate_limited_calls(group_name)) - >= self.throttle_max_calls - ): - self.set_rate_limited_calls( - [ - call - for call in self.rate_limited_calls(group_name) - if call - > datetime.now() - self.throttle_period(group_name) - ], - group_name, - ) - - if ( - len(self.rate_limited_calls(group_name)) - >= self.throttle_max_calls - ): - on_condition = ( - JobException - if self.on_condition is None - else self.on_condition - ) - raise on_condition( - f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}", - ) + # Handle execution limits + await self._handle_concurrency_control(job_group, job) + if not await self._handle_throttling(group_name): + return # Job was throttled, exit early # Execute Job with job.start(): @@ -346,9 +331,10 @@ class Job(CoreSysAttributes): raise JobException() from err finally: self._release_exception_limits() - if job_group and self.limit in ( - JobExecutionLimit.GROUP_ONCE, - JobExecutionLimit.GROUP_WAIT, + # Handle concurrency parameters + if job_group and self.concurrency in ( + JobConcurrency.GROUP_REJECT, + JobConcurrency.GROUP_QUEUE, ): job_group.release() @@ -492,31 +478,80 @@ class Job(CoreSysAttributes): f"'{method_name}' blocked from execution, mounting not supported on system" ) - async def _acquire_exection_limit(self) -> None: - """Process exection limits.""" - if self.limit not in ( - JobExecutionLimit.SINGLE_WAIT, - JobExecutionLimit.ONCE, - JobExecutionLimit.THROTTLE_WAIT, - JobExecutionLimit.GROUP_THROTTLE_WAIT, - ): - return - - if self.limit == JobExecutionLimit.ONCE and self.lock.locked(): - on_condition = ( - JobException if self.on_condition is None else self.on_condition - ) - raise on_condition("Another job is running") - - await self.lock.acquire() - def _release_exception_limits(self) -> None: """Release possible exception limits.""" - if self.limit not in ( - JobExecutionLimit.SINGLE_WAIT, - JobExecutionLimit.ONCE, - JobExecutionLimit.THROTTLE_WAIT, - JobExecutionLimit.GROUP_THROTTLE_WAIT, - ): - return - self.lock.release() + if self.concurrency in (JobConcurrency.REJECT, JobConcurrency.QUEUE): + self.lock.release() + + async def _handle_concurrency_control( + self, job_group: JobGroup | None, job: SupervisorJob + ) -> None: + """Handle concurrency control limits.""" + if self.concurrency == JobConcurrency.REJECT: + if self.lock.locked(): + on_condition = ( + JobException if self.on_condition is None else self.on_condition + ) + raise on_condition("Another job is running") + await self.lock.acquire() + elif self.concurrency == JobConcurrency.QUEUE: + await self.lock.acquire() + elif self.concurrency == JobConcurrency.GROUP_REJECT: + try: + await cast(JobGroup, job_group).acquire(job, wait=False) + except JobGroupExecutionLimitExceeded as err: + if self.on_condition: + raise self.on_condition(str(err)) from err + raise err + elif self.concurrency == JobConcurrency.GROUP_QUEUE: + try: + await cast(JobGroup, job_group).acquire(job, wait=True) + except JobGroupExecutionLimitExceeded as err: + if self.on_condition: + raise self.on_condition(str(err)) from err + raise err + + async def _handle_throttling(self, group_name: str | None) -> bool: + """Handle throttling limits. Returns True if job should continue, False if throttled.""" + if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE): + time_since_last_call = datetime.now() - self.last_call(group_name) + throttle_period = self.throttle_period(group_name) + if time_since_last_call < throttle_period: + # If we have queue concurrency (WAIT behavior), sleep until throttle period passes + if self.concurrency in ( + JobConcurrency.QUEUE, + JobConcurrency.GROUP_QUEUE, + ): + sleep_time = ( + throttle_period - time_since_last_call + ).total_seconds() + await asyncio.sleep(sleep_time) + else: + # For non-queue concurrency (REJECT behavior), just return False + if self.concurrency in ( + JobConcurrency.REJECT, + JobConcurrency.QUEUE, + ): + self.lock.release() + return False + elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT): + # Only reprocess array when necessary (at limit) + if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls: + self.set_rate_limited_calls( + [ + call + for call in self.rate_limited_calls(group_name) + if call > datetime.now() - self.throttle_period(group_name) + ], + group_name, + ) + + if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls: + on_condition = ( + JobException if self.on_condition is None else self.on_condition + ) + raise on_condition( + f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}", + ) + + return True