From 8630adc54a821f1993b88bded5b6414bb78873e5 Mon Sep 17 00:00:00 2001 From: Pascal Vizeli Date: Wed, 24 Feb 2021 17:15:13 +0100 Subject: [PATCH] Add execution limit for jobs (#2612) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add execution limit for jobs * Add test for execution police * Use better test * Apply suggestions from code review Co-authored-by: Joakim Sørensen * Rename JobExecutionLimit * fix typing Co-authored-by: Joakim Sørensen --- supervisor/jobs/const.py | 6 +++ supervisor/jobs/decorator.py | 93 +++++++++++++++++++------------- tests/jobs/test_job_decorator.py | 27 ++++++++++ 3 files changed, 90 insertions(+), 36 deletions(-) diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index 6571e9d3c..946d9d538 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -17,3 +17,9 @@ class JobCondition(str, Enum): INTERNET_SYSTEM = "internet_system" INTERNET_HOST = "internet_host" RUNNING = "running" + + +class JobExecutionLimit(str, Enum): + """Job Execution limits.""" + + SINGLE_WAIT = "single_wait" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index ec0ebb023..d697c927a 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -1,19 +1,20 @@ """Job decorator.""" +import asyncio import logging -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple import sentry_sdk from ..const import CoreState -from ..coresys import CoreSys +from ..coresys import CoreSysAttributes from ..exceptions import HassioError, JobException from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType -from .const import JobCondition +from .const import JobCondition, JobExecutionLimit _LOGGER: logging.Logger = logging.getLogger(__package__) -class Job: +class Job(CoreSysAttributes): """Supervisor job decorator.""" def __init__( @@ -22,33 +23,42 @@ class Job: conditions: Optional[List[JobCondition]] = None, cleanup: bool = True, on_condition: Optional[JobException] = None, + limit: Optional[JobExecutionLimit] = None, ): """Initialize the Job class.""" self.name = name self.conditions = conditions self.cleanup = cleanup self.on_condition = on_condition - self._coresys: Optional[CoreSys] = None + self.limit = limit + self._lock: Optional[asyncio.Semaphore] = None self._method = None + def _post_init(self, args: Tuple[Any]) -> None: + """Runtime init.""" + if self.name is None: + self.name = str(self._method.__qualname__).lower().replace(".", "_") + + # Coresys + try: + self.coresys = args[0].coresys + except AttributeError: + pass + if not self.coresys: + raise JobException(f"coresys is missing on {self.name}") + + if self._lock is None: + self._lock = asyncio.Semaphore() + def __call__(self, method): """Call the wrapper logic.""" self._method = method async def wrapper(*args, **kwargs) -> Any: """Wrap the method.""" - if self.name is None: - self.name = str(self._method.__qualname__).lower().replace(".", "_") + self._post_init(args) - # Evaluate coresys - try: - self._coresys = args[0].coresys - except AttributeError: - pass - if not self._coresys: - raise JobException(f"coresys is missing on {self.name}") - - job = self._coresys.jobs.get_job(self.name) + job = self.sys_jobs.get_job(self.name) # Handle condition if self.conditions and not self._check_conditions(): @@ -56,6 +66,10 @@ class Job: return raise self.on_condition() + # Handle exection limits + if self.limit: + await self._acquire_exection_limit() + # Execute Job try: return await self._method(*args, **kwargs) @@ -67,18 +81,15 @@ class Job: raise JobException() from err finally: if self.cleanup: - self._coresys.jobs.remove_job(job) + self.sys_jobs.remove_job(job) + self._release_exception_limits() return wrapper def _check_conditions(self): """Check conditions.""" - used_conditions = set(self.conditions) - set( - self._coresys.jobs.ignore_conditions - ) - ignored_conditions = set(self.conditions) & set( - self._coresys.jobs.ignore_conditions - ) + used_conditions = set(self.conditions) - set(self.sys_jobs.ignore_conditions) + ignored_conditions = set(self.conditions) & set(self.sys_jobs.ignore_conditions) # Check if somethings is ignored if ignored_conditions: @@ -87,7 +98,7 @@ class Job: ignored_conditions, ) - if JobCondition.HEALTHY in used_conditions and not self._coresys.core.healthy: + if JobCondition.HEALTHY in used_conditions and not self.sys_core.healthy: _LOGGER.warning( "'%s' blocked from execution, system is not healthy", self._method.__qualname__, @@ -96,33 +107,31 @@ class Job: if ( JobCondition.RUNNING in used_conditions - and self._coresys.core.state != CoreState.RUNNING + and self.sys_core.state != CoreState.RUNNING ): _LOGGER.warning( "'%s' blocked from execution, system is not running - %s", self._method.__qualname__, - self._coresys.core.state, + self.sys_core.state, ) return False if ( JobCondition.FREE_SPACE in used_conditions - and self._coresys.host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD + and self.sys_host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD ): _LOGGER.warning( "'%s' blocked from execution, not enough free space (%sGB) left on the device", self._method.__qualname__, - self._coresys.host.info.free_space, - ) - self._coresys.resolution.create_issue( - IssueType.FREE_SPACE, ContextType.SYSTEM + self.sys_host.info.free_space, ) + self.sys_resolution.create_issue(IssueType.FREE_SPACE, ContextType.SYSTEM) return False if ( JobCondition.INTERNET_SYSTEM in self.conditions - and not self._coresys.supervisor.connectivity - and self._coresys.core.state in (CoreState.SETUP, CoreState.RUNNING) + and not self.sys_supervisor.connectivity + and self.sys_core.state in (CoreState.SETUP, CoreState.RUNNING) ): _LOGGER.warning( "'%s' blocked from execution, no supervisor internet connection", @@ -132,9 +141,9 @@ class Job: if ( JobCondition.INTERNET_HOST in self.conditions - and self._coresys.host.network.connectivity is not None - and not self._coresys.host.network.connectivity - and self._coresys.core.state in (CoreState.SETUP, CoreState.RUNNING) + and self.sys_host.network.connectivity is not None + and not self.sys_host.network.connectivity + and self.sys_core.state in (CoreState.SETUP, CoreState.RUNNING) ): _LOGGER.warning( "'%s' blocked from execution, no host internet connection", @@ -143,3 +152,15 @@ class Job: return False return True + + async def _acquire_exection_limit(self) -> None: + """Process exection limits.""" + + if self.limit == JobExecutionLimit.SINGLE_WAIT: + await self._lock.acquire() + + def _release_exception_limits(self) -> None: + """Release possible exception limits.""" + + if self.limit == JobExecutionLimit.SINGLE_WAIT: + self._lock.release() diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index d13a6f7a0..597c25377 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -1,5 +1,6 @@ """Test the condition decorators.""" # pylint: disable=protected-access,import-error +import asyncio from unittest.mock import patch import pytest @@ -7,6 +8,7 @@ import pytest from supervisor.const import CoreState from supervisor.coresys import CoreSys from supervisor.exceptions import HassioError, JobException +from supervisor.jobs.const import JobExecutionLimit from supervisor.jobs.decorator import Job, JobCondition from supervisor.resolution.const import UnhealthyReason @@ -257,3 +259,28 @@ async def test_exception_conditions(coresys: CoreSys): coresys.core.state = CoreState.FREEZE with pytest.raises(HassioError): await test.execute() + + +async def test_exectution_limit_single_wait( + coresys: CoreSys, loop: asyncio.BaseEventLoop +): + """Test the ignore conditions decorator.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + self.coresys = coresys + self.run = asyncio.Lock() + + @Job(limit=JobExecutionLimit.SINGLE_WAIT) + async def execute(self, sleep: float): + """Execute the class method.""" + assert not self.run.locked() + async with self.run: + await asyncio.sleep(sleep) + + test = TestClass(coresys) + + await asyncio.gather(*[test.execute(0.1), test.execute(0.1), test.execute(0.1)])