Split execution limit in concurrency and throttle parameters (#6013)

* Split execution limit in concurrency and throttle parameters

Currently the execution limit combines two ortogonal features: Limit
concurrency and throttle execution. This change separates the two
features, allowing for more flexible configuration of job execution.

Ultimately I want to get rid of the old limit parameter. But for ease
of review and migration, I'd like to do this in two steps: First
introduce the new parameters, and map the old limit parameters to the
new parameters. Then, in a second step, remove the old limit parameter
and migrate all users to the new concurrency and throttle parameters
as needed.

* Introduce common lock release method

* Fix THROTTLE_WAIT behavior

The concurrency QUEUE does not really QUEUE throttle limits.

* Add documentation for new concurrency/throttle Job options

* Handle group options for concurrency and throttle separately

* Fix GROUP_THROTTLE_WAIT concurrency setting

We need to use the QUEUE concurrency setting instead of GROUP_QUEUE
for the GROUP_THROTTLE_WAIT execution limit. Otherwise the
test_jobs_decorator.py::test_execution_limit_group_throttle_wait
test deadlocks.

The reason this deadlocks is because GROUP_QUEUE concurrency doesn't
really work because we only can release a group lock if the job is
actually running.

Or put differently, throttling isn't supported with GROUP_*
concurrency options.

* Prevent using any throttling with group concurrency

The group concurrency modes (reject and queue) are not compatible with
any throttling, since we currently can't unlock the group lock when
a job doesn't get started (which is the case when throttling is
applied).

* Fix commit in group rate limit

* Explain the deadlock issue with group locks in code

* Handle locking correctly on throttle limit exceptions

* Introduce pytest for new job decorator combinations
This commit is contained in:
Stefan Agner 2025-07-30 22:12:14 +02:00 committed by GitHub
parent cf77ab2290
commit 6871ea4b81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 324 additions and 130 deletions

View File

@ -34,8 +34,60 @@ class JobCondition(StrEnum):
SUPERVISOR_UPDATED = "supervisor_updated" SUPERVISOR_UPDATED = "supervisor_updated"
class JobConcurrency(StrEnum):
"""Job concurrency control.
Controls how many instances of a job can run simultaneously.
Individual Concurrency (applies to each method separately):
- REJECT: Fail immediately if another instance is already running
- QUEUE: Wait for the current instance to finish, then run
Group Concurrency (applies across all methods on a JobGroup):
- GROUP_REJECT: Fail if ANY job is running on the JobGroup
- GROUP_QUEUE: Wait for ANY running job on the JobGroup to finish
JobGroup Behavior:
- All methods on the same JobGroup instance share a single lock
- Methods can call other methods on the same group without deadlock
- Uses the JobGroup.group_name for coordination
- Requires the class to inherit from JobGroup
"""
REJECT = "reject" # Fail if already running (was ONCE)
QUEUE = "queue" # Wait if already running (was SINGLE_WAIT)
GROUP_REJECT = "group_reject" # Was GROUP_ONCE
GROUP_QUEUE = "group_queue" # Was GROUP_WAIT
class JobThrottle(StrEnum):
"""Job throttling control.
Controls how frequently jobs can be executed.
Individual Throttling (each method has its own throttle state):
- THROTTLE: Skip execution if called within throttle_period
- RATE_LIMIT: Allow up to throttle_max_calls within throttle_period, then fail
Group Throttling (all methods on a JobGroup share throttle state):
- GROUP_THROTTLE: Skip if ANY method was called within throttle_period
- GROUP_RATE_LIMIT: Allow up to throttle_max_calls total across ALL methods
JobGroup Behavior:
- All methods on the same JobGroup instance share throttle counters/timers
- Uses the JobGroup.group_name as the key for tracking state
- If one method is throttled, other methods may also be throttled
- Requires the class to inherit from JobGroup
"""
THROTTLE = "throttle" # Skip if called too frequently
RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period
GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE
GROUP_RATE_LIMIT = "group_rate_limit" # Group version of RATE_LIMIT
class JobExecutionLimit(StrEnum): class JobExecutionLimit(StrEnum):
"""Job Execution limits.""" """Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead."""
ONCE = "once" ONCE = "once"
SINGLE_WAIT = "single_wait" SINGLE_WAIT = "single_wait"

