Add execution limit for jobs (#2612)

* Add execution limit for jobs

* Add test for execution police

* Use better test

* Apply suggestions from code review

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

* Rename JobExecutionLimit

* fix typing

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Pascal Vizeli 2021-02-24 17:15:13 +01:00 committed by GitHub
parent 90d8832cd2
commit 8630adc54a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 36 deletions

View File

@ -17,3 +17,9 @@ class JobCondition(str, Enum):
INTERNET_SYSTEM = "internet_system"
INTERNET_HOST = "internet_host"
RUNNING = "running"
class JobExecutionLimit(str, Enum):
"""Job Execution limits."""
SINGLE_WAIT = "single_wait"

View File

@ -1,19 +1,20 @@
"""Job decorator."""
import asyncio
import logging
from typing import Any, List, Optional
from typing import Any, List, Optional, Tuple
import sentry_sdk
from ..const import CoreState
from ..coresys import CoreSys
from ..coresys import CoreSysAttributes
from ..exceptions import HassioError, JobException
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
from .const import JobCondition
from .const import JobCondition, JobExecutionLimit
_LOGGER: logging.Logger = logging.getLogger(__package__)
class Job:
class Job(CoreSysAttributes):
"""Supervisor job decorator."""
def __init__(
@ -22,33 +23,42 @@ class Job:
conditions: Optional[List[JobCondition]] = None,
cleanup: bool = True,
on_condition: Optional[JobException] = None,
limit: Optional[JobExecutionLimit] = None,
):
"""Initialize the Job class."""
self.name = name
self.conditions = conditions
self.cleanup = cleanup
self.on_condition = on_condition
self._coresys: Optional[CoreSys] = None
self.limit = limit
self._lock: Optional[asyncio.Semaphore] = None
self._method = None
def _post_init(self, args: Tuple[Any]) -> None:
"""Runtime init."""
if self.name is None:
self.name = str(self._method.__qualname__).lower().replace(".", "_")
# Coresys
try:
self.coresys = args[0].coresys
except AttributeError:
pass
if not self.coresys:
raise JobException(f"coresys is missing on {self.name}")
if self._lock is None:
self._lock = asyncio.Semaphore()
def __call__(self, method):
"""Call the wrapper logic."""
self._method = method
async def wrapper(*args, **kwargs) -> Any:
"""Wrap the method."""
if self.name is None:
self.name = str(self._method.__qualname__).lower().replace(".", "_")
self._post_init(args)
# Evaluate coresys
try:
self._coresys = args[0].coresys
except AttributeError:
pass
if not self._coresys:
raise JobException(f"coresys is missing on {self.name}")
job = self._coresys.jobs.get_job(self.name)
job = self.sys_jobs.get_job(self.name)
# Handle condition
if self.conditions and not self._check_conditions():
@ -56,6 +66,10 @@ class Job:
return
raise self.on_condition()
# Handle exection limits
if self.limit:
await self._acquire_exection_limit()
# Execute Job
try:
return await self._method(*args, **kwargs)
@ -67,18 +81,15 @@ class Job:
raise JobException() from err
finally:
if self.cleanup:
self._coresys.jobs.remove_job(job)
self.sys_jobs.remove_job(job)
self._release_exception_limits()
return wrapper
def _check_conditions(self):
"""Check conditions."""
used_conditions = set(self.conditions) - set(
self._coresys.jobs.ignore_conditions
)
ignored_conditions = set(self.conditions) & set(
self._coresys.jobs.ignore_conditions
)
used_conditions = set(self.conditions) - set(self.sys_jobs.ignore_conditions)
ignored_conditions = set(self.conditions) & set(self.sys_jobs.ignore_conditions)
# Check if somethings is ignored
if ignored_conditions:
@ -87,7 +98,7 @@ class Job:
ignored_conditions,
)
if JobCondition.HEALTHY in used_conditions and not self._coresys.core.healthy:
if JobCondition.HEALTHY in used_conditions and not self.sys_core.healthy:
_LOGGER.warning(
"'%s' blocked from execution, system is not healthy",
self._method.__qualname__,
@ -96,33 +107,31 @@ class Job:
if (
JobCondition.RUNNING in used_conditions
and self._coresys.core.state != CoreState.RUNNING
and self.sys_core.state != CoreState.RUNNING
):
_LOGGER.warning(
"'%s' blocked from execution, system is not running - %s",
self._method.__qualname__,
self._coresys.core.state,
self.sys_core.state,
)
return False
if (
JobCondition.FREE_SPACE in used_conditions
and self._coresys.host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD
and self.sys_host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD
):
_LOGGER.warning(
"'%s' blocked from execution, not enough free space (%sGB) left on the device",
self._method.__qualname__,
self._coresys.host.info.free_space,
)
self._coresys.resolution.create_issue(
IssueType.FREE_SPACE, ContextType.SYSTEM
self.sys_host.info.free_space,
)
self.sys_resolution.create_issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
return False
if (
JobCondition.INTERNET_SYSTEM in self.conditions
and not self._coresys.supervisor.connectivity
and self._coresys.core.state in (CoreState.SETUP, CoreState.RUNNING)
and not self.sys_supervisor.connectivity
and self.sys_core.state in (CoreState.SETUP, CoreState.RUNNING)
):
_LOGGER.warning(
"'%s' blocked from execution, no supervisor internet connection",
@ -132,9 +141,9 @@ class Job:
if (
JobCondition.INTERNET_HOST in self.conditions
and self._coresys.host.network.connectivity is not None
and not self._coresys.host.network.connectivity
and self._coresys.core.state in (CoreState.SETUP, CoreState.RUNNING)
and self.sys_host.network.connectivity is not None
and not self.sys_host.network.connectivity
and self.sys_core.state in (CoreState.SETUP, CoreState.RUNNING)
):
_LOGGER.warning(
"'%s' blocked from execution, no host internet connection",
@ -143,3 +152,15 @@ class Job:
return False
return True
async def _acquire_exection_limit(self) -> None:
"""Process exection limits."""
if self.limit == JobExecutionLimit.SINGLE_WAIT:
await self._lock.acquire()
def _release_exception_limits(self) -> None:
"""Release possible exception limits."""
if self.limit == JobExecutionLimit.SINGLE_WAIT:
self._lock.release()

View File

@ -1,5 +1,6 @@
"""Test the condition decorators."""
# pylint: disable=protected-access,import-error
import asyncio
from unittest.mock import patch
import pytest
@ -7,6 +8,7 @@ import pytest
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import HassioError, JobException
from supervisor.jobs.const import JobExecutionLimit
from supervisor.jobs.decorator import Job, JobCondition
from supervisor.resolution.const import UnhealthyReason
@ -257,3 +259,28 @@ async def test_exception_conditions(coresys: CoreSys):
coresys.core.state = CoreState.FREEZE
with pytest.raises(HassioError):
await test.execute()
async def test_exectution_limit_single_wait(
coresys: CoreSys, loop: asyncio.BaseEventLoop
):
"""Test the ignore conditions decorator."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.run = asyncio.Lock()
@Job(limit=JobExecutionLimit.SINGLE_WAIT)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
async with self.run:
await asyncio.sleep(sleep)
test = TestClass(coresys)
await asyncio.gather(*[test.execute(0.1), test.execute(0.1), test.execute(0.1)])