Set limits on watchdog retries (#3779)

* Set limits on watchdog retries

* Use relative import
This commit is contained in:
Mike Degatano 2022-08-09 11:44:35 -04:00 committed by GitHub
parent a92058e6fc
commit e62324e43f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 371 additions and 129 deletions

View File

@ -58,6 +58,7 @@ from ..docker.stats import DockerStats
from ..exceptions import (
AddonConfigurationError,
AddonsError,
AddonsJobError,
AddonsNotSupportedError,
ConfigurationFileError,
DockerError,
@ -65,10 +66,18 @@ from ..exceptions import (
)
from ..hardware.data import Device
from ..homeassistant.const import WSEvent, WSType
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from ..utils import check_port
from ..utils.apparmor import adjust_profile
from ..utils.json import read_json_file, write_json_file
from .const import WATCHDOG_RETRY_SECONDS, AddonBackupMode
from .const import (
WATCHDOG_MAX_ATTEMPTS,
WATCHDOG_RETRY_SECONDS,
WATCHDOG_THROTTLE_MAX_CALLS,
WATCHDOG_THROTTLE_PERIOD,
AddonBackupMode,
)
from .model import AddonModel, Data
from .options import AddonOptions
from .utils import remove_data
@ -104,6 +113,54 @@ class Addon(AddonModel):
self.instance: DockerAddon = DockerAddon(coresys, self)
self._state: AddonState = AddonState.UNKNOWN
@Job(
name=f"addon_{slug}_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=AddonsJobError,
)
async def restart_after_problem(addon: Addon, state: ContainerState):
"""Restart unhealthy or failed addon."""
attempts = 0
while await addon.instance.current_state() == state:
if not addon.in_progress:
_LOGGER.warning(
"Watchdog found addon %s is %s, restarting...",
addon.name,
state.value,
)
try:
if state == ContainerState.FAILED:
# Ensure failed container is removed before attempting reanimation
if attempts == 0:
with suppress(DockerError):
await addon.instance.stop(remove_container=True)
await addon.start()
else:
await addon.restart()
except AddonsError as err:
attempts = attempts + 1
_LOGGER.error(
"Watchdog restart of addon %s failed!", addon.name
)
addon.sys_capture_exception(err)
else:
break
if attempts >= WATCHDOG_MAX_ATTEMPTS:
_LOGGER.critical(
"Watchdog cannot restart addon %s, failed all %s attempts",
addon.name,
attempts,
)
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
self._restart_after_problem = restart_after_problem
def __repr__(self) -> str:
"""Return internal representation."""
return f"<Addon: {self.slug}>"
@ -904,40 +961,5 @@ class Addon(AddonModel):
if not (event.name == self.instance.name and self.watchdog):
return
if event.state == ContainerState.UNHEALTHY:
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found addon %s is unhealthy, restarting...", self.name
)
try:
await self.restart()
except AddonsError as err:
_LOGGER.error("Watchdog restart of addon %s failed!", self.name)
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
elif event.state == ContainerState.FAILED:
# Ensure failed container is removed before attempting reanimation
with suppress(DockerError):
await self.instance.stop(remove_container=True)
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found addon %s failed, restarting...", self.name
)
try:
await self.start()
except AddonsError as err:
_LOGGER.error(
"Watchdog reanimation of addon %s failed!", self.name
)
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
if event.state in [ContainerState.FAILED, ContainerState.UNHEALTHY]:
await self._restart_after_problem(self, event.state)

View File

@ -1,4 +1,5 @@
"""Add-on static data."""
from datetime import timedelta
from enum import Enum
@ -12,3 +13,6 @@ class AddonBackupMode(str, Enum):
ATTR_BACKUP = "backup"
ATTR_CODENOTARY = "codenotary"
WATCHDOG_RETRY_SECONDS = 10
WATCHDOG_MAX_ATTEMPTS = 5
WATCHDOG_THROTTLE_PERIOD = timedelta(minutes=30)
WATCHDOG_THROTTLE_MAX_CALLS = 10

View File

