Compare commits

...

4 Commits

Author SHA1 Message Date
Stefan Agner
3644280ab7 Add debug logs to Supervisor connectivity probe paths
The original stuck-offline bug was hard to spot in logs because the
silent throttle-drop and the cached state had no audit trail. With
debug-level logging at each decision point, a future investigation can
reconstruct from a single log file:

- who requested a check (force flag distinguishes signal-driven probes
  from precondition / opportunistic-error-path requests)
- why a probe did not actually run (in-flight coalesce, cached within
  min-interval, owner cancellation)
- when a forced rerun was queued and when it ran (the precise failure
  mode that stranded the supervisor in the original incident)
- when the cached state actually flipped (with the previous value in
  the message so transitions are visible)

All new lines are debug-level. The existing _do_connectivity_check
"failed" / "succeeded" lines are kept unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 19:26:50 +02:00
Stefan Agner
e5f1022f9d Clarify why post-NTP-sync forces a connectivity probe
The previous comment claimed the last-check timestamp may be unreliable
after a time jump, but _connectivity_last_check uses loop.time() which
is monotonic and unaffected by wall-clock corrections. The real reason
to force a fresh probe is TLS validation: certificates that appeared
expired or not-yet-valid before the system clock was corrected may now
verify, so a probe that just failed with an SSL error can succeed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:57:01 +02:00
Stefan Agner
b2dbdf711d Propagate connectivity-probe cancellation and skip last-check on cancel
Awaiting an asyncio.Task does not propagate cancellation INTO the task,
so the previous owner-doesn't-shield comment was misleading: a cancelled
owner left the spawned probe running orphaned, and the next caller could
start a second probe alongside it. The owner now explicitly cancels and
awaits the probe on CancelledError before re-raising.

The last-check timestamp is also moved out of the finally block so a
cancelled probe does not leave a "fresh result just ran" cache behind
that would short-circuit the next non-forced caller.

A regression test exercises both: that owner cancellation clears the
in-flight reference and leaves the timestamp untouched, and that a
subsequent non-forced check therefore still actually probes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:56:19 +02:00
Stefan Agner
9966a25f6c Rework Supervisor connectivity check with coalescing and force flag
Previously, a failed connectivity probe could strand Supervisor in a
"no connectivity" state indefinitely. After an Ethernet reconnect, a
probe kicked by NetworkManager's connectivity transition could race
with CoreDNS being restarted (due to DNS locals changing), time out on
DNS, and leave supervisor.connectivity = False. The retry that
_on_dns_container_running was meant to fire landed inside the 5 s
JobThrottle window from the just-failed probe and was silently dropped,
since JobThrottle.THROTTLE drops rather than waits.

The rework replaces the @Job(throttle=THROTTLE) decorator and the
public connectivity setter with a single authoritative state-updating
method:

- check_and_update_connectivity(force=False) is the only path that
  runs the HTTP probe and updates the cached state. Concurrent callers
  coalesce onto a single in-flight probe. A min-interval throttle
  lives inside the method and reuses the cached result within window
  instead of dropping calls.
- request_connectivity_check(force=False) is a fire-and-forget wrapper
  for signal handlers (D-Bus, plugin callbacks) that must return
  quickly without blocking signal dispatch on the HTTP round-trip.
- force=True bypasses the min-interval and, when a probe is in flight,
  sets a trailing-rerun flag so the owning task runs one more probe
  after the current one completes. Used for signals that carry fresh
  state-change information (NM connectivity transition to FULL, DNS
  container RUNNING, startup, post-NTP sync).
- _update_connectivity is the sole writer of the cached flag and
  emits SUPERVISOR_CONNECTIVITY_CHANGE only on actual transitions.

Call sites migrate accordingly. The opportunistic
supervisor.connectivity = False writes in update_apparmor,
updater.fetch_data, os.manager, and addon_pwned error paths are
replaced with request_connectivity_check() calls so the probe remains
authoritative - an endpoint-specific failure no longer lies about the
overall connectivity state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 13:19:51 +02:00
15 changed files with 326 additions and 83 deletions

View File

