Compare commits

...

8 Commits

Author SHA1 Message Date
Stefan Agner
7f584e386d Use context manager for concurrency control
Instead of manually managing concurrency cleanup use a context manager
to ensure that resources are properly released even if an error occurs.
2025-07-14 22:49:04 +02:00
Stefan Agner
79964fd405 Prevent using any throttling with group concurrency
The group concurrency modes (reject and queue) are not compatible with
any throttling, since we currently can't unlock the group lock when
a job doesn't get started (which is the case when throttling is
applied).
2025-07-14 19:21:03 +02:00
Stefan Agner
351de4d9a3 Fix GROUP_THROTTLE_WAIT concurrency setting
We need to use the QUEUE concurrency setting instead of GROUP_QUEUE
for the GROUP_THROTTLE_WAIT execution limit. Otherwise the
test_jobs_decorator.py::test_execution_limit_group_throttle_wait
test deadlocks.

The reason this deadlocks is because GROUP_QUEUE concurrency doesn't
really work because we only can release a group lock if the job is
actually running.

Or put differently, throttling isn't supported with GROUP_*
concurrency options.
2025-07-10 15:23:53 +02:00
Stefan Agner
24a65dccd5 Handle group options for concurrency and throttle separately 2025-07-10 15:23:52 +02:00
Stefan Agner
d95048ed60 Add documentation for new concurrency/throttle Job options 2025-07-10 15:23:52 +02:00
Stefan Agner
b0fe60a5c9 Fix THROTTLE_WAIT behavior
The concurrency QUEUE does not really QUEUE throttle limits.
2025-07-10 15:23:52 +02:00
Stefan Agner
01b5003849 Introduce common lock release method 2025-07-10 15:23:52 +02:00
Stefan Agner
274415fb87 Split execution limit in concurrency and throttle parameters
Currently the execution limit combines two ortogonal features: Limit
concurrency and throttle execution. This change separates the two
features, allowing for more flexible configuration of job execution.

Ultimately I want to get rid of the old limit parameter. But for ease
of review and migration, I'd like to do this in two steps: First
introduce the new parameters, and map the old limit parameters to the
new parameters. Then, in a second step, remove the old limit parameter
and migrate all users to the new concurrency and throttle parameters
as needed.
2025-07-10 15:23:46 +02:00
2 changed files with 255 additions and 153 deletions

View File

@@ -34,8 +34,60 @@ class JobCondition(StrEnum):
SUPERVISOR_UPDATED = "supervisor_updated"
class JobConcurrency(StrEnum):
"""Job concurrency control.
Controls how many instances of a job can run simultaneously.
Individual Concurrency (applies to each method separately):
- REJECT: Fail immediately if another instance is already running
- QUEUE: Wait for the current instance to finish, then run
Group Concurrency (applies across all methods on a JobGroup):
- GROUP_REJECT: Fail if ANY job is running on the JobGroup
- GROUP_QUEUE: Wait for ANY running job on the JobGroup to finish
JobGroup Behavior:
- All methods on the same JobGroup instance share a single lock
- Methods can call other methods on the same group without deadlock
- Uses the JobGroup.group_name for coordination
- Requires the class to inherit from JobGroup
"""
REJECT = "reject" # Fail if already running (was ONCE)
QUEUE = "queue" # Wait if already running (was SINGLE_WAIT)
GROUP_REJECT = "group_reject" # Was GROUP_ONCE
GROUP_QUEUE = "group_queue" # Was GROUP_WAIT
class JobThrottle(StrEnum):
"""Job throttling control.
Controls how frequently jobs can be executed.
Individual Throttling (each method has its own throttle state):
- THROTTLE: Skip execution if called within throttle_period
- RATE_LIMIT: Allow up to throttle_max_calls within throttle_period, then fail
Group Throttling (all methods on a JobGroup share throttle state):
- GROUP_THROTTLE: Skip if ANY method was called within throttle_period
- GROUP_RATE_LIMIT: Allow up to throttle_max_calls total across ALL methods
JobGroup Behavior:
- All methods on the same JobGroup instance share throttle counters/timers
- Uses the JobGroup.group_name as the key for tracking state
- If one method is throttled, other methods may also be throttled
- Requires the class to inherit from JobGroup
"""
THROTTLE = "throttle" # Skip if called too frequently
RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period
GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE
GROUP_RATE_LIMIT = "group_rate_limit" # Group version of THROTTLE_RATE_LIMIT
class JobExecutionLimit(StrEnum):
"""Job Execution limits."""
"""Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead."""
ONCE = "once"
SINGLE_WAIT = "single_wait"

View File

