From 7f584e386d04e6178ece50b6de3e0f14bc3c88ea Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 14 Jul 2025 22:49:04 +0200 Subject: [PATCH] Use context manager for concurrency control Instead of manually managing concurrency cleanup use a context manager to ensure that resources are properly released even if an error occurs. --- supervisor/jobs/decorator.py | 68 ++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index 44a97a5c6..d04e42eac 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -1,8 +1,8 @@ """Job decorator.""" import asyncio -from collections.abc import Awaitable, Callable -from contextlib import suppress +from collections.abc import AsyncIterator, Awaitable, Callable +from contextlib import asynccontextmanager, suppress from datetime import datetime, timedelta from functools import wraps import logging @@ -318,37 +318,34 @@ class Job(CoreSysAttributes): except JobConditionException as err: return self._handle_job_condition_exception(err) - # Handle execution limits - await self._handle_concurrency_control(job_group, job) - if not await self._handle_throttling(group_name): - self._release_concurrency_control(job_group) - return # Job was throttled, exit early + # Handle execution limits using context manager + async with self._concurrency_control(job_group, job): + if not await self._handle_throttling(group_name): + return # Job was throttled, exit early - # Execute Job - with job.start(): - try: - self.set_last_call(datetime.now(), group_name) - if self._rate_limited_calls is not None: - self.add_rate_limited_call( - self.last_call(group_name), group_name - ) + # Execute Job + with job.start(): + try: + self.set_last_call(datetime.now(), group_name) + if self._rate_limited_calls is not None: + self.add_rate_limited_call( + self.last_call(group_name), group_name + ) - return await method(obj, *args, **kwargs) + return await method(obj, *args, **kwargs) - # If a method has a conditional JobCondition, they must check it in the method - # These should be handled like normal JobConditions as much as possible - except JobConditionException as err: - return self._handle_job_condition_exception(err) - except HassioError as err: - job.capture_error(err) - raise err - except Exception as err: - _LOGGER.exception("Unhandled exception: %s", err) - job.capture_error() - await async_capture_exception(err) - raise JobException() from err - finally: - self._release_concurrency_control(job_group) + # If a method has a conditional JobCondition, they must check it in the method + # These should be handled like normal JobConditions as much as possible + except JobConditionException as err: + return self._handle_job_condition_exception(err) + except HassioError as err: + job.capture_error(err) + raise err + except Exception as err: + _LOGGER.exception("Unhandled exception: %s", err) + job.capture_error() + await async_capture_exception(err) + raise JobException() from err # Jobs that weren't started are always cleaned up. Also clean up done jobs if required finally: @@ -533,6 +530,17 @@ class Job(CoreSysAttributes): raise self.on_condition(str(err)) from err raise err + @asynccontextmanager + async def _concurrency_control( + self, job_group: JobGroup | None, job: SupervisorJob + ) -> AsyncIterator[None]: + """Context manager for concurrency control that ensures locks are always released.""" + await self._handle_concurrency_control(job_group, job) + try: + yield + finally: + self._release_concurrency_control(job_group) + async def _handle_throttling(self, group_name: str | None) -> bool: """Handle throttling limits. Returns True if job should continue, False if throttled.""" if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE):