Add job names and references everywhere (#4495)

* Add job names and references everywhere

* Remove group names check and switch to const

* Ensure unique job names in decorator tests
This commit is contained in:
Mike Degatano 2023-08-21 03:15:37 -04:00 committed by GitHub
parent f2f9e3b514
commit 93ba8a3574
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 779 additions and 196 deletions

View File

@ -152,11 +152,15 @@ class AddonManager(CoreSysAttributes):
capture_exception(err)
@Job(
name="addon_manager_install",
conditions=ADDON_UPDATE_CONDITIONS,
on_condition=AddonsJobError,
)
async def install(self, slug: str) -> None:
"""Install an add-on."""
if job := self.sys_jobs.get_job():
job.reference = slug
if slug in self.local:
raise AddonsError(f"Add-on {slug} is already installed", _LOGGER.warning)
store = self.store.get(slug)
@ -247,6 +251,7 @@ class AddonManager(CoreSysAttributes):
_LOGGER.info("Add-on '%s' successfully removed", slug)
@Job(
name="addon_manager_update",
conditions=ADDON_UPDATE_CONDITIONS,
on_condition=AddonsJobError,
)
@ -258,6 +263,9 @@ class AddonManager(CoreSysAttributes):
Returns a coroutine that completes when addon has state 'started' (see addon.start)
if addon is started after update. Else nothing is returned.
"""
if job := self.sys_jobs.get_job():
job.reference = slug
if slug not in self.local:
raise AddonsError(f"Add-on {slug} is not installed", _LOGGER.error)
addon = self.local[slug]
@ -307,6 +315,7 @@ class AddonManager(CoreSysAttributes):
)
@Job(
name="addon_manager_rebuild",
conditions=[
JobCondition.FREE_SPACE,
JobCondition.INTERNET_HOST,
@ -320,6 +329,9 @@ class AddonManager(CoreSysAttributes):
Returns a coroutine that completes when addon has state 'started' (see addon.start)
if addon is started after rebuild. Else nothing is returned.
"""
if job := self.sys_jobs.get_job():
job.reference = slug
if slug not in self.local:
raise AddonsError(f"Add-on {slug} is not installed", _LOGGER.error)
addon = self.local[slug]
@ -359,6 +371,7 @@ class AddonManager(CoreSysAttributes):
)
@Job(
name="addon_manager_restore",
conditions=[
JobCondition.FREE_SPACE,
JobCondition.INTERNET_HOST,
@ -374,6 +387,9 @@ class AddonManager(CoreSysAttributes):
Returns a coroutine that completes when addon has state 'started' (see addon.start)
if addon is started after restore. Else nothing is returned.
"""
if job := self.sys_jobs.get_job():
job.reference = slug
if slug not in self.local:
_LOGGER.debug("Add-on %s is not local available for restore", slug)
addon = Addon(self.coresys, slug)
@ -396,7 +412,10 @@ class AddonManager(CoreSysAttributes):
return wait_for_start
@Job(conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST])
@Job(
name="addon_manager_repair",
conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST],
)
async def repair(self) -> None:
"""Repair local add-ons."""
needs_repair: list[Addon] = []

View File

@ -131,54 +131,6 @@ class Addon(AddonModel):
self._startup_event = asyncio.Event()
self._startup_task: asyncio.Task | None = None
@Job(
name=f"addon_{slug}_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=AddonsJobError,
)
async def restart_after_problem(addon: Addon, state: ContainerState):
"""Restart unhealthy or failed addon."""
attempts = 0
while await addon.instance.current_state() == state:
if not addon.in_progress:
_LOGGER.warning(
"Watchdog found addon %s is %s, restarting...",
addon.name,
state.value,
)
try:
if state == ContainerState.FAILED:
# Ensure failed container is removed before attempting reanimation
if attempts == 0:
with suppress(DockerError):
await addon.instance.stop(remove_container=True)
await (await addon.start())
else:
await (await addon.restart())
except AddonsError as err:
attempts = attempts + 1
_LOGGER.error(
"Watchdog restart of addon %s failed!", addon.name
)
capture_exception(err)
else:
break
if attempts >= WATCHDOG_MAX_ATTEMPTS:
_LOGGER.critical(
"Watchdog cannot restart addon %s, failed all %s attempts",
addon.name,
attempts,
)
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
self._restart_after_problem = restart_after_problem
def __repr__(self) -> str:
"""Return internal representation."""
return f"<Addon: {self.slug}>"
@ -1033,6 +985,50 @@ class Addon(AddonModel):
"""
return self.instance.check_trust()
@Job(
name="addon_restart_after_problem",
limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=AddonsJobError,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed addon."""
attempts = 0
while await self.instance.current_state() == state:
if not self.in_progress:
_LOGGER.warning(
"Watchdog found addon %s is %s, restarting...",
self.name,
state.value,
)
try:
if state == ContainerState.FAILED:
# Ensure failed container is removed before attempting reanimation
if attempts == 0:
with suppress(DockerError):
await self.instance.stop(remove_container=True)
await (await self.start())
else:
await (await self.restart())
except AddonsError as err:
attempts = attempts + 1
_LOGGER.error("Watchdog restart of addon %s failed!", self.name)
capture_exception(err)
else:
break
if attempts >= WATCHDOG_MAX_ATTEMPTS:
_LOGGER.critical(
"Watchdog cannot restart addon %s, failed all %s attempts",
self.name,
attempts,
)
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
async def container_state_changed(self, event: DockerContainerStateEvent) -> None:
"""Set addon state from container state."""
if event.name != self.instance.name:
@ -1067,4 +1063,4 @@ class Addon(AddonModel):
ContainerState.STOPPED,
ContainerState.UNHEALTHY,
]:
await self._restart_after_problem(self, event.state)
await self._restart_after_problem(event.state)

View File

@ -1,5 +1,6 @@
"""Init file for Supervisor add-ons."""
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Awaitable, Callable
from contextlib import suppress
import logging
@ -79,9 +80,11 @@ from ..const import (
AddonStage,
AddonStartup,
)
from ..coresys import CoreSys, CoreSysAttributes
from ..coresys import CoreSys
from ..docker.const import Capabilities
from ..exceptions import AddonsNotSupportedError
from ..jobs.const import JOB_GROUP_ADDON
from ..jobs.job_group import JobGroup
from .const import ATTR_BACKUP, ATTR_CODENOTARY, AddonBackupMode
from .options import AddonOptions, UiOptions
from .validate import RE_SERVICE, RE_VOLUME
@ -91,12 +94,14 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
Data = dict[str, Any]
class AddonModel(CoreSysAttributes, ABC):
class AddonModel(JobGroup, ABC):
"""Add-on Data layout."""
def __init__(self, coresys: CoreSys, slug: str):
"""Initialize data holder."""
self.coresys: CoreSys = coresys
super().__init__(
coresys, JOB_GROUP_ADDON.format_map(defaultdict(str, slug=slug)), slug
)
self.slug: str = slug
@property

View File

@ -110,6 +110,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
backup.store_repositories()
backup.store_dockerconfig()
# Add backup ID to job
if job := self.sys_jobs.get_job():
job.reference = backup.slug
return backup
def load(self):
@ -224,7 +228,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
finally:
self.sys_core.state = CoreState.RUNNING
@Job(conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING])
@Job(
name="backup_manager_full_backup",
conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING],
)
async def do_backup_full(
self,
name="",
@ -250,7 +257,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
_LOGGER.info("Creating full backup with slug %s completed", backup.slug)
return backup
@Job(conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING])
@Job(
name="backup_manager_partial_backup",
conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING],
)
async def do_backup_partial(
self,
name: str = "",
@ -371,16 +381,21 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
await self.sys_homeassistant.core.restart()
@Job(
name="backup_manager_full_restore",
conditions=[
JobCondition.FREE_SPACE,
JobCondition.HEALTHY,
JobCondition.INTERNET_HOST,
JobCondition.INTERNET_SYSTEM,
JobCondition.RUNNING,
]
],
)
async def do_restore_full(self, backup: Backup, password=None):
"""Restore a backup."""
# Add backup ID to job
if job := self.sys_jobs.get_job():
job.reference = backup.slug
if self.lock.locked():
_LOGGER.error("A backup/restore process is already running")
return False
@ -418,13 +433,14 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
_LOGGER.info("Full-Restore %s done", backup.slug)
@Job(
name="backup_manager_partial_restore",
conditions=[
JobCondition.FREE_SPACE,
JobCondition.HEALTHY,
JobCondition.INTERNET_HOST,
JobCondition.INTERNET_SYSTEM,
JobCondition.RUNNING,
]
],
)
async def do_restore_partial(
self,
@ -435,6 +451,10 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
password: str | None = None,
):
"""Restore a backup."""
# Add backup ID to job
if job := self.sys_jobs.get_job():
job.reference = backup.slug
if self.lock.locked():
_LOGGER.error("A backup/restore process is already running")
return False

View File

@ -494,7 +494,11 @@ class DockerAddon(DockerInterface):
return mounts
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_addon_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():
@ -565,7 +569,11 @@ class DockerAddon(DockerInterface):
BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events
)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_addon_update",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None:
@ -585,7 +593,11 @@ class DockerAddon(DockerInterface):
with suppress(DockerError):
await self.stop()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_addon_install",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def install(
self,
version: AwesomeVersion,
@ -636,14 +648,22 @@ class DockerAddon(DockerInterface):
_LOGGER.info("Build %s:%s done", self.image, version)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_addon_export_image",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
def export_image(self, tar_file: Path) -> Awaitable[None]:
"""Export current images into a tar file."""
return self.sys_run_in_executor(
self.sys_docker.export_image, self.image, self.version, tar_file
)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_addon_import_image",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def import_image(self, tar_file: Path) -> None:
"""Import a tar file as image."""
docker_image = await self.sys_run_in_executor(
@ -656,7 +676,11 @@ class DockerAddon(DockerInterface):
with suppress(DockerError):
await self.cleanup()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_addon_write_stdin",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin."""
if not await self.is_running():
@ -686,7 +710,11 @@ class DockerAddon(DockerInterface):
_LOGGER.error("Can't write to %s stdin: %s", self.name, err)
raise DockerError() from err
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_addon_stop",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
# DNS
@ -714,7 +742,11 @@ class DockerAddon(DockerInterface):
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_content(self.addon.codenotary, checksum)
@Job(conditions=[JobCondition.OS_AGENT], limit=JobExecutionLimit.SINGLE_WAIT)
@Job(
name="docker_addon_hardware_events",
conditions=[JobCondition.OS_AGENT],
limit=JobExecutionLimit.SINGLE_WAIT,
)
async def _hardware_events(self, device: Device) -> None:
"""Process Hardware events for adjust device access."""
if not any(

View File

@ -85,7 +85,11 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
return None
return DOCKER_CPU_RUNTIME_ALLOCATION
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_audio_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():

View File

@ -26,7 +26,11 @@ class DockerCli(DockerInterface, CoreSysAttributes):
"""Return name of Docker container."""
return CLI_DOCKER_NAME
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_cli_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():

View File

@ -28,7 +28,11 @@ class DockerDNS(DockerInterface, CoreSysAttributes):
"""Return name of Docker container."""
return DNS_DOCKER_NAME
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_dns_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():

View File

@ -133,7 +133,11 @@ class DockerHomeAssistant(DockerInterface):
return mounts
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_home_assistant_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():
@ -176,7 +180,11 @@ class DockerHomeAssistant(DockerInterface):
"Starting Home Assistant %s with version %s", self.image, self.version
)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_home_assistant_execute_command",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command."""
return await self.sys_run_in_executor(

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from collections import defaultdict
from collections.abc import Awaitable
from contextlib import suppress
import logging
@ -36,7 +37,7 @@ from ..exceptions import (
DockerRequestError,
DockerTrustError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobExecutionLimit
from ..jobs.decorator import Job
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType, SuggestionType
@ -82,7 +83,13 @@ class DockerInterface(JobGroup):
def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper."""
super().__init__(coresys, f"container_{self.name or uuid4().hex}")
super().__init__(
coresys,
JOB_GROUP_DOCKER_INTERFACE.format_map(
defaultdict(str, name=self.name or uuid4().hex)
),
self.name,
)
self.coresys: CoreSys = coresys
self._meta: dict[str, Any] | None = None
self.lock: asyncio.Lock = asyncio.Lock()
@ -209,7 +216,11 @@ class DockerInterface(JobGroup):
await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_install",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def install(
self,
version: AwesomeVersion,
@ -323,7 +334,11 @@ class DockerInterface(JobGroup):
return _container_state_from_model(docker_container)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_attach",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> None:
@ -359,12 +374,20 @@ class DockerInterface(JobGroup):
raise DockerError()
_LOGGER.info("Attaching to %s with version %s", self.image, self.version)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
raise NotImplementedError()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_stop",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
with suppress(DockerNotFound):
@ -375,12 +398,20 @@ class DockerInterface(JobGroup):
remove_container,
)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_start",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
def start(self) -> Awaitable[None]:
"""Start Docker container."""
return self.sys_run_in_executor(self.sys_docker.start_container, self.name)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_remove",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def remove(self) -> None:
"""Remove Docker images."""
# Cleanup container
@ -392,7 +423,11 @@ class DockerInterface(JobGroup):
)
self._meta = None
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_update",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None:
@ -419,7 +454,11 @@ class DockerInterface(JobGroup):
return b""
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_cleanup",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
def cleanup(self, old_image: str | None = None) -> Awaitable[None]:
"""Check if old version exists and cleanup."""
return self.sys_run_in_executor(
@ -429,14 +468,22 @@ class DockerInterface(JobGroup):
{old_image} if old_image else None,
)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_restart",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
def restart(self) -> Awaitable[None]:
"""Restart docker container."""
return self.sys_run_in_executor(
self.sys_docker.restart_container, self.name, self.timeout
)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_execute_command",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command."""
raise NotImplementedError()
@ -497,7 +544,11 @@ class DockerInterface(JobGroup):
available_version.sort(reverse=True)
return available_version[0]
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_run_inside",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
def run_inside(self, command: str) -> Awaitable[CommandReturn]:
"""Execute a command inside Docker container."""
return self.sys_run_in_executor(
@ -511,7 +562,11 @@ class DockerInterface(JobGroup):
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_own_content(checksum)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_interface_check_trust",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def check_trust(self) -> None:
"""Check trust of exists Docker image."""
try:

View File

@ -31,7 +31,11 @@ class DockerMulticast(DockerInterface, CoreSysAttributes):
"""Generate needed capabilities."""
return [Capabilities.NET_ADMIN.value]
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_multicast_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():

View File

@ -28,7 +28,11 @@ class DockerObserver(DockerInterface, CoreSysAttributes):
"""Return name of Docker container."""
return OBSERVER_DOCKER_NAME
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_observer_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():

View File

@ -45,7 +45,11 @@ class DockerSupervisor(DockerInterface, CoreSysAttributes):
if mount.get("Destination") == "/data"
)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
@Job(
name="docker_supervisor_attach",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
)
async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> None:

View File

@ -32,7 +32,10 @@ class HomeAssistantAPI(CoreSysAttributes):
self.access_token: str | None = None
self._access_token_expires: datetime | None = None
@Job(limit=JobExecutionLimit.SINGLE_WAIT)
@Job(
name="home_assistant_api_ensure_access_token",
limit=JobExecutionLimit.SINGLE_WAIT,
)
async def ensure_access_token(self) -> None:
"""Ensure there is an access token."""
if (

View File

@ -24,7 +24,7 @@ from ..exceptions import (
HomeAssistantUpdateError,
JobException,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JOB_GROUP_HOME_ASSISTANT_CORE, JobExecutionLimit
from ..jobs.decorator import Job, JobCondition
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType
@ -56,7 +56,7 @@ class HomeAssistantCore(JobGroup):
def __init__(self, coresys: CoreSys):
"""Initialize Home Assistant object."""
super().__init__(coresys, "home_assistant_core")
super().__init__(coresys, JOB_GROUP_HOME_ASSISTANT_CORE)
self.instance: DockerHomeAssistant = DockerHomeAssistant(coresys)
self.lock: asyncio.Lock = asyncio.Lock()
self._error_state: bool = False
@ -99,7 +99,11 @@ class HomeAssistantCore(JobGroup):
with suppress(HomeAssistantError):
await self.start()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
@Job(
name="home_assistant_core_install_landing_page",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
)
async def install_landingpage(self) -> None:
"""Install a landing page."""
# Try to use a preinstalled landingpage
@ -141,7 +145,11 @@ class HomeAssistantCore(JobGroup):
self.sys_homeassistant.image = self.sys_updater.image_homeassistant
self.sys_homeassistant.save_data()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
@Job(
name="home_assistant_core_install",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
)
async def install(self) -> None:
"""Install a landing page."""
_LOGGER.info("Home Assistant setup")
@ -182,6 +190,7 @@ class HomeAssistantCore(JobGroup):
await self.instance.cleanup()
@Job(
name="home_assistant_core_update",
conditions=[
JobCondition.FREE_SPACE,
JobCondition.HEALTHY,
@ -283,7 +292,11 @@ class HomeAssistantCore(JobGroup):
self.sys_resolution.create_issue(IssueType.UPDATE_FAILED, ContextType.CORE)
raise HomeAssistantUpdateError()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
@Job(
name="home_assistant_core_start",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
)
async def start(self) -> None:
"""Run Home Assistant docker."""
if await self.instance.is_running():
@ -314,7 +327,11 @@ class HomeAssistantCore(JobGroup):
await self._block_till_run(self.sys_homeassistant.version)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
@Job(
name="home_assistant_core_stop",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
)
async def stop(self) -> None:
"""Stop Home Assistant Docker."""
try:
@ -322,7 +339,11 @@ class HomeAssistantCore(JobGroup):
except DockerError as err:
raise HomeAssistantError() from err
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
@Job(
name="home_assistant_core_restart",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
)
async def restart(self) -> None:
"""Restart Home Assistant Docker."""
try:
@ -332,7 +353,11 @@ class HomeAssistantCore(JobGroup):
await self._block_till_run(self.sys_homeassistant.version)
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=HomeAssistantJobError)
@Job(
name="home_assistant_core_rebuild",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
)
async def rebuild(self) -> None:
"""Rebuild Home Assistant Docker container."""
with suppress(DockerError):
@ -429,10 +454,11 @@ class HomeAssistantCore(JobGroup):
raise HomeAssistantCrashError()
@Job(
name="home_assistant_core_repair",
conditions=[
JobCondition.FREE_SPACE,
JobCondition.INTERNET_HOST,
]
],
)
async def repair(self):
"""Repair local Home Assistant data."""
@ -454,6 +480,7 @@ class HomeAssistantCore(JobGroup):
await self._restart_after_problem(event.state)
@Job(
name="home_assistant_core_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,

View File

@ -304,7 +304,7 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
self.sys_homeassistant.websocket.send_message({ATTR_TYPE: "usb/scan"})
@Job()
@Job(name="home_assistant_module_backup")
async def backup(self, tar_file: tarfile.TarFile) -> None:
"""Backup Home Assistant Core config/ directory."""

View File

@ -40,7 +40,11 @@ class HomeAssistantSecrets(CoreSysAttributes):
"""Reload secrets."""
await self._read_secrets()
@Job(limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(seconds=60))
@Job(
name="home_assistant_secrets_read",
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(seconds=60),
)
async def _read_secrets(self):
"""Read secrets.yaml into memory."""
if not self.path_secrets.exists():

View File

@ -107,7 +107,7 @@ class NetworkManager(CoreSysAttributes):
return Interface.from_dbus_interface(self.sys_dbus.network.get(inet_name))
@Job(conditions=[JobCondition.HOST_NETWORK])
@Job(name="network_manager_load", conditions=[JobCondition.HOST_NETWORK])
async def load(self):
"""Load network information and reapply defaults over dbus."""
# Apply current settings on each interface so OS can update any out of date defaults

View File

@ -232,7 +232,11 @@ class SoundControl(CoreSysAttributes):
await self.sys_run_in_executor(_activate_profile)
await self.update()
@Job(limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(seconds=10))
@Job(
name="sound_control_update",
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(seconds=10),
)
async def update(self):
"""Update properties over dbus."""
_LOGGER.info("Updating PulseAudio information")

View File

@ -29,6 +29,7 @@ class SupervisorJob:
"""Representation of a job running in supervisor."""
name: str = field(on_setattr=frozen)
reference: str | None = None
progress: int = field(default=0, validator=[ge(0), le(100)])
stage: str | None = None
uuid: UUID = field(init=False, factory=lambda: uuid4().hex, on_setattr=frozen)
@ -86,9 +87,11 @@ class JobManager(FileConfiguration, CoreSysAttributes):
"""Set a list of ignored condition."""
self._data[ATTR_IGNORE_CONDITIONS] = value
def new_job(self, name: str, initial_stage: str | None = None) -> SupervisorJob:
def new_job(
self, name: str, reference: str | None = None, initial_stage: str | None = None
) -> SupervisorJob:
"""Create a new job."""
job = SupervisorJob(name, stage=initial_stage)
job = SupervisorJob(name, reference=reference, stage=initial_stage)
self._jobs[job.uuid] = job
return job

View File

@ -8,6 +8,10 @@ FILE_CONFIG_JOBS = Path(SUPERVISOR_DATA, "jobs.json")
ATTR_IGNORE_CONDITIONS = "ignore_conditions"
JOB_GROUP_ADDON = "addon_{slug}"
JOB_GROUP_DOCKER_INTERFACE = "container_{name}"
JOB_GROUP_HOME_ASSISTANT_CORE = "home_assistant_core"
class JobCondition(str, Enum):
"""Job condition enum."""
@ -36,3 +40,6 @@ class JobExecutionLimit(str, Enum):
THROTTLE_RATE_LIMIT = "throttle_rate_limit"
GROUP_ONCE = "group_once"
GROUP_WAIT = "group_wait"
GROUP_THROTTLE = "group_throttle"
GROUP_THROTTLE_WAIT = "group_throttle_wait"
GROUP_THROTTLE_RATE_LIMIT = "group_throttle_rate_limit"

View File

@ -21,6 +21,7 @@ from .const import JobCondition, JobExecutionLimit
from .job_group import JobGroup
_LOGGER: logging.Logger = logging.getLogger(__package__)
_JOB_NAMES: set[str] = set()
class Job(CoreSysAttributes):
@ -28,7 +29,7 @@ class Job(CoreSysAttributes):
def __init__(
self,
name: str | None = None,
name: str,
conditions: list[JobCondition] | None = None,
cleanup: bool = True,
on_condition: JobException | None = None,
@ -39,6 +40,10 @@ class Job(CoreSysAttributes):
throttle_max_calls: int | None = None,
):
"""Initialize the Job class."""
if name in _JOB_NAMES:
raise RuntimeError(f"A job already exists with name {name}!")
_JOB_NAMES.add(name)
self.name = name
self.conditions = conditions
self.cleanup = cleanup
@ -48,12 +53,8 @@ class Job(CoreSysAttributes):
self.throttle_max_calls = throttle_max_calls
self._lock: asyncio.Semaphore | None = None
self._method = None
self._last_call = datetime.min
self._rate_limited_calls: list[datetime] | None = None
self._job_group_limit = self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
)
self._last_call: dict[str | None, datetime] = {}
self._rate_limited_calls: dict[str, list[datetime]] | None = None
# Validate Options
if (
@ -62,19 +63,70 @@ class Job(CoreSysAttributes):
JobExecutionLimit.THROTTLE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
)
and self._throttle_period is None
):
raise RuntimeError("Using Job without a Throttle period!")
raise RuntimeError(
f"Job {name} is using execution limit {limit.value} without a throttle period!"
)
if self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT:
if self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
if self.throttle_max_calls is None:
raise RuntimeError("Using rate limit without throttle max calls!")
raise RuntimeError(
f"Job {name} is using execution limit {limit.value} without throttle max calls!"
)
self._rate_limited_calls = []
self._rate_limited_calls = {}
@property
def throttle_period(self) -> timedelta | None:
def last_call(self, group_name: str | None = None) -> datetime:
"""Return last call datetime."""
return self._last_call.get(group_name, datetime.min)
def set_last_call(self, value: datetime, group_name: str | None = None) -> None:
"""Set last call datetime."""
self._last_call[group_name] = value
def rate_limited_calls(
self, group_name: str | None = None
) -> list[datetime] | None:
"""Return rate limited calls if used."""
if self._rate_limited_calls is None:
return None
return self._rate_limited_calls.get(group_name, [])
def add_rate_limited_call(
self, value: datetime, group_name: str | None = None
) -> None:
"""Add a rate limited call to list if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
)
if group_name in self._rate_limited_calls:
self._rate_limited_calls[group_name].append(value)
else:
self._rate_limited_calls[group_name] = [value]
def set_rate_limited_calls(
self, value: list[datetime], group_name: str | None = None
) -> None:
"""Set rate limited calls if used."""
if self._rate_limited_calls is None:
raise RuntimeError(
f"Rate limited calls not available for limit type {self.limit}"
)
self._rate_limited_calls[group_name] = value
def throttle_period(self, group_name: str | None = None) -> timedelta | None:
"""Return throttle period."""
if self._throttle_period is None:
return None
@ -83,14 +135,13 @@ class Job(CoreSysAttributes):
return self._throttle_period
return self._throttle_period(
self.coresys, self._last_call, self._rate_limited_calls
self.coresys,
self.last_call(group_name),
self.rate_limited_calls(group_name),
)
def _post_init(self, obj: JobGroup | CoreSysAttributes) -> None:
def _post_init(self, obj: JobGroup | CoreSysAttributes) -> JobGroup | None:
"""Runtime init."""
if self.name is None:
self.name = str(self._method.__qualname__).lower().replace(".", "_")
# Coresys
try:
self.coresys = obj.coresys
@ -99,8 +150,18 @@ class Job(CoreSysAttributes):
if not self.coresys:
raise RuntimeError(f"Job on {self.name} need to be an coresys object!")
# Setup lock for limits
if self._lock is None:
self._lock = asyncio.Semaphore()
# Job groups
if self._job_group_limit:
if self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
JobExecutionLimit.GROUP_THROTTLE,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
try:
_ = obj.acquire and obj.release
except AttributeError:
@ -108,9 +169,8 @@ class Job(CoreSysAttributes):
f"Job on {self.name} need to be a JobGroup to use group based limits!"
) from None
# Others
if self._lock is None:
self._lock = asyncio.Semaphore()
return obj
return None
def __call__(self, method):
"""Call the wrapper logic."""
@ -123,9 +183,11 @@ class Job(CoreSysAttributes):
This method must be on an instance of CoreSysAttributes. If a JOB_GROUP limit
is used, then it must be on an instance of JobGroup.
"""
self._post_init(obj)
job = self.sys_jobs.new_job(self.name)
job_group = self._post_init(obj)
group_name: str | None = job_group.group_name if job_group else None
job = self.sys_jobs.new_job(
self.name, job_group.job_reference if job_group else None
)
# Handle condition
if self.conditions:
@ -141,46 +203,63 @@ class Job(CoreSysAttributes):
# Handle exection limits
if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE):
await self._acquire_exection_limit()
elif self._job_group_limit:
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try:
await obj.acquire(job, self.limit == JobExecutionLimit.GROUP_WAIT)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit == JobExecutionLimit.THROTTLE:
time_since_last_call = datetime.now() - self._last_call
if time_since_last_call < self.throttle_period:
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit == JobExecutionLimit.THROTTLE_WAIT:
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self._last_call
if time_since_last_call < self.throttle_period:
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit == JobExecutionLimit.THROTTLE_RATE_LIMIT:
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if len(self._rate_limited_calls) >= self.throttle_max_calls:
self._rate_limited_calls = [
call
for call in self._rate_limited_calls
if call > datetime.now() - self.throttle_period
]
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call > datetime.now() - self.throttle_period(group_name)
],
group_name,
)
if len(self._rate_limited_calls) >= self.throttle_max_calls:
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period}",
f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)
# Execute Job
with job.start(on_done=self.sys_jobs.remove_job if self.cleanup else None):
try:
self._last_call = datetime.now()
if self._rate_limited_calls is not None:
self._rate_limited_calls.append(self._last_call)
self.set_last_call(datetime.now(), group_name)
if self.rate_limited_calls(group_name) is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
)
return await self._method(obj, *args, **kwargs)
except HassioError as err:
@ -191,7 +270,10 @@ class Job(CoreSysAttributes):
raise JobException() from err
finally:
self._release_exception_limits()
if self._job_group_limit:
if self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
obj.release()
return wrapper
@ -314,6 +396,7 @@ class Job(CoreSysAttributes):
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
return
@ -331,6 +414,7 @@ class Job(CoreSysAttributes):
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
return
self._lock.release()

View File

@ -16,13 +16,16 @@ class JobGroup(CoreSysAttributes):
higher-level task and should not need to relinquish the lock in between.
"""
def __init__(self, coresys: CoreSys, group_name: str) -> None:
def __init__(
self, coresys: CoreSys, group_name: str, job_reference: str | None = None
) -> None:
"""Initialize object."""
self.coresys: CoreSys = coresys
self._group_name: str = group_name
self._lock: Lock = Lock()
self._active_job: SupervisorJob | None = None
self._parent_jobs: list[SupervisorJob] = []
self._job_reference: str | None = job_reference
@property
def active_job(self) -> SupervisorJob | None:
@ -43,6 +46,11 @@ class JobGroup(CoreSysAttributes):
and self.active_job == task_job
)
@property
def job_reference(self) -> str | None:
"""Return value to use as reference for all jobs created for this job group."""
return self._job_reference
async def acquire(self, job: SupervisorJob, wait: bool = False) -> None:
"""Acquire the lock for the group for the specified job."""
# If there's another job running and we're not waiting, raise

View File

@ -83,7 +83,10 @@ class Tasks(CoreSysAttributes):
_LOGGER.info("All core tasks are scheduled")
@Job(conditions=ADDON_UPDATE_CONDITIONS + [JobCondition.RUNNING])
@Job(
name="tasks_update_addons",
conditions=ADDON_UPDATE_CONDITIONS + [JobCondition.RUNNING],
)
async def _update_addons(self):
"""Check if an update is available for an Add-on and update it."""
start_tasks: list[Awaitable[None]] = []
@ -112,13 +115,14 @@ class Tasks(CoreSysAttributes):
await asyncio.gather(*start_tasks)
@Job(
name="tasks_update_supervisor",
conditions=[
JobCondition.AUTO_UPDATE,
JobCondition.FREE_SPACE,
JobCondition.HEALTHY,
JobCondition.INTERNET_HOST,
JobCondition.RUNNING,
]
],
)
async def _update_supervisor(self):
"""Check and run update of Supervisor Supervisor."""
@ -172,7 +176,7 @@ class Tasks(CoreSysAttributes):
finally:
self._cache[HASS_WATCHDOG_API] = 0
@Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
@Job(name="tasks_update_cli", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
async def _update_cli(self):
"""Check and run update of cli."""
if not self.sys_plugins.cli.need_update:
@ -183,7 +187,7 @@ class Tasks(CoreSysAttributes):
)
await self.sys_plugins.cli.update()
@Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
@Job(name="tasks_update_dns", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
async def _update_dns(self):
"""Check and run update of CoreDNS plugin."""
if not self.sys_plugins.dns.need_update:
@ -195,7 +199,7 @@ class Tasks(CoreSysAttributes):
)
await self.sys_plugins.dns.update()
@Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
@Job(name="tasks_update_audio", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
async def _update_audio(self):
"""Check and run update of PulseAudio plugin."""
if not self.sys_plugins.audio.need_update:
@ -207,7 +211,7 @@ class Tasks(CoreSysAttributes):
)
await self.sys_plugins.audio.update()
@Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
@Job(name="tasks_update_observer", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
async def _update_observer(self):
"""Check and run update of Observer plugin."""
if not self.sys_plugins.observer.need_update:
@ -219,7 +223,7 @@ class Tasks(CoreSysAttributes):
)
await self.sys_plugins.observer.update()
@Job(conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
@Job(name="tasks_update_multicast", conditions=PLUGIN_AUTO_UPDATE_CONDITIONS)
async def _update_multicast(self):
"""Check and run update of multicast."""
if not self.sys_plugins.multicast.need_update:
@ -292,7 +296,7 @@ class Tasks(CoreSysAttributes):
# Adjust state
addon.state = AddonState.STOPPED
@Job(conditions=[JobCondition.SUPERVISOR_UPDATED])
@Job(name="tasks_reload_store", conditions=[JobCondition.SUPERVISOR_UPDATED])
async def _reload_store(self) -> None:
"""Reload store and check for addon updates."""
await self.sys_store.reload()

View File

@ -139,7 +139,7 @@ class MountManager(FileConfiguration, CoreSysAttributes):
]
)
@Job(conditions=[JobCondition.MOUNT_AVAILABLE])
@Job(name="mount_manager_reload", conditions=[JobCondition.MOUNT_AVAILABLE])
async def reload(self) -> None:
"""Update mounts info via dbus and reload failed mounts."""
if not self.mounts:
@ -180,9 +180,17 @@ class MountManager(FileConfiguration, CoreSysAttributes):
],
)
@Job(conditions=[JobCondition.MOUNT_AVAILABLE], on_condition=MountJobError)
@Job(
name="mount_manager_create_mount",
conditions=[JobCondition.MOUNT_AVAILABLE],
on_condition=MountJobError,
)
async def create_mount(self, mount: Mount) -> None:
"""Add/update a mount."""
# Add mount name to job
if job := self.sys_jobs.get_job():
job.reference = mount.name
if mount.name in self._mounts:
_LOGGER.debug("Mount '%s' exists, unmounting then mounting from new config")
await self.remove_mount(mount.name, retain_entry=True)
@ -200,9 +208,17 @@ class MountManager(FileConfiguration, CoreSysAttributes):
elif mount.usage == MountUsage.SHARE:
await self._bind_share(mount)
@Job(conditions=[JobCondition.MOUNT_AVAILABLE], on_condition=MountJobError)
@Job(
name="mount_manager_remove_mount",
conditions=[JobCondition.MOUNT_AVAILABLE],
on_condition=MountJobError,
)
async def remove_mount(self, name: str, *, retain_entry: bool = False) -> None:
"""Remove a mount."""
# Add mount name to job
if job := self.sys_jobs.get_job():
job.reference = name
if name not in self._mounts:
raise MountNotFound(
f"Cannot remove '{name}', no mount exists with that name"
@ -223,9 +239,17 @@ class MountManager(FileConfiguration, CoreSysAttributes):
return mount
@Job(conditions=[JobCondition.MOUNT_AVAILABLE], on_condition=MountJobError)
@Job(
name="mount_manager_reload_mount",
conditions=[JobCondition.MOUNT_AVAILABLE],
on_condition=MountJobError,
)
async def reload_mount(self, name: str) -> None:
"""Reload a mount to retry mounting with same config."""
# Add mount name to job
if job := self.sys_jobs.get_job():
job.reference = name
if name not in self._mounts:
raise MountNotFound(
f"Cannot reload '{name}', no mount exists with that name"

View File

@ -165,7 +165,7 @@ class DataDisk(CoreSysAttributes):
if block.drive == drive.object_path
]
@Job(conditions=[JobCondition.OS_AGENT])
@Job(name="data_disk_load", conditions=[JobCondition.OS_AGENT])
async def load(self) -> None:
"""Load DataDisk feature."""
# Update datadisk details on OS-Agent
@ -173,6 +173,7 @@ class DataDisk(CoreSysAttributes):
await self.sys_dbus.agent.datadisk.reload_device()
@Job(
name="data_disk_migrate",
conditions=[JobCondition.HAOS, JobCondition.OS_AGENT, JobCondition.HEALTHY],
limit=JobExecutionLimit.ONCE,
on_condition=HassOSJobError,

View File

@ -156,6 +156,7 @@ class OSManager(CoreSysAttributes):
)
@Job(
name="os_manager_config_sync",
conditions=[JobCondition.HAOS],
on_condition=HassOSJobError,
)
@ -170,6 +171,7 @@ class OSManager(CoreSysAttributes):
await self.sys_host.services.restart("hassos-config.service")
@Job(
name="os_manager_update",
conditions=[
JobCondition.HAOS,
JobCondition.INTERNET_SYSTEM,
@ -225,7 +227,7 @@ class OSManager(CoreSysAttributes):
)
raise HassOSUpdateError()
@Job(conditions=[JobCondition.HAOS])
@Job(name="os_manager_mark_healthy", conditions=[JobCondition.HAOS])
async def mark_healthy(self) -> None:
"""Set booted partition as good for rauc."""
try:

View File

@ -118,6 +118,7 @@ class PluginAudio(PluginBase):
self.save_data()
@Job(
name="plugin_audio_update",
conditions=PLUGIN_UPDATE_CONDITIONS,
on_condition=AudioJobError,
)
@ -218,6 +219,7 @@ class PluginAudio(PluginBase):
) from err
@Job(
name="plugin_audio_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,

View File

@ -75,6 +75,7 @@ class PluginCli(PluginBase):
self.save_data()
@Job(
name="plugin_cli_update",
conditions=PLUGIN_UPDATE_CONDITIONS,
on_condition=CliJobError,
)
@ -151,6 +152,7 @@ class PluginCli(PluginBase):
capture_exception(err)
@Job(
name="plugin_cli_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,

View File

@ -187,6 +187,7 @@ class PluginDns(PluginBase):
await self.write_hosts()
@Job(
name="plugin_dns_update",
conditions=PLUGIN_UPDATE_CONDITIONS,
on_condition=CoreDNSJobError,
)
@ -269,6 +270,7 @@ class PluginDns(PluginBase):
return await super().watchdog_container(event)
@Job(
name="plugin_dns_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,

View File

@ -71,6 +71,7 @@ class PluginMulticast(PluginBase):
self.save_data()
@Job(
name="plugin_multicast_update",
conditions=PLUGIN_UPDATE_CONDITIONS,
on_condition=MulticastJobError,
)
@ -146,6 +147,7 @@ class PluginMulticast(PluginBase):
capture_exception(err)
@Job(
name="plugin_multicast_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,

View File

@ -79,6 +79,7 @@ class PluginObserver(PluginBase):
self.save_data()
@Job(
name="plugin_observer_update",
conditions=PLUGIN_UPDATE_CONDITIONS,
on_condition=ObserverJobError,
)
@ -156,6 +157,7 @@ class PluginObserver(PluginBase):
capture_exception(err)
@Job(
name="plugin_observer_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,

View File

@ -22,6 +22,7 @@ class CheckAddonPwned(CheckBase):
"""CheckAddonPwned class for check."""
@Job(
name="check_addon_pwned_run",
conditions=[JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=24),
@ -62,7 +63,7 @@ class CheckAddonPwned(CheckBase):
except PwnedError:
pass
@Job(conditions=[JobCondition.INTERNET_SYSTEM])
@Job(name="check_addon_pwned_approve", conditions=[JobCondition.INTERNET_SYSTEM])
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
addon = self.sys_addons.get(reference)

View File

@ -23,6 +23,7 @@ class CheckDNSServer(CheckBase):
"""CheckDNSServer class for check."""
@Job(
name="check_dns_server_run",
conditions=[JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=24),
@ -42,7 +43,7 @@ class CheckDNSServer(CheckBase):
)
capture_exception(results[i])
@Job(conditions=[JobCondition.INTERNET_SYSTEM])
@Job(name="check_dns_server_approve", conditions=[JobCondition.INTERNET_SYSTEM])
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
if reference not in self.dns_servers:

View File

@ -23,6 +23,7 @@ class CheckDNSServerIPv6(CheckBase):
"""CheckDNSServerIPv6 class for check."""
@Job(
name="check_dns_server_ipv6_run",
conditions=[JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=24),
@ -47,7 +48,9 @@ class CheckDNSServerIPv6(CheckBase):
)
capture_exception(results[i])
@Job(conditions=[JobCondition.INTERNET_SYSTEM])
@Job(
name="check_dns_server_ipv6_approve", conditions=[JobCondition.INTERNET_SYSTEM]
)
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
if reference not in self.dns_servers:

View File

@ -36,7 +36,10 @@ class ResolutionFixup(CoreSysAttributes):
"""Return a list of all fixups."""
return list(self._fixups.values())
@Job(conditions=[JobCondition.HEALTHY, JobCondition.RUNNING])
@Job(
name="resolution_fixup_run_autofix",
conditions=[JobCondition.HEALTHY, JobCondition.RUNNING],
)
async def run_autofix(self) -> None:
"""Run all startup fixes."""
_LOGGER.info("Starting system autofix at state %s", self.sys_core.state)

View File

@ -25,6 +25,7 @@ class FixupStoreExecuteReload(FixupBase):
"""Storage class for fixup."""
@Job(
name="fixup_store_execute_reload_process",
conditions=[JobCondition.INTERNET_SYSTEM, JobCondition.FREE_SPACE],
on_condition=ResolutionFixupJobError,
)

View File

@ -26,6 +26,7 @@ class FixupStoreExecuteReset(FixupBase):
"""Storage class for fixup."""
@Job(
name="fixup_store_execute_reset_process",
conditions=[JobCondition.INTERNET_SYSTEM, JobCondition.FREE_SPACE],
on_condition=ResolutionFixupJobError,
)

View File

@ -22,6 +22,7 @@ class FixupSystemExecuteIntegrity(FixupBase):
"""Storage class for fixup."""
@Job(
name="fixup_system_execute_integrity_process",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=ResolutionFixupJobError,
limit=JobExecutionLimit.THROTTLE,

View File

@ -103,6 +103,7 @@ class Security(FileConfiguration, CoreSysAttributes):
return
@Job(
name="security_manager_integrity_check",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=SecurityJobError,
limit=JobExecutionLimit.ONCE,

View File

@ -80,7 +80,11 @@ class StoreManager(CoreSysAttributes, FileConfiguration):
self._data[ATTR_REPOSITORIES], add_with_errors=True
)
@Job(conditions=[JobCondition.SUPERVISOR_UPDATED], on_condition=StoreJobError)
@Job(
name="store_manager_reload",
conditions=[JobCondition.SUPERVISOR_UPDATED],
on_condition=StoreJobError,
)
async def reload(self) -> None:
"""Update add-ons from repository and reload list."""
tasks = [self.sys_create_task(repository.update()) for repository in self.all]
@ -92,6 +96,7 @@ class StoreManager(CoreSysAttributes, FileConfiguration):
self._read_addons()
@Job(
name="store_manager_add_repository",
conditions=[JobCondition.INTERNET_SYSTEM, JobCondition.SUPERVISOR_UPDATED],
on_condition=StoreJobError,
)

View File

@ -77,6 +77,7 @@ class GitRepo(CoreSysAttributes):
raise StoreGitError() from err
@Job(
name="git_repo_clone",
conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_SYSTEM],
on_condition=StoreJobError,
)
@ -112,6 +113,7 @@ class GitRepo(CoreSysAttributes):
raise StoreGitCloneError() from err
@Job(
name="git_repo_pull",
conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_SYSTEM],
on_condition=StoreJobError,
)

View File

@ -211,7 +211,11 @@ class Supervisor(CoreSysAttributes):
self.sys_create_task(self.sys_core.stop())
@Job(conditions=[JobCondition.RUNNING], on_condition=SupervisorJobError)
@Job(
name="supervisor_restart",
conditions=[JobCondition.RUNNING],
on_condition=SupervisorJobError,
)
async def restart(self) -> None:
"""Restart Supervisor soft."""
self.sys_core.exit_code = 100
@ -255,6 +259,7 @@ class Supervisor(CoreSysAttributes):
_LOGGER.error("Repair of Supervisor failed")
@Job(
name="supervisor_check_connectivity",
limit=JobExecutionLimit.THROTTLE,
throttle_period=_check_connectivity_throttle_period,
)

View File

@ -181,6 +181,7 @@ class Updater(FileConfiguration, CoreSysAttributes):
self._data[ATTR_AUTO_UPDATE] = value
@Job(
name="updater_fetch_data",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=UpdaterJobError,
limit=JobExecutionLimit.THROTTLE_WAIT,

View File

@ -3,6 +3,7 @@
import asyncio
from datetime import timedelta
from unittest.mock import AsyncMock, Mock, PropertyMock, patch
from uuid import uuid4
from aiohttp.client_exceptions import ClientError
import pytest
@ -37,7 +38,7 @@ async def test_healthy(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.HEALTHY])
@Job(name="test_healthy_execute", conditions=[JobCondition.HEALTHY])
async def execute(self):
"""Execute the class method."""
return True
@ -79,12 +80,18 @@ async def test_internet(
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.INTERNET_HOST])
@Job(
name=f"test_internet_execute_host_{uuid4().hex}",
conditions=[JobCondition.INTERNET_HOST],
)
async def execute_host(self):
"""Execute the class method."""
return True
@Job(conditions=[JobCondition.INTERNET_SYSTEM])
@Job(
name=f"test_internet_execute_system_{uuid4().hex}",
conditions=[JobCondition.INTERNET_SYSTEM],
)
async def execute_system(self):
"""Execute the class method."""
return True
@ -120,7 +127,7 @@ async def test_free_space(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.FREE_SPACE])
@Job(name="test_free_space_execute", conditions=[JobCondition.FREE_SPACE])
async def execute(self):
"""Execute the class method."""
return True
@ -146,7 +153,7 @@ async def test_haos(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.HAOS])
@Job(name="test_haos_execute", conditions=[JobCondition.HAOS])
async def execute(self):
"""Execute the class method."""
return True
@ -172,7 +179,7 @@ async def test_exception(coresys: CoreSys, capture_exception: Mock):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.HEALTHY])
@Job(name="test_exception_execute", conditions=[JobCondition.HEALTHY])
async def execute(self):
"""Execute the class method."""
raise HassioError()
@ -196,7 +203,9 @@ async def test_exception_not_handle(coresys: CoreSys, capture_exception: Mock):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.HEALTHY])
@Job(
name="test_exception_not_handle_execute", conditions=[JobCondition.HEALTHY]
)
async def execute(self):
"""Execute the class method."""
raise err
@ -219,7 +228,7 @@ async def test_running(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.RUNNING])
@Job(name="test_running_execute", conditions=[JobCondition.RUNNING])
async def execute(self):
"""Execute the class method."""
return True
@ -246,7 +255,11 @@ async def test_exception_conditions(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.RUNNING], on_condition=HassioError)
@Job(
name="test_exception_conditions_execute",
conditions=[JobCondition.RUNNING],
on_condition=HassioError,
)
async def execute(self):
"""Execute the class method."""
return True
@ -274,7 +287,10 @@ async def test_execution_limit_single_wait(
self.coresys = coresys
self.run = asyncio.Lock()
@Job(limit=JobExecutionLimit.SINGLE_WAIT)
@Job(
name="test_execution_limit_single_wait_execute",
limit=JobExecutionLimit.SINGLE_WAIT,
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
@ -300,7 +316,11 @@ async def test_execution_limit_throttle_wait(
self.run = asyncio.Lock()
self.call = 0
@Job(limit=JobExecutionLimit.THROTTLE_WAIT, throttle_period=timedelta(hours=1))
@Job(
name="test_execution_limit_throttle_wait_execute",
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(hours=1),
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
@ -333,6 +353,7 @@ async def test_execution_limit_throttle_rate_limit(
self.call = 0
@Job(
name=f"test_execution_limit_throttle_rate_limit_execute_{uuid4().hex}",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=timedelta(hours=1),
throttle_max_calls=2,
@ -370,7 +391,11 @@ async def test_execution_limit_throttle(coresys: CoreSys, loop: asyncio.BaseEven
self.run = asyncio.Lock()
self.call = 0
@Job(limit=JobExecutionLimit.THROTTLE, throttle_period=timedelta(hours=1))
@Job(
name="test_execution_limit_throttle_execute",
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=1),
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
@ -398,7 +423,11 @@ async def test_execution_limit_once(coresys: CoreSys, loop: asyncio.BaseEventLoo
self.coresys = coresys
self.run = asyncio.Lock()
@Job(limit=JobExecutionLimit.ONCE, on_condition=JobException)
@Job(
name="test_execution_limit_once_execute",
limit=JobExecutionLimit.ONCE,
on_condition=JobException,
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
@ -425,7 +454,10 @@ async def test_supervisor_updated(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.SUPERVISOR_UPDATED])
@Job(
name="test_supervisor_updated_execute",
conditions=[JobCondition.SUPERVISOR_UPDATED],
)
async def execute(self) -> bool:
"""Execute the class method."""
return True
@ -453,7 +485,10 @@ async def test_plugins_updated(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.PLUGINS_UPDATED])
@Job(
name="test_plugins_updated_execute",
conditions=[JobCondition.PLUGINS_UPDATED],
)
async def execute(self) -> bool:
"""Execute the class method."""
return True
@ -486,7 +521,7 @@ async def test_auto_update(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.AUTO_UPDATE])
@Job(name="test_auto_update_execute", conditions=[JobCondition.AUTO_UPDATE])
async def execute(self) -> bool:
"""Execute the class method."""
return True
@ -512,7 +547,7 @@ async def test_os_agent(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.OS_AGENT])
@Job(name="test_os_agent_execute", conditions=[JobCondition.OS_AGENT])
async def execute(self) -> bool:
"""Execute the class method."""
return True
@ -541,7 +576,7 @@ async def test_host_network(coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.HOST_NETWORK])
@Job(name="test_host_network_execute", conditions=[JobCondition.HOST_NETWORK])
async def execute(self) -> bool:
"""Execute the class method."""
return True
@ -567,23 +602,39 @@ async def test_job_group_once(coresys: CoreSys, loop: asyncio.BaseEventLoop):
super().__init__(coresys, "TestClass")
self.event = asyncio.Event()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException)
@Job(
name="test_job_group_once_inner_execute",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=JobException,
)
async def inner_execute(self) -> bool:
"""Inner class method called by execute, group level lock allows this."""
await self.event.wait()
return True
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException)
@Job(
name="test_job_group_once_execute",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=JobException,
)
async def execute(self) -> bool:
"""Execute the class method."""
return await self.inner_execute()
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=JobException)
@Job(
name="test_job_group_once_separate_execute",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=JobException,
)
async def separate_execute(self) -> bool:
"""Alternate execute method that shares group lock."""
return True
@Job(limit=JobExecutionLimit.ONCE, on_condition=JobException)
@Job(
name="test_job_group_once_unrelated",
limit=JobExecutionLimit.ONCE,
on_condition=JobException,
)
async def unrelated_method(self) -> bool:
"""Unrelated method, sparate job with separate lock."""
return True
@ -622,18 +673,30 @@ async def test_job_group_wait(coresys: CoreSys, loop: asyncio.BaseEventLoop):
self.other_count = 0
self.event = asyncio.Event()
@Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException)
@Job(
name="test_job_group_wait_inner_execute",
limit=JobExecutionLimit.GROUP_WAIT,
on_condition=JobException,
)
async def inner_execute(self) -> None:
"""Inner class method called by execute, group level lock allows this."""
self.execute_count += 1
await self.event.wait()
@Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException)
@Job(
name="test_job_group_wait_execute",
limit=JobExecutionLimit.GROUP_WAIT,
on_condition=JobException,
)
async def execute(self) -> None:
"""Execute the class method."""
await self.inner_execute()
@Job(limit=JobExecutionLimit.GROUP_WAIT, on_condition=JobException)
@Job(
name="test_job_group_wait_separate_execute",
limit=JobExecutionLimit.GROUP_WAIT,
on_condition=JobException,
)
async def separate_execute(self) -> None:
"""Alternate execute method that shares group lock."""
self.other_count += 1
@ -670,7 +733,7 @@ async def test_job_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop):
self.event = asyncio.Event()
self.job: SupervisorJob | None = None
@Job(limit=JobExecutionLimit.ONCE)
@Job(name="test_job_cleanup_execute", limit=JobExecutionLimit.ONCE)
async def execute(self):
"""Execute the class method."""
self.job = coresys.jobs.get_job()
@ -703,7 +766,11 @@ async def test_job_skip_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop):
self.event = asyncio.Event()
self.job: SupervisorJob | None = None
@Job(limit=JobExecutionLimit.ONCE, cleanup=False)
@Job(
name="test_job_skip_cleanup_execute",
limit=JobExecutionLimit.ONCE,
cleanup=False,
)
async def execute(self):
"""Execute the class method."""
self.job = coresys.jobs.get_job()
@ -722,3 +789,154 @@ async def test_job_skip_cleanup(coresys: CoreSys, loop: asyncio.BaseEventLoop):
assert coresys.jobs.jobs == [test.job]
assert test.job.done
async def test_execution_limit_group_throttle(
coresys: CoreSys, loop: asyncio.BaseEventLoop
):
"""Test the group throttle execution limit."""
class TestClass(JobGroup):
"""Test class."""
def __init__(self, coresys: CoreSys, reference: str):
"""Initialize the test class."""
super().__init__(coresys, f"test_class_{reference}", reference)
self.run = asyncio.Lock()
self.call = 0
@Job(
name="test_execution_limit_group_throttle_execute",
limit=JobExecutionLimit.GROUP_THROTTLE,
throttle_period=timedelta(milliseconds=95),
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
async with self.run:
await asyncio.sleep(sleep)
self.call += 1
test1 = TestClass(coresys, "test1")
test2 = TestClass(coresys, "test2")
# One call of each should work. The subsequent calls will be silently throttled due to period
await asyncio.gather(
test1.execute(0), test1.execute(0), test2.execute(0), test2.execute(0)
)
assert test1.call == 1
assert test2.call == 1
# First call to each will work again since period cleared. Second throttled once more as they don't wait
with time_machine.travel(utcnow() + timedelta(milliseconds=100)):
await asyncio.gather(
test1.execute(0.1),
test1.execute(0.1),
test2.execute(0.1),
test2.execute(0.1),
)
assert test1.call == 2
assert test2.call == 2
async def test_execution_limit_group_throttle_wait(
coresys: CoreSys, loop: asyncio.BaseEventLoop
):
"""Test the group throttle wait job execution limit."""
class TestClass(JobGroup):
"""Test class."""
def __init__(self, coresys: CoreSys, reference: str):
"""Initialize the test class."""
super().__init__(coresys, f"test_class_{reference}", reference)
self.run = asyncio.Lock()
self.call = 0
@Job(
name="test_execution_limit_group_throttle_wait_execute",
limit=JobExecutionLimit.GROUP_THROTTLE_WAIT,
throttle_period=timedelta(milliseconds=95),
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
async with self.run:
await asyncio.sleep(sleep)
self.call += 1
test1 = TestClass(coresys, "test1")
test2 = TestClass(coresys, "test2")
# One call of each should work. The subsequent calls will be silently throttled after waiting due to period
await asyncio.gather(
*[test1.execute(0), test1.execute(0), test2.execute(0), test2.execute(0)]
)
assert test1.call == 1
assert test2.call == 1
# All calls should work as we cleared the period. And tasks take longer then period and are queued
with time_machine.travel(utcnow() + timedelta(milliseconds=100)):
await asyncio.gather(
*[
test1.execute(0.1),
test1.execute(0.1),
test2.execute(0.1),
test2.execute(0.1),
]
)
assert test1.call == 3
assert test2.call == 3
@pytest.mark.parametrize("error", [None, PluginJobError])
async def test_execution_limit_group_throttle_rate_limit(
coresys: CoreSys, loop: asyncio.BaseEventLoop, error: JobException | None
):
"""Test the group throttle rate limit job execution limit."""
class TestClass(JobGroup):
"""Test class."""
def __init__(self, coresys: CoreSys, reference: str):
"""Initialize the test class."""
super().__init__(coresys, f"test_class_{reference}", reference)
self.run = asyncio.Lock()
self.call = 0
@Job(
name=f"test_execution_limit_group_throttle_rate_limit_execute_{uuid4().hex}",
limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
throttle_period=timedelta(hours=1),
throttle_max_calls=2,
on_condition=error,
)
async def execute(self):
"""Execute the class method."""
self.call += 1
test1 = TestClass(coresys, "test1")
test2 = TestClass(coresys, "test2")
await asyncio.gather(
*[test1.execute(), test1.execute(), test2.execute(), test2.execute()]
)
assert test1.call == 2
assert test2.call == 2
with pytest.raises(JobException if error is None else error):
await test1.execute()
with pytest.raises(JobException if error is None else error):
await test2.execute()
assert test1.call == 2
assert test2.call == 2
with time_machine.travel(utcnow() + timedelta(hours=1)):
await test1.execute()
await test2.execute()
assert test1.call == 3
assert test2.call == 3