View File

@ -20,7 +20,7 @@ from ..host.const import HostFeature
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
from ..utils.sentry import async_capture_exception from ..utils.sentry import async_capture_exception
from . import SupervisorJob from . import SupervisorJob
from .const import JobCondition, JobExecutionLimit from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle
from .job_group import JobGroup from .job_group import JobGroup
_LOGGER: logging.Logger = logging.getLogger(__package__) _LOGGER: logging.Logger = logging.getLogger(__package__)
@ -36,13 +36,16 @@ class Job(CoreSysAttributes):
conditions: list[JobCondition] | None = None, conditions: list[JobCondition] | None = None,
cleanup: bool = True, cleanup: bool = True,
on_condition: type[JobException] | None = None, on_condition: type[JobException] | None = None,
limit: JobExecutionLimit | None = None, concurrency: JobConcurrency | None = None,
throttle: JobThrottle | None = None,
throttle_period: timedelta throttle_period: timedelta
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta] | Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
| None = None, | None = None,
throttle_max_calls: int | None = None, throttle_max_calls: int | None = None,
internal: bool = False, internal: bool = False,
): # Backward compatibility - DEPRECATED
limit: JobExecutionLimit | None = None,
): # pylint: disable=too-many-positional-arguments
"""Initialize the Job decorator. """Initialize the Job decorator.
Args: Args:
@ -50,13 +53,15 @@ class Job(CoreSysAttributes):
conditions (list[JobCondition] | None): List of conditions that must be met before the job runs. conditions (list[JobCondition] | None): List of conditions that must be met before the job runs.
cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart. cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart.
on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure. on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure.
limit (JobExecutionLimit | None): Execution limit policy for the job (e.g., throttle, once, group-based). concurrency (JobConcurrency | None): Concurrency control policy (e.g., reject, queue, group-based).
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for rate-limited jobs). throttle (JobThrottle | None): Throttling policy (e.g., throttle, rate_limit, group-based).
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs).
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs). throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False. internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead.
Raises: Raises:
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected limit. RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy.
""" """
if name in _JOB_NAMES: if name in _JOB_NAMES:
@ -67,7 +72,6 @@ class Job(CoreSysAttributes):
self.conditions = conditions self.conditions = conditions
self.cleanup = cleanup self.cleanup = cleanup
self.on_condition = on_condition self.on_condition = on_condition
self.limit = limit
self._throttle_period = throttle_period self._throttle_period = throttle_period
self._throttle_max_calls = throttle_max_calls self._throttle_max_calls = throttle_max_calls
self._lock: asyncio.Semaphore | None = None self._lock: asyncio.Semaphore | None = None
@ -75,34 +79,91 @@ class Job(CoreSysAttributes):
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
self._internal = internal self._internal = internal
# Handle backward compatibility with limit parameter
if limit is not None:
if concurrency is not None or throttle is not None:
raise RuntimeError(
f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!"
)
# Map old limit values to new parameters
concurrency, throttle = self._map_limit_to_new_params(limit)
self.concurrency = concurrency
self.throttle = throttle
# Validate Options # Validate Options
self._validate_parameters()
def _map_limit_to_new_params(
self, limit: JobExecutionLimit
) -> tuple[JobConcurrency | None, JobThrottle | None]:
"""Map old limit parameter to new concurrency and throttle parameters."""
mapping = {
JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None),
JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None),
JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE),
JobExecutionLimit.THROTTLE_WAIT: (
JobConcurrency.QUEUE,
JobThrottle.THROTTLE,
),
JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT),
JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None),
JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None),
JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE),
JobExecutionLimit.GROUP_THROTTLE_WAIT: (
# Seems a bit counter intuitive, but GROUP_QUEUE deadlocks
# tests/jobs/test_job_decorator.py::test_execution_limit_group_throttle_wait
# The reason this deadlocks is because when using GROUP_QUEUE and the
# throttle limit is hit, the group lock is trying to be unlocked outside
# of the job context. The current implementation doesn't allow to unlock
# the group lock when the job is not running.
JobConcurrency.QUEUE,
JobThrottle.GROUP_THROTTLE,
),
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: (
None,
JobThrottle.GROUP_RATE_LIMIT,
),
}
return mapping.get(limit, (None, None))
def _validate_parameters(self) -> None:
"""Validate job parameters."""
# Validate throttle parameters
if ( if (
self.limit self.throttle
in ( in (
JobExecutionLimit.THROTTLE, JobThrottle.THROTTLE,
JobExecutionLimit.THROTTLE_WAIT, JobThrottle.GROUP_THROTTLE,
JobExecutionLimit.THROTTLE_RATE_LIMIT, JobThrottle.RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE, JobThrottle.GROUP_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
) )
and self._throttle_period is None and self._throttle_period is None
): ):
raise RuntimeError( raise RuntimeError(
f"Job {name} is using execution limit {limit} without a throttle period!" f"Job {self.name} is using throttle {self.throttle} without a throttle period!"
) )
if self.limit in ( if self.throttle in (
JobExecutionLimit.THROTTLE_RATE_LIMIT, JobThrottle.RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT,
): ):
if self._throttle_max_calls is None: if self._throttle_max_calls is None:
raise RuntimeError( raise RuntimeError(
f"Job {name} is using execution limit {limit} without throttle max calls!" f"Job {self.name} is using throttle {self.throttle} without throttle max calls!"
) )
self._rate_limited_calls = {} self._rate_limited_calls = {}
if self.throttle is not None and self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
# We cannot release group locks when Job is not running (e.g. throttled)
# which makes these combinations impossible to use currently.
raise RuntimeError(
f"Job {self.name} is using throttling ({self.throttle}) with group concurrency ({self.concurrency}), which is not allowed!"
)
@property @property
def throttle_max_calls(self) -> int: def throttle_max_calls(self) -> int:
"""Return max calls for throttle.""" """Return max calls for throttle."""
@ -131,7 +192,7 @@ class Job(CoreSysAttributes):
"""Return rate limited calls if used.""" """Return rate limited calls if used."""
if self._rate_limited_calls is None: if self._rate_limited_calls is None:
raise RuntimeError( raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}" "Rate limited calls not available for this throttle type"
) )
return self._rate_limited_calls.get(group_name, []) return self._rate_limited_calls.get(group_name, [])
@ -142,7 +203,7 @@ class Job(CoreSysAttributes):
"""Add a rate limited call to list if used.""" """Add a rate limited call to list if used."""
if self._rate_limited_calls is None: if self._rate_limited_calls is None:
raise RuntimeError( raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}" "Rate limited calls not available for this throttle type"
) )
if group_name in self._rate_limited_calls: if group_name in self._rate_limited_calls:
@ -156,7 +217,7 @@ class Job(CoreSysAttributes):
"""Set rate limited calls if used.""" """Set rate limited calls if used."""
if self._rate_limited_calls is None: if self._rate_limited_calls is None:
raise RuntimeError( raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}" "Rate limited calls not available for this throttle type"
) )
self._rate_limited_calls[group_name] = value self._rate_limited_calls[group_name] = value
@ -193,15 +254,23 @@ class Job(CoreSysAttributes):
if obj.acquire and obj.release: # type: ignore if obj.acquire and obj.release: # type: ignore
job_group = cast(JobGroup, obj) job_group = cast(JobGroup, obj)
if not job_group and self.limit in ( # Check for group-based parameters
JobExecutionLimit.GROUP_ONCE, if not job_group:
JobExecutionLimit.GROUP_WAIT, if self.concurrency in (
JobExecutionLimit.GROUP_THROTTLE, JobConcurrency.GROUP_REJECT,
JobExecutionLimit.GROUP_THROTTLE_WAIT, JobConcurrency.GROUP_QUEUE,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
): ):
raise RuntimeError( raise RuntimeError(
f"Job on {self.name} need to be a JobGroup to use group based limits!" f"Job {self.name} uses group concurrency ({self.concurrency}) but is not on a JobGroup! "
f"The class must inherit from JobGroup to use GROUP_REJECT or GROUP_QUEUE."
) from None
if self.throttle in (
JobThrottle.GROUP_THROTTLE,
JobThrottle.GROUP_RATE_LIMIT,
):
raise RuntimeError(
f"Job {self.name} uses group throttling ({self.throttle}) but is not on a JobGroup! "
f"The class must inherit from JobGroup to use GROUP_THROTTLE or GROUP_RATE_LIMIT."
) from None ) from None
return job_group return job_group
@ -255,71 +324,15 @@ class Job(CoreSysAttributes):
except JobConditionException as err: except JobConditionException as err:
return self._handle_job_condition_exception(err) return self._handle_job_condition_exception(err)
# Handle exection limits # Handle execution limits
if self.limit in ( await self._handle_concurrency_control(job_group, job)
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
):
await self._acquire_exection_limit()
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try: try:
await cast(JobGroup, job_group).acquire( if not await self._handle_throttling(group_name):
job, self.limit == JobExecutionLimit.GROUP_WAIT self._release_concurrency_control(job_group)
) return # Job was throttled, exit early
except JobGroupExecutionLimitExceeded as err: except Exception:
if self.on_condition: self._release_concurrency_control(job_group)
raise self.on_condition(str(err)) from err raise
raise err
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call
> datetime.now() - self.throttle_period(group_name)
],
group_name,
)
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
on_condition = (
JobException
if self.on_condition is None
else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
# Execute Job # Execute Job
with job.start(): with job.start():
@ -345,12 +358,7 @@ class Job(CoreSysAttributes):
await async_capture_exception(err) await async_capture_exception(err)
raise JobException() from err raise JobException() from err
finally: finally:
self._release_exception_limits() self._release_concurrency_control(job_group)
if job_group and self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
job_group.release()
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required # Jobs that weren't started are always cleaned up. Also clean up done jobs if required
finally: finally:
@ -492,31 +500,75 @@ class Job(CoreSysAttributes):
f"'{method_name}' blocked from execution, mounting not supported on system" f"'{method_name}' blocked from execution, mounting not supported on system"
) )
async def _acquire_exection_limit(self) -> None: def _release_concurrency_control(self, job_group: JobGroup | None) -> None:
"""Process exection limits.""" """Release concurrency control locks."""
if self.limit not in ( if self.concurrency == JobConcurrency.REJECT:
JobExecutionLimit.SINGLE_WAIT, if self.lock.locked():
JobExecutionLimit.ONCE, self.lock.release()
JobExecutionLimit.THROTTLE_WAIT, elif self.concurrency == JobConcurrency.QUEUE:
JobExecutionLimit.GROUP_THROTTLE_WAIT, if self.lock.locked():
self.lock.release()
elif self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
): ):
return if job_group and job_group.has_lock:
job_group.release()
if self.limit == JobExecutionLimit.ONCE and self.lock.locked(): async def _handle_concurrency_control(
self, job_group: JobGroup | None, job: SupervisorJob
) -> None:
"""Handle concurrency control limits."""
if self.concurrency == JobConcurrency.REJECT:
if self.lock.locked():
on_condition = ( on_condition = (
JobException if self.on_condition is None else self.on_condition JobException if self.on_condition is None else self.on_condition
) )
raise on_condition("Another job is running") raise on_condition("Another job is running")
await self.lock.acquire() await self.lock.acquire()
elif self.concurrency == JobConcurrency.QUEUE:
await self.lock.acquire()
elif self.concurrency == JobConcurrency.GROUP_REJECT:
try:
await cast(JobGroup, job_group).acquire(job, wait=False)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.concurrency == JobConcurrency.GROUP_QUEUE:
try:
await cast(JobGroup, job_group).acquire(job, wait=True)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
def _release_exception_limits(self) -> None: async def _handle_throttling(self, group_name: str | None) -> bool:
"""Release possible exception limits.""" """Handle throttling limits. Returns True if job should continue, False if throttled."""
if self.limit not in ( if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE):
JobExecutionLimit.SINGLE_WAIT, time_since_last_call = datetime.now() - self.last_call(group_name)
JobExecutionLimit.ONCE, throttle_period = self.throttle_period(group_name)
JobExecutionLimit.THROTTLE_WAIT, if time_since_last_call < throttle_period:
JobExecutionLimit.GROUP_THROTTLE_WAIT, # Always return False when throttled (skip execution)
): return False
return elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT):
self.lock.release() # Only reprocess array when necessary (at limit)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call > datetime.now() - self.throttle_period(group_name)
],
group_name,
)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
return True