@@ -139,7 +139,7 @@ class Core(CoreSysAttributes):
await self.coresys.init_websession()
# Check internet on startup
await self.sys_supervisor.check_connectivity()
await self.sys_supervisor.check_and_update_connectivity(force=True)
# Order can be important!
setup_loads: list[Awaitable[None]] = [
@@ -460,7 +460,10 @@ class Core(CoreSysAttributes):
)
await self.sys_host.control.set_datetime(data.dt_utc)
await self.sys_supervisor.check_connectivity()
# System time was just corrected. TLS certificates that previously
# appeared expired/not-yet-valid may now verify, so a connectivity
# probe that just failed for that reason can succeed now.
await self.sys_supervisor.check_and_update_connectivity(force=True)
async def repair(self) -> None:
"""Repair system integrity."""

View File

@@ -69,8 +69,11 @@ class NetworkManager(CoreSysAttributes):
self.sys_homeassistant.websocket.supervisor_update_event(
"network", {ATTR_HOST_INTERNET: state}
)
if state and not self.sys_supervisor.connectivity:
self.sys_create_task(self.sys_supervisor.check_connectivity())
if state:
# Host just regained connectivity; kick a fresh Supervisor probe.
# Coalescing in request_connectivity_check means redundant calls
# are safe, so no "only if supervisor is False" guard is needed.
self.sys_supervisor.request_connectivity_check(force=True)
@property
def interfaces(self) -> list[Interface]:

View File

@@ -372,7 +372,10 @@ class Job(CoreSysAttributes):
)
if JobCondition.INTERNET_SYSTEM in used_conditions:
await coresys.sys_supervisor.check_connectivity()
# Precondition wants a recent result, not necessarily a fresh one;
# the min-interval short-circuit inside check_and_update_connectivity
# reuses the cached state when it's still within window.
await coresys.sys_supervisor.check_and_update_connectivity()
if not coresys.sys_supervisor.connectivity:
raise JobConditionException(
f"'{method_name}' blocked from execution, no supervisor internet connection"

View File

@@ -121,7 +121,7 @@ class OSManager(CoreSysAttributes):
and self.latest_version is not None
and self.version < self.latest_version
)
except (AwesomeVersionException, TypeError):
except AwesomeVersionException, TypeError:
return False
@property
@@ -205,7 +205,9 @@ class OSManager(CoreSysAttributes):
_LOGGER.info("Completed download of OTA update file %s", raucb)
except (aiohttp.ClientError, TimeoutError) as err:
self.sys_supervisor.connectivity = False
# Nudge a fresh connectivity check; the probe is authoritative,
# this error path only hints that something may be wrong.
self.sys_supervisor.request_connectivity_check()
raise HassOSUpdateError(
f"Can't fetch OTA update from {url}: {err!s}", _LOGGER.error
) from err

View File

@@ -122,7 +122,10 @@ class PluginDns(PluginBase):
await asyncio.sleep(5)
_LOGGER.debug("CoreDNS started, checking connectivity")
await self.sys_supervisor.check_connectivity()
# DNS resolution has just changed; force a fresh probe so a check
# in flight while DNS was restarting doesn't leave us with a
# stale failure cached.
self.sys_supervisor.request_connectivity_check(force=True)
async def _restart_dns_after_locals_change(self) -> None:
"""Restart DNS after a debounced delay for local changes."""

View File

@@ -45,7 +45,10 @@ class CheckAppPwned(CheckBase):
try:
await self.sys_security.verify_secret(secret)
except PwnedConnectivityError:
self.sys_supervisor.connectivity = False
# Nudge a fresh connectivity check; the probe is
# authoritative, this error path only hints that
# something may be wrong.
self.sys_supervisor.request_connectivity_check()
return
except PwnedSecret:
# Check possible suggestion

View File

