Introduce common lock release method

This commit is contained in:
Stefan Agner 2025-07-10 14:01:15 +02:00
parent 546f2d9e6c
commit 7fcbc87db9
No known key found for this signature in database
GPG Key ID: AE01353D1E44747D

View File

@ -304,6 +304,7 @@ class Job(CoreSysAttributes):
# Handle execution limits
await self._handle_concurrency_control(job_group, job)
if not await self._handle_throttling(group_name):
self._release_concurrency_control(job_group)
return # Job was throttled, exit early
# Execute Job
@ -330,13 +331,7 @@ class Job(CoreSysAttributes):
await async_capture_exception(err)
raise JobException() from err
finally:
self._release_exception_limits()
# Handle concurrency parameters
if job_group and self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
job_group.release()
self._release_concurrency_control(job_group)
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
finally:
@ -478,10 +473,20 @@ class Job(CoreSysAttributes):
f"'{method_name}' blocked from execution, mounting not supported on system"
)
def _release_exception_limits(self) -> None:
"""Release possible exception limits."""
if self.concurrency in (JobConcurrency.REJECT, JobConcurrency.QUEUE):
self.lock.release()
def _release_concurrency_control(self, job_group: JobGroup | None) -> None:
"""Release concurrency control locks."""
if self.concurrency == JobConcurrency.REJECT:
if self.lock.locked():
self.lock.release()
elif self.concurrency == JobConcurrency.QUEUE:
if self.lock.locked():
self.lock.release()
elif self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
if job_group and job_group.has_lock:
job_group.release()
async def _handle_concurrency_control(
self, job_group: JobGroup | None, job: SupervisorJob
@ -528,11 +533,6 @@ class Job(CoreSysAttributes):
await asyncio.sleep(sleep_time)
else:
# For non-queue concurrency (REJECT behavior), just return False
if self.concurrency in (
JobConcurrency.REJECT,
JobConcurrency.QUEUE,
):
self.lock.release()
return False
elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT):
# Only reprocess array when necessary (at limit)