Unstarted jobs should always be cleaned up (#4604)

This commit is contained in:
Mike Degatano 2023-10-09 05:57:04 -04:00 committed by GitHub
parent f9840306a0
commit ace58ba735
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 164 additions and 88 deletions

View File

@ -86,7 +86,7 @@ class SupervisorJob:
}
@contextmanager
def start(self, *, on_done: Callable[["SupervisorJob"], None] | None = None):
def start(self):
"""Start the job in the current task.
This can only be called if the parent ID matches the job running in the current task.
@ -107,8 +107,6 @@ class SupervisorJob:
self.done = True
if token:
_CURRENT_JOB.reset(token)
if on_done:
on_done(self)
class JobManager(FileConfiguration, CoreSysAttributes):

View File

@ -201,95 +201,115 @@ class Job(CoreSysAttributes):
internal=self._internal,
)
# Handle condition
if self.conditions:
try:
await Job.check_conditions(
self, set(self.conditions), self._method.__qualname__
)
except JobConditionException as err:
return self._handle_job_condition_exception(err)
try:
# Handle condition
if self.conditions:
try:
await Job.check_conditions(
self, set(self.conditions), self._method.__qualname__
)
except JobConditionException as err:
return self._handle_job_condition_exception(err)
# Handle exection limits
if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE):
await self._acquire_exection_limit()
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try:
await obj.acquire(job, self.limit == JobExecutionLimit.GROUP_WAIT)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call > datetime.now() - self.throttle_period(group_name)
],
group_name,
)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
# Execute Job
with job.start(on_done=self.sys_jobs.remove_job if self.cleanup else None):
try:
self.set_last_call(datetime.now(), group_name)
if self.rate_limited_calls(group_name) is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
# Handle exection limits
if self.limit in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
):
await self._acquire_exection_limit()
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try:
await obj.acquire(
job, self.limit == JobExecutionLimit.GROUP_WAIT
)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call
> datetime.now() - self.throttle_period(group_name)
],
group_name,
)
return await self._method(obj, *args, **kwargs)
# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
capture_exception(err)
raise JobException() from err
finally:
self._release_exception_limits()
if self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
obj.release()
on_condition = (
JobException
if self.on_condition is None
else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
# Execute Job
with job.start():
try:
self.set_last_call(datetime.now(), group_name)
if self.rate_limited_calls(group_name) is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
)
return await self._method(obj, *args, **kwargs)
# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
capture_exception(err)
raise JobException() from err
finally:
self._release_exception_limits()
if self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
obj.release()
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
finally:
if job.done is None or self.cleanup:
self.sys_jobs.remove_job(job)
return wrapper

View File

@ -1044,3 +1044,61 @@ async def test_job_starting_separate_task(coresys: CoreSys):
await test.job_await()
await test.job_release()
await task
async def test_job_always_removed_on_check_failure(coresys: CoreSys):
"""Test that the job instance is always removed if the condition or limit check fails."""
class TestClass:
"""Test class."""
event = asyncio.Event()
limit_job: Job | None = None
def __init__(self, coresys: CoreSys) -> None:
"""Initialize object."""
self.coresys = coresys
@Job(
name="test_job_always_removed_on_check_failure_condition",
conditions=[JobCondition.HAOS],
on_condition=JobException,
cleanup=False,
)
async def condition_check(self):
"""Job that will fail a condition check."""
raise AssertionError("should not run")
@Job(
name="test_job_always_removed_on_check_failure_limit",
limit=JobExecutionLimit.ONCE,
cleanup=False,
)
async def limit_check(self):
"""Job that can fail a limit check."""
self.limit_job = self.coresys.jobs.current
await self.event.wait()
def release_limit_check(self):
"""Release the limit check job."""
self.event.set()
test = TestClass(coresys)
with pytest.raises(JobException):
await test.condition_check()
assert coresys.jobs.jobs == []
task = coresys.create_task(test.limit_check())
await asyncio.sleep(0)
assert (job := test.limit_job)
with pytest.raises(JobException):
await test.limit_check()
assert test.limit_job == job
assert coresys.jobs.jobs == [job]
test.release_limit_check()
await task
assert job.done
assert coresys.jobs.jobs == [job]