mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-27 19:02:43 +00:00
Compare commits
4 Commits
refactor/v
...
refactor-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3644280ab7 | ||
|
|
e5f1022f9d | ||
|
|
b2dbdf711d | ||
|
|
9966a25f6c |
@@ -139,7 +139,7 @@ class Core(CoreSysAttributes):
|
||||
await self.coresys.init_websession()
|
||||
|
||||
# Check internet on startup
|
||||
await self.sys_supervisor.check_connectivity()
|
||||
await self.sys_supervisor.check_and_update_connectivity(force=True)
|
||||
|
||||
# Order can be important!
|
||||
setup_loads: list[Awaitable[None]] = [
|
||||
@@ -460,7 +460,10 @@ class Core(CoreSysAttributes):
|
||||
)
|
||||
|
||||
await self.sys_host.control.set_datetime(data.dt_utc)
|
||||
await self.sys_supervisor.check_connectivity()
|
||||
# System time was just corrected. TLS certificates that previously
|
||||
# appeared expired/not-yet-valid may now verify, so a connectivity
|
||||
# probe that just failed for that reason can succeed now.
|
||||
await self.sys_supervisor.check_and_update_connectivity(force=True)
|
||||
|
||||
async def repair(self) -> None:
|
||||
"""Repair system integrity."""
|
||||
|
||||
@@ -69,8 +69,11 @@ class NetworkManager(CoreSysAttributes):
|
||||
self.sys_homeassistant.websocket.supervisor_update_event(
|
||||
"network", {ATTR_HOST_INTERNET: state}
|
||||
)
|
||||
if state and not self.sys_supervisor.connectivity:
|
||||
self.sys_create_task(self.sys_supervisor.check_connectivity())
|
||||
if state:
|
||||
# Host just regained connectivity; kick a fresh Supervisor probe.
|
||||
# Coalescing in request_connectivity_check means redundant calls
|
||||
# are safe, so no "only if supervisor is False" guard is needed.
|
||||
self.sys_supervisor.request_connectivity_check(force=True)
|
||||
|
||||
@property
|
||||
def interfaces(self) -> list[Interface]:
|
||||
|
||||
@@ -372,7 +372,10 @@ class Job(CoreSysAttributes):
|
||||
)
|
||||
|
||||
if JobCondition.INTERNET_SYSTEM in used_conditions:
|
||||
await coresys.sys_supervisor.check_connectivity()
|
||||
# Precondition wants a recent result, not necessarily a fresh one;
|
||||
# the min-interval short-circuit inside check_and_update_connectivity
|
||||
# reuses the cached state when it's still within window.
|
||||
await coresys.sys_supervisor.check_and_update_connectivity()
|
||||
if not coresys.sys_supervisor.connectivity:
|
||||
raise JobConditionException(
|
||||
f"'{method_name}' blocked from execution, no supervisor internet connection"
|
||||
|
||||
@@ -121,7 +121,7 @@ class OSManager(CoreSysAttributes):
|
||||
and self.latest_version is not None
|
||||
and self.version < self.latest_version
|
||||
)
|
||||
except (AwesomeVersionException, TypeError):
|
||||
except AwesomeVersionException, TypeError:
|
||||
return False
|
||||
|
||||
@property
|
||||
@@ -205,7 +205,9 @@ class OSManager(CoreSysAttributes):
|
||||
_LOGGER.info("Completed download of OTA update file %s", raucb)
|
||||
|
||||
except (aiohttp.ClientError, TimeoutError) as err:
|
||||
self.sys_supervisor.connectivity = False
|
||||
# Nudge a fresh connectivity check; the probe is authoritative,
|
||||
# this error path only hints that something may be wrong.
|
||||
self.sys_supervisor.request_connectivity_check()
|
||||
raise HassOSUpdateError(
|
||||
f"Can't fetch OTA update from {url}: {err!s}", _LOGGER.error
|
||||
) from err
|
||||
|
||||
@@ -122,7 +122,10 @@ class PluginDns(PluginBase):
|
||||
await asyncio.sleep(5)
|
||||
|
||||
_LOGGER.debug("CoreDNS started, checking connectivity")
|
||||
await self.sys_supervisor.check_connectivity()
|
||||
# DNS resolution has just changed; force a fresh probe so a check
|
||||
# in flight while DNS was restarting doesn't leave us with a
|
||||
# stale failure cached.
|
||||
self.sys_supervisor.request_connectivity_check(force=True)
|
||||
|
||||
async def _restart_dns_after_locals_change(self) -> None:
|
||||
"""Restart DNS after a debounced delay for local changes."""
|
||||
|
||||
@@ -45,7 +45,10 @@ class CheckAppPwned(CheckBase):
|
||||
try:
|
||||
await self.sys_security.verify_secret(secret)
|
||||
except PwnedConnectivityError:
|
||||
self.sys_supervisor.connectivity = False
|
||||
# Nudge a fresh connectivity check; the probe is
|
||||
# authoritative, this error path only hints that
|
||||
# something may be wrong.
|
||||
self.sys_supervisor.request_connectivity_check()
|
||||
return
|
||||
except PwnedSecret:
|
||||
# Check possible suggestion
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
"""Home Assistant control object."""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Awaitable
|
||||
from contextlib import suppress
|
||||
from datetime import timedelta
|
||||
from ipaddress import IPv4Address
|
||||
import logging
|
||||
from pathlib import Path
|
||||
@@ -30,20 +30,18 @@ from .exceptions import (
|
||||
SupervisorUpdateError,
|
||||
)
|
||||
from .jobs import ChildJobSyncFilter
|
||||
from .jobs.const import JobCondition, JobThrottle
|
||||
from .jobs.const import JobCondition
|
||||
from .jobs.decorator import Job
|
||||
from .resolution.const import ContextType, IssueType
|
||||
from .utils.sentry import async_capture_exception
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _check_connectivity_throttle_period(coresys: CoreSys, *_) -> timedelta:
|
||||
"""Throttle period for connectivity check."""
|
||||
if coresys.supervisor.connectivity:
|
||||
return timedelta(minutes=10)
|
||||
|
||||
return timedelta(seconds=5)
|
||||
# Minimum time between two actual connectivity probes. Callers within this
|
||||
# window get the cached result instead of hitting checkonline.home-assistant.io
|
||||
# again. The interval shrinks while we're offline so recovery is quick.
|
||||
_CONNECTIVITY_MIN_INTERVAL_CONNECTED_SEC: float = 600.0
|
||||
_CONNECTIVITY_MIN_INTERVAL_DISCONNECTED_SEC: float = 5.0
|
||||
|
||||
|
||||
class Supervisor(CoreSysAttributes):
|
||||
@@ -54,6 +52,11 @@ class Supervisor(CoreSysAttributes):
|
||||
self.coresys: CoreSys = coresys
|
||||
self.instance: DockerSupervisor = DockerSupervisor(coresys)
|
||||
self._connectivity: bool = True
|
||||
self._connectivity_check: asyncio.Task[None] | None = None
|
||||
self._connectivity_rerun_forced: bool = False
|
||||
# -inf means "never probed" so the first non-forced call always runs;
|
||||
# 0.0 would wrongly short-circuit while loop.time() < min_interval.
|
||||
self._connectivity_last_check: float = float("-inf")
|
||||
|
||||
async def load(self) -> None:
|
||||
"""Prepare Supervisor object."""
|
||||
@@ -70,11 +73,19 @@ class Supervisor(CoreSysAttributes):
|
||||
"""Return true if we are connected to the internet."""
|
||||
return self._connectivity
|
||||
|
||||
@connectivity.setter
|
||||
def connectivity(self, state: bool) -> None:
|
||||
"""Set supervisor connectivity state."""
|
||||
def _update_connectivity(self, state: bool) -> None:
|
||||
"""Update the cached connectivity state and notify listeners if it changed.
|
||||
|
||||
Only :meth:`check_and_update_connectivity` (or the task it spawns)
|
||||
should ever call this. Bypass paths that set the state from outside
|
||||
an actual probe make it impossible to reason about what the flag
|
||||
means.
|
||||
"""
|
||||
if self._connectivity == state:
|
||||
return
|
||||
_LOGGER.debug(
|
||||
"Supervisor connectivity changed: %s -> %s", self._connectivity, state
|
||||
)
|
||||
self._connectivity = state
|
||||
self.sys_bus.fire_event(BusEvent.SUPERVISOR_CONNECTIVITY_CHANGE, state)
|
||||
self.sys_homeassistant.websocket.supervisor_update_event(
|
||||
@@ -94,7 +105,7 @@ class Supervisor(CoreSysAttributes):
|
||||
|
||||
try:
|
||||
return self.version < self.latest_version
|
||||
except (AwesomeVersionException, TypeError):
|
||||
except AwesomeVersionException, TypeError:
|
||||
return False
|
||||
|
||||
@property
|
||||
@@ -139,7 +150,9 @@ class Supervisor(CoreSysAttributes):
|
||||
data = await request.text()
|
||||
|
||||
except (aiohttp.ClientError, TimeoutError) as err:
|
||||
self.sys_supervisor.connectivity = False
|
||||
# Nudge a fresh connectivity check; the probe is authoritative,
|
||||
# this error path only hints that something may be wrong.
|
||||
self.request_connectivity_check()
|
||||
raise SupervisorAppArmorError(
|
||||
f"Can't fetch AppArmor profile {url}: {str(err) or 'Timeout'}",
|
||||
_LOGGER.error,
|
||||
@@ -269,13 +282,92 @@ class Supervisor(CoreSysAttributes):
|
||||
except DockerError:
|
||||
_LOGGER.error("Repair of Supervisor failed")
|
||||
|
||||
@Job(
|
||||
name="supervisor_check_connectivity",
|
||||
throttle_period=_check_connectivity_throttle_period,
|
||||
throttle=JobThrottle.THROTTLE,
|
||||
)
|
||||
async def check_connectivity(self) -> None:
|
||||
"""Check the Internet connectivity from Supervisor's point of view."""
|
||||
def request_connectivity_check(self, *, force: bool = False) -> None:
|
||||
"""Schedule a connectivity check without awaiting the result.
|
||||
|
||||
Intended for signal handlers (D-Bus, plugin callbacks) that must
|
||||
return quickly. Concurrent calls coalesce onto a single in-flight
|
||||
check. ``force`` is forwarded to :meth:`check_and_update_connectivity`
|
||||
for signals that carry fresh state-change information.
|
||||
"""
|
||||
_LOGGER.debug("Connectivity check requested (force=%s)", force)
|
||||
self.sys_create_task(self.check_and_update_connectivity(force=force))
|
||||
|
||||
async def check_and_update_connectivity(self, *, force: bool = False) -> None:
|
||||
"""Probe Supervisor internet connectivity and update cached state.
|
||||
|
||||
Concurrent callers coalesce onto a single HTTP probe: callers that
|
||||
arrive while one is in flight await its completion rather than
|
||||
starting a second.
|
||||
|
||||
Without ``force``, a probe that ran within the minimum interval
|
||||
short-circuits and the cached state is returned. With ``force``, the
|
||||
interval is bypassed and (if a probe is already in flight) a fresh
|
||||
one is guaranteed to run once the current probe completes.
|
||||
"""
|
||||
if self._connectivity_check is not None:
|
||||
# Probe already in flight - coalesce with it. If the caller
|
||||
# needs a fresh result, mark a trailing rerun so the task that
|
||||
# owns the in-flight probe runs once more after it completes.
|
||||
# Shield so a follower being cancelled cannot bring down the
|
||||
# probe that other callers are also waiting on.
|
||||
if force:
|
||||
_LOGGER.debug(
|
||||
"Connectivity probe in flight; queued forced rerun on completion"
|
||||
)
|
||||
self._connectivity_rerun_forced = True
|
||||
else:
|
||||
_LOGGER.debug("Connectivity probe in flight; awaiting its result")
|
||||
await asyncio.shield(self._connectivity_check)
|
||||
return
|
||||
|
||||
if not force:
|
||||
min_interval = (
|
||||
_CONNECTIVITY_MIN_INTERVAL_CONNECTED_SEC
|
||||
if self._connectivity
|
||||
else _CONNECTIVITY_MIN_INTERVAL_DISCONNECTED_SEC
|
||||
)
|
||||
elapsed = self.sys_loop.time() - self._connectivity_last_check
|
||||
if elapsed < min_interval:
|
||||
_LOGGER.debug(
|
||||
"Connectivity check within min-interval (%.1fs of %.1fs); "
|
||||
"using cached state %s",
|
||||
elapsed,
|
||||
min_interval,
|
||||
self._connectivity,
|
||||
)
|
||||
return
|
||||
|
||||
# Awaiting a Task does not propagate cancellation INTO the task, so
|
||||
# the owner explicitly cancels the probe on its own cancellation.
|
||||
# That keeps the probe from being orphaned (with the next caller
|
||||
# starting a second probe alongside the unfinished first).
|
||||
probe = self.sys_create_task(self._do_connectivity_check())
|
||||
self._connectivity_check = probe
|
||||
try:
|
||||
await probe
|
||||
except asyncio.CancelledError:
|
||||
_LOGGER.debug("Connectivity probe owner cancelled; cancelling probe")
|
||||
probe.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
await probe
|
||||
raise
|
||||
finally:
|
||||
if self._connectivity_check is probe:
|
||||
self._connectivity_check = None
|
||||
|
||||
# Only count as a recent probe on actual completion. On cancellation
|
||||
# the timestamp must stay so the next caller doesn't see a "fresh"
|
||||
# cached result that never actually ran.
|
||||
self._connectivity_last_check = self.sys_loop.time()
|
||||
|
||||
if self._connectivity_rerun_forced:
|
||||
_LOGGER.debug("Running queued forced rerun after probe completion")
|
||||
self._connectivity_rerun_forced = False
|
||||
await self.check_and_update_connectivity(force=True)
|
||||
|
||||
async def _do_connectivity_check(self) -> None:
|
||||
"""Run a single HTTP probe and update cached connectivity state."""
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
try:
|
||||
await self.sys_websession.head(
|
||||
@@ -283,7 +375,7 @@ class Supervisor(CoreSysAttributes):
|
||||
)
|
||||
except (ClientError, TimeoutError) as err:
|
||||
_LOGGER.debug("Supervisor Connectivity check failed: %s", err)
|
||||
self.connectivity = False
|
||||
self._update_connectivity(False)
|
||||
else:
|
||||
_LOGGER.debug("Supervisor Connectivity check succeeded")
|
||||
self.connectivity = True
|
||||
self._update_connectivity(True)
|
||||
|
||||
@@ -272,7 +272,9 @@ class Updater(FileConfiguration, CoreSysAttributes):
|
||||
data = await request.read()
|
||||
|
||||
except (aiohttp.ClientError, TimeoutError) as err:
|
||||
self.sys_supervisor.connectivity = False
|
||||
# Nudge a fresh connectivity check; the probe is authoritative,
|
||||
# this error path only hints that something may be wrong.
|
||||
self.sys_supervisor.request_connectivity_check()
|
||||
raise UpdaterError(
|
||||
f"Can't fetch versions from {url}: {str(err) or 'Timeout'}",
|
||||
_LOGGER.warning,
|
||||
|
||||
@@ -708,7 +708,7 @@ async def api_client(
|
||||
def supervisor_internet(coresys: CoreSys) -> Generator[AsyncMock]:
|
||||
"""Fixture which simluate Supervsior internet connection."""
|
||||
connectivity_check = AsyncMock(return_value=True)
|
||||
coresys.supervisor.check_connectivity = connectivity_check
|
||||
coresys.supervisor.check_and_update_connectivity = connectivity_check
|
||||
yield connectivity_check
|
||||
|
||||
|
||||
|
||||
@@ -26,11 +26,8 @@ from supervisor.jobs.job_group import JobGroup
|
||||
from supervisor.os.manager import OSManager
|
||||
from supervisor.plugins.audio import PluginAudio
|
||||
from supervisor.resolution.const import UnhealthyReason, UnsupportedReason
|
||||
from supervisor.supervisor import Supervisor
|
||||
from supervisor.utils.dt import utcnow
|
||||
|
||||
from tests.common import reset_last_call
|
||||
|
||||
|
||||
async def test_healthy(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
|
||||
"""Test the healty decorator."""
|
||||
@@ -76,7 +73,6 @@ async def test_internet(
|
||||
):
|
||||
"""Test the internet decorator."""
|
||||
await coresys.core.set_state(CoreState.RUNNING)
|
||||
reset_last_call(Supervisor.check_connectivity)
|
||||
|
||||
class TestClass:
|
||||
"""Test class."""
|
||||
@@ -105,7 +101,9 @@ async def test_internet(
|
||||
|
||||
mock_websession = AsyncMock()
|
||||
mock_websession.head.side_effect = head_side_effect
|
||||
coresys.supervisor.connectivity = None
|
||||
# Reset cached state so the precondition path actually runs a probe.
|
||||
coresys.supervisor._connectivity = True # pylint: disable=protected-access
|
||||
coresys.supervisor._connectivity_last_check = float("-inf") # pylint: disable=protected-access
|
||||
with (
|
||||
patch("supervisor.utils.dbus.DBus.call_dbus", return_value=connectivity),
|
||||
patch.object(
|
||||
|
||||
@@ -417,18 +417,18 @@ async def test_dns_restart_triggers_connectivity_check(coresys: CoreSys):
|
||||
# Verify listener was registered (connectivity check listener should be stored)
|
||||
assert dns_plugin._connectivity_check_listener is not None
|
||||
|
||||
# Create event to signal when connectivity check is called
|
||||
# Create event to signal when connectivity check is requested
|
||||
connectivity_check_event = asyncio.Event()
|
||||
|
||||
# Mock connectivity check to set the event when called
|
||||
async def mock_check_connectivity():
|
||||
# Mock the fire-and-forget request to set the event when called
|
||||
def mock_request_connectivity_check(*, force: bool = False):
|
||||
connectivity_check_event.set()
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
coresys.supervisor,
|
||||
"check_connectivity",
|
||||
side_effect=mock_check_connectivity,
|
||||
"request_connectivity_check",
|
||||
side_effect=mock_request_connectivity_check,
|
||||
),
|
||||
patch("supervisor.plugins.dns.asyncio.sleep") as mock_sleep,
|
||||
):
|
||||
|
||||
@@ -43,7 +43,7 @@ async def test_fixup(coresys: CoreSys, supervisor_internet):
|
||||
async def test_store_execute_reload_runs_on_connectivity_true(coresys: CoreSys):
|
||||
"""Test fixup runs when connectivity goes from false to true."""
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
coresys.supervisor.connectivity = False
|
||||
coresys.supervisor._update_connectivity(False) # pylint: disable=protected-access
|
||||
await asyncio.sleep(0)
|
||||
|
||||
mock_repository = AsyncMock()
|
||||
@@ -59,7 +59,7 @@ async def test_store_execute_reload_runs_on_connectivity_true(coresys: CoreSys):
|
||||
|
||||
with patch.object(coresys.store, "reload") as mock_reload:
|
||||
# Fire event with connectivity True
|
||||
coresys.supervisor.connectivity = True
|
||||
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
mock_repository.load.assert_called_once()
|
||||
@@ -72,7 +72,7 @@ async def test_store_execute_reload_does_not_run_on_connectivity_false(
|
||||
):
|
||||
"""Test fixup does not run when connectivity goes from true to false."""
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
coresys.supervisor.connectivity = True
|
||||
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
|
||||
await asyncio.sleep(0)
|
||||
|
||||
mock_repository = AsyncMock()
|
||||
@@ -87,7 +87,7 @@ async def test_store_execute_reload_does_not_run_on_connectivity_false(
|
||||
)
|
||||
|
||||
# Fire event with connectivity True
|
||||
coresys.supervisor.connectivity = False
|
||||
coresys.supervisor._update_connectivity(False) # pylint: disable=protected-access
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
mock_repository.load.assert_not_called()
|
||||
@@ -99,7 +99,7 @@ async def test_store_execute_reload_dismiss_suggestion_removes_listener(
|
||||
):
|
||||
"""Test fixup does not run on event if suggestion has been dismissed."""
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
coresys.supervisor.connectivity = True
|
||||
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
|
||||
await asyncio.sleep(0)
|
||||
|
||||
mock_repository = AsyncMock()
|
||||
|
||||
@@ -101,7 +101,9 @@ async def test_adjust_system_datetime_if_time_behind(
|
||||
InfoCenter, "dt_synchronized", new=PropertyMock(return_value=False)
|
||||
),
|
||||
patch.object(InfoCenter, "use_ntp", new=PropertyMock(return_value=True)),
|
||||
patch.object(Supervisor, "check_connectivity") as mock_check_connectivity,
|
||||
patch.object(
|
||||
Supervisor, "check_and_update_connectivity"
|
||||
) as mock_check_connectivity,
|
||||
):
|
||||
# Start the time adjustment which will wait for timesyncd to stop
|
||||
task = asyncio.create_task(coresys.core._adjust_system_datetime())
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Test supervisor object."""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
import asyncio
|
||||
import errno
|
||||
from unittest.mock import AsyncMock, MagicMock, Mock, patch
|
||||
|
||||
@@ -8,9 +8,8 @@ from aiohttp import ClientTimeout
|
||||
from aiohttp.client_exceptions import ClientError
|
||||
from awesomeversion import AwesomeVersion
|
||||
import pytest
|
||||
from time_machine import travel
|
||||
|
||||
from supervisor.const import UpdateChannel
|
||||
from supervisor.const import BusEvent, UpdateChannel
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.docker.supervisor import DockerSupervisor
|
||||
from supervisor.exceptions import (
|
||||
@@ -21,57 +20,190 @@ from supervisor.exceptions import (
|
||||
from supervisor.host.apparmor import AppArmorControl
|
||||
from supervisor.resolution.const import ContextType, IssueType
|
||||
from supervisor.resolution.data import Issue
|
||||
from supervisor.supervisor import Supervisor
|
||||
|
||||
from tests.common import MockResponse, reset_last_call
|
||||
from tests.common import MockResponse
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"side_effect,connectivity", [(ClientError(), False), (None, True)]
|
||||
)
|
||||
@pytest.mark.usefixtures("no_job_throttle")
|
||||
async def test_connectivity_check(
|
||||
coresys: CoreSys,
|
||||
websession: MagicMock,
|
||||
side_effect: Exception | None,
|
||||
connectivity: bool,
|
||||
):
|
||||
"""Test connectivity check."""
|
||||
"""Test connectivity check updates state based on probe outcome."""
|
||||
assert coresys.supervisor.connectivity is True
|
||||
|
||||
websession.head = AsyncMock(side_effect=side_effect)
|
||||
await coresys.supervisor.check_connectivity()
|
||||
await coresys.supervisor.check_and_update_connectivity(force=True)
|
||||
|
||||
assert coresys.supervisor.connectivity is connectivity
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"side_effect,call_interval,throttled",
|
||||
[
|
||||
(None, timedelta(minutes=5), True),
|
||||
(None, timedelta(minutes=15), False),
|
||||
(ClientError(), timedelta(seconds=3), True),
|
||||
(ClientError(), timedelta(seconds=10), False),
|
||||
],
|
||||
)
|
||||
async def test_connectivity_check_throttling(
|
||||
coresys: CoreSys,
|
||||
websession: MagicMock,
|
||||
side_effect: Exception | None,
|
||||
call_interval: timedelta,
|
||||
throttled: bool,
|
||||
async def test_connectivity_check_min_interval_when_connected(
|
||||
coresys: CoreSys, websession: MagicMock
|
||||
):
|
||||
"""Test connectivity check throttled when checks succeed."""
|
||||
coresys.supervisor.connectivity = None
|
||||
websession.head = AsyncMock(side_effect=side_effect)
|
||||
"""Non-forced checks within the min-interval use the cached state."""
|
||||
websession.head = AsyncMock()
|
||||
|
||||
reset_last_call(Supervisor.check_connectivity)
|
||||
with travel(datetime.now(), tick=False) as traveller:
|
||||
await coresys.supervisor.check_connectivity()
|
||||
traveller.shift(call_interval)
|
||||
await coresys.supervisor.check_connectivity()
|
||||
# First call runs the probe.
|
||||
await coresys.supervisor.check_and_update_connectivity()
|
||||
assert websession.head.call_count == 1
|
||||
|
||||
assert websession.head.call_count == (1 if throttled else 2)
|
||||
# Second call within the (10 min) window should not hit the network.
|
||||
await coresys.supervisor.check_and_update_connectivity()
|
||||
assert websession.head.call_count == 1
|
||||
|
||||
|
||||
async def test_connectivity_check_force_bypasses_min_interval(
|
||||
coresys: CoreSys, websession: MagicMock
|
||||
):
|
||||
"""force=True skips the min-interval short-circuit."""
|
||||
websession.head = AsyncMock()
|
||||
|
||||
await coresys.supervisor.check_and_update_connectivity()
|
||||
assert websession.head.call_count == 1
|
||||
|
||||
await coresys.supervisor.check_and_update_connectivity(force=True)
|
||||
assert websession.head.call_count == 2
|
||||
|
||||
|
||||
async def test_connectivity_check_coalesces_concurrent_callers(
|
||||
coresys: CoreSys, websession: MagicMock
|
||||
):
|
||||
"""Concurrent callers await the same in-flight probe instead of each firing one."""
|
||||
probe_started = asyncio.Event()
|
||||
probe_release = asyncio.Event()
|
||||
|
||||
async def slow_head(*args, **kwargs):
|
||||
probe_started.set()
|
||||
await probe_release.wait()
|
||||
|
||||
websession.head = AsyncMock(side_effect=slow_head)
|
||||
|
||||
first = asyncio.create_task(
|
||||
coresys.supervisor.check_and_update_connectivity(force=True)
|
||||
)
|
||||
await probe_started.wait()
|
||||
|
||||
# Kick off a pile of additional callers while the first probe is in flight.
|
||||
concurrent = [
|
||||
asyncio.create_task(coresys.supervisor.check_and_update_connectivity())
|
||||
for _ in range(5)
|
||||
]
|
||||
# Let them all reach the in-flight await.
|
||||
await asyncio.sleep(0)
|
||||
|
||||
probe_release.set()
|
||||
await asyncio.gather(first, *concurrent)
|
||||
|
||||
assert websession.head.call_count == 1
|
||||
|
||||
|
||||
async def test_connectivity_check_force_during_in_flight_triggers_rerun(
|
||||
coresys: CoreSys, websession: MagicMock
|
||||
):
|
||||
"""A force signal arriving while a probe is in flight queues exactly one rerun."""
|
||||
probe_started = asyncio.Event()
|
||||
probe_release = asyncio.Event()
|
||||
|
||||
async def first_then_fast(*args, **kwargs):
|
||||
if websession.head.call_count == 1:
|
||||
probe_started.set()
|
||||
await probe_release.wait()
|
||||
|
||||
websession.head = AsyncMock(side_effect=first_then_fast)
|
||||
|
||||
first = asyncio.create_task(
|
||||
coresys.supervisor.check_and_update_connectivity(force=True)
|
||||
)
|
||||
await probe_started.wait()
|
||||
|
||||
# Forced call while a probe is in flight should set the rerun flag.
|
||||
forced = asyncio.create_task(
|
||||
coresys.supervisor.check_and_update_connectivity(force=True)
|
||||
)
|
||||
# Non-forced calls must NOT queue a rerun.
|
||||
cheap = asyncio.create_task(coresys.supervisor.check_and_update_connectivity())
|
||||
await asyncio.sleep(0)
|
||||
|
||||
probe_release.set()
|
||||
await asyncio.gather(first, forced, cheap)
|
||||
|
||||
assert websession.head.call_count == 2
|
||||
|
||||
|
||||
async def test_connectivity_check_owner_cancellation_cancels_probe(
|
||||
coresys: CoreSys, websession: MagicMock
|
||||
):
|
||||
"""Owner cancellation propagates to the probe and skips updating last-check."""
|
||||
probe_started = asyncio.Event()
|
||||
probe_release = asyncio.Event()
|
||||
|
||||
async def slow_head(*args, **kwargs):
|
||||
probe_started.set()
|
||||
await probe_release.wait()
|
||||
|
||||
websession.head = AsyncMock(side_effect=slow_head)
|
||||
last_check_before = coresys.supervisor._connectivity_last_check # pylint: disable=protected-access
|
||||
|
||||
owner = asyncio.create_task(
|
||||
coresys.supervisor.check_and_update_connectivity(force=True)
|
||||
)
|
||||
await probe_started.wait()
|
||||
|
||||
owner.cancel()
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await owner
|
||||
|
||||
# Owner cancellation must cancel the spawned probe, not orphan it,
|
||||
# and the cached last-check timestamp must NOT advance.
|
||||
assert coresys.supervisor._connectivity_check is None # pylint: disable=protected-access
|
||||
assert coresys.supervisor._connectivity_last_check == last_check_before # pylint: disable=protected-access
|
||||
|
||||
# A subsequent non-forced call must therefore still run a probe.
|
||||
websession.head = AsyncMock()
|
||||
await coresys.supervisor.check_and_update_connectivity()
|
||||
assert websession.head.call_count == 1
|
||||
|
||||
|
||||
async def test_update_connectivity_fires_event_on_change(coresys: CoreSys):
|
||||
"""SUPERVISOR_CONNECTIVITY_CHANGE fires only when the cached value changes."""
|
||||
events: list[bool] = []
|
||||
|
||||
async def listener(state: bool) -> None:
|
||||
events.append(state)
|
||||
|
||||
coresys.bus.register_event(BusEvent.SUPERVISOR_CONNECTIVITY_CHANGE, listener)
|
||||
|
||||
# Same value: no event.
|
||||
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
|
||||
# Change to False: one event.
|
||||
coresys.supervisor._update_connectivity(False) # pylint: disable=protected-access
|
||||
# Change back to True: another event.
|
||||
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert events == [False, True]
|
||||
|
||||
|
||||
async def test_request_connectivity_check_is_fire_and_forget(
|
||||
coresys: CoreSys, websession: MagicMock
|
||||
):
|
||||
"""request_connectivity_check schedules a check that runs asynchronously."""
|
||||
websession.head = AsyncMock()
|
||||
|
||||
# Synchronous call must return without awaiting the HTTP probe.
|
||||
result = coresys.supervisor.request_connectivity_check(force=True)
|
||||
assert result is None
|
||||
|
||||
# Yield until the scheduled task has had a chance to complete.
|
||||
for _ in range(5):
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert websession.head.call_count == 1
|
||||
|
||||
|
||||
async def test_update_failed(coresys: CoreSys, capture_exception: Mock):
|
||||
|
||||
@@ -116,7 +116,7 @@ async def test_delayed_fetch_for_connectivity(
|
||||
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, find_fetch_data_job_start)
|
||||
|
||||
# Start with no connectivity and confirm there is no version fetch on load
|
||||
coresys.supervisor.connectivity = False
|
||||
coresys.supervisor._connectivity = False # pylint: disable=protected-access
|
||||
network_manager_service.connectivity = ConnectivityState.CONNECTIVITY_NONE.value
|
||||
await coresys.host.network.load()
|
||||
await coresys.host.network.check_connectivity()
|
||||
|
||||
Reference in New Issue
Block a user