@@ -1,8 +1,8 @@
"""Home Assistant control object."""
import asyncio
from collections.abc import Awaitable
from contextlib import suppress
from datetime import timedelta
from ipaddress import IPv4Address
import logging
from pathlib import Path
@@ -30,20 +30,18 @@ from .exceptions import (
SupervisorUpdateError,
)
from .jobs import ChildJobSyncFilter
from .jobs.const import JobCondition, JobThrottle
from .jobs.const import JobCondition
from .jobs.decorator import Job
from .resolution.const import ContextType, IssueType
from .utils.sentry import async_capture_exception
_LOGGER: logging.Logger = logging.getLogger(__name__)
def _check_connectivity_throttle_period(coresys: CoreSys, *_) -> timedelta:
"""Throttle period for connectivity check."""
if coresys.supervisor.connectivity:
return timedelta(minutes=10)
return timedelta(seconds=5)
# Minimum time between two actual connectivity probes. Callers within this
# window get the cached result instead of hitting checkonline.home-assistant.io
# again. The interval shrinks while we're offline so recovery is quick.
_CONNECTIVITY_MIN_INTERVAL_CONNECTED_SEC: float = 600.0
_CONNECTIVITY_MIN_INTERVAL_DISCONNECTED_SEC: float = 5.0
class Supervisor(CoreSysAttributes):
@@ -54,6 +52,11 @@ class Supervisor(CoreSysAttributes):
self.coresys: CoreSys = coresys
self.instance: DockerSupervisor = DockerSupervisor(coresys)
self._connectivity: bool = True
self._connectivity_check: asyncio.Task[None] | None = None
self._connectivity_rerun_forced: bool = False
# -inf means "never probed" so the first non-forced call always runs;
# 0.0 would wrongly short-circuit while loop.time() < min_interval.
self._connectivity_last_check: float = float("-inf")
async def load(self) -> None:
"""Prepare Supervisor object."""
@@ -70,11 +73,19 @@ class Supervisor(CoreSysAttributes):
"""Return true if we are connected to the internet."""
return self._connectivity
@connectivity.setter
def connectivity(self, state: bool) -> None:
"""Set supervisor connectivity state."""
def _update_connectivity(self, state: bool) -> None:
"""Update the cached connectivity state and notify listeners if it changed.
Only :meth:`check_and_update_connectivity` (or the task it spawns)
should ever call this. Bypass paths that set the state from outside
an actual probe make it impossible to reason about what the flag
means.
"""
if self._connectivity == state:
return
_LOGGER.debug(
"Supervisor connectivity changed: %s -> %s", self._connectivity, state
)
self._connectivity = state
self.sys_bus.fire_event(BusEvent.SUPERVISOR_CONNECTIVITY_CHANGE, state)
self.sys_homeassistant.websocket.supervisor_update_event(
@@ -94,7 +105,7 @@ class Supervisor(CoreSysAttributes):
try:
return self.version < self.latest_version
except (AwesomeVersionException, TypeError):
except AwesomeVersionException, TypeError:
return False
@property
@@ -139,7 +150,9 @@ class Supervisor(CoreSysAttributes):
data = await request.text()
except (aiohttp.ClientError, TimeoutError) as err:
self.sys_supervisor.connectivity = False
# Nudge a fresh connectivity check; the probe is authoritative,
# this error path only hints that something may be wrong.
self.request_connectivity_check()
raise SupervisorAppArmorError(
f"Can't fetch AppArmor profile {url}: {str(err) or 'Timeout'}",
_LOGGER.error,
@@ -269,13 +282,92 @@ class Supervisor(CoreSysAttributes):
except DockerError:
_LOGGER.error("Repair of Supervisor failed")
@Job(
name="supervisor_check_connectivity",
throttle_period=_check_connectivity_throttle_period,
throttle=JobThrottle.THROTTLE,
)
async def check_connectivity(self) -> None:
"""Check the Internet connectivity from Supervisor's point of view."""
def request_connectivity_check(self, *, force: bool = False) -> None:
"""Schedule a connectivity check without awaiting the result.
Intended for signal handlers (D-Bus, plugin callbacks) that must
return quickly. Concurrent calls coalesce onto a single in-flight
check. ``force`` is forwarded to :meth:`check_and_update_connectivity`
for signals that carry fresh state-change information.
"""
_LOGGER.debug("Connectivity check requested (force=%s)", force)
self.sys_create_task(self.check_and_update_connectivity(force=force))
async def check_and_update_connectivity(self, *, force: bool = False) -> None:
"""Probe Supervisor internet connectivity and update cached state.
Concurrent callers coalesce onto a single HTTP probe: callers that
arrive while one is in flight await its completion rather than
starting a second.
Without ``force``, a probe that ran within the minimum interval
short-circuits and the cached state is returned. With ``force``, the
interval is bypassed and (if a probe is already in flight) a fresh
one is guaranteed to run once the current probe completes.
"""
if self._connectivity_check is not None:
# Probe already in flight - coalesce with it. If the caller
# needs a fresh result, mark a trailing rerun so the task that
# owns the in-flight probe runs once more after it completes.
# Shield so a follower being cancelled cannot bring down the
# probe that other callers are also waiting on.
if force:
_LOGGER.debug(
"Connectivity probe in flight; queued forced rerun on completion"
)
self._connectivity_rerun_forced = True
else:
_LOGGER.debug("Connectivity probe in flight; awaiting its result")
await asyncio.shield(self._connectivity_check)
return
if not force:
min_interval = (
_CONNECTIVITY_MIN_INTERVAL_CONNECTED_SEC
if self._connectivity
else _CONNECTIVITY_MIN_INTERVAL_DISCONNECTED_SEC
)
elapsed = self.sys_loop.time() - self._connectivity_last_check
if elapsed < min_interval:
_LOGGER.debug(
"Connectivity check within min-interval (%.1fs of %.1fs); "
"using cached state %s",
elapsed,
min_interval,
self._connectivity,
)
return
# Awaiting a Task does not propagate cancellation INTO the task, so
# the owner explicitly cancels the probe on its own cancellation.
# That keeps the probe from being orphaned (with the next caller
# starting a second probe alongside the unfinished first).
probe = self.sys_create_task(self._do_connectivity_check())
self._connectivity_check = probe
try:
await probe
except asyncio.CancelledError:
_LOGGER.debug("Connectivity probe owner cancelled; cancelling probe")
probe.cancel()
with suppress(asyncio.CancelledError):
await probe
raise
finally:
if self._connectivity_check is probe:
self._connectivity_check = None
# Only count as a recent probe on actual completion. On cancellation
# the timestamp must stay so the next caller doesn't see a "fresh"
# cached result that never actually ran.
self._connectivity_last_check = self.sys_loop.time()
if self._connectivity_rerun_forced:
_LOGGER.debug("Running queued forced rerun after probe completion")
self._connectivity_rerun_forced = False
await self.check_and_update_connectivity(force=True)
async def _do_connectivity_check(self) -> None:
"""Run a single HTTP probe and update cached connectivity state."""
timeout = aiohttp.ClientTimeout(total=10)
try:
await self.sys_websession.head(
@@ -283,7 +375,7 @@ class Supervisor(CoreSysAttributes):
)
except (ClientError, TimeoutError) as err:
_LOGGER.debug("Supervisor Connectivity check failed: %s", err)
self.connectivity = False
self._update_connectivity(False)
else:
_LOGGER.debug("Supervisor Connectivity check succeeded")
self.connectivity = True
self._update_connectivity(True)

View File

@@ -272,7 +272,9 @@ class Updater(FileConfiguration, CoreSysAttributes):
data = await request.read()
except (aiohttp.ClientError, TimeoutError) as err:
self.sys_supervisor.connectivity = False
# Nudge a fresh connectivity check; the probe is authoritative,
# this error path only hints that something may be wrong.
self.sys_supervisor.request_connectivity_check()
raise UpdaterError(
f"Can't fetch versions from {url}: {str(err) or 'Timeout'}",
_LOGGER.warning,

View File

@@ -708,7 +708,7 @@ async def api_client(
def supervisor_internet(coresys: CoreSys) -> Generator[AsyncMock]:
"""Fixture which simluate Supervsior internet connection."""
connectivity_check = AsyncMock(return_value=True)
coresys.supervisor.check_connectivity = connectivity_check
coresys.supervisor.check_and_update_connectivity = connectivity_check
yield connectivity_check

View File

@@ -26,11 +26,8 @@ from supervisor.jobs.job_group import JobGroup
from supervisor.os.manager import OSManager
from supervisor.plugins.audio import PluginAudio
from supervisor.resolution.const import UnhealthyReason, UnsupportedReason
from supervisor.supervisor import Supervisor
from supervisor.utils.dt import utcnow
from tests.common import reset_last_call
async def test_healthy(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Test the healty decorator."""
@@ -76,7 +73,6 @@ async def test_internet(
):
"""Test the internet decorator."""
await coresys.core.set_state(CoreState.RUNNING)
reset_last_call(Supervisor.check_connectivity)
class TestClass:
"""Test class."""
@@ -105,7 +101,9 @@ async def test_internet(
mock_websession = AsyncMock()
mock_websession.head.side_effect = head_side_effect
coresys.supervisor.connectivity = None
# Reset cached state so the precondition path actually runs a probe.
coresys.supervisor._connectivity = True # pylint: disable=protected-access
coresys.supervisor._connectivity_last_check = float("-inf") # pylint: disable=protected-access
with (
patch("supervisor.utils.dbus.DBus.call_dbus", return_value=connectivity),
patch.object(

View File

@@ -417,18 +417,18 @@ async def test_dns_restart_triggers_connectivity_check(coresys: CoreSys):
# Verify listener was registered (connectivity check listener should be stored)
assert dns_plugin._connectivity_check_listener is not None
# Create event to signal when connectivity check is called
# Create event to signal when connectivity check is requested
connectivity_check_event = asyncio.Event()
# Mock connectivity check to set the event when called
async def mock_check_connectivity():
# Mock the fire-and-forget request to set the event when called
def mock_request_connectivity_check(*, force: bool = False):
connectivity_check_event.set()
with (
patch.object(
coresys.supervisor,
"check_connectivity",
side_effect=mock_check_connectivity,
"request_connectivity_check",
side_effect=mock_request_connectivity_check,
),
patch("supervisor.plugins.dns.asyncio.sleep") as mock_sleep,
):

View File

@@ -43,7 +43,7 @@ async def test_fixup(coresys: CoreSys, supervisor_internet):
async def test_store_execute_reload_runs_on_connectivity_true(coresys: CoreSys):
"""Test fixup runs when connectivity goes from false to true."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.supervisor.connectivity = False
coresys.supervisor._update_connectivity(False) # pylint: disable=protected-access
await asyncio.sleep(0)
mock_repository = AsyncMock()
@@ -59,7 +59,7 @@ async def test_store_execute_reload_runs_on_connectivity_true(coresys: CoreSys):
with patch.object(coresys.store, "reload") as mock_reload:
# Fire event with connectivity True
coresys.supervisor.connectivity = True
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
await asyncio.sleep(0.1)
mock_repository.load.assert_called_once()
@@ -72,7 +72,7 @@ async def test_store_execute_reload_does_not_run_on_connectivity_false(
):
"""Test fixup does not run when connectivity goes from true to false."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.supervisor.connectivity = True
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
await asyncio.sleep(0)
mock_repository = AsyncMock()
@@ -87,7 +87,7 @@ async def test_store_execute_reload_does_not_run_on_connectivity_false(
)
# Fire event with connectivity True
coresys.supervisor.connectivity = False
coresys.supervisor._update_connectivity(False) # pylint: disable=protected-access
await asyncio.sleep(0.1)
mock_repository.load.assert_not_called()
@@ -99,7 +99,7 @@ async def test_store_execute_reload_dismiss_suggestion_removes_listener(
):
"""Test fixup does not run on event if suggestion has been dismissed."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.supervisor.connectivity = True
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
await asyncio.sleep(0)
mock_repository = AsyncMock()

View File

@@ -101,7 +101,9 @@ async def test_adjust_system_datetime_if_time_behind(
InfoCenter, "dt_synchronized", new=PropertyMock(return_value=False)
),
patch.object(InfoCenter, "use_ntp", new=PropertyMock(return_value=True)),
patch.object(Supervisor, "check_connectivity") as mock_check_connectivity,
patch.object(
Supervisor, "check_and_update_connectivity"
) as mock_check_connectivity,
):
# Start the time adjustment which will wait for timesyncd to stop
task = asyncio.create_task(coresys.core._adjust_system_datetime())

View File

@@ -1,6 +1,6 @@
"""Test supervisor object."""
from datetime import datetime, timedelta
import asyncio
import errno
from unittest.mock import AsyncMock, MagicMock, Mock, patch
@@ -8,9 +8,8 @@ from aiohttp import ClientTimeout
from aiohttp.client_exceptions import ClientError
from awesomeversion import AwesomeVersion
import pytest
from time_machine import travel
from supervisor.const import UpdateChannel
from supervisor.const import BusEvent, UpdateChannel
from supervisor.coresys import CoreSys
from supervisor.docker.supervisor import DockerSupervisor
from supervisor.exceptions import (
@@ -21,57 +20,190 @@ from supervisor.exceptions import (
from supervisor.host.apparmor import AppArmorControl
from supervisor.resolution.const import ContextType, IssueType
from supervisor.resolution.data import Issue
from supervisor.supervisor import Supervisor
from tests.common import MockResponse, reset_last_call
from tests.common import MockResponse
@pytest.mark.parametrize(
"side_effect,connectivity", [(ClientError(), False), (None, True)]
)
@pytest.mark.usefixtures("no_job_throttle")
async def test_connectivity_check(
coresys: CoreSys,
websession: MagicMock,
side_effect: Exception | None,
connectivity: bool,
):
"""Test connectivity check."""
"""Test connectivity check updates state based on probe outcome."""
assert coresys.supervisor.connectivity is True
websession.head = AsyncMock(side_effect=side_effect)
await coresys.supervisor.check_connectivity()
await coresys.supervisor.check_and_update_connectivity(force=True)
assert coresys.supervisor.connectivity is connectivity
@pytest.mark.parametrize(
"side_effect,call_interval,throttled",
[
(None, timedelta(minutes=5), True),
(None, timedelta(minutes=15), False),
(ClientError(), timedelta(seconds=3), True),
(ClientError(), timedelta(seconds=10), False),
],
)
async def test_connectivity_check_throttling(
coresys: CoreSys,
websession: MagicMock,
side_effect: Exception | None,
call_interval: timedelta,
throttled: bool,
async def test_connectivity_check_min_interval_when_connected(
coresys: CoreSys, websession: MagicMock
):
"""Test connectivity check throttled when checks succeed."""
coresys.supervisor.connectivity = None
websession.head = AsyncMock(side_effect=side_effect)
"""Non-forced checks within the min-interval use the cached state."""
websession.head = AsyncMock()
reset_last_call(Supervisor.check_connectivity)
with travel(datetime.now(), tick=False) as traveller:
await coresys.supervisor.check_connectivity()
traveller.shift(call_interval)
await coresys.supervisor.check_connectivity()
# First call runs the probe.
await coresys.supervisor.check_and_update_connectivity()
assert websession.head.call_count == 1
assert websession.head.call_count == (1 if throttled else 2)
# Second call within the (10 min) window should not hit the network.
await coresys.supervisor.check_and_update_connectivity()
assert websession.head.call_count == 1
async def test_connectivity_check_force_bypasses_min_interval(
coresys: CoreSys, websession: MagicMock
):
"""force=True skips the min-interval short-circuit."""
websession.head = AsyncMock()
await coresys.supervisor.check_and_update_connectivity()
assert websession.head.call_count == 1
await coresys.supervisor.check_and_update_connectivity(force=True)
assert websession.head.call_count == 2
async def test_connectivity_check_coalesces_concurrent_callers(
coresys: CoreSys, websession: MagicMock
):
"""Concurrent callers await the same in-flight probe instead of each firing one."""
probe_started = asyncio.Event()
probe_release = asyncio.Event()
async def slow_head(*args, **kwargs):
probe_started.set()
await probe_release.wait()
websession.head = AsyncMock(side_effect=slow_head)
first = asyncio.create_task(
coresys.supervisor.check_and_update_connectivity(force=True)
)
await probe_started.wait()
# Kick off a pile of additional callers while the first probe is in flight.
concurrent = [
asyncio.create_task(coresys.supervisor.check_and_update_connectivity())
for _ in range(5)
]
# Let them all reach the in-flight await.
await asyncio.sleep(0)
probe_release.set()
await asyncio.gather(first, *concurrent)
assert websession.head.call_count == 1
async def test_connectivity_check_force_during_in_flight_triggers_rerun(
coresys: CoreSys, websession: MagicMock
):
"""A force signal arriving while a probe is in flight queues exactly one rerun."""
probe_started = asyncio.Event()
probe_release = asyncio.Event()
async def first_then_fast(*args, **kwargs):
if websession.head.call_count == 1:
probe_started.set()
await probe_release.wait()
websession.head = AsyncMock(side_effect=first_then_fast)
first = asyncio.create_task(
coresys.supervisor.check_and_update_connectivity(force=True)
)
await probe_started.wait()
# Forced call while a probe is in flight should set the rerun flag.
forced = asyncio.create_task(
coresys.supervisor.check_and_update_connectivity(force=True)
)
# Non-forced calls must NOT queue a rerun.
cheap = asyncio.create_task(coresys.supervisor.check_and_update_connectivity())
await asyncio.sleep(0)
probe_release.set()
await asyncio.gather(first, forced, cheap)
assert websession.head.call_count == 2
async def test_connectivity_check_owner_cancellation_cancels_probe(
coresys: CoreSys, websession: MagicMock
):
"""Owner cancellation propagates to the probe and skips updating last-check."""
probe_started = asyncio.Event()
probe_release = asyncio.Event()
async def slow_head(*args, **kwargs):
probe_started.set()
await probe_release.wait()
websession.head = AsyncMock(side_effect=slow_head)
last_check_before = coresys.supervisor._connectivity_last_check # pylint: disable=protected-access
owner = asyncio.create_task(
coresys.supervisor.check_and_update_connectivity(force=True)
)
await probe_started.wait()
owner.cancel()
with pytest.raises(asyncio.CancelledError):
await owner
# Owner cancellation must cancel the spawned probe, not orphan it,
# and the cached last-check timestamp must NOT advance.
assert coresys.supervisor._connectivity_check is None # pylint: disable=protected-access
assert coresys.supervisor._connectivity_last_check == last_check_before # pylint: disable=protected-access
# A subsequent non-forced call must therefore still run a probe.
websession.head = AsyncMock()
await coresys.supervisor.check_and_update_connectivity()
assert websession.head.call_count == 1
async def test_update_connectivity_fires_event_on_change(coresys: CoreSys):
"""SUPERVISOR_CONNECTIVITY_CHANGE fires only when the cached value changes."""
events: list[bool] = []
async def listener(state: bool) -> None:
events.append(state)
coresys.bus.register_event(BusEvent.SUPERVISOR_CONNECTIVITY_CHANGE, listener)
# Same value: no event.
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
# Change to False: one event.
coresys.supervisor._update_connectivity(False) # pylint: disable=protected-access
# Change back to True: another event.
coresys.supervisor._update_connectivity(True) # pylint: disable=protected-access
await asyncio.sleep(0)
assert events == [False, True]
async def test_request_connectivity_check_is_fire_and_forget(
coresys: CoreSys, websession: MagicMock
):
"""request_connectivity_check schedules a check that runs asynchronously."""
websession.head = AsyncMock()
# Synchronous call must return without awaiting the HTTP probe.
result = coresys.supervisor.request_connectivity_check(force=True)
assert result is None
# Yield until the scheduled task has had a chance to complete.
for _ in range(5):
await asyncio.sleep(0)
assert websession.head.call_count == 1
async def test_update_failed(coresys: CoreSys, capture_exception: Mock):

View File

@@ -116,7 +116,7 @@ async def test_delayed_fetch_for_connectivity(
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, find_fetch_data_job_start)
# Start with no connectivity and confirm there is no version fetch on load
coresys.supervisor.connectivity = False
coresys.supervisor._connectivity = False # pylint: disable=protected-access
network_manager_service.connectivity = ConnectivityState.CONNECTIVITY_NONE.value
await coresys.host.network.load()
await coresys.host.network.check_connectivity()