@ -122,6 +122,10 @@ class PluginError(HassioError):
"""Plugin error."""
class PluginJobError(PluginError, JobException):
"""Raise on job error with plugin."""
# HaCli
@ -133,6 +137,10 @@ class CliUpdateError(CliError):
"""Error on update of a HA cli."""
class CliJobError(CliError, PluginJobError):
"""Raise on job error with cli plugin."""
# Observer
@ -144,6 +152,10 @@ class ObserverUpdateError(ObserverError):
"""Error on update of a Observer."""
class ObserverJobError(ObserverError, PluginJobError):
"""Raise on job error with observer plugin."""
# Multicast
@ -155,6 +167,10 @@ class MulticastUpdateError(MulticastError):
"""Error on update of a multicast."""
class MulticastJobError(MulticastError, PluginJobError):
"""Raise on job error with multicast plugin."""
# DNS
@ -166,6 +182,10 @@ class CoreDNSUpdateError(CoreDNSError):
"""Error on update of a CoreDNS."""
class CoreDNSJobError(CoreDNSError, PluginJobError):
"""Raise on job error with dns plugin."""
# Audio
@ -177,6 +197,10 @@ class AudioUpdateError(AudioError):
"""Error on update of a Audio."""
class AudioJobError(AudioError, PluginJobError):
"""Raise on job error with audio plugin."""
# Addons

View File

