Migrate to JobConcurrency and JobThrottle parameters (#6065)

This commit is contained in:
Stefan Agner 2025-08-05 13:24:44 +02:00 committed by GitHub
parent 8a1e6b0895
commit 9bee58a8b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 181 additions and 231 deletions

View File

@ -77,7 +77,7 @@ from ..exceptions import (
)
from ..hardware.data import Device
from ..homeassistant.const import WSEvent
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency, JobThrottle
from ..jobs.decorator import Job
from ..resolution.const import ContextType, IssueType, UnhealthyReason
from ..resolution.data import Issue
@ -733,8 +733,8 @@ class Addon(AddonModel):
@Job(
name="addon_unload",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def unload(self) -> None:
"""Unload add-on and remove data."""
@ -766,8 +766,8 @@ class Addon(AddonModel):
@Job(
name="addon_install",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def install(self) -> None:
"""Install and setup this addon."""
@ -807,8 +807,8 @@ class Addon(AddonModel):
@Job(
name="addon_uninstall",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def uninstall(
self, *, remove_config: bool, remove_image: bool = True
@ -873,8 +873,8 @@ class Addon(AddonModel):
@Job(
name="addon_update",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(self) -> asyncio.Task | None:
"""Update this addon to latest version.
@ -923,8 +923,8 @@ class Addon(AddonModel):
@Job(
name="addon_rebuild",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def rebuild(self) -> asyncio.Task | None:
"""Rebuild this addons container and image.
@ -1068,8 +1068,8 @@ class Addon(AddonModel):
@Job(
name="addon_start",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def start(self) -> asyncio.Task:
"""Set options and start add-on.
@ -1117,8 +1117,8 @@ class Addon(AddonModel):
@Job(
name="addon_stop",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def stop(self) -> None:
"""Stop add-on."""
@ -1131,8 +1131,8 @@ class Addon(AddonModel):
@Job(
name="addon_restart",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def restart(self) -> asyncio.Task:
"""Restart add-on.
@ -1166,8 +1166,8 @@ class Addon(AddonModel):
@Job(
name="addon_write_stdin",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def write_stdin(self, data) -> None:
"""Write data to add-on stdin."""
@ -1200,8 +1200,8 @@ class Addon(AddonModel):
@Job(
name="addon_begin_backup",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def begin_backup(self) -> bool:
"""Execute pre commands or stop addon if necessary.
@ -1222,8 +1222,8 @@ class Addon(AddonModel):
@Job(
name="addon_end_backup",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def end_backup(self) -> asyncio.Task | None:
"""Execute post commands or restart addon if necessary.
@ -1260,8 +1260,8 @@ class Addon(AddonModel):
@Job(
name="addon_backup",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def backup(self, tar_file: tarfile.TarFile) -> asyncio.Task | None:
"""Backup state of an add-on.
@ -1368,8 +1368,8 @@ class Addon(AddonModel):
@Job(
name="addon_restore",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def restore(self, tar_file: tarfile.TarFile) -> asyncio.Task | None:
"""Restore state of an add-on.
@ -1521,10 +1521,10 @@ class Addon(AddonModel):
@Job(
name="addon_restart_after_problem",
limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=AddonsJobError,
throttle=JobThrottle.GROUP_RATE_LIMIT,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed addon."""

View File

@ -27,7 +27,7 @@ from ..exceptions import (
BackupJobError,
BackupMountDownError,
)
from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobCondition, JobExecutionLimit
from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobConcurrency, JobCondition
from ..jobs.decorator import Job
from ..jobs.job_group import JobGroup
from ..mounts.mount import Mount
@ -583,9 +583,9 @@ class BackupManager(FileConfiguration, JobGroup):
@Job(
name="backup_manager_full_backup",
conditions=[JobCondition.RUNNING],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
cleanup=False,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def do_backup_full(
self,
@ -630,9 +630,9 @@ class BackupManager(FileConfiguration, JobGroup):
@Job(
name="backup_manager_partial_backup",
conditions=[JobCondition.RUNNING],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
cleanup=False,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def do_backup_partial(
self,
@ -810,9 +810,9 @@ class BackupManager(FileConfiguration, JobGroup):
JobCondition.INTERNET_SYSTEM,
JobCondition.RUNNING,
],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
cleanup=False,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def do_restore_full(
self,
@ -869,9 +869,9 @@ class BackupManager(FileConfiguration, JobGroup):
JobCondition.INTERNET_SYSTEM,
JobCondition.RUNNING,
],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
cleanup=False,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def do_restore_partial(
self,
@ -930,8 +930,8 @@ class BackupManager(FileConfiguration, JobGroup):
@Job(
name="backup_manager_freeze_all",
conditions=[JobCondition.RUNNING],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def freeze_all(self, timeout: float = DEFAULT_FREEZE_TIMEOUT) -> None:
"""Freeze system to prepare for an external backup such as an image snapshot."""
@ -999,9 +999,9 @@ class BackupManager(FileConfiguration, JobGroup):
@Job(
name="backup_manager_signal_thaw",
conditions=[JobCondition.FROZEN],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
internal=True,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def thaw_all(self) -> None:
"""Signal thaw task to begin unfreezing the system."""

View File

@ -39,7 +39,7 @@ from ..exceptions import (
)
from ..hardware.const import PolicyGroup
from ..hardware.data import Device
from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.const import JobConcurrency, JobCondition
from ..jobs.decorator import Job
from ..resolution.const import CGROUP_V2_VERSION, ContextType, IssueType, SuggestionType
from ..utils.sentry import async_capture_exception
@ -553,8 +553,8 @@ class DockerAddon(DockerInterface):
@Job(
name="docker_addon_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""
@ -619,8 +619,8 @@ class DockerAddon(DockerInterface):
@Job(
name="docker_addon_update",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(
self,
@ -647,8 +647,8 @@ class DockerAddon(DockerInterface):
@Job(
name="docker_addon_install",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def install(
self,
@ -735,8 +735,8 @@ class DockerAddon(DockerInterface):
@Job(
name="docker_addon_import_image",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def import_image(self, tar_file: Path) -> None:
"""Import a tar file as image."""
@ -750,7 +750,7 @@ class DockerAddon(DockerInterface):
with suppress(DockerError):
await self.cleanup()
@Job(name="docker_addon_cleanup", limit=JobExecutionLimit.GROUP_WAIT)
@Job(name="docker_addon_cleanup", concurrency=JobConcurrency.GROUP_QUEUE)
async def cleanup(
self,
old_image: str | None = None,
@ -774,8 +774,8 @@ class DockerAddon(DockerInterface):
@Job(
name="docker_addon_write_stdin",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin."""
@ -808,8 +808,8 @@ class DockerAddon(DockerInterface):
@Job(
name="docker_addon_stop",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
@ -848,8 +848,8 @@ class DockerAddon(DockerInterface):
@Job(
name="docker_addon_hardware_events",
conditions=[JobCondition.OS_AGENT],
limit=JobExecutionLimit.SINGLE_WAIT,
internal=True,
concurrency=JobConcurrency.QUEUE,
)
async def _hardware_events(self, device: Device) -> None:
"""Process Hardware events for adjust device access."""

View File

@ -9,7 +9,7 @@ from ..const import DOCKER_CPU_RUNTIME_ALLOCATION
from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..hardware.const import PolicyGroup
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from .const import (
ENV_TIME,
@ -89,8 +89,8 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
@Job(
name="docker_audio_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""

View File

@ -4,7 +4,7 @@ import logging
from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from .const import ENV_TIME, ENV_TOKEN
from .interface import DockerInterface
@ -29,8 +29,8 @@ class DockerCli(DockerInterface, CoreSysAttributes):
@Job(
name="docker_cli_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""

View File

@ -6,7 +6,7 @@ from docker.types import Mount
from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from .const import ENV_TIME, MOUNT_DBUS, MountType
from .interface import DockerInterface
@ -31,8 +31,8 @@ class DockerDNS(DockerInterface, CoreSysAttributes):
@Job(
name="docker_dns_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""

View File

@ -12,7 +12,7 @@ from ..const import LABEL_MACHINE
from ..exceptions import DockerJobError
from ..hardware.const import PolicyGroup
from ..homeassistant.const import LANDINGPAGE
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from .const import (
ENV_TIME,
@ -161,8 +161,8 @@ class DockerHomeAssistant(DockerInterface):
@Job(
name="docker_home_assistant_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self, *, restore_job_id: str | None = None) -> None:
"""Run Docker image."""
@ -200,8 +200,8 @@ class DockerHomeAssistant(DockerInterface):
@Job(
name="docker_home_assistant_execute_command",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command."""

View File

@ -39,7 +39,7 @@ from ..exceptions import (
DockerRequestError,
DockerTrustError,
)
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobExecutionLimit
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency
from ..jobs.decorator import Job
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType, SuggestionType
@ -219,8 +219,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_install",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def install(
self,
@ -338,7 +338,7 @@ class DockerInterface(JobGroup, ABC):
return _container_state_from_model(docker_container)
@Job(name="docker_interface_attach", limit=JobExecutionLimit.GROUP_WAIT)
@Job(name="docker_interface_attach", concurrency=JobConcurrency.GROUP_QUEUE)
async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> None:
@ -376,8 +376,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""
@ -406,8 +406,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_stop",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
@ -421,8 +421,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_start",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def start(self) -> Awaitable[None]:
"""Start Docker container."""
@ -430,8 +430,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_remove",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def remove(self, *, remove_image: bool = True) -> None:
"""Remove Docker images."""
@ -448,8 +448,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_check_image",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def check_image(
self,
@ -497,8 +497,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_update",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
@ -526,7 +526,7 @@ class DockerInterface(JobGroup, ABC):
return b""
@Job(name="docker_interface_cleanup", limit=JobExecutionLimit.GROUP_WAIT)
@Job(name="docker_interface_cleanup", concurrency=JobConcurrency.GROUP_QUEUE)
async def cleanup(
self,
old_image: str | None = None,
@ -543,8 +543,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_restart",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def restart(self) -> Awaitable[None]:
"""Restart docker container."""
@ -554,8 +554,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_execute_command",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command."""
@ -619,8 +619,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_run_inside",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def run_inside(self, command: str) -> Awaitable[CommandReturn]:
"""Execute a command inside Docker container."""
@ -635,8 +635,8 @@ class DockerInterface(JobGroup, ABC):
@Job(
name="docker_interface_check_trust",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def check_trust(self) -> None:
"""Check trust of exists Docker image."""

View File

@ -4,7 +4,7 @@ import logging
from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from .const import ENV_TIME, Capabilities
from .interface import DockerInterface
@ -34,8 +34,8 @@ class DockerMulticast(DockerInterface, CoreSysAttributes):
@Job(
name="docker_multicast_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""

View File

@ -5,7 +5,7 @@ import logging
from ..const import DOCKER_IPV4_NETWORK_MASK, OBSERVER_DOCKER_NAME
from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from .const import ENV_TIME, ENV_TOKEN, MOUNT_DOCKER, RestartPolicy
from .interface import DockerInterface
@ -30,8 +30,8 @@ class DockerObserver(DockerInterface, CoreSysAttributes):
@Job(
name="docker_observer_run",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""

View File

@ -10,7 +10,7 @@ import docker
import requests
from ..exceptions import DockerError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from .const import PropagationMode
from .interface import DockerInterface
@ -45,7 +45,7 @@ class DockerSupervisor(DockerInterface):
if mount.get("Destination") == "/data"
)
@Job(name="docker_supervisor_attach", limit=JobExecutionLimit.GROUP_WAIT)
@Job(name="docker_supervisor_attach", concurrency=JobConcurrency.GROUP_QUEUE)
async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> None:
@ -77,7 +77,7 @@ class DockerSupervisor(DockerInterface):
ipv4=self.sys_docker.network.supervisor,
)
@Job(name="docker_supervisor_retag", limit=JobExecutionLimit.GROUP_WAIT)
@Job(name="docker_supervisor_retag", concurrency=JobConcurrency.GROUP_QUEUE)
def retag(self) -> Awaitable[None]:
"""Retag latest image to version."""
return self.sys_run_in_executor(self._retag)
@ -108,7 +108,10 @@ class DockerSupervisor(DockerInterface):
f"Can't retag Supervisor version: {err}", _LOGGER.error
) from err
@Job(name="docker_supervisor_update_start_tag", limit=JobExecutionLimit.GROUP_WAIT)
@Job(
name="docker_supervisor_update_start_tag",
concurrency=JobConcurrency.GROUP_QUEUE,
)
def update_start_tag(self, image: str, version: AwesomeVersion) -> Awaitable[None]:
"""Update start tag to new version."""
return self.sys_run_in_executor(self._update_start_tag, image, version)

View File

@ -15,7 +15,7 @@ from multidict import MultiMapping
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HomeAssistantAPIError, HomeAssistantAuthError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job
from ..utils import check_port, version_is_new_enough
from .const import LANDINGPAGE
@ -46,8 +46,8 @@ class HomeAssistantAPI(CoreSysAttributes):
@Job(
name="home_assistant_api_ensure_access_token",
limit=JobExecutionLimit.SINGLE_WAIT,
internal=True,
concurrency=JobConcurrency.QUEUE,
)
async def ensure_access_token(self) -> None:
"""Ensure there is an access token."""

View File

@ -28,7 +28,7 @@ from ..exceptions import (
HomeAssistantUpdateError,
JobException,
)
from ..jobs.const import JOB_GROUP_HOME_ASSISTANT_CORE, JobExecutionLimit
from ..jobs.const import JOB_GROUP_HOME_ASSISTANT_CORE, JobConcurrency, JobThrottle
from ..jobs.decorator import Job, JobCondition
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType
@ -123,8 +123,8 @@ class HomeAssistantCore(JobGroup):
@Job(
name="home_assistant_core_install_landing_page",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def install_landingpage(self) -> None:
"""Install a landing page."""
@ -171,8 +171,8 @@ class HomeAssistantCore(JobGroup):
@Job(
name="home_assistant_core_install",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def install(self) -> None:
"""Install a landing page."""
@ -222,8 +222,8 @@ class HomeAssistantCore(JobGroup):
JobCondition.PLUGINS_UPDATED,
JobCondition.SUPERVISOR_UPDATED,
],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(
self,
@ -324,8 +324,8 @@ class HomeAssistantCore(JobGroup):
@Job(
name="home_assistant_core_start",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def start(self) -> None:
"""Run Home Assistant docker."""
@ -359,8 +359,8 @@ class HomeAssistantCore(JobGroup):
@Job(
name="home_assistant_core_stop",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def stop(self, *, remove_container: bool = False) -> None:
"""Stop Home Assistant Docker."""
@ -371,8 +371,8 @@ class HomeAssistantCore(JobGroup):
@Job(
name="home_assistant_core_restart",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def restart(self, *, safe_mode: bool = False) -> None:
"""Restart Home Assistant Docker."""
@ -392,8 +392,8 @@ class HomeAssistantCore(JobGroup):
@Job(
name="home_assistant_core_rebuild",
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=HomeAssistantJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def rebuild(self, *, safe_mode: bool = False) -> None:
"""Rebuild Home Assistant Docker container."""
@ -546,9 +546,9 @@ class HomeAssistantCore(JobGroup):
@Job(
name="home_assistant_core_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
throttle=JobThrottle.RATE_LIMIT,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed Home Assistant."""

View File

@ -46,7 +46,8 @@ from ..exceptions import (
)
from ..hardware.const import PolicyGroup
from ..hardware.data import Device
from ..jobs.decorator import Job, JobExecutionLimit
from ..jobs.const import JobConcurrency, JobThrottle
from ..jobs.decorator import Job
from ..resolution.const import UnhealthyReason
from ..utils import remove_folder, remove_folder_with_excludes
from ..utils.common import FileConfiguration
@ -551,9 +552,10 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
@Job(
name="home_assistant_get_users",
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(minutes=5),
internal=True,
concurrency=JobConcurrency.QUEUE,
throttle=JobThrottle.THROTTLE,
)
async def get_users(self) -> list[IngressSessionDataUser]:
"""Get list of all configured users."""

View File

@ -6,7 +6,7 @@ from pathlib import Path
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import YamlFileError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency, JobThrottle
from ..jobs.decorator import Job
from ..utils.yaml import read_yaml_file
@ -43,9 +43,10 @@ class HomeAssistantSecrets(CoreSysAttributes):
@Job(
name="home_assistant_secrets_read",
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(seconds=60),
internal=True,
concurrency=JobConcurrency.QUEUE,
throttle=JobThrottle.THROTTLE,
)
async def _read_secrets(self):
"""Read secrets.yaml into memory."""

View File

@ -9,7 +9,7 @@ from pulsectl import Pulse, PulseError, PulseIndexError, PulseOperationFailed
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import PulseAudioError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobConcurrency, JobThrottle
from ..jobs.decorator import Job
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -236,8 +236,9 @@ class SoundControl(CoreSysAttributes):
@Job(
name="sound_control_update",
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(seconds=2),
concurrency=JobConcurrency.QUEUE,
throttle=JobThrottle.THROTTLE,
)
async def update(self, reload_pulse: bool = False):
"""Update properties over dbus."""

View File

@ -84,18 +84,3 @@ class JobThrottle(StrEnum):
RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period
GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE
GROUP_RATE_LIMIT = "group_rate_limit" # Group version of RATE_LIMIT
class JobExecutionLimit(StrEnum):
"""Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead."""
ONCE = "once"
SINGLE_WAIT = "single_wait"
THROTTLE = "throttle"
THROTTLE_WAIT = "throttle_wait"
THROTTLE_RATE_LIMIT = "throttle_rate_limit"
GROUP_ONCE = "group_once"
GROUP_WAIT = "group_wait"
GROUP_THROTTLE = "group_throttle"
GROUP_THROTTLE_WAIT = "group_throttle_wait"
GROUP_THROTTLE_RATE_LIMIT = "group_throttle_rate_limit"

View File

@ -20,7 +20,7 @@ from ..host.const import HostFeature
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
from ..utils.sentry import async_capture_exception
from . import SupervisorJob
from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle
from .const import JobConcurrency, JobCondition, JobThrottle
from .job_group import JobGroup
_LOGGER: logging.Logger = logging.getLogger(__package__)
@ -43,8 +43,6 @@ class Job(CoreSysAttributes):
| None = None,
throttle_max_calls: int | None = None,
internal: bool = False,
# Backward compatibility - DEPRECATED
limit: JobExecutionLimit | None = None,
): # pylint: disable=too-many-positional-arguments
"""Initialize the Job decorator.
@ -58,7 +56,6 @@ class Job(CoreSysAttributes):
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs).
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead.
Raises:
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy.
@ -79,54 +76,12 @@ class Job(CoreSysAttributes):
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
self._internal = internal
# Handle backward compatibility with limit parameter
if limit is not None:
if concurrency is not None or throttle is not None:
raise RuntimeError(
f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!"
)
# Map old limit values to new parameters
concurrency, throttle = self._map_limit_to_new_params(limit)
self.concurrency = concurrency
self.throttle = throttle
# Validate Options
self._validate_parameters()
def _map_limit_to_new_params(
self, limit: JobExecutionLimit
) -> tuple[JobConcurrency | None, JobThrottle | None]:
"""Map old limit parameter to new concurrency and throttle parameters."""
mapping = {
JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None),
JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None),
JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE),
JobExecutionLimit.THROTTLE_WAIT: (
JobConcurrency.QUEUE,
JobThrottle.THROTTLE,
),
JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT),
JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None),
JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None),
JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE),
JobExecutionLimit.GROUP_THROTTLE_WAIT: (
# Seems a bit counter intuitive, but GROUP_QUEUE deadlocks
# tests/jobs/test_job_decorator.py::test_execution_limit_group_throttle_wait
# The reason this deadlocks is because when using GROUP_QUEUE and the
# throttle limit is hit, the group lock is trying to be unlocked outside
# of the job context. The current implementation doesn't allow to unlock
# the group lock when the job is not running.
JobConcurrency.QUEUE,
JobThrottle.GROUP_THROTTLE,
),
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: (
None,
JobThrottle.GROUP_RATE_LIMIT,
),
}
return mapping.get(limit, (None, None))
def _validate_parameters(self) -> None:
"""Validate job parameters."""
# Validate throttle parameters

View File

@ -15,7 +15,8 @@ from ..exceptions import (
ObserverError,
)
from ..homeassistant.const import LANDINGPAGE, WSType
from ..jobs.decorator import Job, JobCondition, JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job, JobCondition
from ..plugins.const import PLUGIN_UPDATE_CONDITIONS
from ..utils.dt import utcnow
from ..utils.sentry import async_capture_exception
@ -160,7 +161,7 @@ class Tasks(CoreSysAttributes):
JobCondition.INTERNET_HOST,
JobCondition.RUNNING,
],
limit=JobExecutionLimit.ONCE,
concurrency=JobConcurrency.REJECT,
)
async def _update_supervisor(self):
"""Check and run update of Supervisor Supervisor."""

View File

@ -22,7 +22,7 @@ from ..exceptions import (
HassOSJobError,
HostError,
)
from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.const import JobConcurrency, JobCondition
from ..jobs.decorator import Job
from ..resolution.checks.base import CheckBase
from ..resolution.checks.disabled_data_disk import CheckDisabledDataDisk
@ -205,8 +205,8 @@ class DataDisk(CoreSysAttributes):
@Job(
name="data_disk_migrate",
conditions=[JobCondition.HAOS, JobCondition.OS_AGENT, JobCondition.HEALTHY],
limit=JobExecutionLimit.ONCE,
on_condition=HassOSJobError,
concurrency=JobConcurrency.REJECT,
)
async def migrate_disk(self, new_disk: str) -> None:
"""Move data partition to a new disk."""
@ -305,8 +305,8 @@ class DataDisk(CoreSysAttributes):
@Job(
name="data_disk_wipe",
conditions=[JobCondition.HAOS, JobCondition.OS_AGENT, JobCondition.HEALTHY],
limit=JobExecutionLimit.ONCE,
on_condition=HassOSJobError,
concurrency=JobConcurrency.REJECT,
)
async def wipe_disk(self) -> None:
"""Wipe the current data disk."""

View File

@ -21,7 +21,7 @@ from ..exceptions import (
HassOSSlotUpdateError,
HassOSUpdateError,
)
from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.const import JobConcurrency, JobCondition
from ..jobs.decorator import Job
from ..resolution.const import UnhealthyReason
from ..utils.sentry import async_capture_exception
@ -277,8 +277,8 @@ class OSManager(CoreSysAttributes):
JobCondition.RUNNING,
JobCondition.SUPERVISOR_UPDATED,
],
limit=JobExecutionLimit.ONCE,
on_condition=HassOSJobError,
concurrency=JobConcurrency.REJECT,
)
async def update(self, version: AwesomeVersion | None = None) -> None:
"""Update HassOS system."""

View File

@ -24,7 +24,7 @@ from ..exceptions import (
DockerError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobThrottle
from ..jobs.decorator import Job
from ..resolution.const import UnhealthyReason
from ..utils.json import write_json_file
@ -205,10 +205,10 @@ class PluginAudio(PluginBase):
@Job(
name="plugin_audio_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=AudioJobError,
throttle=JobThrottle.RATE_LIMIT,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""

View File

@ -15,7 +15,7 @@ from ..docker.cli import DockerCli
from ..docker.const import ContainerState
from ..docker.stats import DockerStats
from ..exceptions import CliError, CliJobError, CliUpdateError, DockerError, PluginError
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobThrottle
from ..jobs.decorator import Job
from ..utils.sentry import async_capture_exception
from .base import PluginBase
@ -118,10 +118,10 @@ class PluginCli(PluginBase):
@Job(
name="plugin_cli_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=CliJobError,
throttle=JobThrottle.RATE_LIMIT,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""

View File

@ -31,7 +31,7 @@ from ..exceptions import (
DockerError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobThrottle
from ..jobs.decorator import Job
from ..resolution.const import ContextType, IssueType, SuggestionType, UnhealthyReason
from ..utils.json import write_json_file
@ -351,10 +351,10 @@ class PluginDns(PluginBase):
@Job(
name="plugin_dns_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=CoreDNSJobError,
throttle=JobThrottle.RATE_LIMIT,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""

View File

@ -18,7 +18,7 @@ from ..exceptions import (
MulticastUpdateError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobThrottle
from ..jobs.decorator import Job
from ..utils.sentry import async_capture_exception
from .base import PluginBase
@ -114,10 +114,10 @@ class PluginMulticast(PluginBase):
@Job(
name="plugin_multicast_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=MulticastJobError,
throttle=JobThrottle.RATE_LIMIT,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""

View File

@ -21,7 +21,7 @@ from ..exceptions import (
ObserverUpdateError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.const import JobThrottle
from ..jobs.decorator import Job
from ..utils.sentry import async_capture_exception
from .base import PluginBase
@ -130,10 +130,10 @@ class PluginObserver(PluginBase):
@Job(
name="plugin_observer_restart_after_problem",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
throttle_period=WATCHDOG_THROTTLE_PERIOD,
throttle_max_calls=WATCHDOG_THROTTLE_MAX_CALLS,
on_condition=ObserverJobError,
throttle=JobThrottle.RATE_LIMIT,
)
async def _restart_after_problem(self, state: ContainerState):
"""Restart unhealthy or failed plugin."""

View File

@ -6,7 +6,7 @@ import logging
from ...const import AddonState, CoreState
from ...coresys import CoreSys
from ...exceptions import PwnedConnectivityError, PwnedError, PwnedSecret
from ...jobs.const import JobCondition, JobExecutionLimit
from ...jobs.const import JobCondition, JobThrottle
from ...jobs.decorator import Job
from ..const import ContextType, IssueType, SuggestionType
from .base import CheckBase
@ -25,8 +25,8 @@ class CheckAddonPwned(CheckBase):
@Job(
name="check_addon_pwned_run",
conditions=[JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=24),
throttle=JobThrottle.THROTTLE,
)
async def run_check(self) -> None:
"""Run check if not affected by issue."""

View File

@ -9,7 +9,7 @@ from aiodns.error import DNSError
from ...const import CoreState
from ...coresys import CoreSys
from ...jobs.const import JobCondition, JobExecutionLimit
from ...jobs.const import JobCondition, JobThrottle
from ...jobs.decorator import Job
from ...utils.sentry import async_capture_exception
from ..const import DNS_CHECK_HOST, ContextType, IssueType
@ -36,8 +36,8 @@ class CheckDNSServer(CheckBase):
@Job(
name="check_dns_server_run",
conditions=[JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=24),
throttle=JobThrottle.THROTTLE,
)
async def run_check(self) -> None:
"""Run check if not affected by issue."""

View File

@ -7,7 +7,7 @@ from aiodns.error import DNSError
from ...const import CoreState
from ...coresys import CoreSys
from ...jobs.const import JobCondition, JobExecutionLimit
from ...jobs.const import JobCondition, JobThrottle
from ...jobs.decorator import Job
from ...utils.sentry import async_capture_exception
from ..const import DNS_ERROR_NO_DATA, ContextType, IssueType
@ -26,8 +26,8 @@ class CheckDNSServerIPv6(CheckBase):
@Job(
name="check_dns_server_ipv6_run",
conditions=[JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=24),
throttle=JobThrottle.THROTTLE,
)
async def run_check(self) -> None:
"""Run check if not affected by issue."""

View File

@ -5,7 +5,7 @@ import logging
from ...coresys import CoreSys
from ...exceptions import ResolutionFixupError, ResolutionFixupJobError
from ...jobs.const import JobCondition, JobExecutionLimit
from ...jobs.const import JobCondition, JobThrottle
from ...jobs.decorator import Job
from ...security.const import ContentTrustResult
from ..const import ContextType, IssueType, SuggestionType
@ -26,8 +26,8 @@ class FixupSystemExecuteIntegrity(FixupBase):
name="fixup_system_execute_integrity_process",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=ResolutionFixupJobError,
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=8),
throttle=JobThrottle.THROTTLE,
)
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""

View File

@ -17,7 +17,8 @@ from ..exceptions import (
PwnedError,
SecurityJobError,
)
from ..jobs.decorator import Job, JobCondition, JobExecutionLimit
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.codenotary import cas_validate
from ..utils.common import FileConfiguration
@ -107,7 +108,7 @@ class Security(FileConfiguration, CoreSysAttributes):
name="security_manager_integrity_check",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=SecurityJobError,
limit=JobExecutionLimit.ONCE,
concurrency=JobConcurrency.REJECT,
)
async def integrity_check(self) -> IntegrityResult:
"""Run a full system integrity check of the platform.

View File

@ -32,7 +32,7 @@ from .exceptions import (
SupervisorJobError,
SupervisorUpdateError,
)
from .jobs.const import JobCondition, JobExecutionLimit
from .jobs.const import JobCondition, JobThrottle
from .jobs.decorator import Job
from .resolution.const import ContextType, IssueType, UnhealthyReason
from .utils.codenotary import calc_checksum
@ -288,8 +288,8 @@ class Supervisor(CoreSysAttributes):
@Job(
name="supervisor_check_connectivity",
limit=JobExecutionLimit.THROTTLE,
throttle_period=_check_connectivity_throttle_period,
throttle=JobThrottle.THROTTLE,
)
async def check_connectivity(self) -> None:
"""Check the Internet connectivity from Supervisor's point of view."""

View File

@ -8,6 +8,8 @@ import logging
import aiohttp
from awesomeversion import AwesomeVersion
from supervisor.jobs.const import JobConcurrency, JobThrottle
from .bus import EventListener
from .const import (
ATTR_AUDIO,
@ -34,7 +36,7 @@ from .exceptions import (
UpdaterError,
UpdaterJobError,
)
from .jobs.decorator import Job, JobCondition, JobExecutionLimit
from .jobs.decorator import Job, JobCondition
from .utils.codenotary import calc_checksum
from .utils.common import FileConfiguration
from .validate import SCHEMA_UPDATER_CONFIG
@ -198,8 +200,9 @@ class Updater(FileConfiguration, CoreSysAttributes):
name="updater_fetch_data",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=UpdaterJobError,
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(seconds=30),
concurrency=JobConcurrency.QUEUE,
throttle=JobThrottle.THROTTLE,
)
async def fetch_data(self):
"""Fetch current versions from Github.

View File

@ -20,7 +20,7 @@ from supervisor.exceptions import (
from supervisor.host.const import HostFeature
from supervisor.host.manager import HostManager
from supervisor.jobs import JobSchedulerOptions, SupervisorJob
from supervisor.jobs.const import JobConcurrency, JobExecutionLimit, JobThrottle
from supervisor.jobs.const import JobConcurrency, JobThrottle
from supervisor.jobs.decorator import Job, JobCondition
from supervisor.jobs.job_group import JobGroup
from supervisor.os.manager import OSManager
@ -280,8 +280,8 @@ async def test_exception_conditions(coresys: CoreSys):
await test.execute()
async def test_execution_limit_single_wait(coresys: CoreSys):
"""Test the single wait job execution limit."""
async def test_concurrency_queue(coresys: CoreSys):
"""Test the queue job concurrency."""
class TestClass:
"""Test class."""
@ -292,8 +292,8 @@ async def test_execution_limit_single_wait(coresys: CoreSys):
self.run = asyncio.Lock()
@Job(
name="test_execution_limit_single_wait_execute",
limit=JobExecutionLimit.SINGLE_WAIT,
name="test_concurrency_queue_execute",
concurrency=JobConcurrency.QUEUE,
)
async def execute(self, sleep: float):
"""Execute the class method."""
@ -306,8 +306,8 @@ async def test_execution_limit_single_wait(coresys: CoreSys):
await asyncio.gather(*[test.execute(0.1), test.execute(0.1), test.execute(0.1)])
async def test_execution_limit_throttle_wait(coresys: CoreSys):
"""Test the throttle wait job execution limit."""
async def test_concurrency_queue_with_throttle(coresys: CoreSys):
"""Test the queue concurrency with throttle."""
class TestClass:
"""Test class."""
@ -319,8 +319,9 @@ async def test_execution_limit_throttle_wait(coresys: CoreSys):
self.call = 0
@Job(
name="test_execution_limit_throttle_wait_execute",
limit=JobExecutionLimit.THROTTLE_WAIT,
name="test_concurrency_queue_with_throttle_execute",
concurrency=JobConcurrency.QUEUE,
throttle=JobThrottle.THROTTLE,
throttle_period=timedelta(hours=1),
)
async def execute(self, sleep: float):
@ -340,10 +341,8 @@ async def test_execution_limit_throttle_wait(coresys: CoreSys):
@pytest.mark.parametrize("error", [None, PluginJobError])
async def test_execution_limit_throttle_rate_limit(
coresys: CoreSys, error: JobException | None
):
"""Test the throttle wait job execution limit."""
async def test_throttle_rate_limit(coresys: CoreSys, error: JobException | None):
"""Test the throttle rate limit."""
class TestClass:
"""Test class."""
@ -355,8 +354,8 @@ async def test_execution_limit_throttle_rate_limit(
self.call = 0
@Job(
name=f"test_execution_limit_throttle_rate_limit_execute_{uuid4().hex}",
limit=JobExecutionLimit.THROTTLE_RATE_LIMIT,
name=f"test_throttle_rate_limit_execute_{uuid4().hex}",
throttle=JobThrottle.RATE_LIMIT,
throttle_period=timedelta(hours=1),
throttle_max_calls=2,
on_condition=error,
@ -381,8 +380,8 @@ async def test_execution_limit_throttle_rate_limit(
assert test.call == 3
async def test_execution_limit_throttle(coresys: CoreSys):
"""Test the ignore conditions decorator."""
async def test_throttle_basic(coresys: CoreSys):
"""Test the basic throttle functionality."""
class TestClass:
"""Test class."""
@ -394,8 +393,8 @@ async def test_execution_limit_throttle(coresys: CoreSys):
self.call = 0
@Job(
name="test_execution_limit_throttle_execute",
limit=JobExecutionLimit.THROTTLE,
name="test_throttle_basic_execute",
throttle=JobThrottle.THROTTLE,
throttle_period=timedelta(hours=1),
)
async def execute(self, sleep: float):
@ -414,8 +413,8 @@ async def test_execution_limit_throttle(coresys: CoreSys):
assert test.call == 1
async def test_execution_limit_once(coresys: CoreSys):
"""Test the ignore conditions decorator."""
async def test_concurrency_reject(coresys: CoreSys):
"""Test the reject concurrency."""
class TestClass:
"""Test class."""
@ -426,8 +425,8 @@ async def test_execution_limit_once(coresys: CoreSys):
self.run = asyncio.Lock()
@Job(
name="test_execution_limit_once_execute",
limit=JobExecutionLimit.ONCE,
name="test_concurrency_reject_execute",
concurrency=JobConcurrency.REJECT,
on_condition=JobException,
)
async def execute(self, sleep: float):
@ -603,8 +602,8 @@ async def test_host_network(coresys: CoreSys):
assert await test.execute()
async def test_job_group_once(coresys: CoreSys):
"""Test job group once execution limitation."""
async def test_job_group_reject(coresys: CoreSys):
"""Test job group reject concurrency limitation."""
class TestClass(JobGroup):
"""Test class."""
@ -615,8 +614,8 @@ async def test_job_group_once(coresys: CoreSys):
self.event = asyncio.Event()
@Job(
name="test_job_group_once_inner_execute",
limit=JobExecutionLimit.GROUP_ONCE,
name="test_job_group_reject_inner_execute",
concurrency=JobConcurrency.GROUP_REJECT,
on_condition=JobException,
)
async def inner_execute(self) -> bool:
@ -625,8 +624,8 @@ async def test_job_group_once(coresys: CoreSys):
return True
@Job(
name="test_job_group_once_execute",
limit=JobExecutionLimit.GROUP_ONCE,
name="test_job_group_reject_execute",
concurrency=JobConcurrency.GROUP_REJECT,
on_condition=JobException,
)
async def execute(self) -> bool:
@ -634,8 +633,8 @@ async def test_job_group_once(coresys: CoreSys):
return await self.inner_execute()
@Job(
name="test_job_group_once_separate_execute",
limit=JobExecutionLimit.GROUP_ONCE,
name="test_job_group_reject_separate_execute",
concurrency=JobConcurrency.GROUP_REJECT,
on_condition=JobException,
)
async def separate_execute(self) -> bool:
@ -643,8 +642,8 @@ async def test_job_group_once(coresys: CoreSys):
return True
@Job(
name="test_job_group_once_unrelated",
limit=JobExecutionLimit.ONCE,
name="test_job_group_reject_unrelated",
concurrency=JobConcurrency.REJECT,
on_condition=JobException,
)
async def unrelated_method(self) -> bool:
@ -672,8 +671,8 @@ async def test_job_group_once(coresys: CoreSys):
assert await run_task
async def test_job_group_wait(coresys: CoreSys):
"""Test job group wait execution limitation."""
async def test_job_group_queue(coresys: CoreSys):
"""Test job group queue concurrency limitation."""
class TestClass(JobGroup):
"""Test class."""
@ -686,8 +685,8 @@ async def test_job_group_wait(coresys: CoreSys):
self.event = asyncio.Event()
@Job(
name="test_job_group_wait_inner_execute",
limit=JobExecutionLimit.GROUP_WAIT,
name="test_job_group_queue_inner_execute",
concurrency=JobConcurrency.GROUP_QUEUE,
on_condition=JobException,
)
async def inner_execute(self) -> None:
@ -696,8 +695,8 @@ async def test_job_group_wait(coresys: CoreSys):
await self.event.wait()
@Job(
name="test_job_group_wait_execute",
limit=JobExecutionLimit.GROUP_WAIT,
name="test_job_group_queue_execute",
concurrency=JobConcurrency.GROUP_QUEUE,
on_condition=JobException,
)
async def execute(self) -> None:
@ -705,8 +704,8 @@ async def test_job_group_wait(coresys: CoreSys):
await self.inner_execute()
@Job(
name="test_job_group_wait_separate_execute",
limit=JobExecutionLimit.GROUP_WAIT,
name="test_job_group_queue_separate_execute",
concurrency=JobConcurrency.GROUP_QUEUE,
on_condition=JobException,
)
async def separate_execute(self) -> None:
@ -746,7 +745,7 @@ async def test_job_cleanup(coresys: CoreSys):
self.event = asyncio.Event()
self.job: SupervisorJob | None = None
@Job(name="test_job_cleanup_execute", limit=JobExecutionLimit.ONCE)
@Job(name="test_job_cleanup_execute", concurrency=JobConcurrency.REJECT)
async def execute(self):
"""Execute the class method."""
self.job = coresys.jobs.current
@ -781,7 +780,7 @@ async def test_job_skip_cleanup(coresys: CoreSys):
@Job(
name="test_job_skip_cleanup_execute",
limit=JobExecutionLimit.ONCE,
concurrency=JobConcurrency.REJECT,
cleanup=False,
)
async def execute(self):
@ -804,8 +803,8 @@ async def test_job_skip_cleanup(coresys: CoreSys):
assert test.job.done
async def test_execution_limit_group_throttle(coresys: CoreSys):
"""Test the group throttle execution limit."""
async def test_group_throttle(coresys: CoreSys):
"""Test the group throttle."""
class TestClass(JobGroup):
"""Test class."""
@ -817,8 +816,8 @@ async def test_execution_limit_group_throttle(coresys: CoreSys):
self.call = 0
@Job(
name="test_execution_limit_group_throttle_execute",
limit=JobExecutionLimit.GROUP_THROTTLE,
name="test_group_throttle_execute",
throttle=JobThrottle.GROUP_THROTTLE,
throttle_period=timedelta(milliseconds=95),
)
async def execute(self, sleep: float):
@ -851,8 +850,8 @@ async def test_execution_limit_group_throttle(coresys: CoreSys):
assert test2.call == 2
async def test_execution_limit_group_throttle_wait(coresys: CoreSys):
"""Test the group throttle wait job execution limit."""
async def test_group_throttle_with_queue(coresys: CoreSys):
"""Test the group throttle with queue concurrency."""
class TestClass(JobGroup):
"""Test class."""
@ -864,8 +863,9 @@ async def test_execution_limit_group_throttle_wait(coresys: CoreSys):
self.call = 0
@Job(
name="test_execution_limit_group_throttle_wait_execute",
limit=JobExecutionLimit.GROUP_THROTTLE_WAIT,
name="test_group_throttle_with_queue_execute",
concurrency=JobConcurrency.QUEUE,
throttle=JobThrottle.GROUP_THROTTLE,
throttle_period=timedelta(milliseconds=95),
)
async def execute(self, sleep: float):
@ -901,10 +901,8 @@ async def test_execution_limit_group_throttle_wait(coresys: CoreSys):
@pytest.mark.parametrize("error", [None, PluginJobError])
async def test_execution_limit_group_throttle_rate_limit(
coresys: CoreSys, error: JobException | None
):
"""Test the group throttle rate limit job execution limit."""
async def test_group_throttle_rate_limit(coresys: CoreSys, error: JobException | None):
"""Test the group throttle rate limit."""
class TestClass(JobGroup):
"""Test class."""
@ -916,8 +914,8 @@ async def test_execution_limit_group_throttle_rate_limit(
self.call = 0
@Job(
name=f"test_execution_limit_group_throttle_rate_limit_execute_{uuid4().hex}",
limit=JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
name=f"test_group_throttle_rate_limit_execute_{uuid4().hex}",
throttle=JobThrottle.GROUP_RATE_LIMIT,
throttle_period=timedelta(hours=1),
throttle_max_calls=2,
on_condition=error,
@ -1013,7 +1011,7 @@ async def test_job_starting_separate_task(coresys: CoreSys):
@Job(
name="test_job_starting_separate_task_job_task",
limit=JobExecutionLimit.GROUP_ONCE,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def job_task(self):
"""Create a separate long running job task."""
@ -1035,7 +1033,7 @@ async def test_job_starting_separate_task(coresys: CoreSys):
@Job(
name="test_job_starting_separate_task_job_await",
limit=JobExecutionLimit.GROUP_ONCE,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def job_await(self):
"""Await a simple job in same group to confirm lock released."""
@ -1080,7 +1078,7 @@ async def test_job_always_removed_on_check_failure(coresys: CoreSys):
@Job(
name="test_job_always_removed_on_check_failure_limit",
limit=JobExecutionLimit.ONCE,
concurrency=JobConcurrency.REJECT,
cleanup=False,
)
async def limit_check(self):