mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-09-09 21:19:26 +00:00
Compare commits
8 Commits
2025.08.2
...
separate-c
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7f584e386d | ||
![]() |
79964fd405 | ||
![]() |
351de4d9a3 | ||
![]() |
24a65dccd5 | ||
![]() |
d95048ed60 | ||
![]() |
b0fe60a5c9 | ||
![]() |
01b5003849 | ||
![]() |
274415fb87 |
@@ -34,8 +34,60 @@ class JobCondition(StrEnum):
|
|||||||
SUPERVISOR_UPDATED = "supervisor_updated"
|
SUPERVISOR_UPDATED = "supervisor_updated"
|
||||||
|
|
||||||
|
|
||||||
|
class JobConcurrency(StrEnum):
|
||||||
|
"""Job concurrency control.
|
||||||
|
|
||||||
|
Controls how many instances of a job can run simultaneously.
|
||||||
|
|
||||||
|
Individual Concurrency (applies to each method separately):
|
||||||
|
- REJECT: Fail immediately if another instance is already running
|
||||||
|
- QUEUE: Wait for the current instance to finish, then run
|
||||||
|
|
||||||
|
Group Concurrency (applies across all methods on a JobGroup):
|
||||||
|
- GROUP_REJECT: Fail if ANY job is running on the JobGroup
|
||||||
|
- GROUP_QUEUE: Wait for ANY running job on the JobGroup to finish
|
||||||
|
|
||||||
|
JobGroup Behavior:
|
||||||
|
- All methods on the same JobGroup instance share a single lock
|
||||||
|
- Methods can call other methods on the same group without deadlock
|
||||||
|
- Uses the JobGroup.group_name for coordination
|
||||||
|
- Requires the class to inherit from JobGroup
|
||||||
|
"""
|
||||||
|
|
||||||
|
REJECT = "reject" # Fail if already running (was ONCE)
|
||||||
|
QUEUE = "queue" # Wait if already running (was SINGLE_WAIT)
|
||||||
|
GROUP_REJECT = "group_reject" # Was GROUP_ONCE
|
||||||
|
GROUP_QUEUE = "group_queue" # Was GROUP_WAIT
|
||||||
|
|
||||||
|
|
||||||
|
class JobThrottle(StrEnum):
|
||||||
|
"""Job throttling control.
|
||||||
|
|
||||||
|
Controls how frequently jobs can be executed.
|
||||||
|
|
||||||
|
Individual Throttling (each method has its own throttle state):
|
||||||
|
- THROTTLE: Skip execution if called within throttle_period
|
||||||
|
- RATE_LIMIT: Allow up to throttle_max_calls within throttle_period, then fail
|
||||||
|
|
||||||
|
Group Throttling (all methods on a JobGroup share throttle state):
|
||||||
|
- GROUP_THROTTLE: Skip if ANY method was called within throttle_period
|
||||||
|
- GROUP_RATE_LIMIT: Allow up to throttle_max_calls total across ALL methods
|
||||||
|
|
||||||
|
JobGroup Behavior:
|
||||||
|
- All methods on the same JobGroup instance share throttle counters/timers
|
||||||
|
- Uses the JobGroup.group_name as the key for tracking state
|
||||||
|
- If one method is throttled, other methods may also be throttled
|
||||||
|
- Requires the class to inherit from JobGroup
|
||||||
|
"""
|
||||||
|
|
||||||
|
THROTTLE = "throttle" # Skip if called too frequently
|
||||||
|
RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period
|
||||||
|
GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE
|
||||||
|
GROUP_RATE_LIMIT = "group_rate_limit" # Group version of THROTTLE_RATE_LIMIT
|
||||||
|
|
||||||
|
|
||||||
class JobExecutionLimit(StrEnum):
|
class JobExecutionLimit(StrEnum):
|
||||||
"""Job Execution limits."""
|
"""Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead."""
|
||||||
|
|
||||||
ONCE = "once"
|
ONCE = "once"
|
||||||
SINGLE_WAIT = "single_wait"
|
SINGLE_WAIT = "single_wait"
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
"""Job decorator."""
|
"""Job decorator."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import AsyncIterator, Awaitable, Callable
|
||||||
from contextlib import suppress
|
from contextlib import asynccontextmanager, suppress
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
import logging
|
import logging
|
||||||
@@ -20,7 +20,7 @@ from ..host.const import HostFeature
|
|||||||
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
|
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
|
||||||
from ..utils.sentry import async_capture_exception
|
from ..utils.sentry import async_capture_exception
|
||||||
from . import SupervisorJob
|
from . import SupervisorJob
|
||||||
from .const import JobCondition, JobExecutionLimit
|
from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle
|
||||||
from .job_group import JobGroup
|
from .job_group import JobGroup
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__package__)
|
_LOGGER: logging.Logger = logging.getLogger(__package__)
|
||||||
@@ -36,13 +36,16 @@ class Job(CoreSysAttributes):
|
|||||||
conditions: list[JobCondition] | None = None,
|
conditions: list[JobCondition] | None = None,
|
||||||
cleanup: bool = True,
|
cleanup: bool = True,
|
||||||
on_condition: type[JobException] | None = None,
|
on_condition: type[JobException] | None = None,
|
||||||
limit: JobExecutionLimit | None = None,
|
concurrency: JobConcurrency | None = None,
|
||||||
|
throttle: JobThrottle | None = None,
|
||||||
throttle_period: timedelta
|
throttle_period: timedelta
|
||||||
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
|
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
|
||||||
| None = None,
|
| None = None,
|
||||||
throttle_max_calls: int | None = None,
|
throttle_max_calls: int | None = None,
|
||||||
internal: bool = False,
|
internal: bool = False,
|
||||||
):
|
# Backward compatibility - DEPRECATED
|
||||||
|
limit: JobExecutionLimit | None = None,
|
||||||
|
): # pylint: disable=too-many-positional-arguments
|
||||||
"""Initialize the Job decorator.
|
"""Initialize the Job decorator.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -50,13 +53,15 @@ class Job(CoreSysAttributes):
|
|||||||
conditions (list[JobCondition] | None): List of conditions that must be met before the job runs.
|
conditions (list[JobCondition] | None): List of conditions that must be met before the job runs.
|
||||||
cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart.
|
cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart.
|
||||||
on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure.
|
on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure.
|
||||||
limit (JobExecutionLimit | None): Execution limit policy for the job (e.g., throttle, once, group-based).
|
concurrency (JobConcurrency | None): Concurrency control policy (e.g., reject, queue, group-based).
|
||||||
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for rate-limited jobs).
|
throttle (JobThrottle | None): Throttling policy (e.g., throttle, rate_limit, group-based).
|
||||||
|
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs).
|
||||||
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
|
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
|
||||||
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
|
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
|
||||||
|
limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected limit.
|
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if name in _JOB_NAMES:
|
if name in _JOB_NAMES:
|
||||||
@@ -67,7 +72,6 @@ class Job(CoreSysAttributes):
|
|||||||
self.conditions = conditions
|
self.conditions = conditions
|
||||||
self.cleanup = cleanup
|
self.cleanup = cleanup
|
||||||
self.on_condition = on_condition
|
self.on_condition = on_condition
|
||||||
self.limit = limit
|
|
||||||
self._throttle_period = throttle_period
|
self._throttle_period = throttle_period
|
||||||
self._throttle_max_calls = throttle_max_calls
|
self._throttle_max_calls = throttle_max_calls
|
||||||
self._lock: asyncio.Semaphore | None = None
|
self._lock: asyncio.Semaphore | None = None
|
||||||
@@ -75,34 +79,85 @@ class Job(CoreSysAttributes):
|
|||||||
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
|
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
|
||||||
self._internal = internal
|
self._internal = internal
|
||||||
|
|
||||||
|
# Handle backward compatibility with limit parameter
|
||||||
|
if limit is not None:
|
||||||
|
if concurrency is not None or throttle is not None:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!"
|
||||||
|
)
|
||||||
|
# Map old limit values to new parameters
|
||||||
|
concurrency, throttle = self._map_limit_to_new_params(limit)
|
||||||
|
|
||||||
|
self.concurrency = concurrency
|
||||||
|
self.throttle = throttle
|
||||||
|
|
||||||
# Validate Options
|
# Validate Options
|
||||||
|
self._validate_parameters()
|
||||||
|
|
||||||
|
def _map_limit_to_new_params(
|
||||||
|
self, limit: JobExecutionLimit
|
||||||
|
) -> tuple[JobConcurrency | None, JobThrottle | None]:
|
||||||
|
"""Map old limit parameter to new concurrency and throttle parameters."""
|
||||||
|
mapping = {
|
||||||
|
JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None),
|
||||||
|
JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None),
|
||||||
|
JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE),
|
||||||
|
JobExecutionLimit.THROTTLE_WAIT: (
|
||||||
|
JobConcurrency.QUEUE,
|
||||||
|
JobThrottle.THROTTLE,
|
||||||
|
),
|
||||||
|
JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT),
|
||||||
|
JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None),
|
||||||
|
JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None),
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE),
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE_WAIT: (
|
||||||
|
JobConcurrency.QUEUE, # Seems a bit counter intuitive, but GROUP_QUEUE deadlocks tests/jobs/test_job_decorator.py::test_execution_limit_group_throttle_wait
|
||||||
|
JobThrottle.GROUP_THROTTLE,
|
||||||
|
),
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: (
|
||||||
|
None,
|
||||||
|
JobThrottle.GROUP_RATE_LIMIT,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
return mapping.get(limit, (None, None))
|
||||||
|
|
||||||
|
def _validate_parameters(self) -> None:
|
||||||
|
"""Validate job parameters."""
|
||||||
|
# Validate throttle parameters
|
||||||
if (
|
if (
|
||||||
self.limit
|
self.throttle
|
||||||
in (
|
in (
|
||||||
JobExecutionLimit.THROTTLE,
|
JobThrottle.THROTTLE,
|
||||||
JobExecutionLimit.THROTTLE_WAIT,
|
JobThrottle.GROUP_THROTTLE,
|
||||||
JobExecutionLimit.THROTTLE_RATE_LIMIT,
|
JobThrottle.RATE_LIMIT,
|
||||||
JobExecutionLimit.GROUP_THROTTLE,
|
JobThrottle.GROUP_RATE_LIMIT,
|
||||||
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
|
||||||
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
|
||||||
)
|
)
|
||||||
and self._throttle_period is None
|
and self._throttle_period is None
|
||||||
):
|
):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Job {name} is using execution limit {limit} without a throttle period!"
|
f"Job {self.name} is using throttle {self.throttle} without a throttle period!"
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.limit in (
|
if self.throttle in (
|
||||||
JobExecutionLimit.THROTTLE_RATE_LIMIT,
|
JobThrottle.RATE_LIMIT,
|
||||||
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
JobThrottle.GROUP_RATE_LIMIT,
|
||||||
):
|
):
|
||||||
if self._throttle_max_calls is None:
|
if self._throttle_max_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Job {name} is using execution limit {limit} without throttle max calls!"
|
f"Job {self.name} is using throttle {self.throttle} without throttle max calls!"
|
||||||
)
|
)
|
||||||
|
|
||||||
self._rate_limited_calls = {}
|
self._rate_limited_calls = {}
|
||||||
|
|
||||||
|
if self.throttle is not None and self.concurrency in (
|
||||||
|
JobConcurrency.GROUP_REJECT,
|
||||||
|
JobConcurrency.GROUP_QUEUE,
|
||||||
|
):
|
||||||
|
# We cannot release group locks when Job is not running (e.g. throttled)
|
||||||
|
# which makes these combinations impossible to use currently.
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Job {self.name} is using throttling ({self.throttle}) with group concurrency ({self.concurrency}), which is not allowed!"
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def throttle_max_calls(self) -> int:
|
def throttle_max_calls(self) -> int:
|
||||||
"""Return max calls for throttle."""
|
"""Return max calls for throttle."""
|
||||||
@@ -131,7 +186,7 @@ class Job(CoreSysAttributes):
|
|||||||
"""Return rate limited calls if used."""
|
"""Return rate limited calls if used."""
|
||||||
if self._rate_limited_calls is None:
|
if self._rate_limited_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Rate limited calls not available for limit type {self.limit}"
|
"Rate limited calls not available for this throttle type"
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._rate_limited_calls.get(group_name, [])
|
return self._rate_limited_calls.get(group_name, [])
|
||||||
@@ -142,7 +197,7 @@ class Job(CoreSysAttributes):
|
|||||||
"""Add a rate limited call to list if used."""
|
"""Add a rate limited call to list if used."""
|
||||||
if self._rate_limited_calls is None:
|
if self._rate_limited_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Rate limited calls not available for limit type {self.limit}"
|
"Rate limited calls not available for this throttle type"
|
||||||
)
|
)
|
||||||
|
|
||||||
if group_name in self._rate_limited_calls:
|
if group_name in self._rate_limited_calls:
|
||||||
@@ -156,7 +211,7 @@ class Job(CoreSysAttributes):
|
|||||||
"""Set rate limited calls if used."""
|
"""Set rate limited calls if used."""
|
||||||
if self._rate_limited_calls is None:
|
if self._rate_limited_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Rate limited calls not available for limit type {self.limit}"
|
"Rate limited calls not available for this throttle type"
|
||||||
)
|
)
|
||||||
|
|
||||||
self._rate_limited_calls[group_name] = value
|
self._rate_limited_calls[group_name] = value
|
||||||
@@ -193,16 +248,24 @@ class Job(CoreSysAttributes):
|
|||||||
if obj.acquire and obj.release: # type: ignore
|
if obj.acquire and obj.release: # type: ignore
|
||||||
job_group = cast(JobGroup, obj)
|
job_group = cast(JobGroup, obj)
|
||||||
|
|
||||||
if not job_group and self.limit in (
|
# Check for group-based parameters
|
||||||
JobExecutionLimit.GROUP_ONCE,
|
if not job_group:
|
||||||
JobExecutionLimit.GROUP_WAIT,
|
if self.concurrency in (
|
||||||
JobExecutionLimit.GROUP_THROTTLE,
|
JobConcurrency.GROUP_REJECT,
|
||||||
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
JobConcurrency.GROUP_QUEUE,
|
||||||
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
):
|
||||||
):
|
raise RuntimeError(
|
||||||
raise RuntimeError(
|
f"Job {self.name} uses group concurrency ({self.concurrency}) but is not on a JobGroup! "
|
||||||
f"Job on {self.name} need to be a JobGroup to use group based limits!"
|
f"The class must inherit from JobGroup to use GROUP_REJECT or GROUP_QUEUE."
|
||||||
) from None
|
) from None
|
||||||
|
if self.throttle in (
|
||||||
|
JobThrottle.GROUP_THROTTLE,
|
||||||
|
JobThrottle.GROUP_RATE_LIMIT,
|
||||||
|
):
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Job {self.name} uses group throttling ({self.throttle}) but is not on a JobGroup! "
|
||||||
|
f"The class must inherit from JobGroup to use GROUP_THROTTLE or GROUP_RATE_LIMIT."
|
||||||
|
) from None
|
||||||
|
|
||||||
return job_group
|
return job_group
|
||||||
|
|
||||||
@@ -255,102 +318,34 @@ class Job(CoreSysAttributes):
|
|||||||
except JobConditionException as err:
|
except JobConditionException as err:
|
||||||
return self._handle_job_condition_exception(err)
|
return self._handle_job_condition_exception(err)
|
||||||
|
|
||||||
# Handle exection limits
|
# Handle execution limits using context manager
|
||||||
if self.limit in (
|
async with self._concurrency_control(job_group, job):
|
||||||
JobExecutionLimit.SINGLE_WAIT,
|
if not await self._handle_throttling(group_name):
|
||||||
JobExecutionLimit.ONCE,
|
return # Job was throttled, exit early
|
||||||
):
|
|
||||||
await self._acquire_exection_limit()
|
|
||||||
elif self.limit in (
|
|
||||||
JobExecutionLimit.GROUP_ONCE,
|
|
||||||
JobExecutionLimit.GROUP_WAIT,
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
await cast(JobGroup, job_group).acquire(
|
|
||||||
job, self.limit == JobExecutionLimit.GROUP_WAIT
|
|
||||||
)
|
|
||||||
except JobGroupExecutionLimitExceeded as err:
|
|
||||||
if self.on_condition:
|
|
||||||
raise self.on_condition(str(err)) from err
|
|
||||||
raise err
|
|
||||||
elif self.limit in (
|
|
||||||
JobExecutionLimit.THROTTLE,
|
|
||||||
JobExecutionLimit.GROUP_THROTTLE,
|
|
||||||
):
|
|
||||||
time_since_last_call = datetime.now() - self.last_call(group_name)
|
|
||||||
if time_since_last_call < self.throttle_period(group_name):
|
|
||||||
return
|
|
||||||
elif self.limit in (
|
|
||||||
JobExecutionLimit.THROTTLE_WAIT,
|
|
||||||
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
|
||||||
):
|
|
||||||
await self._acquire_exection_limit()
|
|
||||||
time_since_last_call = datetime.now() - self.last_call(group_name)
|
|
||||||
if time_since_last_call < self.throttle_period(group_name):
|
|
||||||
self._release_exception_limits()
|
|
||||||
return
|
|
||||||
elif self.limit in (
|
|
||||||
JobExecutionLimit.THROTTLE_RATE_LIMIT,
|
|
||||||
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
|
||||||
):
|
|
||||||
# Only reprocess array when necessary (at limit)
|
|
||||||
if (
|
|
||||||
len(self.rate_limited_calls(group_name))
|
|
||||||
>= self.throttle_max_calls
|
|
||||||
):
|
|
||||||
self.set_rate_limited_calls(
|
|
||||||
[
|
|
||||||
call
|
|
||||||
for call in self.rate_limited_calls(group_name)
|
|
||||||
if call
|
|
||||||
> datetime.now() - self.throttle_period(group_name)
|
|
||||||
],
|
|
||||||
group_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
if (
|
# Execute Job
|
||||||
len(self.rate_limited_calls(group_name))
|
with job.start():
|
||||||
>= self.throttle_max_calls
|
try:
|
||||||
):
|
self.set_last_call(datetime.now(), group_name)
|
||||||
on_condition = (
|
if self._rate_limited_calls is not None:
|
||||||
JobException
|
self.add_rate_limited_call(
|
||||||
if self.on_condition is None
|
self.last_call(group_name), group_name
|
||||||
else self.on_condition
|
)
|
||||||
)
|
|
||||||
raise on_condition(
|
|
||||||
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Execute Job
|
return await method(obj, *args, **kwargs)
|
||||||
with job.start():
|
|
||||||
try:
|
|
||||||
self.set_last_call(datetime.now(), group_name)
|
|
||||||
if self._rate_limited_calls is not None:
|
|
||||||
self.add_rate_limited_call(
|
|
||||||
self.last_call(group_name), group_name
|
|
||||||
)
|
|
||||||
|
|
||||||
return await method(obj, *args, **kwargs)
|
# If a method has a conditional JobCondition, they must check it in the method
|
||||||
|
# These should be handled like normal JobConditions as much as possible
|
||||||
# If a method has a conditional JobCondition, they must check it in the method
|
except JobConditionException as err:
|
||||||
# These should be handled like normal JobConditions as much as possible
|
return self._handle_job_condition_exception(err)
|
||||||
except JobConditionException as err:
|
except HassioError as err:
|
||||||
return self._handle_job_condition_exception(err)
|
job.capture_error(err)
|
||||||
except HassioError as err:
|
raise err
|
||||||
job.capture_error(err)
|
except Exception as err:
|
||||||
raise err
|
_LOGGER.exception("Unhandled exception: %s", err)
|
||||||
except Exception as err:
|
job.capture_error()
|
||||||
_LOGGER.exception("Unhandled exception: %s", err)
|
await async_capture_exception(err)
|
||||||
job.capture_error()
|
raise JobException() from err
|
||||||
await async_capture_exception(err)
|
|
||||||
raise JobException() from err
|
|
||||||
finally:
|
|
||||||
self._release_exception_limits()
|
|
||||||
if job_group and self.limit in (
|
|
||||||
JobExecutionLimit.GROUP_ONCE,
|
|
||||||
JobExecutionLimit.GROUP_WAIT,
|
|
||||||
):
|
|
||||||
job_group.release()
|
|
||||||
|
|
||||||
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
|
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
|
||||||
finally:
|
finally:
|
||||||
@@ -492,31 +487,86 @@ class Job(CoreSysAttributes):
|
|||||||
f"'{method_name}' blocked from execution, mounting not supported on system"
|
f"'{method_name}' blocked from execution, mounting not supported on system"
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _acquire_exection_limit(self) -> None:
|
def _release_concurrency_control(self, job_group: JobGroup | None) -> None:
|
||||||
"""Process exection limits."""
|
"""Release concurrency control locks."""
|
||||||
if self.limit not in (
|
if self.concurrency == JobConcurrency.REJECT:
|
||||||
JobExecutionLimit.SINGLE_WAIT,
|
if self.lock.locked():
|
||||||
JobExecutionLimit.ONCE,
|
self.lock.release()
|
||||||
JobExecutionLimit.THROTTLE_WAIT,
|
elif self.concurrency == JobConcurrency.QUEUE:
|
||||||
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
if self.lock.locked():
|
||||||
|
self.lock.release()
|
||||||
|
elif self.concurrency in (
|
||||||
|
JobConcurrency.GROUP_REJECT,
|
||||||
|
JobConcurrency.GROUP_QUEUE,
|
||||||
):
|
):
|
||||||
return
|
if job_group and job_group.has_lock:
|
||||||
|
job_group.release()
|
||||||
|
|
||||||
if self.limit == JobExecutionLimit.ONCE and self.lock.locked():
|
async def _handle_concurrency_control(
|
||||||
on_condition = (
|
self, job_group: JobGroup | None, job: SupervisorJob
|
||||||
JobException if self.on_condition is None else self.on_condition
|
) -> None:
|
||||||
)
|
"""Handle concurrency control limits."""
|
||||||
raise on_condition("Another job is running")
|
if self.concurrency == JobConcurrency.REJECT:
|
||||||
|
if self.lock.locked():
|
||||||
|
on_condition = (
|
||||||
|
JobException if self.on_condition is None else self.on_condition
|
||||||
|
)
|
||||||
|
raise on_condition("Another job is running")
|
||||||
|
await self.lock.acquire()
|
||||||
|
elif self.concurrency == JobConcurrency.QUEUE:
|
||||||
|
await self.lock.acquire()
|
||||||
|
elif self.concurrency == JobConcurrency.GROUP_REJECT:
|
||||||
|
try:
|
||||||
|
await cast(JobGroup, job_group).acquire(job, wait=False)
|
||||||
|
except JobGroupExecutionLimitExceeded as err:
|
||||||
|
if self.on_condition:
|
||||||
|
raise self.on_condition(str(err)) from err
|
||||||
|
raise err
|
||||||
|
elif self.concurrency == JobConcurrency.GROUP_QUEUE:
|
||||||
|
try:
|
||||||
|
await cast(JobGroup, job_group).acquire(job, wait=True)
|
||||||
|
except JobGroupExecutionLimitExceeded as err:
|
||||||
|
if self.on_condition:
|
||||||
|
raise self.on_condition(str(err)) from err
|
||||||
|
raise err
|
||||||
|
|
||||||
await self.lock.acquire()
|
@asynccontextmanager
|
||||||
|
async def _concurrency_control(
|
||||||
|
self, job_group: JobGroup | None, job: SupervisorJob
|
||||||
|
) -> AsyncIterator[None]:
|
||||||
|
"""Context manager for concurrency control that ensures locks are always released."""
|
||||||
|
await self._handle_concurrency_control(job_group, job)
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
self._release_concurrency_control(job_group)
|
||||||
|
|
||||||
def _release_exception_limits(self) -> None:
|
async def _handle_throttling(self, group_name: str | None) -> bool:
|
||||||
"""Release possible exception limits."""
|
"""Handle throttling limits. Returns True if job should continue, False if throttled."""
|
||||||
if self.limit not in (
|
if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE):
|
||||||
JobExecutionLimit.SINGLE_WAIT,
|
time_since_last_call = datetime.now() - self.last_call(group_name)
|
||||||
JobExecutionLimit.ONCE,
|
throttle_period = self.throttle_period(group_name)
|
||||||
JobExecutionLimit.THROTTLE_WAIT,
|
if time_since_last_call < throttle_period:
|
||||||
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
# Always return False when throttled (skip execution)
|
||||||
):
|
return False
|
||||||
return
|
elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT):
|
||||||
self.lock.release()
|
# Only reprocess array when necessary (at limit)
|
||||||
|
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
|
||||||
|
self.set_rate_limited_calls(
|
||||||
|
[
|
||||||
|
call
|
||||||
|
for call in self.rate_limited_calls(group_name)
|
||||||
|
if call > datetime.now() - self.throttle_period(group_name)
|
||||||
|
],
|
||||||
|
group_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
|
||||||
|
on_condition = (
|
||||||
|
JobException if self.on_condition is None else self.on_condition
|
||||||
|
)
|
||||||
|
raise on_condition(
|
||||||
|
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
|
||||||
|
)
|
||||||
|
|
||||||
|
return True
|
||||||
|
Reference in New Issue
Block a user