@ -1,4 +1,5 @@
"""Constants for homeassistant."""
from datetime import timedelta
from enum import Enum
from awesomeversion import AwesomeVersion
@ -7,6 +8,9 @@ from ..const import CoreState
LANDINGPAGE: AwesomeVersion = AwesomeVersion("landingpage")
WATCHDOG_RETRY_SECONDS = 10
WATCHDOG_MAX_ATTEMPTS = 5
WATCHDOG_THROTTLE_PERIOD = timedelta(minutes=30)
WATCHDOG_THROTTLE_MAX_CALLS = 10
CLOSING_STATES = [
CoreState.SHUTDOWN,

View File

@ -13,6 +13,7 @@ from awesomeversion import AwesomeVersion
from supervisor.const import ATTR_HOMEASSISTANT, BusEvent
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.jobs.const import JobExecutionLimit
from ..coresys import CoreSys, CoreSysAttributes
from ..docker.homeassistant import DockerHomeAssistant
@ -27,7 +28,13 @@ from ..exceptions import (
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType
from ..utils import convert_to_ascii, process_lock
from .const import LANDINGPAGE, WATCHDOG_RETRY_SECONDS
from .const import (
LANDINGPAGE,
WATCHDOG_MAX_ATTEMPTS,
WATCHDOG_RETRY_SECONDS,
WATCHDOG_THROTTLE_MAX_CALLS,
WATCHDOG_THROTTLE_PERIOD,
)
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -444,46 +451,48 @@ class HomeAssistantCore(CoreSysAttributes):
if not (event.name == self.instance.name and self.sys_homeassistant.watchdog):
return
if event.state == ContainerState.UNHEALTHY:
while await self.instance.current_state() == event.state:
# Don't interrupt a task in progress or if rollback is handling it
if not (self.in_progress or self.error_state):
_LOGGER.warning(
"Watchdog found Home Assistant is unhealthy, restarting..."
)
if event.state in [ContainerState.FAILED, ContainerState.UNHEALTHY]:
await self._restart_after_problem(event.state)
@Job(
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed Home Assistant."""
attempts = 0
while await self.instance.current_state() == state:
# Don't interrupt a task in progress or if rollback is handling it
if not (self.in_progress or self.error_state):
_LOGGER.warning(
"Watchdog found Home Assistant %s, restarting...", state.value
)
if state == ContainerState.FAILED and attempts == 0:
try:
await self.restart()
await self.start()
except HomeAssistantError as err:
_LOGGER.error("Watchdog restart of Home Assistant failed!")
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
elif event.state == ContainerState.FAILED:
rebuild = False
while await self.instance.current_state() == event.state:
# Don't interrupt a task in progress or if rollback is handling it
if not (self.in_progress or self.error_state):
_LOGGER.warning(
"Watchdog found Home Assistant failed, restarting..."
)
if not rebuild:
try:
await self.start()
except HomeAssistantError as err:
self.sys_capture_exception(err)
rebuild = True
else:
break
try:
try:
if state == ContainerState.FAILED:
await self.rebuild()
except HomeAssistantError as err:
_LOGGER.error("Watchdog reanimation of Home Assistant failed!")
self.sys_capture_exception(err)
else:
break
await self.restart()
except HomeAssistantError as err:
attempts = attempts + 1
_LOGGER.error("Watchdog restart of Home Assistant failed!")
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
if attempts >= WATCHDOG_MAX_ATTEMPTS:
_LOGGER.critical(
"Watchdog cannot restart Home Assistant, failed all %s attempts",
attempts,
)
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)

View File

@ -30,3 +30,4 @@ class JobExecutionLimit(str, Enum):
ONCE = "once"
THROTTLE = "throttle"
THROTTLE_WAIT = "throttle_wait"
THROTTLE_RATE_LIMIT = "throttle_rate_limit"

View File

@ -28,6 +28,7 @@ class Job(CoreSysAttributes):
on_condition: Optional[JobException] = None,
limit: Optional[JobExecutionLimit] = None,
throttle_period: Optional[timedelta] = None,
throttle_max_calls: Optional[int] = None,
):
"""Initialize the Job class."""
self.name = name
@ -36,17 +37,30 @@ class Job(CoreSysAttributes):
self.on_condition = on_condition
self.limit = limit
self.throttle_period = throttle_period
self.throttle_max_calls = throttle_max_calls
self._lock: Optional[asyncio.Semaphore] = None
self._method = None
self._last_call = datetime.min
self._rate_limited_calls: Optional[list[datetime]] = None
# Validate Options
if (
self.limit in (JobExecutionLimit.THROTTLE, JobExecutionLimit.THROTTLE_WAIT)
self.limit
in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.THROTTLE_RATE_LIMIT,
)
and self.throttle_period is None
):
raise RuntimeError("Using Job without a Throttle period!")
if self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT:
if self.throttle_max_calls is None:
raise RuntimeError("Using rate limit without throttle max calls!")
self._rate_limited_calls = []
def _post_init(self, args: tuple[Any]) -> None:
"""Runtime init."""
if self.name is None:
@ -99,10 +113,29 @@ class Job(CoreSysAttributes):
if time_since_last_call < self.throttle_period:
self._release_exception_limits()
return
elif self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT:
# Only reprocess array when necessary (at limit)
if len(self._rate_limited_calls) >= self.throttle_max_calls:
self._rate_limited_calls = [
call
for call in self._rate_limited_calls
if call > datetime.now() - self.throttle_period
]
if len(self._rate_limited_calls) >= self.throttle_max_calls:
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period}",
)
# Execute Job
try:
self._last_call = datetime.now()
if self._rate_limited_calls is not None:
self._rate_limited_calls.append(self._last_call)
return await self._method(*args, **kwargs)
except HassioError as err:
raise err
@ -207,7 +240,10 @@ class Job(CoreSysAttributes):
return
if self.limit == JobExecutionLimit.ONCE and self._lock.locked():
raise self.on_condition("Another job is running")
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition("Another job is running")
await self._lock.acquire()

View File

@ -12,19 +12,28 @@ from typing import Optional
from awesomeversion import AwesomeVersion
import jinja2
from supervisor.docker.const import ContainerState
from supervisor.jobs.const import JobExecutionLimit
from supervisor.jobs.decorator import Job
from ..const import LogLevel
from ..coresys import CoreSys
from ..docker.audio import DockerAudio
from ..docker.stats import DockerStats
from ..exceptions import (
AudioError,
AudioJobError,
AudioUpdateError,
ConfigurationFileError,
DockerError,
)
from ..utils.json import write_json_file
from .base import PluginBase
from .const import FILE_HASSIO_AUDIO
from .const import (
FILE_HASSIO_AUDIO,
WATCHDOG_THROTTLE_MAX_CALLS,
WATCHDOG_THROTTLE_PERIOD,
)
from .validate import SCHEMA_AUDIO_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -199,3 +208,13 @@ class PluginAudio(PluginBase):
raise AudioError(
f"Can't update pulse audio config: {err}", _LOGGER.error
) from err
@Job(
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=AudioJobError,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""
return await super()._restart_after_problem(state)

View File

@ -15,7 +15,7 @@ from ..coresys import CoreSysAttributes
from ..docker.interface import DockerInterface
from ..exceptions import DockerError, PluginError
from ..utils.common import FileConfiguration
from .const import WATCHDOG_RETRY_SECONDS
from .const import WATCHDOG_MAX_ATTEMPTS, WATCHDOG_RETRY_SECONDS
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -105,49 +105,44 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
if not (event.name == self.instance.name):
return
if event.state == ContainerState.UNHEALTHY:
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found %s plugin is unhealthy, restarting...",
self.slug,
)
try:
if event.state in [
ContainerState.FAILED,
ContainerState.STOPPED,
ContainerState.UNHEALTHY,
]:
await self._restart_after_problem(event.state)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""
attempts = 0
while await self.instance.current_state() == state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found %s plugin %s, restarting...",
self.slug,
state.value,
)
try:
if state == ContainerState.STOPPED and attempts == 0:
await self.start()
else:
await self.rebuild()
except PluginError as err:
_LOGGER.error(
"Watchdog restart of %s plugin failed!", self.slug
)
self.sys_capture_exception(err)
else:
break
except PluginError as err:
attempts = attempts + 1
_LOGGER.error("Watchdog restart of %s plugin failed!", self.slug)
self.sys_capture_exception(err)
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
if attempts >= WATCHDOG_MAX_ATTEMPTS:
_LOGGER.critical(
"Watchdog cannot restart %s plugin, failed all %s attempts",
self.slug,
attempts,
)
break
elif event.state in [ContainerState.FAILED, ContainerState.STOPPED]:
rebuild = event.state == ContainerState.FAILED
while await self.instance.current_state() == event.state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found %s plugin %s, restarting...",
self.slug,
event.state.value,
)
try:
if rebuild:
await self.rebuild()
else:
await self.start()
except PluginError as err:
_LOGGER.error(
"Watchdog reanimation of %s plugin failed!", self.slug
)
self.sys_capture_exception(err)
rebuild = True
else:
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
async def rebuild(self) -> None:
"""Rebuild system plugin."""

View File

@ -13,10 +13,17 @@ from awesomeversion import AwesomeVersion
from ..const import ATTR_ACCESS_TOKEN
from ..coresys import CoreSys
from ..docker.cli import DockerCli
from ..docker.const import ContainerState
from ..docker.stats import DockerStats
from ..exceptions import CliError, CliUpdateError, DockerError
from ..exceptions import CliError, CliJobError, CliUpdateError, DockerError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .base import PluginBase
from .const import FILE_HASSIO_CLI
from .const import (
FILE_HASSIO_CLI,
WATCHDOG_THROTTLE_MAX_CALLS,
WATCHDOG_THROTTLE_PERIOD,
)
from .validate import SCHEMA_CLI_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -136,3 +143,13 @@ class PluginCli(PluginBase):
except DockerError as err:
_LOGGER.error("Repair of HA cli failed")
self.sys_capture_exception(err)
@Job(
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=CliJobError,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""
return await super()._restart_after_problem(state)

View File

@ -1,4 +1,5 @@
"""Const for plugins."""
from datetime import timedelta
from pathlib import Path
from ..const import SUPERVISOR_DATA
@ -11,3 +12,6 @@ FILE_HASSIO_MULTICAST = Path(SUPERVISOR_DATA, "multicast.json")
ATTR_FALLBACK = "fallback"
WATCHDOG_RETRY_SECONDS = 10
WATCHDOG_MAX_ATTEMPTS = 5
WATCHDOG_THROTTLE_PERIOD = timedelta(minutes=30)
WATCHDOG_THROTTLE_MAX_CALLS = 10

View File

@ -17,6 +17,8 @@ import voluptuous as vol
from supervisor.dbus.const import MulticastProtocolEnabled
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.jobs.const import JobExecutionLimit
from supervisor.jobs.decorator import Job
from ..const import ATTR_SERVERS, DNS_SUFFIX, LogLevel
from ..coresys import CoreSys
@ -25,6 +27,7 @@ from ..docker.stats import DockerStats
from ..exceptions import (
ConfigurationFileError,
CoreDNSError,
CoreDNSJobError,
CoreDNSUpdateError,
DockerError,
)
@ -32,7 +35,12 @@ from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.json import write_json_file
from ..validate import dns_url
from .base import PluginBase
from .const import ATTR_FALLBACK, FILE_HASSIO_DNS
from .const import (
ATTR_FALLBACK,
FILE_HASSIO_DNS,
WATCHDOG_THROTTLE_MAX_CALLS,
WATCHDOG_THROTTLE_PERIOD,
)
from .validate import SCHEMA_DNS_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -250,6 +258,16 @@ class PluginDns(PluginBase):
return await super().watchdog_container(event)
@Job(
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=CoreDNSJobError,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""
return await super()._restart_after_problem(state)
async def loop_detection(self) -> None:
"""Check if there was a loop found."""
log = await self.instance.logs()

View File

@ -9,12 +9,25 @@ from typing import Optional
from awesomeversion import AwesomeVersion
from supervisor.docker.const import ContainerState
from supervisor.jobs.const import JobExecutionLimit
from supervisor.jobs.decorator import Job
from ..coresys import CoreSys
from ..docker.multicast import DockerMulticast
from ..docker.stats import DockerStats
from ..exceptions import DockerError, MulticastError, MulticastUpdateError
from ..exceptions import (
DockerError,
MulticastError,
MulticastJobError,
MulticastUpdateError,
)
from .base import PluginBase
from .const import FILE_HASSIO_MULTICAST
from .const import (
FILE_HASSIO_MULTICAST,
WATCHDOG_THROTTLE_MAX_CALLS,
WATCHDOG_THROTTLE_PERIOD,
)
from .validate import SCHEMA_MULTICAST_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -127,3 +140,13 @@ class PluginMulticast(PluginBase):
except DockerError as err:
_LOGGER.error("Repair of Multicast failed")
self.sys_capture_exception(err)
@Job(
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=MulticastJobError,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""
return await super()._restart_after_problem(state)

View File

@ -13,11 +13,23 @@ from awesomeversion import AwesomeVersion
from ..const import ATTR_ACCESS_TOKEN
from ..coresys import CoreSys
from ..docker.const import ContainerState
from ..docker.observer import DockerObserver
from ..docker.stats import DockerStats
from ..exceptions import DockerError, ObserverError, ObserverUpdateError
from ..exceptions import (
DockerError,
ObserverError,
ObserverJobError,
ObserverUpdateError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .base import PluginBase
from .const import FILE_HASSIO_OBSERVER
from .const import (
FILE_HASSIO_OBSERVER,
WATCHDOG_THROTTLE_MAX_CALLS,
WATCHDOG_THROTTLE_PERIOD,
)
from .validate import SCHEMA_OBSERVER_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -137,3 +149,13 @@ class PluginObserver(PluginBase):
except DockerError as err:
_LOGGER.error("Repair of HA observer failed")
self.sys_capture_exception(err)
@Job(
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=ObserverJobError,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""
return await super()._restart_after_problem(state)

View File

@ -2,16 +2,19 @@
# pylint: disable=protected-access,import-error
import asyncio
from datetime import timedelta
from typing import Optional
from unittest.mock import PropertyMock, patch
import pytest
import time_machine
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import HassioError, JobException
from supervisor.exceptions import HassioError, JobException, PluginJobError
from supervisor.jobs.const import JobExecutionLimit
from supervisor.jobs.decorator import Job, JobCondition
from supervisor.resolution.const import UnhealthyReason
from supervisor.utils.dt import utcnow
async def test_healthy(coresys: CoreSys):
@ -220,7 +223,7 @@ async def test_ignore_conditions(coresys: CoreSys):
async def test_exception_conditions(coresys: CoreSys):
"""Test the ignore conditions decorator."""
"""Test the on condition decorator."""
class TestClass:
"""Test class."""
@ -247,7 +250,7 @@ async def test_exception_conditions(coresys: CoreSys):
async def test_exectution_limit_single_wait(
coresys: CoreSys, loop: asyncio.BaseEventLoop
):
"""Test the ignore conditions decorator."""
"""Test the single wait job execution limit."""
class TestClass:
"""Test class."""
@ -269,10 +272,10 @@ async def test_exectution_limit_single_wait(
await asyncio.gather(*[test.execute(0.1), test.execute(0.1), test.execute(0.1)])
async def test_exectution_limit_throttle_wait(
async def test_execution_limit_throttle_wait(
coresys: CoreSys, loop: asyncio.BaseEventLoop
):
"""Test the ignore conditions decorator."""
"""Test the throttle wait job execution limit."""
class TestClass:
"""Test class."""
@ -300,6 +303,47 @@ async def test_exectution_limit_throttle_wait(
assert test.call == 1
@pytest.mark.parametrize("error", [None, PluginJobError])
async def test_execution_limit_throttle_rate_limit(
coresys: CoreSys, loop: asyncio.BaseEventLoop, error: Optional[JobException]
):
"""Test the throttle wait job execution limit."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.run = asyncio.Lock()
self.call = 0
@Job(
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=timedelta(hours=1),
throttle_max_calls=2,
on_condition=error,
)
async def execute(self):
"""Execute the class method."""
self.call += 1
test = TestClass(coresys)
await asyncio.gather(*[test.execute(), test.execute()])
assert test.call == 2
with pytest.raises(JobException if error is None else error):
await test.execute()
assert test.call == 2
with time_machine.travel(utcnow() + timedelta(hours=1)):
await test.execute()
assert test.call == 3
async def test_exectution_limit_throttle(coresys: CoreSys, loop: asyncio.BaseEventLoop):
"""Test the ignore conditions decorator."""