Split execution limit in concurrency and throttle parameters

Currently the execution limit combines two ortogonal features: Limit
concurrency and throttle execution. This change separates the two
features, allowing for more flexible configuration of job execution.

Ultimately I want to get rid of the old limit parameter. But for ease
of review and migration, I'd like to do this in two steps: First
introduce the new parameters, and map the old limit parameters to the
new parameters. Then, in a second step, remove the old limit parameter
and migrate all users to the new concurrency and throttle parameters
as needed.
This commit is contained in:
Stefan Agner 2025-07-10 11:27:13 +02:00
parent 9a0f530a2f
commit 274415fb87
No known key found for this signature in database
GPG Key ID: AE01353D1E44747D
2 changed files with 178 additions and 125 deletions

View File

@ -34,8 +34,26 @@ class JobCondition(StrEnum):
SUPERVISOR_UPDATED = "supervisor_updated"
class JobConcurrency(StrEnum):
"""Job concurrency control."""
REJECT = "reject" # Fail if already running (was ONCE)
QUEUE = "queue" # Wait if already running (was SINGLE_WAIT)
GROUP_REJECT = "group_reject" # Was GROUP_ONCE
GROUP_QUEUE = "group_queue" # Was GROUP_WAIT
class JobThrottle(StrEnum):
"""Job throttling control."""
THROTTLE = "throttle" # Skip if called too frequently
RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period
GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE
GROUP_RATE_LIMIT = "group_rate_limit" # Group version of THROTTLE_RATE_LIMIT
class JobExecutionLimit(StrEnum):
"""Job Execution limits."""
"""Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead."""
ONCE = "once"
SINGLE_WAIT = "single_wait"

View File

