diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index d18b2cf51..9e9911a91 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -304,6 +304,7 @@ class Job(CoreSysAttributes): # Handle execution limits await self._handle_concurrency_control(job_group, job) if not await self._handle_throttling(group_name): + self._release_concurrency_control(job_group) return # Job was throttled, exit early # Execute Job @@ -330,13 +331,7 @@ class Job(CoreSysAttributes): await async_capture_exception(err) raise JobException() from err finally: - self._release_exception_limits() - # Handle concurrency parameters - if job_group and self.concurrency in ( - JobConcurrency.GROUP_REJECT, - JobConcurrency.GROUP_QUEUE, - ): - job_group.release() + self._release_concurrency_control(job_group) # Jobs that weren't started are always cleaned up. Also clean up done jobs if required finally: @@ -478,10 +473,20 @@ class Job(CoreSysAttributes): f"'{method_name}' blocked from execution, mounting not supported on system" ) - def _release_exception_limits(self) -> None: - """Release possible exception limits.""" - if self.concurrency in (JobConcurrency.REJECT, JobConcurrency.QUEUE): - self.lock.release() + def _release_concurrency_control(self, job_group: JobGroup | None) -> None: + """Release concurrency control locks.""" + if self.concurrency == JobConcurrency.REJECT: + if self.lock.locked(): + self.lock.release() + elif self.concurrency == JobConcurrency.QUEUE: + if self.lock.locked(): + self.lock.release() + elif self.concurrency in ( + JobConcurrency.GROUP_REJECT, + JobConcurrency.GROUP_QUEUE, + ): + if job_group and job_group.has_lock: + job_group.release() async def _handle_concurrency_control( self, job_group: JobGroup | None, job: SupervisorJob @@ -528,11 +533,6 @@ class Job(CoreSysAttributes): await asyncio.sleep(sleep_time) else: # For non-queue concurrency (REJECT behavior), just return False - if self.concurrency in ( - JobConcurrency.REJECT, - JobConcurrency.QUEUE, - ): - self.lock.release() return False elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT): # Only reprocess array when necessary (at limit)