View File

@ -20,7 +20,7 @@ from supervisor.exceptions import (
from supervisor.host.const import HostFeature from supervisor.host.const import HostFeature
from supervisor.host.manager import HostManager from supervisor.host.manager import HostManager
from supervisor.jobs import JobSchedulerOptions, SupervisorJob from supervisor.jobs import JobSchedulerOptions, SupervisorJob
from supervisor.jobs.const import JobExecutionLimit from supervisor.jobs.const import JobConcurrency, JobExecutionLimit, JobThrottle
from supervisor.jobs.decorator import Job, JobCondition from supervisor.jobs.decorator import Job, JobCondition
from supervisor.jobs.job_group import JobGroup from supervisor.jobs.job_group import JobGroup
from supervisor.os.manager import OSManager from supervisor.os.manager import OSManager
@ -1212,3 +1212,93 @@ async def test_job_scheduled_at(coresys: CoreSys):
assert job.name == "test_job_scheduled_at_job_task" assert job.name == "test_job_scheduled_at_job_task"
assert job.stage == "work" assert job.stage == "work"
assert job.parent_id is None assert job.parent_id is None
async def test_concurency_reject_and_throttle(coresys: CoreSys):
"""Test the concurrency rejct and throttle job execution limit."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.run = asyncio.Lock()
self.call = 0
@Job(
name="test_concurency_reject_and_throttle_execute",
concurrency=JobConcurrency.REJECT,
throttle=JobThrottle.THROTTLE,
throttle_period=timedelta(hours=1),
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
async with self.run:
await asyncio.sleep(sleep)
self.call += 1
test = TestClass(coresys)
results = await asyncio.gather(
*[test.execute(0.1), test.execute(0.1), test.execute(0.1)],
return_exceptions=True,
)
assert results[0] is None
assert isinstance(results[1], JobException)
assert isinstance(results[2], JobException)
assert test.call == 1
await asyncio.gather(*[test.execute(0.1)])
assert test.call == 1
@pytest.mark.parametrize("error", [None, PluginJobError])
async def test_concurency_reject_and_rate_limit(
coresys: CoreSys, error: JobException | None
):
"""Test the concurrency rejct and rate limit job execution limit."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.run = asyncio.Lock()
self.call = 0
@Job(
name=f"test_concurency_reject_and_rate_limit_execute_{uuid4().hex}",
concurrency=JobConcurrency.REJECT,
throttle=JobThrottle.RATE_LIMIT,
throttle_period=timedelta(hours=1),
throttle_max_calls=1,
on_condition=error,
)
async def execute(self, sleep: float = 0):
"""Execute the class method."""
async with self.run:
await asyncio.sleep(sleep)
self.call += 1
test = TestClass(coresys)
results = await asyncio.gather(
*[test.execute(0.1), test.execute(), test.execute()], return_exceptions=True
)
assert results[0] is None
assert isinstance(results[1], JobException)
assert isinstance(results[2], JobException)
assert test.call == 1
with pytest.raises(JobException if error is None else error):
await test.execute()
assert test.call == 1
with time_machine.travel(utcnow() + timedelta(hours=1)):
await test.execute()
assert test.call == 2