@ -20,7 +20,7 @@ from ..host.const import HostFeature
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
from ..utils.sentry import async_capture_exception
from . import SupervisorJob
from .const import JobCondition, JobExecutionLimit
from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle
from .job_group import JobGroup
_LOGGER: logging.Logger = logging.getLogger(__package__)
@ -36,13 +36,16 @@ class Job(CoreSysAttributes):
conditions: list[JobCondition] | None = None,
cleanup: bool = True,
on_condition: type[JobException] | None = None,
limit: JobExecutionLimit | None = None,
concurrency: JobConcurrency | None = None,
throttle: JobThrottle | None = None,
throttle_period: timedelta
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
| None = None,
throttle_max_calls: int | None = None,
internal: bool = False,
):
# Backward compatibility - DEPRECATED
limit: JobExecutionLimit | None = None,
): # pylint: disable=too-many-positional-arguments
"""Initialize the Job decorator.
Args:
@ -50,13 +53,15 @@ class Job(CoreSysAttributes):
conditions (list[JobCondition] | None): List of conditions that must be met before the job runs.
cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart.
on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure.
limit (JobExecutionLimit | None): Execution limit policy for the job (e.g., throttle, once, group-based).
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for rate-limited jobs).
concurrency (JobConcurrency | None): Concurrency control policy (e.g., reject, queue, group-based).
throttle (JobThrottle | None): Throttling policy (e.g., throttle, rate_limit, group-based).
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs).
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead.
Raises:
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected limit.
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy.
"""
if name in _JOB_NAMES:
@ -67,7 +72,6 @@ class Job(CoreSysAttributes):
self.conditions = conditions
self.cleanup = cleanup
self.on_condition = on_condition
self.limit = limit
self._throttle_period = throttle_period
self._throttle_max_calls = throttle_max_calls
self._lock: asyncio.Semaphore | None = None
@ -75,32 +79,73 @@ class Job(CoreSysAttributes):
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
self._internal = internal
# Handle backward compatibility with limit parameter
if limit is not None:
if concurrency is not None or throttle is not None:
raise RuntimeError(
f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!"
)
# Map old limit values to new parameters
concurrency, throttle = self._map_limit_to_new_params(limit)
self.concurrency = concurrency
self.throttle = throttle
# Validate Options
self._validate_parameters()
def _map_limit_to_new_params(
self, limit: JobExecutionLimit
) -> tuple[JobConcurrency | None, JobThrottle | None]:
"""Map old limit parameter to new concurrency and throttle parameters."""
mapping = {
JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None),
JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None),
JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE),
JobExecutionLimit.THROTTLE_WAIT: (
JobConcurrency.QUEUE,
JobThrottle.THROTTLE,
),
JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT),
JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None),
JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None),
JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE),
JobExecutionLimit.GROUP_THROTTLE_WAIT: (
JobConcurrency.GROUP_QUEUE,
JobThrottle.GROUP_THROTTLE,
),
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: (
None,
JobThrottle.GROUP_RATE_LIMIT,
),
}
return mapping.get(limit, (None, None))
def _validate_parameters(self) -> None:
"""Validate job parameters."""
# Validate throttle parameters
if (
self.limit
self.throttle
in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
JobThrottle.THROTTLE,
JobThrottle.GROUP_THROTTLE,
JobThrottle.RATE_LIMIT,
JobThrottle.GROUP_RATE_LIMIT,
)
and self._throttle_period is None
):
raise RuntimeError(
f"Job {name} is using execution limit {limit} without a throttle period!"
f"Job {self.name} is using throttle {self.throttle} without a throttle period!"
)
if self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
if self.throttle in (
JobThrottle.RATE_LIMIT,
JobThrottle.GROUP_RATE_LIMIT,
):
if self._throttle_max_calls is None:
raise RuntimeError(
f"Job {name} is using execution limit {limit} without throttle max calls!"
f"Job {self.name} is using throttle {self.throttle} without throttle max calls!"
)
self._rate_limited_calls = {}
@property
@ -131,7 +176,7 @@ class Job(CoreSysAttributes):
"""Return rate limited calls if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
"Rate limited calls not available for this throttle type"
)
return self._rate_limited_calls.get(group_name, [])
@ -142,7 +187,7 @@ class Job(CoreSysAttributes):
"""Add a rate limited call to list if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
"Rate limited calls not available for this throttle type"
)
if group_name in self._rate_limited_calls:
@ -156,7 +201,7 @@ class Job(CoreSysAttributes):
"""Set rate limited calls if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
"Rate limited calls not available for this throttle type"
)
self._rate_limited_calls[group_name] = value
@ -193,13 +238,14 @@ class Job(CoreSysAttributes):
if obj.acquire and obj.release: # type: ignore
job_group = cast(JobGroup, obj)
if not job_group and self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
JobExecutionLimit.GROUP_THROTTLE,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Check for group-based parameters
group_based_params = [
self.concurrency
in (JobConcurrency.GROUP_REJECT, JobConcurrency.GROUP_QUEUE),
self.throttle in (JobThrottle.GROUP_THROTTLE, JobThrottle.GROUP_RATE_LIMIT),
]
if not job_group and any(group_based_params):
raise RuntimeError(
f"Job on {self.name} need to be a JobGroup to use group based limits!"
) from None
@ -255,71 +301,10 @@ class Job(CoreSysAttributes):
except JobConditionException as err:
return self._handle_job_condition_exception(err)
# Handle exection limits
if self.limit in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
):
await self._acquire_exection_limit()
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try:
await cast(JobGroup, job_group).acquire(
job, self.limit == JobExecutionLimit.GROUP_WAIT
)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call
> datetime.now() - self.throttle_period(group_name)
],
group_name,
)
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
on_condition = (
JobException
if self.on_condition is None
else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
# Handle execution limits
await self._handle_concurrency_control(job_group, job)
if not await self._handle_throttling(group_name):
return # Job was throttled, exit early
# Execute Job
with job.start():
@ -346,9 +331,10 @@ class Job(CoreSysAttributes):
raise JobException() from err
finally:
self._release_exception_limits()
if job_group and self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
# Handle concurrency parameters
if job_group and self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
job_group.release()
@ -492,31 +478,80 @@ class Job(CoreSysAttributes):
f"'{method_name}' blocked from execution, mounting not supported on system"
)
async def _acquire_exection_limit(self) -> None:
"""Process exection limits."""
if self.limit not in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
return
if self.limit == JobExecutionLimit.ONCE and self.lock.locked():
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition("Another job is running")
await self.lock.acquire()
def _release_exception_limits(self) -> None:
"""Release possible exception limits."""
if self.limit not in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
return
self.lock.release()
if self.concurrency in (JobConcurrency.REJECT, JobConcurrency.QUEUE):
self.lock.release()
async def _handle_concurrency_control(
self, job_group: JobGroup | None, job: SupervisorJob
) -> None:
"""Handle concurrency control limits."""
if self.concurrency == JobConcurrency.REJECT:
if self.lock.locked():
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition("Another job is running")
await self.lock.acquire()
elif self.concurrency == JobConcurrency.QUEUE:
await self.lock.acquire()
elif self.concurrency == JobConcurrency.GROUP_REJECT:
try:
await cast(JobGroup, job_group).acquire(job, wait=False)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.concurrency == JobConcurrency.GROUP_QUEUE:
try:
await cast(JobGroup, job_group).acquire(job, wait=True)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
async def _handle_throttling(self, group_name: str | None) -> bool:
"""Handle throttling limits. Returns True if job should continue, False if throttled."""
if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE):
time_since_last_call = datetime.now() - self.last_call(group_name)
throttle_period = self.throttle_period(group_name)
if time_since_last_call < throttle_period:
# If we have queue concurrency (WAIT behavior), sleep until throttle period passes
if self.concurrency in (
JobConcurrency.QUEUE,
JobConcurrency.GROUP_QUEUE,
):
sleep_time = (
throttle_period - time_since_last_call
).total_seconds()
await asyncio.sleep(sleep_time)
else:
# For non-queue concurrency (REJECT behavior), just return False
if self.concurrency in (
JobConcurrency.REJECT,
JobConcurrency.QUEUE,
):
self.lock.release()
return False
elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT):
# Only reprocess array when necessary (at limit)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call > datetime.now() - self.throttle_period(group_name)
],
group_name,
)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
return True