Introduce common lock release method

This commit is contained in:
Stefan Agner 2025-07-10 14:01:15 +02:00
parent 274415fb87
commit 01b5003849
No known key found for this signature in database
GPG Key ID: AE01353D1E44747D

View File

@ -304,6 +304,7 @@ class Job(CoreSysAttributes):
# Handle execution limits # Handle execution limits
await self._handle_concurrency_control(job_group, job) await self._handle_concurrency_control(job_group, job)
if not await self._handle_throttling(group_name): if not await self._handle_throttling(group_name):
self._release_concurrency_control(job_group)
return # Job was throttled, exit early return # Job was throttled, exit early
# Execute Job # Execute Job
@ -330,13 +331,7 @@ class Job(CoreSysAttributes):
await async_capture_exception(err) await async_capture_exception(err)
raise JobException() from err raise JobException() from err
finally: finally:
self._release_exception_limits() self._release_concurrency_control(job_group)
# Handle concurrency parameters
if job_group and self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
job_group.release()
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required # Jobs that weren't started are always cleaned up. Also clean up done jobs if required
finally: finally:
@ -478,10 +473,20 @@ class Job(CoreSysAttributes):
f"'{method_name}' blocked from execution, mounting not supported on system" f"'{method_name}' blocked from execution, mounting not supported on system"
) )
def _release_exception_limits(self) -> None: def _release_concurrency_control(self, job_group: JobGroup | None) -> None:
"""Release possible exception limits.""" """Release concurrency control locks."""
if self.concurrency in (JobConcurrency.REJECT, JobConcurrency.QUEUE): if self.concurrency == JobConcurrency.REJECT:
if self.lock.locked():
self.lock.release() self.lock.release()
elif self.concurrency == JobConcurrency.QUEUE:
if self.lock.locked():
self.lock.release()
elif self.concurrency in (
JobConcurrency.GROUP_REJECT,
JobConcurrency.GROUP_QUEUE,
):
if job_group and job_group.has_lock:
job_group.release()
async def _handle_concurrency_control( async def _handle_concurrency_control(
self, job_group: JobGroup | None, job: SupervisorJob self, job_group: JobGroup | None, job: SupervisorJob
@ -528,11 +533,6 @@ class Job(CoreSysAttributes):
await asyncio.sleep(sleep_time) await asyncio.sleep(sleep_time)
else: else:
# For non-queue concurrency (REJECT behavior), just return False # For non-queue concurrency (REJECT behavior), just return False
if self.concurrency in (
JobConcurrency.REJECT,
JobConcurrency.QUEUE,
):
self.lock.release()
return False return False
elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT): elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT):
# Only reprocess array when necessary (at limit) # Only reprocess array when necessary (at limit)