Use context manager for concurrency control

Instead of manually managing concurrency cleanup use a context manager
to ensure that resources are properly released even if an error occurs.
This commit is contained in:
Stefan Agner 2025-07-14 22:49:04 +02:00
parent 79964fd405
commit 7f584e386d
No known key found for this signature in database
GPG Key ID: AE01353D1E44747D

View File

@ -1,8 +1,8 @@
"""Job decorator."""
import asyncio
from collections.abc import Awaitable, Callable
from contextlib import suppress
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import asynccontextmanager, suppress
from datetime import datetime, timedelta
from functools import wraps
import logging
@ -318,37 +318,34 @@ class Job(CoreSysAttributes):
except JobConditionException as err:
return self._handle_job_condition_exception(err)
# Handle execution limits
await self._handle_concurrency_control(job_group, job)
if not await self._handle_throttling(group_name):
self._release_concurrency_control(job_group)
return # Job was throttled, exit early
# Handle execution limits using context manager
async with self._concurrency_control(job_group, job):
if not await self._handle_throttling(group_name):
return # Job was throttled, exit early
# Execute Job
with job.start():
try:
self.set_last_call(datetime.now(), group_name)
if self._rate_limited_calls is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
)
# Execute Job
with job.start():
try:
self.set_last_call(datetime.now(), group_name)
if self._rate_limited_calls is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
)
return await method(obj, *args, **kwargs)
return await method(obj, *args, **kwargs)
# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
job.capture_error(err)
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
job.capture_error()
await async_capture_exception(err)
raise JobException() from err
finally:
self._release_concurrency_control(job_group)
# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
job.capture_error(err)
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
job.capture_error()
await async_capture_exception(err)
raise JobException() from err
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
finally:
@ -533,6 +530,17 @@ class Job(CoreSysAttributes):
raise self.on_condition(str(err)) from err
raise err
@asynccontextmanager
async def _concurrency_control(
self, job_group: JobGroup | None, job: SupervisorJob
) -> AsyncIterator[None]:
"""Context manager for concurrency control that ensures locks are always released."""
await self._handle_concurrency_control(job_group, job)
try:
yield
finally:
self._release_concurrency_control(job_group)
async def _handle_throttling(self, group_name: str | None) -> bool:
"""Handle throttling limits. Returns True if job should continue, False if throttled."""
if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE):