mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-08-02 13:57:42 +00:00
Compare commits
No commits in common. "main" and "2025.07.2" have entirely different histories.
2
.github/workflows/builder.yml
vendored
2
.github/workflows/builder.yml
vendored
@ -131,7 +131,7 @@ jobs:
|
|||||||
|
|
||||||
- name: Install Cosign
|
- name: Install Cosign
|
||||||
if: needs.init.outputs.publish == 'true'
|
if: needs.init.outputs.publish == 'true'
|
||||||
uses: sigstore/cosign-installer@v3.9.2
|
uses: sigstore/cosign-installer@v3.9.1
|
||||||
with:
|
with:
|
||||||
cosign-release: "v2.4.3"
|
cosign-release: "v2.4.3"
|
||||||
|
|
||||||
|
2
.github/workflows/ci.yaml
vendored
2
.github/workflows/ci.yaml
vendored
@ -346,7 +346,7 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
python-version: ${{ needs.prepare.outputs.python-version }}
|
python-version: ${{ needs.prepare.outputs.python-version }}
|
||||||
- name: Install Cosign
|
- name: Install Cosign
|
||||||
uses: sigstore/cosign-installer@v3.9.2
|
uses: sigstore/cosign-installer@v3.9.1
|
||||||
with:
|
with:
|
||||||
cosign-release: "v2.4.3"
|
cosign-release: "v2.4.3"
|
||||||
- name: Restore Python virtual environment
|
- name: Restore Python virtual environment
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
aiodns==3.5.0
|
aiodns==3.5.0
|
||||||
aiohttp==3.12.15
|
aiohttp==3.12.14
|
||||||
atomicwrites-homeassistant==1.4.1
|
atomicwrites-homeassistant==1.4.1
|
||||||
attrs==25.3.0
|
attrs==25.3.0
|
||||||
awesomeversion==25.5.0
|
awesomeversion==25.5.0
|
||||||
@ -14,17 +14,17 @@ deepmerge==2.0
|
|||||||
dirhash==0.5.0
|
dirhash==0.5.0
|
||||||
docker==7.1.0
|
docker==7.1.0
|
||||||
faust-cchardet==2.1.19
|
faust-cchardet==2.1.19
|
||||||
gitpython==3.1.45
|
gitpython==3.1.44
|
||||||
jinja2==3.1.6
|
jinja2==3.1.6
|
||||||
log-rate-limit==1.4.2
|
log-rate-limit==1.4.2
|
||||||
orjson==3.11.1
|
orjson==3.11.0
|
||||||
pulsectl==24.12.0
|
pulsectl==24.12.0
|
||||||
pyudev==0.24.3
|
pyudev==0.24.3
|
||||||
PyYAML==6.0.2
|
PyYAML==6.0.2
|
||||||
requests==2.32.4
|
requests==2.32.4
|
||||||
securetar==2025.2.1
|
securetar==2025.2.1
|
||||||
sentry-sdk==2.34.1
|
sentry-sdk==2.33.0
|
||||||
setuptools==80.9.0
|
setuptools==80.9.0
|
||||||
voluptuous==0.15.2
|
voluptuous==0.15.2
|
||||||
dbus-fast==2.44.2
|
dbus-fast==2.44.1
|
||||||
zlib-fast==0.2.1
|
zlib-fast==0.2.1
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
astroid==3.3.11
|
astroid==3.3.11
|
||||||
coverage==7.10.1
|
coverage==7.9.2
|
||||||
mypy==1.17.0
|
mypy==1.17.0
|
||||||
pre-commit==4.2.0
|
pre-commit==4.2.0
|
||||||
pylint==3.3.7
|
pylint==3.3.7
|
||||||
@ -8,7 +8,7 @@ pytest-asyncio==0.25.2
|
|||||||
pytest-cov==6.2.1
|
pytest-cov==6.2.1
|
||||||
pytest-timeout==2.4.0
|
pytest-timeout==2.4.0
|
||||||
pytest==8.4.1
|
pytest==8.4.1
|
||||||
ruff==0.12.7
|
ruff==0.12.3
|
||||||
time-machine==2.16.0
|
time-machine==2.16.0
|
||||||
types-docker==7.1.0.20250705
|
types-docker==7.1.0.20250705
|
||||||
types-pyyaml==6.0.12.20250516
|
types-pyyaml==6.0.12.20250516
|
||||||
|
@ -6,8 +6,6 @@ from typing import Any
|
|||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
|
|
||||||
|
|
||||||
from ..const import (
|
from ..const import (
|
||||||
ATTR_ENABLE_IPV6,
|
ATTR_ENABLE_IPV6,
|
||||||
ATTR_HOSTNAME,
|
ATTR_HOSTNAME,
|
||||||
@ -34,7 +32,7 @@ SCHEMA_DOCKER_REGISTRY = vol.Schema(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# pylint: disable=no-value-for-parameter
|
# pylint: disable=no-value-for-parameter
|
||||||
SCHEMA_OPTIONS = vol.Schema({vol.Optional(ATTR_ENABLE_IPV6): vol.Maybe(vol.Boolean())})
|
SCHEMA_OPTIONS = vol.Schema({vol.Optional(ATTR_ENABLE_IPV6): vol.Boolean()})
|
||||||
|
|
||||||
|
|
||||||
class APIDocker(CoreSysAttributes):
|
class APIDocker(CoreSysAttributes):
|
||||||
@ -61,17 +59,8 @@ class APIDocker(CoreSysAttributes):
|
|||||||
"""Set docker options."""
|
"""Set docker options."""
|
||||||
body = await api_validate(SCHEMA_OPTIONS, request)
|
body = await api_validate(SCHEMA_OPTIONS, request)
|
||||||
|
|
||||||
if (
|
if ATTR_ENABLE_IPV6 in body:
|
||||||
ATTR_ENABLE_IPV6 in body
|
|
||||||
and self.sys_docker.config.enable_ipv6 != body[ATTR_ENABLE_IPV6]
|
|
||||||
):
|
|
||||||
self.sys_docker.config.enable_ipv6 = body[ATTR_ENABLE_IPV6]
|
self.sys_docker.config.enable_ipv6 = body[ATTR_ENABLE_IPV6]
|
||||||
_LOGGER.info("Host system reboot required to apply new IPv6 configuration")
|
|
||||||
self.sys_resolution.create_issue(
|
|
||||||
IssueType.REBOOT_REQUIRED,
|
|
||||||
ContextType.SYSTEM,
|
|
||||||
suggestions=[SuggestionType.EXECUTE_REBOOT],
|
|
||||||
)
|
|
||||||
|
|
||||||
await self.sys_docker.config.save_data()
|
await self.sys_docker.config.save_data()
|
||||||
|
|
||||||
|
@ -95,12 +95,12 @@ class DockerConfig(FileConfiguration):
|
|||||||
super().__init__(FILE_HASSIO_DOCKER, SCHEMA_DOCKER_CONFIG)
|
super().__init__(FILE_HASSIO_DOCKER, SCHEMA_DOCKER_CONFIG)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def enable_ipv6(self) -> bool | None:
|
def enable_ipv6(self) -> bool:
|
||||||
"""Return IPv6 configuration for docker network."""
|
"""Return IPv6 configuration for docker network."""
|
||||||
return self._data.get(ATTR_ENABLE_IPV6, None)
|
return self._data.get(ATTR_ENABLE_IPV6, False)
|
||||||
|
|
||||||
@enable_ipv6.setter
|
@enable_ipv6.setter
|
||||||
def enable_ipv6(self, value: bool | None) -> None:
|
def enable_ipv6(self, value: bool) -> None:
|
||||||
"""Set IPv6 configuration for docker network."""
|
"""Set IPv6 configuration for docker network."""
|
||||||
self._data[ATTR_ENABLE_IPV6] = value
|
self._data[ATTR_ENABLE_IPV6] = value
|
||||||
|
|
||||||
|
@ -47,8 +47,6 @@ DOCKER_NETWORK_PARAMS = {
|
|||||||
"options": {"com.docker.network.bridge.name": DOCKER_NETWORK},
|
"options": {"com.docker.network.bridge.name": DOCKER_NETWORK},
|
||||||
}
|
}
|
||||||
|
|
||||||
DOCKER_ENABLE_IPV6_DEFAULT = True
|
|
||||||
|
|
||||||
|
|
||||||
class DockerNetwork:
|
class DockerNetwork:
|
||||||
"""Internal Supervisor Network.
|
"""Internal Supervisor Network.
|
||||||
@ -61,7 +59,7 @@ class DockerNetwork:
|
|||||||
self.docker: docker.DockerClient = docker_client
|
self.docker: docker.DockerClient = docker_client
|
||||||
self._network: docker.models.networks.Network
|
self._network: docker.models.networks.Network
|
||||||
|
|
||||||
async def post_init(self, enable_ipv6: bool | None = None) -> Self:
|
async def post_init(self, enable_ipv6: bool = False) -> Self:
|
||||||
"""Post init actions that must be done in event loop."""
|
"""Post init actions that must be done in event loop."""
|
||||||
self._network = await asyncio.get_running_loop().run_in_executor(
|
self._network = await asyncio.get_running_loop().run_in_executor(
|
||||||
None, self._get_network, enable_ipv6
|
None, self._get_network, enable_ipv6
|
||||||
@ -113,24 +111,16 @@ class DockerNetwork:
|
|||||||
"""Return observer of the network."""
|
"""Return observer of the network."""
|
||||||
return DOCKER_IPV4_NETWORK_MASK[6]
|
return DOCKER_IPV4_NETWORK_MASK[6]
|
||||||
|
|
||||||
def _get_network(
|
def _get_network(self, enable_ipv6: bool = False) -> docker.models.networks.Network:
|
||||||
self, enable_ipv6: bool | None = None
|
|
||||||
) -> docker.models.networks.Network:
|
|
||||||
"""Get supervisor network."""
|
"""Get supervisor network."""
|
||||||
try:
|
try:
|
||||||
if network := self.docker.networks.get(DOCKER_NETWORK):
|
if network := self.docker.networks.get(DOCKER_NETWORK):
|
||||||
current_ipv6 = network.attrs.get(DOCKER_ENABLEIPV6, False)
|
if network.attrs.get(DOCKER_ENABLEIPV6) == enable_ipv6:
|
||||||
# If the network exists and we don't have an explicit setting,
|
|
||||||
# simply stick with what we have.
|
|
||||||
if enable_ipv6 is None or current_ipv6 == enable_ipv6:
|
|
||||||
return network
|
return network
|
||||||
|
|
||||||
# We have an explicit setting which differs from the current state.
|
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
"Migrating Supervisor network to %s",
|
"Migrating Supervisor network to %s",
|
||||||
"IPv4/IPv6 Dual-Stack" if enable_ipv6 else "IPv4-Only",
|
"IPv4/IPv6 Dual-Stack" if enable_ipv6 else "IPv4-Only",
|
||||||
)
|
)
|
||||||
|
|
||||||
if (containers := network.containers) and (
|
if (containers := network.containers) and (
|
||||||
containers_all := all(
|
containers_all := all(
|
||||||
container.name in (OBSERVER_DOCKER_NAME, SUPERVISOR_DOCKER_NAME)
|
container.name in (OBSERVER_DOCKER_NAME, SUPERVISOR_DOCKER_NAME)
|
||||||
@ -144,7 +134,6 @@ class DockerNetwork:
|
|||||||
requests.RequestException,
|
requests.RequestException,
|
||||||
):
|
):
|
||||||
network.disconnect(container, force=True)
|
network.disconnect(container, force=True)
|
||||||
|
|
||||||
if not containers or containers_all:
|
if not containers or containers_all:
|
||||||
try:
|
try:
|
||||||
network.remove()
|
network.remove()
|
||||||
@ -162,9 +151,7 @@ class DockerNetwork:
|
|||||||
_LOGGER.info("Can't find Supervisor network, creating a new network")
|
_LOGGER.info("Can't find Supervisor network, creating a new network")
|
||||||
|
|
||||||
network_params = DOCKER_NETWORK_PARAMS.copy()
|
network_params = DOCKER_NETWORK_PARAMS.copy()
|
||||||
network_params[ATTR_ENABLE_IPV6] = (
|
network_params[ATTR_ENABLE_IPV6] = enable_ipv6
|
||||||
DOCKER_ENABLE_IPV6_DEFAULT if enable_ipv6 is None else enable_ipv6
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._network = self.docker.networks.create(**network_params) # type: ignore
|
self._network = self.docker.networks.create(**network_params) # type: ignore
|
||||||
|
@ -34,60 +34,8 @@ class JobCondition(StrEnum):
|
|||||||
SUPERVISOR_UPDATED = "supervisor_updated"
|
SUPERVISOR_UPDATED = "supervisor_updated"
|
||||||
|
|
||||||
|
|
||||||
class JobConcurrency(StrEnum):
|
|
||||||
"""Job concurrency control.
|
|
||||||
|
|
||||||
Controls how many instances of a job can run simultaneously.
|
|
||||||
|
|
||||||
Individual Concurrency (applies to each method separately):
|
|
||||||
- REJECT: Fail immediately if another instance is already running
|
|
||||||
- QUEUE: Wait for the current instance to finish, then run
|
|
||||||
|
|
||||||
Group Concurrency (applies across all methods on a JobGroup):
|
|
||||||
- GROUP_REJECT: Fail if ANY job is running on the JobGroup
|
|
||||||
- GROUP_QUEUE: Wait for ANY running job on the JobGroup to finish
|
|
||||||
|
|
||||||
JobGroup Behavior:
|
|
||||||
- All methods on the same JobGroup instance share a single lock
|
|
||||||
- Methods can call other methods on the same group without deadlock
|
|
||||||
- Uses the JobGroup.group_name for coordination
|
|
||||||
- Requires the class to inherit from JobGroup
|
|
||||||
"""
|
|
||||||
|
|
||||||
REJECT = "reject" # Fail if already running (was ONCE)
|
|
||||||
QUEUE = "queue" # Wait if already running (was SINGLE_WAIT)
|
|
||||||
GROUP_REJECT = "group_reject" # Was GROUP_ONCE
|
|
||||||
GROUP_QUEUE = "group_queue" # Was GROUP_WAIT
|
|
||||||
|
|
||||||
|
|
||||||
class JobThrottle(StrEnum):
|
|
||||||
"""Job throttling control.
|
|
||||||
|
|
||||||
Controls how frequently jobs can be executed.
|
|
||||||
|
|
||||||
Individual Throttling (each method has its own throttle state):
|
|
||||||
- THROTTLE: Skip execution if called within throttle_period
|
|
||||||
- RATE_LIMIT: Allow up to throttle_max_calls within throttle_period, then fail
|
|
||||||
|
|
||||||
Group Throttling (all methods on a JobGroup share throttle state):
|
|
||||||
- GROUP_THROTTLE: Skip if ANY method was called within throttle_period
|
|
||||||
- GROUP_RATE_LIMIT: Allow up to throttle_max_calls total across ALL methods
|
|
||||||
|
|
||||||
JobGroup Behavior:
|
|
||||||
- All methods on the same JobGroup instance share throttle counters/timers
|
|
||||||
- Uses the JobGroup.group_name as the key for tracking state
|
|
||||||
- If one method is throttled, other methods may also be throttled
|
|
||||||
- Requires the class to inherit from JobGroup
|
|
||||||
"""
|
|
||||||
|
|
||||||
THROTTLE = "throttle" # Skip if called too frequently
|
|
||||||
RATE_LIMIT = "rate_limit" # Rate limiting with max calls per period
|
|
||||||
GROUP_THROTTLE = "group_throttle" # Group version of THROTTLE
|
|
||||||
GROUP_RATE_LIMIT = "group_rate_limit" # Group version of RATE_LIMIT
|
|
||||||
|
|
||||||
|
|
||||||
class JobExecutionLimit(StrEnum):
|
class JobExecutionLimit(StrEnum):
|
||||||
"""Job Execution limits - DEPRECATED: Use JobConcurrency and JobThrottle instead."""
|
"""Job Execution limits."""
|
||||||
|
|
||||||
ONCE = "once"
|
ONCE = "once"
|
||||||
SINGLE_WAIT = "single_wait"
|
SINGLE_WAIT = "single_wait"
|
||||||
|
@ -20,7 +20,7 @@ from ..host.const import HostFeature
|
|||||||
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
|
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
|
||||||
from ..utils.sentry import async_capture_exception
|
from ..utils.sentry import async_capture_exception
|
||||||
from . import SupervisorJob
|
from . import SupervisorJob
|
||||||
from .const import JobConcurrency, JobCondition, JobExecutionLimit, JobThrottle
|
from .const import JobCondition, JobExecutionLimit
|
||||||
from .job_group import JobGroup
|
from .job_group import JobGroup
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__package__)
|
_LOGGER: logging.Logger = logging.getLogger(__package__)
|
||||||
@ -36,16 +36,13 @@ class Job(CoreSysAttributes):
|
|||||||
conditions: list[JobCondition] | None = None,
|
conditions: list[JobCondition] | None = None,
|
||||||
cleanup: bool = True,
|
cleanup: bool = True,
|
||||||
on_condition: type[JobException] | None = None,
|
on_condition: type[JobException] | None = None,
|
||||||
concurrency: JobConcurrency | None = None,
|
limit: JobExecutionLimit | None = None,
|
||||||
throttle: JobThrottle | None = None,
|
|
||||||
throttle_period: timedelta
|
throttle_period: timedelta
|
||||||
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
|
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
|
||||||
| None = None,
|
| None = None,
|
||||||
throttle_max_calls: int | None = None,
|
throttle_max_calls: int | None = None,
|
||||||
internal: bool = False,
|
internal: bool = False,
|
||||||
# Backward compatibility - DEPRECATED
|
):
|
||||||
limit: JobExecutionLimit | None = None,
|
|
||||||
): # pylint: disable=too-many-positional-arguments
|
|
||||||
"""Initialize the Job decorator.
|
"""Initialize the Job decorator.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -53,15 +50,13 @@ class Job(CoreSysAttributes):
|
|||||||
conditions (list[JobCondition] | None): List of conditions that must be met before the job runs.
|
conditions (list[JobCondition] | None): List of conditions that must be met before the job runs.
|
||||||
cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart.
|
cleanup (bool): Whether to clean up the job after execution. Defaults to True. If set to False, the job will remain accessible through the Supervisor API until the next restart.
|
||||||
on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure.
|
on_condition (type[JobException] | None): Exception type to raise if a job condition fails. If None, logs the failure.
|
||||||
concurrency (JobConcurrency | None): Concurrency control policy (e.g., reject, queue, group-based).
|
limit (JobExecutionLimit | None): Execution limit policy for the job (e.g., throttle, once, group-based).
|
||||||
throttle (JobThrottle | None): Throttling policy (e.g., throttle, rate_limit, group-based).
|
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for rate-limited jobs).
|
||||||
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs).
|
|
||||||
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
|
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
|
||||||
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
|
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
|
||||||
limit (JobExecutionLimit | None): DEPRECATED - Use concurrency and throttle instead.
|
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy.
|
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected limit.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if name in _JOB_NAMES:
|
if name in _JOB_NAMES:
|
||||||
@ -72,6 +67,7 @@ class Job(CoreSysAttributes):
|
|||||||
self.conditions = conditions
|
self.conditions = conditions
|
||||||
self.cleanup = cleanup
|
self.cleanup = cleanup
|
||||||
self.on_condition = on_condition
|
self.on_condition = on_condition
|
||||||
|
self.limit = limit
|
||||||
self._throttle_period = throttle_period
|
self._throttle_period = throttle_period
|
||||||
self._throttle_max_calls = throttle_max_calls
|
self._throttle_max_calls = throttle_max_calls
|
||||||
self._lock: asyncio.Semaphore | None = None
|
self._lock: asyncio.Semaphore | None = None
|
||||||
@ -79,90 +75,33 @@ class Job(CoreSysAttributes):
|
|||||||
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
|
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
|
||||||
self._internal = internal
|
self._internal = internal
|
||||||
|
|
||||||
# Handle backward compatibility with limit parameter
|
|
||||||
if limit is not None:
|
|
||||||
if concurrency is not None or throttle is not None:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Job {name} cannot specify both 'limit' (deprecated) and 'concurrency'/'throttle' parameters!"
|
|
||||||
)
|
|
||||||
# Map old limit values to new parameters
|
|
||||||
concurrency, throttle = self._map_limit_to_new_params(limit)
|
|
||||||
|
|
||||||
self.concurrency = concurrency
|
|
||||||
self.throttle = throttle
|
|
||||||
|
|
||||||
# Validate Options
|
# Validate Options
|
||||||
self._validate_parameters()
|
|
||||||
|
|
||||||
def _map_limit_to_new_params(
|
|
||||||
self, limit: JobExecutionLimit
|
|
||||||
) -> tuple[JobConcurrency | None, JobThrottle | None]:
|
|
||||||
"""Map old limit parameter to new concurrency and throttle parameters."""
|
|
||||||
mapping = {
|
|
||||||
JobExecutionLimit.ONCE: (JobConcurrency.REJECT, None),
|
|
||||||
JobExecutionLimit.SINGLE_WAIT: (JobConcurrency.QUEUE, None),
|
|
||||||
JobExecutionLimit.THROTTLE: (None, JobThrottle.THROTTLE),
|
|
||||||
JobExecutionLimit.THROTTLE_WAIT: (
|
|
||||||
JobConcurrency.QUEUE,
|
|
||||||
JobThrottle.THROTTLE,
|
|
||||||
),
|
|
||||||
JobExecutionLimit.THROTTLE_RATE_LIMIT: (None, JobThrottle.RATE_LIMIT),
|
|
||||||
JobExecutionLimit.GROUP_ONCE: (JobConcurrency.GROUP_REJECT, None),
|
|
||||||
JobExecutionLimit.GROUP_WAIT: (JobConcurrency.GROUP_QUEUE, None),
|
|
||||||
JobExecutionLimit.GROUP_THROTTLE: (None, JobThrottle.GROUP_THROTTLE),
|
|
||||||
JobExecutionLimit.GROUP_THROTTLE_WAIT: (
|
|
||||||
# Seems a bit counter intuitive, but GROUP_QUEUE deadlocks
|
|
||||||
# tests/jobs/test_job_decorator.py::test_execution_limit_group_throttle_wait
|
|
||||||
# The reason this deadlocks is because when using GROUP_QUEUE and the
|
|
||||||
# throttle limit is hit, the group lock is trying to be unlocked outside
|
|
||||||
# of the job context. The current implementation doesn't allow to unlock
|
|
||||||
# the group lock when the job is not running.
|
|
||||||
JobConcurrency.QUEUE,
|
|
||||||
JobThrottle.GROUP_THROTTLE,
|
|
||||||
),
|
|
||||||
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT: (
|
|
||||||
None,
|
|
||||||
JobThrottle.GROUP_RATE_LIMIT,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
return mapping.get(limit, (None, None))
|
|
||||||
|
|
||||||
def _validate_parameters(self) -> None:
|
|
||||||
"""Validate job parameters."""
|
|
||||||
# Validate throttle parameters
|
|
||||||
if (
|
if (
|
||||||
self.throttle
|
self.limit
|
||||||
in (
|
in (
|
||||||
JobThrottle.THROTTLE,
|
JobExecutionLimit.THROTTLE,
|
||||||
JobThrottle.GROUP_THROTTLE,
|
JobExecutionLimit.THROTTLE_WAIT,
|
||||||
JobThrottle.RATE_LIMIT,
|
JobExecutionLimit.THROTTLE_RATE_LIMIT,
|
||||||
JobThrottle.GROUP_RATE_LIMIT,
|
JobExecutionLimit.GROUP_THROTTLE,
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
||||||
)
|
)
|
||||||
and self._throttle_period is None
|
and self._throttle_period is None
|
||||||
):
|
):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Job {self.name} is using throttle {self.throttle} without a throttle period!"
|
f"Job {name} is using execution limit {limit} without a throttle period!"
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.throttle in (
|
if self.limit in (
|
||||||
JobThrottle.RATE_LIMIT,
|
JobExecutionLimit.THROTTLE_RATE_LIMIT,
|
||||||
JobThrottle.GROUP_RATE_LIMIT,
|
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
||||||
):
|
):
|
||||||
if self._throttle_max_calls is None:
|
if self._throttle_max_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Job {self.name} is using throttle {self.throttle} without throttle max calls!"
|
f"Job {name} is using execution limit {limit} without throttle max calls!"
|
||||||
)
|
)
|
||||||
self._rate_limited_calls = {}
|
|
||||||
|
|
||||||
if self.throttle is not None and self.concurrency in (
|
self._rate_limited_calls = {}
|
||||||
JobConcurrency.GROUP_REJECT,
|
|
||||||
JobConcurrency.GROUP_QUEUE,
|
|
||||||
):
|
|
||||||
# We cannot release group locks when Job is not running (e.g. throttled)
|
|
||||||
# which makes these combinations impossible to use currently.
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Job {self.name} is using throttling ({self.throttle}) with group concurrency ({self.concurrency}), which is not allowed!"
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def throttle_max_calls(self) -> int:
|
def throttle_max_calls(self) -> int:
|
||||||
@ -192,7 +131,7 @@ class Job(CoreSysAttributes):
|
|||||||
"""Return rate limited calls if used."""
|
"""Return rate limited calls if used."""
|
||||||
if self._rate_limited_calls is None:
|
if self._rate_limited_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"Rate limited calls not available for this throttle type"
|
f"Rate limited calls not available for limit type {self.limit}"
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._rate_limited_calls.get(group_name, [])
|
return self._rate_limited_calls.get(group_name, [])
|
||||||
@ -203,7 +142,7 @@ class Job(CoreSysAttributes):
|
|||||||
"""Add a rate limited call to list if used."""
|
"""Add a rate limited call to list if used."""
|
||||||
if self._rate_limited_calls is None:
|
if self._rate_limited_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"Rate limited calls not available for this throttle type"
|
f"Rate limited calls not available for limit type {self.limit}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if group_name in self._rate_limited_calls:
|
if group_name in self._rate_limited_calls:
|
||||||
@ -217,7 +156,7 @@ class Job(CoreSysAttributes):
|
|||||||
"""Set rate limited calls if used."""
|
"""Set rate limited calls if used."""
|
||||||
if self._rate_limited_calls is None:
|
if self._rate_limited_calls is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"Rate limited calls not available for this throttle type"
|
f"Rate limited calls not available for limit type {self.limit}"
|
||||||
)
|
)
|
||||||
|
|
||||||
self._rate_limited_calls[group_name] = value
|
self._rate_limited_calls[group_name] = value
|
||||||
@ -254,24 +193,16 @@ class Job(CoreSysAttributes):
|
|||||||
if obj.acquire and obj.release: # type: ignore
|
if obj.acquire and obj.release: # type: ignore
|
||||||
job_group = cast(JobGroup, obj)
|
job_group = cast(JobGroup, obj)
|
||||||
|
|
||||||
# Check for group-based parameters
|
if not job_group and self.limit in (
|
||||||
if not job_group:
|
JobExecutionLimit.GROUP_ONCE,
|
||||||
if self.concurrency in (
|
JobExecutionLimit.GROUP_WAIT,
|
||||||
JobConcurrency.GROUP_REJECT,
|
JobExecutionLimit.GROUP_THROTTLE,
|
||||||
JobConcurrency.GROUP_QUEUE,
|
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
||||||
):
|
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
||||||
raise RuntimeError(
|
):
|
||||||
f"Job {self.name} uses group concurrency ({self.concurrency}) but is not on a JobGroup! "
|
raise RuntimeError(
|
||||||
f"The class must inherit from JobGroup to use GROUP_REJECT or GROUP_QUEUE."
|
f"Job on {self.name} need to be a JobGroup to use group based limits!"
|
||||||
) from None
|
) from None
|
||||||
if self.throttle in (
|
|
||||||
JobThrottle.GROUP_THROTTLE,
|
|
||||||
JobThrottle.GROUP_RATE_LIMIT,
|
|
||||||
):
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Job {self.name} uses group throttling ({self.throttle}) but is not on a JobGroup! "
|
|
||||||
f"The class must inherit from JobGroup to use GROUP_THROTTLE or GROUP_RATE_LIMIT."
|
|
||||||
) from None
|
|
||||||
|
|
||||||
return job_group
|
return job_group
|
||||||
|
|
||||||
@ -324,15 +255,71 @@ class Job(CoreSysAttributes):
|
|||||||
except JobConditionException as err:
|
except JobConditionException as err:
|
||||||
return self._handle_job_condition_exception(err)
|
return self._handle_job_condition_exception(err)
|
||||||
|
|
||||||
# Handle execution limits
|
# Handle exection limits
|
||||||
await self._handle_concurrency_control(job_group, job)
|
if self.limit in (
|
||||||
try:
|
JobExecutionLimit.SINGLE_WAIT,
|
||||||
if not await self._handle_throttling(group_name):
|
JobExecutionLimit.ONCE,
|
||||||
self._release_concurrency_control(job_group)
|
):
|
||||||
return # Job was throttled, exit early
|
await self._acquire_exection_limit()
|
||||||
except Exception:
|
elif self.limit in (
|
||||||
self._release_concurrency_control(job_group)
|
JobExecutionLimit.GROUP_ONCE,
|
||||||
raise
|
JobExecutionLimit.GROUP_WAIT,
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
await cast(JobGroup, job_group).acquire(
|
||||||
|
job, self.limit == JobExecutionLimit.GROUP_WAIT
|
||||||
|
)
|
||||||
|
except JobGroupExecutionLimitExceeded as err:
|
||||||
|
if self.on_condition:
|
||||||
|
raise self.on_condition(str(err)) from err
|
||||||
|
raise err
|
||||||
|
elif self.limit in (
|
||||||
|
JobExecutionLimit.THROTTLE,
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE,
|
||||||
|
):
|
||||||
|
time_since_last_call = datetime.now() - self.last_call(group_name)
|
||||||
|
if time_since_last_call < self.throttle_period(group_name):
|
||||||
|
return
|
||||||
|
elif self.limit in (
|
||||||
|
JobExecutionLimit.THROTTLE_WAIT,
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
||||||
|
):
|
||||||
|
await self._acquire_exection_limit()
|
||||||
|
time_since_last_call = datetime.now() - self.last_call(group_name)
|
||||||
|
if time_since_last_call < self.throttle_period(group_name):
|
||||||
|
self._release_exception_limits()
|
||||||
|
return
|
||||||
|
elif self.limit in (
|
||||||
|
JobExecutionLimit.THROTTLE_RATE_LIMIT,
|
||||||
|
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
|
||||||
|
):
|
||||||
|
# Only reprocess array when necessary (at limit)
|
||||||
|
if (
|
||||||
|
len(self.rate_limited_calls(group_name))
|
||||||
|
>= self.throttle_max_calls
|
||||||
|
):
|
||||||
|
self.set_rate_limited_calls(
|
||||||
|
[
|
||||||
|
call
|
||||||
|
for call in self.rate_limited_calls(group_name)
|
||||||
|
if call
|
||||||
|
> datetime.now() - self.throttle_period(group_name)
|
||||||
|
],
|
||||||
|
group_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
len(self.rate_limited_calls(group_name))
|
||||||
|
>= self.throttle_max_calls
|
||||||
|
):
|
||||||
|
on_condition = (
|
||||||
|
JobException
|
||||||
|
if self.on_condition is None
|
||||||
|
else self.on_condition
|
||||||
|
)
|
||||||
|
raise on_condition(
|
||||||
|
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
|
||||||
|
)
|
||||||
|
|
||||||
# Execute Job
|
# Execute Job
|
||||||
with job.start():
|
with job.start():
|
||||||
@ -358,7 +345,12 @@ class Job(CoreSysAttributes):
|
|||||||
await async_capture_exception(err)
|
await async_capture_exception(err)
|
||||||
raise JobException() from err
|
raise JobException() from err
|
||||||
finally:
|
finally:
|
||||||
self._release_concurrency_control(job_group)
|
self._release_exception_limits()
|
||||||
|
if job_group and self.limit in (
|
||||||
|
JobExecutionLimit.GROUP_ONCE,
|
||||||
|
JobExecutionLimit.GROUP_WAIT,
|
||||||
|
):
|
||||||
|
job_group.release()
|
||||||
|
|
||||||
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
|
# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
|
||||||
finally:
|
finally:
|
||||||
@ -500,75 +492,31 @@ class Job(CoreSysAttributes):
|
|||||||
f"'{method_name}' blocked from execution, mounting not supported on system"
|
f"'{method_name}' blocked from execution, mounting not supported on system"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _release_concurrency_control(self, job_group: JobGroup | None) -> None:
|
async def _acquire_exection_limit(self) -> None:
|
||||||
"""Release concurrency control locks."""
|
"""Process exection limits."""
|
||||||
if self.concurrency == JobConcurrency.REJECT:
|
if self.limit not in (
|
||||||
if self.lock.locked():
|
JobExecutionLimit.SINGLE_WAIT,
|
||||||
self.lock.release()
|
JobExecutionLimit.ONCE,
|
||||||
elif self.concurrency == JobConcurrency.QUEUE:
|
JobExecutionLimit.THROTTLE_WAIT,
|
||||||
if self.lock.locked():
|
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
||||||
self.lock.release()
|
|
||||||
elif self.concurrency in (
|
|
||||||
JobConcurrency.GROUP_REJECT,
|
|
||||||
JobConcurrency.GROUP_QUEUE,
|
|
||||||
):
|
):
|
||||||
if job_group and job_group.has_lock:
|
return
|
||||||
job_group.release()
|
|
||||||
|
|
||||||
async def _handle_concurrency_control(
|
if self.limit == JobExecutionLimit.ONCE and self.lock.locked():
|
||||||
self, job_group: JobGroup | None, job: SupervisorJob
|
on_condition = (
|
||||||
) -> None:
|
JobException if self.on_condition is None else self.on_condition
|
||||||
"""Handle concurrency control limits."""
|
)
|
||||||
if self.concurrency == JobConcurrency.REJECT:
|
raise on_condition("Another job is running")
|
||||||
if self.lock.locked():
|
|
||||||
on_condition = (
|
|
||||||
JobException if self.on_condition is None else self.on_condition
|
|
||||||
)
|
|
||||||
raise on_condition("Another job is running")
|
|
||||||
await self.lock.acquire()
|
|
||||||
elif self.concurrency == JobConcurrency.QUEUE:
|
|
||||||
await self.lock.acquire()
|
|
||||||
elif self.concurrency == JobConcurrency.GROUP_REJECT:
|
|
||||||
try:
|
|
||||||
await cast(JobGroup, job_group).acquire(job, wait=False)
|
|
||||||
except JobGroupExecutionLimitExceeded as err:
|
|
||||||
if self.on_condition:
|
|
||||||
raise self.on_condition(str(err)) from err
|
|
||||||
raise err
|
|
||||||
elif self.concurrency == JobConcurrency.GROUP_QUEUE:
|
|
||||||
try:
|
|
||||||
await cast(JobGroup, job_group).acquire(job, wait=True)
|
|
||||||
except JobGroupExecutionLimitExceeded as err:
|
|
||||||
if self.on_condition:
|
|
||||||
raise self.on_condition(str(err)) from err
|
|
||||||
raise err
|
|
||||||
|
|
||||||
async def _handle_throttling(self, group_name: str | None) -> bool:
|
await self.lock.acquire()
|
||||||
"""Handle throttling limits. Returns True if job should continue, False if throttled."""
|
|
||||||
if self.throttle in (JobThrottle.THROTTLE, JobThrottle.GROUP_THROTTLE):
|
|
||||||
time_since_last_call = datetime.now() - self.last_call(group_name)
|
|
||||||
throttle_period = self.throttle_period(group_name)
|
|
||||||
if time_since_last_call < throttle_period:
|
|
||||||
# Always return False when throttled (skip execution)
|
|
||||||
return False
|
|
||||||
elif self.throttle in (JobThrottle.RATE_LIMIT, JobThrottle.GROUP_RATE_LIMIT):
|
|
||||||
# Only reprocess array when necessary (at limit)
|
|
||||||
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
|
|
||||||
self.set_rate_limited_calls(
|
|
||||||
[
|
|
||||||
call
|
|
||||||
for call in self.rate_limited_calls(group_name)
|
|
||||||
if call > datetime.now() - self.throttle_period(group_name)
|
|
||||||
],
|
|
||||||
group_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
|
def _release_exception_limits(self) -> None:
|
||||||
on_condition = (
|
"""Release possible exception limits."""
|
||||||
JobException if self.on_condition is None else self.on_condition
|
if self.limit not in (
|
||||||
)
|
JobExecutionLimit.SINGLE_WAIT,
|
||||||
raise on_condition(
|
JobExecutionLimit.ONCE,
|
||||||
f"Rate limit exceeded, more than {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
|
JobExecutionLimit.THROTTLE_WAIT,
|
||||||
)
|
JobExecutionLimit.GROUP_THROTTLE_WAIT,
|
||||||
|
):
|
||||||
return True
|
return
|
||||||
|
self.lock.release()
|
||||||
|
@ -272,7 +272,6 @@ class OSManager(CoreSysAttributes):
|
|||||||
name="os_manager_update",
|
name="os_manager_update",
|
||||||
conditions=[
|
conditions=[
|
||||||
JobCondition.HAOS,
|
JobCondition.HAOS,
|
||||||
JobCondition.HEALTHY,
|
|
||||||
JobCondition.INTERNET_SYSTEM,
|
JobCondition.INTERNET_SYSTEM,
|
||||||
JobCondition.RUNNING,
|
JobCondition.RUNNING,
|
||||||
JobCondition.SUPERVISOR_UPDATED,
|
JobCondition.SUPERVISOR_UPDATED,
|
||||||
|
@ -12,7 +12,6 @@ from sentry_sdk.integrations.dedupe import DedupeIntegration
|
|||||||
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
|
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
|
||||||
from sentry_sdk.integrations.logging import LoggingIntegration
|
from sentry_sdk.integrations.logging import LoggingIntegration
|
||||||
from sentry_sdk.integrations.threading import ThreadingIntegration
|
from sentry_sdk.integrations.threading import ThreadingIntegration
|
||||||
from sentry_sdk.scrubber import DEFAULT_DENYLIST, EventScrubber
|
|
||||||
|
|
||||||
from ..const import SUPERVISOR_VERSION
|
from ..const import SUPERVISOR_VERSION
|
||||||
from ..coresys import CoreSys
|
from ..coresys import CoreSys
|
||||||
@ -27,7 +26,6 @@ def init_sentry(coresys: CoreSys) -> None:
|
|||||||
"""Initialize sentry client."""
|
"""Initialize sentry client."""
|
||||||
if not sentry_sdk.is_initialized():
|
if not sentry_sdk.is_initialized():
|
||||||
_LOGGER.info("Initializing Supervisor Sentry")
|
_LOGGER.info("Initializing Supervisor Sentry")
|
||||||
denylist = DEFAULT_DENYLIST + ["psk", "ssid"]
|
|
||||||
# Don't use AsyncioIntegration(). We commonly handle task exceptions
|
# Don't use AsyncioIntegration(). We commonly handle task exceptions
|
||||||
# outside of tasks. This would cause exception we gracefully handle to
|
# outside of tasks. This would cause exception we gracefully handle to
|
||||||
# be captured by sentry.
|
# be captured by sentry.
|
||||||
@ -36,7 +34,6 @@ def init_sentry(coresys: CoreSys) -> None:
|
|||||||
before_send=partial(filter_data, coresys),
|
before_send=partial(filter_data, coresys),
|
||||||
auto_enabling_integrations=False,
|
auto_enabling_integrations=False,
|
||||||
default_integrations=False,
|
default_integrations=False,
|
||||||
event_scrubber=EventScrubber(denylist=denylist),
|
|
||||||
integrations=[
|
integrations=[
|
||||||
AioHttpIntegration(
|
AioHttpIntegration(
|
||||||
failed_request_status_codes=frozenset(range(500, 600))
|
failed_request_status_codes=frozenset(range(500, 600))
|
||||||
|
@ -182,7 +182,7 @@ SCHEMA_DOCKER_CONFIG = vol.Schema(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
vol.Optional(ATTR_ENABLE_IPV6, default=None): vol.Maybe(vol.Boolean()),
|
vol.Optional(ATTR_ENABLE_IPV6): vol.Boolean(),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ async def test_api_docker_info(api_client: TestClient):
|
|||||||
|
|
||||||
async def test_api_network_enable_ipv6(coresys: CoreSys, api_client: TestClient):
|
async def test_api_network_enable_ipv6(coresys: CoreSys, api_client: TestClient):
|
||||||
"""Test setting docker network for enabled IPv6."""
|
"""Test setting docker network for enabled IPv6."""
|
||||||
assert coresys.docker.config.enable_ipv6 is None
|
assert coresys.docker.config.enable_ipv6 is False
|
||||||
|
|
||||||
resp = await api_client.post("/docker/options", json={"enable_ipv6": True})
|
resp = await api_client.post("/docker/options", json={"enable_ipv6": True})
|
||||||
assert resp.status == 200
|
assert resp.status == 200
|
||||||
|
@ -111,39 +111,3 @@ async def test_network_recreation(
|
|||||||
network_params[ATTR_ENABLE_IPV6] = new_enable_ipv6
|
network_params[ATTR_ENABLE_IPV6] = new_enable_ipv6
|
||||||
|
|
||||||
mock_create.assert_called_with(**network_params)
|
mock_create.assert_called_with(**network_params)
|
||||||
|
|
||||||
|
|
||||||
async def test_network_default_ipv6_for_new_installations():
|
|
||||||
"""Test that IPv6 is enabled by default when no user setting is provided (None)."""
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"supervisor.docker.network.DockerNetwork.docker",
|
|
||||||
new_callable=PropertyMock,
|
|
||||||
return_value=MagicMock(),
|
|
||||||
create=True,
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"supervisor.docker.network.DockerNetwork.docker.networks",
|
|
||||||
new_callable=PropertyMock,
|
|
||||||
return_value=MagicMock(),
|
|
||||||
create=True,
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"supervisor.docker.network.DockerNetwork.docker.networks.get",
|
|
||||||
side_effect=docker.errors.NotFound("Network not found"),
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"supervisor.docker.network.DockerNetwork.docker.networks.create",
|
|
||||||
return_value=MockNetwork(False, None, True),
|
|
||||||
) as mock_create,
|
|
||||||
):
|
|
||||||
# Pass None as enable_ipv6 to simulate no user setting
|
|
||||||
network = (await DockerNetwork(MagicMock()).post_init(None)).network
|
|
||||||
|
|
||||||
assert network is not None
|
|
||||||
assert network.attrs.get(DOCKER_ENABLEIPV6) is True
|
|
||||||
|
|
||||||
# Verify that create was called with IPv6 enabled by default
|
|
||||||
expected_params = DOCKER_NETWORK_PARAMS.copy()
|
|
||||||
expected_params[ATTR_ENABLE_IPV6] = True
|
|
||||||
mock_create.assert_called_with(**expected_params)
|
|
||||||
|
@ -20,7 +20,7 @@ from supervisor.exceptions import (
|
|||||||
from supervisor.host.const import HostFeature
|
from supervisor.host.const import HostFeature
|
||||||
from supervisor.host.manager import HostManager
|
from supervisor.host.manager import HostManager
|
||||||
from supervisor.jobs import JobSchedulerOptions, SupervisorJob
|
from supervisor.jobs import JobSchedulerOptions, SupervisorJob
|
||||||
from supervisor.jobs.const import JobConcurrency, JobExecutionLimit, JobThrottle
|
from supervisor.jobs.const import JobExecutionLimit
|
||||||
from supervisor.jobs.decorator import Job, JobCondition
|
from supervisor.jobs.decorator import Job, JobCondition
|
||||||
from supervisor.jobs.job_group import JobGroup
|
from supervisor.jobs.job_group import JobGroup
|
||||||
from supervisor.os.manager import OSManager
|
from supervisor.os.manager import OSManager
|
||||||
@ -1212,93 +1212,3 @@ async def test_job_scheduled_at(coresys: CoreSys):
|
|||||||
assert job.name == "test_job_scheduled_at_job_task"
|
assert job.name == "test_job_scheduled_at_job_task"
|
||||||
assert job.stage == "work"
|
assert job.stage == "work"
|
||||||
assert job.parent_id is None
|
assert job.parent_id is None
|
||||||
|
|
||||||
|
|
||||||
async def test_concurency_reject_and_throttle(coresys: CoreSys):
|
|
||||||
"""Test the concurrency rejct and throttle job execution limit."""
|
|
||||||
|
|
||||||
class TestClass:
|
|
||||||
"""Test class."""
|
|
||||||
|
|
||||||
def __init__(self, coresys: CoreSys):
|
|
||||||
"""Initialize the test class."""
|
|
||||||
self.coresys = coresys
|
|
||||||
self.run = asyncio.Lock()
|
|
||||||
self.call = 0
|
|
||||||
|
|
||||||
@Job(
|
|
||||||
name="test_concurency_reject_and_throttle_execute",
|
|
||||||
concurrency=JobConcurrency.REJECT,
|
|
||||||
throttle=JobThrottle.THROTTLE,
|
|
||||||
throttle_period=timedelta(hours=1),
|
|
||||||
)
|
|
||||||
async def execute(self, sleep: float):
|
|
||||||
"""Execute the class method."""
|
|
||||||
assert not self.run.locked()
|
|
||||||
async with self.run:
|
|
||||||
await asyncio.sleep(sleep)
|
|
||||||
self.call += 1
|
|
||||||
|
|
||||||
test = TestClass(coresys)
|
|
||||||
|
|
||||||
results = await asyncio.gather(
|
|
||||||
*[test.execute(0.1), test.execute(0.1), test.execute(0.1)],
|
|
||||||
return_exceptions=True,
|
|
||||||
)
|
|
||||||
assert results[0] is None
|
|
||||||
assert isinstance(results[1], JobException)
|
|
||||||
assert isinstance(results[2], JobException)
|
|
||||||
assert test.call == 1
|
|
||||||
|
|
||||||
await asyncio.gather(*[test.execute(0.1)])
|
|
||||||
assert test.call == 1
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("error", [None, PluginJobError])
|
|
||||||
async def test_concurency_reject_and_rate_limit(
|
|
||||||
coresys: CoreSys, error: JobException | None
|
|
||||||
):
|
|
||||||
"""Test the concurrency rejct and rate limit job execution limit."""
|
|
||||||
|
|
||||||
class TestClass:
|
|
||||||
"""Test class."""
|
|
||||||
|
|
||||||
def __init__(self, coresys: CoreSys):
|
|
||||||
"""Initialize the test class."""
|
|
||||||
self.coresys = coresys
|
|
||||||
self.run = asyncio.Lock()
|
|
||||||
self.call = 0
|
|
||||||
|
|
||||||
@Job(
|
|
||||||
name=f"test_concurency_reject_and_rate_limit_execute_{uuid4().hex}",
|
|
||||||
concurrency=JobConcurrency.REJECT,
|
|
||||||
throttle=JobThrottle.RATE_LIMIT,
|
|
||||||
throttle_period=timedelta(hours=1),
|
|
||||||
throttle_max_calls=1,
|
|
||||||
on_condition=error,
|
|
||||||
)
|
|
||||||
async def execute(self, sleep: float = 0):
|
|
||||||
"""Execute the class method."""
|
|
||||||
async with self.run:
|
|
||||||
await asyncio.sleep(sleep)
|
|
||||||
self.call += 1
|
|
||||||
|
|
||||||
test = TestClass(coresys)
|
|
||||||
|
|
||||||
results = await asyncio.gather(
|
|
||||||
*[test.execute(0.1), test.execute(), test.execute()], return_exceptions=True
|
|
||||||
)
|
|
||||||
assert results[0] is None
|
|
||||||
assert isinstance(results[1], JobException)
|
|
||||||
assert isinstance(results[2], JobException)
|
|
||||||
assert test.call == 1
|
|
||||||
|
|
||||||
with pytest.raises(JobException if error is None else error):
|
|
||||||
await test.execute()
|
|
||||||
|
|
||||||
assert test.call == 1
|
|
||||||
|
|
||||||
with time_machine.travel(utcnow() + timedelta(hours=1)):
|
|
||||||
await test.execute()
|
|
||||||
|
|
||||||
assert test.call == 2
|
|
||||||
|
@ -9,7 +9,6 @@ import pytest
|
|||||||
from supervisor.const import CoreState
|
from supervisor.const import CoreState
|
||||||
from supervisor.coresys import CoreSys
|
from supervisor.coresys import CoreSys
|
||||||
from supervisor.exceptions import HassOSJobError
|
from supervisor.exceptions import HassOSJobError
|
||||||
from supervisor.resolution.const import UnhealthyReason
|
|
||||||
|
|
||||||
from tests.common import MockResponse
|
from tests.common import MockResponse
|
||||||
from tests.dbus_service_mocks.base import DBusServiceMock
|
from tests.dbus_service_mocks.base import DBusServiceMock
|
||||||
@ -86,21 +85,6 @@ async def test_update_fails_if_out_of_date(
|
|||||||
await coresys.os.update()
|
await coresys.os.update()
|
||||||
|
|
||||||
|
|
||||||
async def test_update_fails_if_unhealthy(
|
|
||||||
coresys: CoreSys,
|
|
||||||
) -> None:
|
|
||||||
"""Test update of OS fails if Supervisor is unhealthy."""
|
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
|
||||||
coresys.resolution.add_unhealthy_reason(UnhealthyReason.DUPLICATE_OS_INSTALLATION)
|
|
||||||
with (
|
|
||||||
patch.object(
|
|
||||||
type(coresys.os), "available", new=PropertyMock(return_value=True)
|
|
||||||
),
|
|
||||||
pytest.raises(HassOSJobError),
|
|
||||||
):
|
|
||||||
await coresys.os.update()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_board_name_supervised(coresys: CoreSys) -> None:
|
async def test_board_name_supervised(coresys: CoreSys) -> None:
|
||||||
"""Test board name is supervised when not on haos."""
|
"""Test board name is supervised when not on haos."""
|
||||||
with patch("supervisor.os.manager.CPE.get_product", return_value=["not-hassos"]):
|
with patch("supervisor.os.manager.CPE.get_product", return_value=["not-hassos"]):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user