diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index 41f46513a..53259924a 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -58,6 +58,7 @@ from ..docker.stats import DockerStats from ..exceptions import ( AddonConfigurationError, AddonsError, + AddonsJobError, AddonsNotSupportedError, ConfigurationFileError, DockerError, @@ -65,10 +66,18 @@ from ..exceptions import ( ) from ..hardware.data import Device from ..homeassistant.const import WSEvent, WSType +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from ..utils import check_port from ..utils.apparmor import adjust_profile from ..utils.json import read_json_file, write_json_file -from .const import WATCHDOG_RETRY_SECONDS, AddonBackupMode +from .const import ( + WATCHDOG_MAX_ATTEMPTS, + WATCHDOG_RETRY_SECONDS, + WATCHDOG_THROTTLE_MAX_CALLS, + WATCHDOG_THROTTLE_PERIOD, + AddonBackupMode, +) from .model import AddonModel, Data from .options import AddonOptions from .utils import remove_data @@ -104,6 +113,54 @@ class Addon(AddonModel): self.instance: DockerAddon = DockerAddon(coresys, self) self._state: AddonState = AddonState.UNKNOWN + @Job( + name=f"addon_{slug}_restart_after_problem", + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + on_condition=AddonsJobError, + ) + async def restart_after_problem(addon: Addon, state: ContainerState): + """Restart unhealthy or failed addon.""" + attempts = 0 + while await addon.instance.current_state() == state: + if not addon.in_progress: + _LOGGER.warning( + "Watchdog found addon %s is %s, restarting...", + addon.name, + state.value, + ) + try: + if state == ContainerState.FAILED: + # Ensure failed container is removed before attempting reanimation + if attempts == 0: + with suppress(DockerError): + await addon.instance.stop(remove_container=True) + + await addon.start() + else: + await addon.restart() + except AddonsError as err: + attempts = attempts + 1 + _LOGGER.error( + "Watchdog restart of addon %s failed!", addon.name + ) + addon.sys_capture_exception(err) + else: + break + + if attempts >= WATCHDOG_MAX_ATTEMPTS: + _LOGGER.critical( + "Watchdog cannot restart addon %s, failed all %s attempts", + addon.name, + attempts, + ) + break + + await asyncio.sleep(WATCHDOG_RETRY_SECONDS) + + self._restart_after_problem = restart_after_problem + def __repr__(self) -> str: """Return internal representation.""" return f"" @@ -904,40 +961,5 @@ class Addon(AddonModel): if not (event.name == self.instance.name and self.watchdog): return - if event.state == ContainerState.UNHEALTHY: - while await self.instance.current_state() == event.state: - if not self.in_progress: - _LOGGER.warning( - "Watchdog found addon %s is unhealthy, restarting...", self.name - ) - try: - await self.restart() - except AddonsError as err: - _LOGGER.error("Watchdog restart of addon %s failed!", self.name) - self.sys_capture_exception(err) - else: - break - - await asyncio.sleep(WATCHDOG_RETRY_SECONDS) - - elif event.state == ContainerState.FAILED: - # Ensure failed container is removed before attempting reanimation - with suppress(DockerError): - await self.instance.stop(remove_container=True) - - while await self.instance.current_state() == event.state: - if not self.in_progress: - _LOGGER.warning( - "Watchdog found addon %s failed, restarting...", self.name - ) - try: - await self.start() - except AddonsError as err: - _LOGGER.error( - "Watchdog reanimation of addon %s failed!", self.name - ) - self.sys_capture_exception(err) - else: - break - - await asyncio.sleep(WATCHDOG_RETRY_SECONDS) + if event.state in [ContainerState.FAILED, ContainerState.UNHEALTHY]: + await self._restart_after_problem(self, event.state) diff --git a/supervisor/addons/const.py b/supervisor/addons/const.py index 65ba146e9..9819479d7 100644 --- a/supervisor/addons/const.py +++ b/supervisor/addons/const.py @@ -1,4 +1,5 @@ """Add-on static data.""" +from datetime import timedelta from enum import Enum @@ -12,3 +13,6 @@ class AddonBackupMode(str, Enum): ATTR_BACKUP = "backup" ATTR_CODENOTARY = "codenotary" WATCHDOG_RETRY_SECONDS = 10 +WATCHDOG_MAX_ATTEMPTS = 5 +WATCHDOG_THROTTLE_PERIOD = timedelta(minutes=30) +WATCHDOG_THROTTLE_MAX_CALLS = 10 diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 5d3730026..8b516d680 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -122,6 +122,10 @@ class PluginError(HassioError): """Plugin error.""" +class PluginJobError(PluginError, JobException): + """Raise on job error with plugin.""" + + # HaCli @@ -133,6 +137,10 @@ class CliUpdateError(CliError): """Error on update of a HA cli.""" +class CliJobError(CliError, PluginJobError): + """Raise on job error with cli plugin.""" + + # Observer @@ -144,6 +152,10 @@ class ObserverUpdateError(ObserverError): """Error on update of a Observer.""" +class ObserverJobError(ObserverError, PluginJobError): + """Raise on job error with observer plugin.""" + + # Multicast @@ -155,6 +167,10 @@ class MulticastUpdateError(MulticastError): """Error on update of a multicast.""" +class MulticastJobError(MulticastError, PluginJobError): + """Raise on job error with multicast plugin.""" + + # DNS @@ -166,6 +182,10 @@ class CoreDNSUpdateError(CoreDNSError): """Error on update of a CoreDNS.""" +class CoreDNSJobError(CoreDNSError, PluginJobError): + """Raise on job error with dns plugin.""" + + # Audio @@ -177,6 +197,10 @@ class AudioUpdateError(AudioError): """Error on update of a Audio.""" +class AudioJobError(AudioError, PluginJobError): + """Raise on job error with audio plugin.""" + + # Addons diff --git a/supervisor/homeassistant/const.py b/supervisor/homeassistant/const.py index 00068a4b4..5e6f33079 100644 --- a/supervisor/homeassistant/const.py +++ b/supervisor/homeassistant/const.py @@ -1,4 +1,5 @@ """Constants for homeassistant.""" +from datetime import timedelta from enum import Enum from awesomeversion import AwesomeVersion @@ -7,6 +8,9 @@ from ..const import CoreState LANDINGPAGE: AwesomeVersion = AwesomeVersion("landingpage") WATCHDOG_RETRY_SECONDS = 10 +WATCHDOG_MAX_ATTEMPTS = 5 +WATCHDOG_THROTTLE_PERIOD = timedelta(minutes=30) +WATCHDOG_THROTTLE_MAX_CALLS = 10 CLOSING_STATES = [ CoreState.SHUTDOWN, diff --git a/supervisor/homeassistant/core.py b/supervisor/homeassistant/core.py index 46d94839c..38eaef5b7 100644 --- a/supervisor/homeassistant/core.py +++ b/supervisor/homeassistant/core.py @@ -13,6 +13,7 @@ from awesomeversion import AwesomeVersion from supervisor.const import ATTR_HOMEASSISTANT, BusEvent from supervisor.docker.const import ContainerState from supervisor.docker.monitor import DockerContainerStateEvent +from supervisor.jobs.const import JobExecutionLimit from ..coresys import CoreSys, CoreSysAttributes from ..docker.homeassistant import DockerHomeAssistant @@ -27,7 +28,13 @@ from ..exceptions import ( from ..jobs.decorator import Job, JobCondition from ..resolution.const import ContextType, IssueType from ..utils import convert_to_ascii, process_lock -from .const import LANDINGPAGE, WATCHDOG_RETRY_SECONDS +from .const import ( + LANDINGPAGE, + WATCHDOG_MAX_ATTEMPTS, + WATCHDOG_RETRY_SECONDS, + WATCHDOG_THROTTLE_MAX_CALLS, + WATCHDOG_THROTTLE_PERIOD, +) _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -444,46 +451,48 @@ class HomeAssistantCore(CoreSysAttributes): if not (event.name == self.instance.name and self.sys_homeassistant.watchdog): return - if event.state == ContainerState.UNHEALTHY: - while await self.instance.current_state() == event.state: - # Don't interrupt a task in progress or if rollback is handling it - if not (self.in_progress or self.error_state): - _LOGGER.warning( - "Watchdog found Home Assistant is unhealthy, restarting..." - ) + if event.state in [ContainerState.FAILED, ContainerState.UNHEALTHY]: + await self._restart_after_problem(event.state) + + @Job( + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + ) + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed Home Assistant.""" + attempts = 0 + while await self.instance.current_state() == state: + # Don't interrupt a task in progress or if rollback is handling it + if not (self.in_progress or self.error_state): + _LOGGER.warning( + "Watchdog found Home Assistant %s, restarting...", state.value + ) + if state == ContainerState.FAILED and attempts == 0: try: - await self.restart() + await self.start() except HomeAssistantError as err: - _LOGGER.error("Watchdog restart of Home Assistant failed!") self.sys_capture_exception(err) else: break - await asyncio.sleep(WATCHDOG_RETRY_SECONDS) - - elif event.state == ContainerState.FAILED: - rebuild = False - while await self.instance.current_state() == event.state: - # Don't interrupt a task in progress or if rollback is handling it - if not (self.in_progress or self.error_state): - _LOGGER.warning( - "Watchdog found Home Assistant failed, restarting..." - ) - if not rebuild: - try: - await self.start() - except HomeAssistantError as err: - self.sys_capture_exception(err) - rebuild = True - else: - break - - try: + try: + if state == ContainerState.FAILED: await self.rebuild() - except HomeAssistantError as err: - _LOGGER.error("Watchdog reanimation of Home Assistant failed!") - self.sys_capture_exception(err) else: - break + await self.restart() + except HomeAssistantError as err: + attempts = attempts + 1 + _LOGGER.error("Watchdog restart of Home Assistant failed!") + self.sys_capture_exception(err) + else: + break - await asyncio.sleep(WATCHDOG_RETRY_SECONDS) + if attempts >= WATCHDOG_MAX_ATTEMPTS: + _LOGGER.critical( + "Watchdog cannot restart Home Assistant, failed all %s attempts", + attempts, + ) + break + + await asyncio.sleep(WATCHDOG_RETRY_SECONDS) diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index bb67c6575..f8cde0c97 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -30,3 +30,4 @@ class JobExecutionLimit(str, Enum): ONCE = "once" THROTTLE = "throttle" THROTTLE_WAIT = "throttle_wait" + THROTTLE_RATE_LIMIT = "throttle_rate_limit" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index 9d3b49bbe..9e020fda9 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -28,6 +28,7 @@ class Job(CoreSysAttributes): on_condition: Optional[JobException] = None, limit: Optional[JobExecutionLimit] = None, throttle_period: Optional[timedelta] = None, + throttle_max_calls: Optional[int] = None, ): """Initialize the Job class.""" self.name = name @@ -36,17 +37,30 @@ class Job(CoreSysAttributes): self.on_condition = on_condition self.limit = limit self.throttle_period = throttle_period + self.throttle_max_calls = throttle_max_calls self._lock: Optional[asyncio.Semaphore] = None self._method = None self._last_call = datetime.min + self._rate_limited_calls: Optional[list[datetime]] = None # Validate Options if ( - self.limit in (JobExecutionLimit.THROTTLE, JobExecutionLimit.THROTTLE_WAIT) + self.limit + in ( + JobExecutionLimit.THROTTLE, + JobExecutionLimit.THROTTLE_WAIT, + JobExecutionLimit.THROTTLE_RATE_LIMIT, + ) and self.throttle_period is None ): raise RuntimeError("Using Job without a Throttle period!") + if self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT: + if self.throttle_max_calls is None: + raise RuntimeError("Using rate limit without throttle max calls!") + + self._rate_limited_calls = [] + def _post_init(self, args: tuple[Any]) -> None: """Runtime init.""" if self.name is None: @@ -99,10 +113,29 @@ class Job(CoreSysAttributes): if time_since_last_call < self.throttle_period: self._release_exception_limits() return + elif self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT: + # Only reprocess array when necessary (at limit) + if len(self._rate_limited_calls) >= self.throttle_max_calls: + self._rate_limited_calls = [ + call + for call in self._rate_limited_calls + if call > datetime.now() - self.throttle_period + ] + + if len(self._rate_limited_calls) >= self.throttle_max_calls: + on_condition = ( + JobException if self.on_condition is None else self.on_condition + ) + raise on_condition( + f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period}", + ) # Execute Job try: self._last_call = datetime.now() + if self._rate_limited_calls is not None: + self._rate_limited_calls.append(self._last_call) + return await self._method(*args, **kwargs) except HassioError as err: raise err @@ -207,7 +240,10 @@ class Job(CoreSysAttributes): return if self.limit == JobExecutionLimit.ONCE and self._lock.locked(): - raise self.on_condition("Another job is running") + on_condition = ( + JobException if self.on_condition is None else self.on_condition + ) + raise on_condition("Another job is running") await self._lock.acquire() diff --git a/supervisor/plugins/audio.py b/supervisor/plugins/audio.py index fb0338af7..1b1502c11 100644 --- a/supervisor/plugins/audio.py +++ b/supervisor/plugins/audio.py @@ -12,19 +12,28 @@ from typing import Optional from awesomeversion import AwesomeVersion import jinja2 +from supervisor.docker.const import ContainerState +from supervisor.jobs.const import JobExecutionLimit +from supervisor.jobs.decorator import Job + from ..const import LogLevel from ..coresys import CoreSys from ..docker.audio import DockerAudio from ..docker.stats import DockerStats from ..exceptions import ( AudioError, + AudioJobError, AudioUpdateError, ConfigurationFileError, DockerError, ) from ..utils.json import write_json_file from .base import PluginBase -from .const import FILE_HASSIO_AUDIO +from .const import ( + FILE_HASSIO_AUDIO, + WATCHDOG_THROTTLE_MAX_CALLS, + WATCHDOG_THROTTLE_PERIOD, +) from .validate import SCHEMA_AUDIO_CONFIG _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -199,3 +208,13 @@ class PluginAudio(PluginBase): raise AudioError( f"Can't update pulse audio config: {err}", _LOGGER.error ) from err + + @Job( + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + on_condition=AudioJobError, + ) + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed plugin.""" + return await super()._restart_after_problem(state) diff --git a/supervisor/plugins/base.py b/supervisor/plugins/base.py index 31f7e46c2..f279069c3 100644 --- a/supervisor/plugins/base.py +++ b/supervisor/plugins/base.py @@ -15,7 +15,7 @@ from ..coresys import CoreSysAttributes from ..docker.interface import DockerInterface from ..exceptions import DockerError, PluginError from ..utils.common import FileConfiguration -from .const import WATCHDOG_RETRY_SECONDS +from .const import WATCHDOG_MAX_ATTEMPTS, WATCHDOG_RETRY_SECONDS _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -105,49 +105,44 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes): if not (event.name == self.instance.name): return - if event.state == ContainerState.UNHEALTHY: - while await self.instance.current_state() == event.state: - if not self.in_progress: - _LOGGER.warning( - "Watchdog found %s plugin is unhealthy, restarting...", - self.slug, - ) - try: + if event.state in [ + ContainerState.FAILED, + ContainerState.STOPPED, + ContainerState.UNHEALTHY, + ]: + await self._restart_after_problem(event.state) + + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed plugin.""" + attempts = 0 + while await self.instance.current_state() == state: + if not self.in_progress: + _LOGGER.warning( + "Watchdog found %s plugin %s, restarting...", + self.slug, + state.value, + ) + try: + if state == ContainerState.STOPPED and attempts == 0: + await self.start() + else: await self.rebuild() - except PluginError as err: - _LOGGER.error( - "Watchdog restart of %s plugin failed!", self.slug - ) - self.sys_capture_exception(err) - else: - break + except PluginError as err: + attempts = attempts + 1 + _LOGGER.error("Watchdog restart of %s plugin failed!", self.slug) + self.sys_capture_exception(err) + else: + break - await asyncio.sleep(WATCHDOG_RETRY_SECONDS) + if attempts >= WATCHDOG_MAX_ATTEMPTS: + _LOGGER.critical( + "Watchdog cannot restart %s plugin, failed all %s attempts", + self.slug, + attempts, + ) + break - elif event.state in [ContainerState.FAILED, ContainerState.STOPPED]: - rebuild = event.state == ContainerState.FAILED - while await self.instance.current_state() == event.state: - if not self.in_progress: - _LOGGER.warning( - "Watchdog found %s plugin %s, restarting...", - self.slug, - event.state.value, - ) - try: - if rebuild: - await self.rebuild() - else: - await self.start() - except PluginError as err: - _LOGGER.error( - "Watchdog reanimation of %s plugin failed!", self.slug - ) - self.sys_capture_exception(err) - rebuild = True - else: - break - - await asyncio.sleep(WATCHDOG_RETRY_SECONDS) + await asyncio.sleep(WATCHDOG_RETRY_SECONDS) async def rebuild(self) -> None: """Rebuild system plugin.""" diff --git a/supervisor/plugins/cli.py b/supervisor/plugins/cli.py index 12a9818c0..a3dabcc41 100644 --- a/supervisor/plugins/cli.py +++ b/supervisor/plugins/cli.py @@ -13,10 +13,17 @@ from awesomeversion import AwesomeVersion from ..const import ATTR_ACCESS_TOKEN from ..coresys import CoreSys from ..docker.cli import DockerCli +from ..docker.const import ContainerState from ..docker.stats import DockerStats -from ..exceptions import CliError, CliUpdateError, DockerError +from ..exceptions import CliError, CliJobError, CliUpdateError, DockerError +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .base import PluginBase -from .const import FILE_HASSIO_CLI +from .const import ( + FILE_HASSIO_CLI, + WATCHDOG_THROTTLE_MAX_CALLS, + WATCHDOG_THROTTLE_PERIOD, +) from .validate import SCHEMA_CLI_CONFIG _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -136,3 +143,13 @@ class PluginCli(PluginBase): except DockerError as err: _LOGGER.error("Repair of HA cli failed") self.sys_capture_exception(err) + + @Job( + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + on_condition=CliJobError, + ) + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed plugin.""" + return await super()._restart_after_problem(state) diff --git a/supervisor/plugins/const.py b/supervisor/plugins/const.py index ca64f613e..0f3086368 100644 --- a/supervisor/plugins/const.py +++ b/supervisor/plugins/const.py @@ -1,4 +1,5 @@ """Const for plugins.""" +from datetime import timedelta from pathlib import Path from ..const import SUPERVISOR_DATA @@ -11,3 +12,6 @@ FILE_HASSIO_MULTICAST = Path(SUPERVISOR_DATA, "multicast.json") ATTR_FALLBACK = "fallback" WATCHDOG_RETRY_SECONDS = 10 +WATCHDOG_MAX_ATTEMPTS = 5 +WATCHDOG_THROTTLE_PERIOD = timedelta(minutes=30) +WATCHDOG_THROTTLE_MAX_CALLS = 10 diff --git a/supervisor/plugins/dns.py b/supervisor/plugins/dns.py index efe0554dc..adc18a13e 100644 --- a/supervisor/plugins/dns.py +++ b/supervisor/plugins/dns.py @@ -17,6 +17,8 @@ import voluptuous as vol from supervisor.dbus.const import MulticastProtocolEnabled from supervisor.docker.const import ContainerState from supervisor.docker.monitor import DockerContainerStateEvent +from supervisor.jobs.const import JobExecutionLimit +from supervisor.jobs.decorator import Job from ..const import ATTR_SERVERS, DNS_SUFFIX, LogLevel from ..coresys import CoreSys @@ -25,6 +27,7 @@ from ..docker.stats import DockerStats from ..exceptions import ( ConfigurationFileError, CoreDNSError, + CoreDNSJobError, CoreDNSUpdateError, DockerError, ) @@ -32,7 +35,12 @@ from ..resolution.const import ContextType, IssueType, SuggestionType from ..utils.json import write_json_file from ..validate import dns_url from .base import PluginBase -from .const import ATTR_FALLBACK, FILE_HASSIO_DNS +from .const import ( + ATTR_FALLBACK, + FILE_HASSIO_DNS, + WATCHDOG_THROTTLE_MAX_CALLS, + WATCHDOG_THROTTLE_PERIOD, +) from .validate import SCHEMA_DNS_CONFIG _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -250,6 +258,16 @@ class PluginDns(PluginBase): return await super().watchdog_container(event) + @Job( + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + on_condition=CoreDNSJobError, + ) + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed plugin.""" + return await super()._restart_after_problem(state) + async def loop_detection(self) -> None: """Check if there was a loop found.""" log = await self.instance.logs() diff --git a/supervisor/plugins/multicast.py b/supervisor/plugins/multicast.py index 911bbb3ae..40f7e1f5a 100644 --- a/supervisor/plugins/multicast.py +++ b/supervisor/plugins/multicast.py @@ -9,12 +9,25 @@ from typing import Optional from awesomeversion import AwesomeVersion +from supervisor.docker.const import ContainerState +from supervisor.jobs.const import JobExecutionLimit +from supervisor.jobs.decorator import Job + from ..coresys import CoreSys from ..docker.multicast import DockerMulticast from ..docker.stats import DockerStats -from ..exceptions import DockerError, MulticastError, MulticastUpdateError +from ..exceptions import ( + DockerError, + MulticastError, + MulticastJobError, + MulticastUpdateError, +) from .base import PluginBase -from .const import FILE_HASSIO_MULTICAST +from .const import ( + FILE_HASSIO_MULTICAST, + WATCHDOG_THROTTLE_MAX_CALLS, + WATCHDOG_THROTTLE_PERIOD, +) from .validate import SCHEMA_MULTICAST_CONFIG _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -127,3 +140,13 @@ class PluginMulticast(PluginBase): except DockerError as err: _LOGGER.error("Repair of Multicast failed") self.sys_capture_exception(err) + + @Job( + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + on_condition=MulticastJobError, + ) + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed plugin.""" + return await super()._restart_after_problem(state) diff --git a/supervisor/plugins/observer.py b/supervisor/plugins/observer.py index 810a70ac6..ff91c3ad8 100644 --- a/supervisor/plugins/observer.py +++ b/supervisor/plugins/observer.py @@ -13,11 +13,23 @@ from awesomeversion import AwesomeVersion from ..const import ATTR_ACCESS_TOKEN from ..coresys import CoreSys +from ..docker.const import ContainerState from ..docker.observer import DockerObserver from ..docker.stats import DockerStats -from ..exceptions import DockerError, ObserverError, ObserverUpdateError +from ..exceptions import ( + DockerError, + ObserverError, + ObserverJobError, + ObserverUpdateError, +) +from ..jobs.const import JobExecutionLimit +from ..jobs.decorator import Job from .base import PluginBase -from .const import FILE_HASSIO_OBSERVER +from .const import ( + FILE_HASSIO_OBSERVER, + WATCHDOG_THROTTLE_MAX_CALLS, + WATCHDOG_THROTTLE_PERIOD, +) from .validate import SCHEMA_OBSERVER_CONFIG _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -137,3 +149,13 @@ class PluginObserver(PluginBase): except DockerError as err: _LOGGER.error("Repair of HA observer failed") self.sys_capture_exception(err) + + @Job( + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=WATCHDOG_THROTTLE_PERIOD, + throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS, + on_condition=ObserverJobError, + ) + async def _restart_after_problem(self, state: ContainerState): + """Restart unhealthy or failed plugin.""" + return await super()._restart_after_problem(state) diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index e3a94cc5d..9c66ddacf 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -2,16 +2,19 @@ # pylint: disable=protected-access,import-error import asyncio from datetime import timedelta +from typing import Optional from unittest.mock import PropertyMock, patch import pytest +import time_machine from supervisor.const import CoreState from supervisor.coresys import CoreSys -from supervisor.exceptions import HassioError, JobException +from supervisor.exceptions import HassioError, JobException, PluginJobError from supervisor.jobs.const import JobExecutionLimit from supervisor.jobs.decorator import Job, JobCondition from supervisor.resolution.const import UnhealthyReason +from supervisor.utils.dt import utcnow async def test_healthy(coresys: CoreSys): @@ -220,7 +223,7 @@ async def test_ignore_conditions(coresys: CoreSys): async def test_exception_conditions(coresys: CoreSys): - """Test the ignore conditions decorator.""" + """Test the on condition decorator.""" class TestClass: """Test class.""" @@ -247,7 +250,7 @@ async def test_exception_conditions(coresys: CoreSys): async def test_exectution_limit_single_wait( coresys: CoreSys, loop: asyncio.BaseEventLoop ): - """Test the ignore conditions decorator.""" + """Test the single wait job execution limit.""" class TestClass: """Test class.""" @@ -269,10 +272,10 @@ async def test_exectution_limit_single_wait( await asyncio.gather(*[test.execute(0.1), test.execute(0.1), test.execute(0.1)]) -async def test_exectution_limit_throttle_wait( +async def test_execution_limit_throttle_wait( coresys: CoreSys, loop: asyncio.BaseEventLoop ): - """Test the ignore conditions decorator.""" + """Test the throttle wait job execution limit.""" class TestClass: """Test class.""" @@ -300,6 +303,47 @@ async def test_exectution_limit_throttle_wait( assert test.call == 1 +@pytest.mark.parametrize("error", [None, PluginJobError]) +async def test_execution_limit_throttle_rate_limit( + coresys: CoreSys, loop: asyncio.BaseEventLoop, error: Optional[JobException] +): + """Test the throttle wait job execution limit.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + self.coresys = coresys + self.run = asyncio.Lock() + self.call = 0 + + @Job( + limit=JobExecutionLimit.THROTTLE_RATE_LIMIT, + throttle_period=timedelta(hours=1), + throttle_max_calls=2, + on_condition=error, + ) + async def execute(self): + """Execute the class method.""" + self.call += 1 + + test = TestClass(coresys) + + await asyncio.gather(*[test.execute(), test.execute()]) + assert test.call == 2 + + with pytest.raises(JobException if error is None else error): + await test.execute() + + assert test.call == 2 + + with time_machine.travel(utcnow() + timedelta(hours=1)): + await test.execute() + + assert test.call == 3 + + async def test_exectution_limit_throttle(coresys: CoreSys, loop: asyncio.BaseEventLoop): """Test the ignore conditions decorator."""