@@ -1,8 +1,8 @@
"""Job decorator."""
import asyncio
from collections.abc import Awaitable, Callable
from contextlib import suppress
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import asynccontextmanager, suppress
from datetime import datetime, timedelta
from functools import wraps
import logging
@@ -20,7 +20,7 @@ from ..host.const import HostFeature
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
from ..utils.sentry import async_capture_exception
from . import SupervisorJob
from .const import JobCondition, JobExecutionLimit
from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle
from .job_group import JobGroup
_LOGGER: logging.Logger = logging.getLogger(__package__)
@@ -36,13 +36,16 @@ class Job(CoreSysAttributes):
conditions: list[JobCondition] | None = None,
cleanup: bool = True,
on_condition: type[JobException] | None = None,
limit: JobExecutionLimit | None = None,
concurrency: JobConcurrency | None = None,
throttle: JobThrottle | None = None,
throttle_period: timedelta
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
| None = None,
throttle_max_calls: int | None = None,
internal: bool = False,
):
# Backward compatibility - DEPRECATED
limit: JobExecutionLimit | None = None,
): # pylint: disable=too-many-positional-arguments
"""Initialize the Job decorator.
Args:
@@ -50,13 +53,15 @@ class Job(CoreSysAttributes):
conditions (list[JobCondition] | None): List of conditions that must be met before the job runs.
cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart.
on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure.
limit (JobExecutionLimit | None): Execution limit policy for the job (e.g., throttle, once, group-based).
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for rate-limited jobs).
concurrency (JobConcurrency | None): Concurrency control policy (e.g., reject, queue, group-based).
throttle (JobThrottle | None): Throttling policy (e.g., throttle, rate_limit, group-based).
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs).
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead.
Raises:
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected limit.
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy.
"""
if name in _JOB_NAMES:
@@ -67,7 +72,6 @@ class Job(CoreSysAttributes):
self.conditions = conditions
self.cleanup = cleanup
self.on_condition = on_condition
self.limit = limit
self._throttle_period = throttle_period
self._throttle_max_calls = throttle_max_calls
self._lock: asyncio.Semaphore | None = None
@@ -75,34 +79,85 @@ class Job(CoreSysAttributes):
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
self._internal = internal
# Handle backward compatibility with limit parameter
if limit is not None:
if concurrency is not None or throttle is not None:
raise RuntimeError(
f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!"
)
# Map old limit values to new parameters
concurrency, throttle = self._map_limit_to_new_params(limit)
self.concurrency = concurrency
self.throttle = throttle
# Validate Options
self._validate_parameters()
def _map_limit_to_new_params(
self, limit: JobExecutionLimit
) -> tuple[JobConcurrency | None, JobThrottle | None]:
"""Map old limit parameter to new concurrency and throttle parameters."""
mapping = {
JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None),
JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None),
JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE),
JobExecutionLimit.THROTTLE_WAIT: (
JobConcurrency.QUEUE,
JobThrottle.THROTTLE,
),
JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT),
JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None),
JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None),
JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE),
JobExecutionLimit.GROUP_THROTTLE_WAIT: (
JobConcurrency.QUEUE, # Seems a bit counter intuitive, but GROUP_QUEUE deadlocks tests/jobs/test_job_decorator.py::test_execution_limit_group_throttle_wait
JobThrottle.GROUP_THROTTLE,
),
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: (
None,
JobThrottle.GROUP_RATE_LIMIT,
),
}
return mapping.get(limit, (None, None))
def _validate_parameters(self) -> None:
"""Validate job parameters."""
# Validate throttle parameters
if (
self.limit
self.throttle
in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
JobThrottle.THROTTLE,
JobThrottle.GROUP_THROTTLE,
JobThrottle.RATE_LIMIT,
JobThrottle.GROUP_RATE_LIMIT,
)
and self._throttle_period is None
):
raise RuntimeError(
f"Job {name} is using execution limit {limit} without a throttle period!"
f"Job {self.name} is using throttle {self.throttle} without a throttle period!"
)
if self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
if self.throttle in (
JobThrottle.RATE_LIMIT,
JobThrottle.GROUP_RATE_LIMIT,
):
if self._throttle_max_calls is None:
raise RuntimeError(
f"Job {name} is using execution limit {limit} without throttle max calls!"
f"Job {self.name} is using throttle {self.throttle} without throttle max calls!"
)
self._rate_limited_calls = {}
if self.throttle is not None and self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
# We cannot release group locks when Job is not running (e.g. throttled)
# which makes these combinations impossible to use currently.
raise RuntimeError(
f"Job {self.name} is using throttling ({self.throttle}) with group concurrency ({self.concurrency}), which is not allowed!"
)
@property
def throttle_max_calls(self) -> int:
"""Return max calls for throttle."""
@@ -131,7 +186,7 @@ class Job(CoreSysAttributes):
"""Return rate limited calls if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
"Rate limited calls not available for this throttle type"
)
return self._rate_limited_calls.get(group_name, [])
@@ -142,7 +197,7 @@ class Job(CoreSysAttributes):
"""Add a rate limited call to list if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
"Rate limited calls not available for this throttle type"
)
if group_name in self._rate_limited_calls:
@@ -156,7 +211,7 @@ class Job(CoreSysAttributes):
"""Set rate limited calls if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
"Rate limited calls not available for this throttle type"
)
self._rate_limited_calls[group_name] = value
@@ -193,16 +248,24 @@ class Job(CoreSysAttributes):
if obj.acquire and obj.release: # type: ignore
job_group = cast(JobGroup, obj)
if not job_group and self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
JobExecutionLimit.GROUP_THROTTLE,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
raise RuntimeError(
f"Job on {self.name} need to be a JobGroup to use group based limits!"
) from None
# Check for group-based parameters
if not job_group:
if self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
raise RuntimeError(
f"Job {self.name} uses group concurrency ({self.concurrency}) but is not on a JobGroup! "
f"The class must inherit from JobGroup to use GROUP_REJECT or GROUP_QUEUE."
) from None
if self.throttle in (
JobThrottle.GROUP_THROTTLE,
JobThrottle.GROUP_RATE_LIMIT,
):
raise RuntimeError(
f"Job {self.name} uses group throttling ({self.throttle}) but is not on a JobGroup! "
f"The class must inherit from JobGroup to use GROUP_THROTTLE or GROUP_RATE_LIMIT."
) from None
return job_group
@@ -255,102 +318,34 @@ class Job(CoreSysAttributes):
except JobConditionException as err:
return self._handle_job_condition_exception(err)
# Handle exection limits
if self.limit in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
):
await self._acquire_exection_limit()
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try:
await cast(JobGroup, job_group).acquire(
job, self.limit == JobExecutionLimit.GROUP_WAIT
)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call
> datetime.now() - self.throttle_period(group_name)
],
group_name,
)
# Handle execution limits using context manager
async with self._concurrency_control(job_group, job):
if not await self._handle_throttling(group_name):
return # Job was throttled, exit early
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
on_condition = (
JobException
if self.on_condition is None
else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
# Execute Job
with job.start():
try:
self.set_last_call(datetime.now(), group_name)
if self._rate_limited_calls is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
)
# Execute Job
with job.start():
try:
self.set_last_call(datetime.now(), group_name)
if self._rate_limited_calls is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
)
return await method(obj, *args, **kwargs)
return await method(obj, *args, **kwargs)
# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
job.capture_error(err)
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
job.capture_error()
await async_capture_exception(err)
raise JobException() from err
finally:
self._release_exception_limits()
if job_group and self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
job_group.release()
# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
job.capture_error(err)
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
job.capture_error()
await async_capture_exception(err)
raise JobException() from err
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
finally:
@@ -492,31 +487,86 @@ class Job(CoreSysAttributes):
f"'{method_name}' blocked from execution, mounting not supported on system"
)
async def _acquire_exection_limit(self) -> None:
"""Process exection limits."""
if self.limit not in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
def _release_concurrency_control(self, job_group: JobGroup | None) -> None:
"""Release concurrency control locks."""
if self.concurrency == JobConcurrency.REJECT:
if self.lock.locked():
self.lock.release()
elif self.concurrency == JobConcurrency.QUEUE:
if self.lock.locked():
self.lock.release()
elif self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
return
if job_group and job_group.has_lock:
job_group.release()
if self.limit == JobExecutionLimit.ONCE and self.lock.locked():
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition("Another job is running")
async def _handle_concurrency_control(
self, job_group: JobGroup | None, job: SupervisorJob
) -> None:
"""Handle concurrency control limits."""
if self.concurrency == JobConcurrency.REJECT:
if self.lock.locked():
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition("Another job is running")
await self.lock.acquire()
elif self.concurrency == JobConcurrency.QUEUE:
await self.lock.acquire()
elif self.concurrency == JobConcurrency.GROUP_REJECT:
try:
await cast(JobGroup, job_group).acquire(job, wait=False)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.concurrency == JobConcurrency.GROUP_QUEUE:
try:
await cast(JobGroup, job_group).acquire(job, wait=True)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
await self.lock.acquire()
@asynccontextmanager
async def _concurrency_control(
self, job_group: JobGroup | None, job: SupervisorJob
) -> AsyncIterator[None]:
"""Context manager for concurrency control that ensures locks are always released."""
await self._handle_concurrency_control(job_group, job)
try:
yield
finally:
self._release_concurrency_control(job_group)
def _release_exception_limits(self) -> None:
"""Release possible exception limits."""
if self.limit not in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
return
self.lock.release()
async def _handle_throttling(self, group_name: str | None) -> bool:
"""Handle throttling limits. Returns True if job should continue, False if throttled."""
if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE):
time_since_last_call = datetime.now() - self.last_call(group_name)
throttle_period = self.throttle_period(group_name)
if time_since_last_call < throttle_period:
# Always return False when throttled (skip execution)
return False
elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT):
# Only reprocess array when necessary (at limit)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call > datetime.now() - self.throttle_period(group_name)
],
group_name,
)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
return True