Reduce connectivity checks (#3836)

* Reduce connectivity checks

* Fix/remove connectivity tests

* Remove throttle from prior connectivity tests

* Use dbus_property wrapper

* Allow variable throttle period with lambda

* Add evaluation for connectivity check disabled
This commit is contained in:
Mike Degatano 2022-09-03 03:48:30 -04:00 committed by GitHub
parent 0769af9383
commit fc646db95f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 420 additions and 182 deletions

View File

@ -220,7 +220,9 @@ class APINetwork(CoreSysAttributes):
@api_process
def reload(self, request: web.Request) -> Awaitable[None]:
"""Reload network data."""
return asyncio.shield(self.sys_host.network.update())
return asyncio.shield(
self.sys_host.network.update(force_connectivity_check=True)
)
@api_process
async def scan_accesspoints(self, request: web.Request) -> dict[str, Any]:

View File

@ -62,6 +62,7 @@ DBUS_ATTR_COMPATIBLE = "Compatible"
DBUS_ATTR_CONFIGURATION = "Configuration"
DBUS_ATTR_CONNECTION = "Connection"
DBUS_ATTR_CONNECTION_ENABLED = "ConnectivityCheckEnabled"
DBUS_ATTR_CONNECTIVITY = "Connectivity"
DBUS_ATTR_CURRENT_DEVICE = "CurrentDevice"
DBUS_ATTR_CURRENT_DNS_SERVER = "CurrentDNSServer"
DBUS_ATTR_CURRENT_DNS_SERVER_EX = "CurrentDNSServerEx"

View File

@ -1,7 +1,7 @@
"""Network Manager implementation for DBUS."""
import asyncio
import logging
from typing import Any, Awaitable
from typing import Any
from awesomeversion import AwesomeVersion, AwesomeVersionException
import sentry_sdk
@ -16,6 +16,7 @@ from ...exceptions import (
from ...utils.dbus import DBus
from ..const import (
DBUS_ATTR_CONNECTION_ENABLED,
DBUS_ATTR_CONNECTIVITY,
DBUS_ATTR_DEVICES,
DBUS_ATTR_PRIMARY_CONNECTION,
DBUS_ATTR_VERSION,
@ -25,7 +26,7 @@ from ..const import (
DBUS_OBJECT_NM,
DeviceType,
)
from ..interface import DBusInterface
from ..interface import DBusInterface, dbus_property
from ..utils import dbus_connected
from .connection import NetworkConnection
from .dns import NetworkManagerDNS
@ -70,13 +71,15 @@ class NetworkManager(DBusInterface):
return self._interfaces
@property
@dbus_property
def connectivity_enabled(self) -> bool:
"""Return if connectivity check is enabled."""
return self.properties[DBUS_ATTR_CONNECTION_ENABLED]
@property
@dbus_property
def version(self) -> AwesomeVersion:
"""Return if connectivity check is enabled."""
"""Return Network Manager version."""
return AwesomeVersion(self.properties[DBUS_ATTR_VERSION])
@dbus_connected
@ -107,9 +110,12 @@ class NetworkManager(DBusInterface):
return con_setting, active_con
@dbus_connected
async def check_connectivity(self) -> Awaitable[Any]:
async def check_connectivity(self, *, force: bool = False) -> int:
"""Check the connectivity of the host."""
return await self.dbus.CheckConnectivity()
if force:
return (await self.dbus.CheckConnectivity())[0]
else:
return await self.dbus.get_property(DBUS_IFACE_NM, DBUS_ATTR_CONNECTIVITY)
async def connect(self) -> None:
"""Connect to system's D-Bus."""

View File

@ -82,15 +82,16 @@ class NetworkManager(CoreSysAttributes):
return list(dict.fromkeys(servers))
async def check_connectivity(self):
async def check_connectivity(self, *, force: bool = False):
"""Check the internet connection."""
if not self.sys_dbus.network.connectivity_enabled:
self.connectivity = None
return
# Check connectivity
try:
state = await self.sys_dbus.network.check_connectivity()
self.connectivity = state[0] == ConnectivityState.CONNECTIVITY_FULL
state = await self.sys_dbus.network.check_connectivity(force=force)
self.connectivity = state == ConnectivityState.CONNECTIVITY_FULL
except DBusError as err:
_LOGGER.warning("Can't update connectivity information: %s", err)
self.connectivity = False
@ -128,7 +129,7 @@ class NetworkManager(CoreSysAttributes):
]
)
async def update(self):
async def update(self, *, force_connectivity_check: bool = False):
"""Update properties over dbus."""
_LOGGER.info("Updating local network information")
try:
@ -140,7 +141,7 @@ class NetworkManager(CoreSysAttributes):
"No network D-Bus connection available", _LOGGER.error
) from err
await self.check_connectivity()
await self.check_connectivity(force=force_connectivity_check)
async def apply_changes(
self, interface: Interface, *, update_only: bool = False
@ -247,7 +248,8 @@ class NetworkManager(CoreSysAttributes):
state = msg[0]
_LOGGER.debug("Active connection state changed to %s", state)
await self.update()
# update_only means not done by user so don't force a check afterwards
await self.update(force_connectivity_check=not update_only)
async def scan_wifi(self, interface: Interface) -> list[AccessPoint]:
"""Scan on Interface for AccessPoint."""

View File

@ -3,12 +3,12 @@ import asyncio
from datetime import datetime, timedelta
from functools import wraps
import logging
from typing import Any
from typing import Any, Callable
import sentry_sdk
from ..const import CoreState
from ..coresys import CoreSysAttributes
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HassioError, JobConditionException, JobException
from ..host.const import HostFeature
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
@ -27,7 +27,9 @@ class Job(CoreSysAttributes):
cleanup: bool = True,
on_condition: JobException | None = None,
limit: JobExecutionLimit | None = None,
throttle_period: timedelta | None = None,
throttle_period: timedelta
| Callable[[CoreSys, datetime, list[datetime] | None], timedelta]
| None = None,
throttle_max_calls: int | None = None,
):
"""Initialize the Job class."""
@ -36,7 +38,7 @@ class Job(CoreSysAttributes):
self.cleanup = cleanup
self.on_condition = on_condition
self.limit = limit
self.throttle_period = throttle_period
self._throttle_period = throttle_period
self.throttle_max_calls = throttle_max_calls
self._lock: asyncio.Semaphore | None = None
self._method = None
@ -51,7 +53,7 @@ class Job(CoreSysAttributes):
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.THROTTLE_RATE_LIMIT,
)
and self.throttle_period is None
and self._throttle_period is None
):
raise RuntimeError("Using Job without a Throttle period!")
@ -61,6 +63,19 @@ class Job(CoreSysAttributes):
self._rate_limited_calls = []
@property
def throttle_period(self) -> timedelta | None:
"""Return throttle period."""
if self._throttle_period is None:
return None
if isinstance(self._throttle_period, timedelta):
return self._throttle_period
return self._throttle_period(
self.coresys, self._last_call, self._rate_limited_calls
)
def _post_init(self, args: tuple[Any]) -> None:
"""Runtime init."""
if self.name is None:
@ -184,17 +199,17 @@ class Job(CoreSysAttributes):
f"'{self._method.__qualname__}' blocked from execution, not enough free space ({self.sys_host.info.free_space}GB) left on the device"
)
if (
JobCondition.INTERNET_SYSTEM in self.conditions
and not self.sys_supervisor.connectivity
):
if JobCondition.INTERNET_SYSTEM in self.conditions:
await self.sys_supervisor.check_connectivity()
if not self.sys_supervisor.connectivity:
raise JobConditionException(
f"'{self._method.__qualname__}' blocked from execution, no supervisor internet connection"
)
if JobCondition.INTERNET_HOST in self.conditions:
await self.sys_host.network.check_connectivity()
if (
JobCondition.INTERNET_HOST in self.conditions
and self.sys_host.network.connectivity is not None
self.sys_host.network.connectivity is not None
and not self.sys_host.network.connectivity
):
raise JobConditionException(

View File

@ -5,7 +5,6 @@ from ..addons.const import ADDON_UPDATE_CONDITIONS
from ..const import AddonState
from ..coresys import CoreSysAttributes
from ..exceptions import AddonsError, HomeAssistantError, ObserverError
from ..host.const import HostFeature
from ..jobs.decorator import Job, JobCondition
from ..plugins.const import PLUGIN_UPDATE_CONDITIONS
@ -34,8 +33,6 @@ RUN_WATCHDOG_OBSERVER_APPLICATION = 180
RUN_REFRESH_ADDON = 15
RUN_CHECK_CONNECTIVITY = 30
PLUGIN_AUTO_UPDATE_CONDITIONS = PLUGIN_UPDATE_CONDITIONS + [JobCondition.RUNNING]
@ -79,11 +76,6 @@ class Tasks(CoreSysAttributes):
# Refresh
self.sys_scheduler.register_task(self._refresh_addon, RUN_REFRESH_ADDON)
# Connectivity
self.sys_scheduler.register_task(
self._check_connectivity, RUN_CHECK_CONNECTIVITY
)
_LOGGER.info("All core tasks are scheduled")
@Job(conditions=ADDON_UPDATE_CONDITIONS + [JobCondition.RUNNING])
@ -291,32 +283,6 @@ class Tasks(CoreSysAttributes):
# Adjust state
addon.state = AddonState.STOPPED
async def _check_connectivity(self) -> None:
"""Check system connectivity."""
value = self._cache.get("connectivity", 0)
# Need only full check if not connected or each 10min
if value >= 600:
pass
elif (
self.sys_supervisor.connectivity
and self.sys_host.network.connectivity is None
) or (
self.sys_supervisor.connectivity
and self.sys_host.network.connectivity is not None
and self.sys_host.network.connectivity
):
self._cache["connectivity"] = value + RUN_CHECK_CONNECTIVITY
return
# Check connectivity
try:
await self.sys_supervisor.check_connectivity()
if HostFeature.NETWORK in self.sys_host.features:
await self.sys_host.network.check_connectivity()
finally:
self._cache["connectivity"] = 0
@Job(conditions=[JobCondition.SUPERVISOR_UPDATED])
async def _reload_store(self) -> None:
"""Reload store and check for addon updates."""

View File

@ -32,6 +32,7 @@ class UnsupportedReason(str, Enum):
"""Reasons for unsupported status."""
APPARMOR = "apparmor"
CONNECTIVITY_CHECK = "connectivity_check"
CONTENT_TRUST = "content_trust"
DBUS = "dbus"
DNS_SERVER = "dns_server"

View File

@ -34,8 +34,8 @@ class EvaluateBase(ABC, CoreSysAttributes):
self.sys_resolution.dismiss_unsupported(self.reason)
@abstractmethod
async def evaluate(self):
"""Run evaluation."""
async def evaluate(self) -> bool:
"""Run evaluation, return true if system fails."""
@property
@abstractmethod
@ -50,7 +50,7 @@ class EvaluateBase(ABC, CoreSysAttributes):
@property
@abstractmethod
def on_failure(self) -> str:
"""Return a string that is printed when self.evaluate is False."""
"""Return a string that is printed when system fails this evaluation."""
@property
def states(self) -> list[CoreState]:

View File

@ -0,0 +1,34 @@
"""Evaluation class for connectivity check."""
from ...const import CoreState
from ...coresys import CoreSys
from ..const import UnsupportedReason
from .base import EvaluateBase
def setup(coresys: CoreSys) -> EvaluateBase:
"""Initialize evaluation-setup function."""
return EvaluateConnectivityCheck(coresys)
class EvaluateConnectivityCheck(EvaluateBase):
"""Evaluate connectivity check."""
@property
def reason(self) -> UnsupportedReason:
"""Return a UnsupportedReason enum."""
return UnsupportedReason.CONNECTIVITY_CHECK
@property
def on_failure(self) -> str:
"""Return a string that is printed when system fails this evaluation."""
return "Connectivity checks are required for Home Assistant."
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.RUNNING]
async def evaluate(self) -> bool:
"""Run evaluation, return true if system fails."""
return self.sys_dbus.network.connectivity_enabled is False

View File

@ -21,7 +21,7 @@ class EvaluateSupervisorVersion(EvaluateBase):
@property
def on_failure(self) -> str:
"""Return a string that is printed when self.evaluate is False."""
"""Return a string that is printed when self.evaluate is True."""
return "Not using latest version of Supervisor and auto update is disabled."
@property

View File

@ -1,6 +1,7 @@
"""Home Assistant control object."""
import asyncio
from contextlib import suppress
from datetime import timedelta
from ipaddress import IPv4Address
import logging
from pathlib import Path
@ -25,15 +26,24 @@ from .exceptions import (
SupervisorJobError,
SupervisorUpdateError,
)
from .jobs.decorator import Job, JobCondition
from .jobs.const import JobCondition, JobExecutionLimit
from .jobs.decorator import Job
from .resolution.const import ContextType, IssueType
from .utils.codenotary import calc_checksum
_LOGGER: logging.Logger = logging.getLogger(__name__)
def _check_connectivity_throttle_period(coresys: CoreSys, *_) -> timedelta:
"""Throttle period for connectivity check."""
if coresys.supervisor.connectivity:
return timedelta(minutes=10)
return timedelta()
class Supervisor(CoreSysAttributes):
"""Home Assistant core object for handle it."""
"""Supervisor object."""
def __init__(self, coresys: CoreSys):
"""Initialize hass object."""
@ -42,7 +52,7 @@ class Supervisor(CoreSysAttributes):
self._connectivity: bool = True
async def load(self) -> None:
"""Prepare Home Assistant object."""
"""Prepare Supervisor object."""
try:
await self.instance.attach(version=self.version)
except DockerError:
@ -243,6 +253,10 @@ class Supervisor(CoreSysAttributes):
except DockerError:
_LOGGER.error("Repair of Supervisor failed")
@Job(
limit=JobExecutionLimit.THROTTLE,
throttle_period=_check_connectivity_throttle_period,
)
async def check_connectivity(self):
"""Check the connection."""
timeout = aiohttp.ClientTimeout(total=10)

View File

@ -38,6 +38,7 @@ def _remove_dbus_signature(data: Any) -> Any:
_LOGGER: logging.Logger = logging.getLogger(__name__)
DBUS_METHOD_GETALL: str = "org.freedesktop.DBus.Properties.GetAll"
DBUS_METHOD_GET: str = "org.freedesktop.DBus.Properties.Get"
DBUS_METHOD_SET: str = "org.freedesktop.DBus.Properties.Set"
@ -186,6 +187,14 @@ class DBus:
_LOGGER.error("No attributes returned for %s", interface)
raise DBusFatalError() from err
async def get_property(self, interface: str, name: str) -> Any:
"""Read value of single property from interface."""
try:
return (await self.call_dbus(DBUS_METHOD_GET, interface, name))[0]
except IndexError as err:
_LOGGER.error("No attribute returned for %s on %s", name, interface)
raise DBusFatalError() from err
async def set_property(
self,
interface: str,
@ -275,7 +284,6 @@ class DBusSignalWrapper:
async def __aenter__(self):
"""Install match for signals and start collecting signal messages."""
_LOGGER.debug("Install match for signal %s.%s", self._interface, self._member)
await self._dbus._bus.call(
Message(
@ -298,7 +306,6 @@ class DBusSignalWrapper:
async def __aexit__(self, exc_t, exc_v, exc_tb):
"""Stop collecting signal messages and remove match for signals."""
self._dbus._bus.remove_message_handler(self._message_handler)
await self._dbus._bus.call(

View File

@ -1,15 +1,16 @@
"""Test NetwrokInterface API."""
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock, Mock, patch
import pytest
from supervisor.const import DOCKER_NETWORK, DOCKER_NETWORK_MASK
from supervisor.coresys import CoreSys
from tests.const import TEST_INTERFACE, TEST_INTERFACE_WLAN
@pytest.mark.asyncio
async def test_api_network_info(api_client, coresys):
async def test_api_network_info(api_client, coresys: CoreSys):
"""Test network manager api."""
resp = await api_client.get("/network/info")
result = await resp.json()
@ -98,8 +99,13 @@ async def test_api_network_interface_info_default(api_client):
@pytest.mark.asyncio
async def test_api_network_interface_update(api_client):
async def test_api_network_interface_update(api_client, coresys: CoreSys):
"""Test network manager api."""
with patch.object(
type(coresys.host.sys_dbus.network),
"check_connectivity",
new=Mock(wraps=coresys.host.sys_dbus.network.check_connectivity),
) as check_connectivity:
resp = await api_client.post(
f"/network/interface/{TEST_INTERFACE}/update",
json={
@ -113,6 +119,7 @@ async def test_api_network_interface_update(api_client):
)
result = await resp.json()
assert result["result"] == "ok"
check_connectivity.assert_called_once_with(force=True)
@pytest.mark.asyncio
@ -196,7 +203,17 @@ async def test_api_network_wireless_scan(api_client):
@pytest.mark.asyncio
async def test_api_network_reload(api_client, coresys):
"""Test network manager reload api."""
with patch.object(type(coresys.dbus.network.dbus), "call_dbus") as call_dbus:
resp = await api_client.post("/network/reload")
result = await resp.json()
assert result["result"] == "ok"
assert (
call_dbus.call_args_list[0][0][0]
== "org.freedesktop.NetworkManager.Settings.Connection.GetSettings"
)
# Check that we forced NM to do an immediate connectivity check
assert (
call_dbus.call_args_list[1][0][0]
== "org.freedesktop.NetworkManager.CheckConnectivity"
)

View File

@ -94,6 +94,10 @@ def dbus() -> DBus:
return load_json_fixture(f"{fixture}.json")
async def mock_get_property(dbus_obj, interface, name):
properties = await mock_get_properties(dbus_obj, interface)
return properties[name]
async def mock_wait_for_signal(self):
if (
self._interface + "." + self._method
@ -146,6 +150,8 @@ def dbus() -> DBus:
), patch(
"supervisor.utils.dbus.DBusSignalWrapper.wait_for_signal",
new=mock_wait_for_signal,
), patch(
"supervisor.utils.dbus.DBus.get_property", new=mock_get_property
):
yield dbus_commands

View File

@ -1,5 +1,5 @@
"""Test NetwrokInterface."""
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, patch
import pytest
@ -27,3 +27,28 @@ async def test_network_manager_version(network_manager: NetworkManager):
with pytest.raises(HostNotSupportedError):
await network_manager._validate_version()
assert network_manager.version == "1.13.9"
async def test_check_connectivity(network_manager: NetworkManager):
"""Test connectivity check."""
assert await network_manager.check_connectivity() == 4
assert await network_manager.check_connectivity(force=True) == 4
with patch.object(
type(network_manager.dbus), "call_dbus"
) as call_dbus, patch.object(
type(network_manager.dbus), "get_property"
) as get_property:
await network_manager.check_connectivity()
call_dbus.assert_not_called()
get_property.assert_called_once_with(
"org.freedesktop.NetworkManager", "Connectivity"
)
get_property.reset_mock()
await network_manager.check_connectivity(force=True)
call_dbus.assert_called_once_with(
"org.freedesktop.NetworkManager.CheckConnectivity", remove_signature=True
)
get_property.assert_not_called()

View File

@ -1,19 +1,85 @@
"""Test supported features."""
# pylint: disable=protected-access
from unittest.mock import patch
import asyncio
from unittest.mock import PropertyMock, patch
import pytest
from supervisor.coresys import CoreSys
async def test_connectivity_not_connected(coresys: CoreSys):
"""Test host unknown connectivity."""
with patch("supervisor.utils.dbus.DBus.call_dbus", return_value=[0]):
with patch("supervisor.utils.dbus.DBus.get_property", return_value=0):
await coresys.host.network.check_connectivity()
assert not coresys.host.network.connectivity
with patch("supervisor.utils.dbus.DBus.call_dbus", return_value=[0]):
await coresys.host.network.check_connectivity(force=True)
assert not coresys.host.network.connectivity
async def test_connectivity_connected(coresys: CoreSys):
"""Test host full connectivity."""
with patch("supervisor.utils.dbus.DBus.call_dbus", return_value=[4]):
# Variation on above since our default fixture for each of these returns 4
with patch(
"supervisor.utils.dbus.DBus.get_property", return_value=4
) as get_property, patch(
"supervisor.utils.dbus.DBus.call_dbus", return_value=[4]
) as call_dbus:
await coresys.host.network.check_connectivity()
assert coresys.host.network.connectivity
get_property.assert_called_once()
call_dbus.assert_not_called()
get_property.reset_mock()
await coresys.host.network.check_connectivity(force=True)
assert coresys.host.network.connectivity
get_property.assert_not_called()
call_dbus.assert_called_once()
@pytest.mark.parametrize("force", [True, False])
async def test_connectivity_events(coresys: CoreSys, force: bool):
"""Test connectivity events."""
coresys.host.network.connectivity = None
await asyncio.sleep(0)
with patch.object(
type(coresys.homeassistant.websocket), "async_send_message"
) as send_message:
await coresys.host.network.check_connectivity(force=force)
await asyncio.sleep(0)
assert coresys.host.network.connectivity is True
send_message.assert_called_once_with(
{
"type": "supervisor/event",
"data": {
"event": "supervisor_update",
"update_key": "network",
"data": {"host_internet": True},
},
}
)
send_message.reset_mock()
with patch.object(
type(coresys.dbus.network),
"connectivity_enabled",
new=PropertyMock(return_value=False),
):
await coresys.host.network.check_connectivity(force=force)
await asyncio.sleep(0)
assert coresys.host.network.connectivity is None
send_message.assert_called_once_with(
{
"type": "supervisor/event",
"data": {
"event": "supervisor_update",
"update_key": "network",
"data": {"host_internet": None},
},
}
)

View File

@ -11,10 +11,14 @@ from supervisor.host.network import Interface, IpConfig
async def test_load(coresys: CoreSys):
"""Test network manager load."""
with patch.object(
coresys.host.sys_dbus.network,
type(coresys.host.sys_dbus.network),
"activate_connection",
new=Mock(wraps=coresys.host.sys_dbus.network.activate_connection),
) as activate_connection:
) as activate_connection, patch.object(
type(coresys.host.sys_dbus.network),
"check_connectivity",
new=Mock(wraps=coresys.host.sys_dbus.network.check_connectivity),
) as check_connectivity:
await coresys.host.network.load()
assert coresys.host.network.connectivity is True
@ -43,6 +47,10 @@ async def test_load(coresys: CoreSys):
"/org/freedesktop/NetworkManager/Devices/1",
)
assert check_connectivity.call_count == 2
assert check_connectivity.call_args_list[0][1] == {"force": False}
assert check_connectivity.call_args_list[1][1] == {"force": False}
async def test_load_with_disabled_methods(coresys: CoreSys):
"""Test load does not disable methods of interfaces."""

View File

@ -2,8 +2,9 @@
# pylint: disable=protected-access,import-error
import asyncio
from datetime import timedelta
from unittest.mock import PropertyMock, patch
from unittest.mock import AsyncMock, PropertyMock, patch
from aiohttp.client_exceptions import ClientError
import pytest
import time_machine
@ -43,7 +44,22 @@ async def test_healthy(coresys: CoreSys):
assert not await test.execute()
async def test_internet(coresys: CoreSys):
@pytest.mark.parametrize(
"connectivity,head_side_effect,host_result,system_result",
[
(4, None, True, True),
(4, ClientError(), True, None),
(0, None, None, True),
(0, ClientError(), None, None),
],
)
async def test_internet(
coresys: CoreSys,
connectivity: int,
head_side_effect: Exception | None,
host_result: bool | None,
system_result: bool | None,
):
"""Test the internet decorator."""
coresys.core.state = CoreState.RUNNING
@ -66,25 +82,16 @@ async def test_internet(coresys: CoreSys):
test = TestClass(coresys)
coresys.host.network._connectivity = True
coresys.supervisor._connectivity = True
assert await test.execute_host()
assert await test.execute_system()
coresys.host.network._connectivity = True
coresys.supervisor._connectivity = False
assert await test.execute_host()
assert not await test.execute_system()
coresys.host.network._connectivity = None
coresys.supervisor._connectivity = True
assert await test.execute_host()
assert await test.execute_system()
coresys.host.network._connectivity = False
coresys.supervisor._connectivity = True
assert not await test.execute_host()
assert await test.execute_system()
mock_websession = AsyncMock()
mock_websession.head.side_effect = head_side_effect
coresys.supervisor.connectivity = None
with patch.object(
type(coresys.dbus.network.dbus), "get_property", return_value=connectivity
), patch.object(
CoreSys, "websession", new=PropertyMock(return_value=mock_websession)
):
assert await test.execute_host() is host_result
assert await test.execute_system() is system_result
async def test_free_space(coresys: CoreSys):

View File

@ -1,61 +0,0 @@
"""Test periodic connectivity task."""
# pylint: disable=protected-access,import-error
from unittest.mock import AsyncMock
from supervisor.coresys import CoreSys
async def test_no_connectivity(coresys: CoreSys):
"""Test periodic connectivity task."""
coresys.host.network.check_connectivity = AsyncMock()
coresys.supervisor.check_connectivity = AsyncMock()
coresys.tasks._cache["connectivity"] = 0
coresys.host.network._connectivity = False
coresys.supervisor._connectivity = False
await coresys.tasks._check_connectivity()
coresys.host.network.check_connectivity.assert_called_once()
coresys.supervisor.check_connectivity.assert_called_once()
assert coresys.tasks._cache["connectivity"] == 0
coresys.host.network.check_connectivity.reset_mock()
coresys.supervisor.check_connectivity.reset_mock()
await coresys.tasks._check_connectivity()
coresys.host.network.check_connectivity.assert_called_once()
coresys.supervisor.check_connectivity.assert_called_once()
assert coresys.tasks._cache["connectivity"] == 0
async def test_connectivity(coresys: CoreSys):
"""Test periodic connectivity task."""
coresys.host.network.check_connectivity = AsyncMock()
coresys.supervisor.check_connectivity = AsyncMock()
coresys.tasks._cache["connectivity"] = 0
coresys.host.network._connectivity = True
coresys.supervisor._connectivity = True
await coresys.tasks._check_connectivity()
coresys.host.network.check_connectivity.assert_not_called()
coresys.supervisor.check_connectivity.assert_not_called()
assert coresys.tasks._cache["connectivity"] == 30
async def test_connectivity_cache_reached(coresys: CoreSys):
"""Test periodic connectivity task."""
coresys.host.network.check_connectivity = AsyncMock()
coresys.supervisor.check_connectivity = AsyncMock()
coresys.tasks._cache["connectivity"] = 600
coresys.host.network._connectivity = True
coresys.supervisor._connectivity = True
await coresys.tasks._check_connectivity()
coresys.host.network.check_connectivity.assert_called_once()
coresys.supervisor.check_connectivity.assert_called_once()
assert coresys.tasks._cache["connectivity"] == 0

View File

@ -0,0 +1,57 @@
"""Test connectivity check evaluation."""
from unittest.mock import PropertyMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.evaluations.connectivity_check import (
EvaluateConnectivityCheck,
)
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
connectivity_check = EvaluateConnectivityCheck(coresys)
coresys.core.state = CoreState.RUNNING
assert connectivity_check.reason not in coresys.resolution.unsupported
with patch.object(
type(coresys.dbus.network),
"connectivity_enabled",
new=PropertyMock(return_value=False),
) as connectivity_enabled:
await connectivity_check()
assert connectivity_check.reason in coresys.resolution.unsupported
connectivity_enabled.return_value = True
await connectivity_check()
assert connectivity_check.reason not in coresys.resolution.unsupported
connectivity_enabled.return_value = None
await connectivity_check()
assert connectivity_check.reason not in coresys.resolution.unsupported
async def test_did_run(coresys: CoreSys):
"""Test that the evaluation ran as expected."""
connectivity_check = EvaluateConnectivityCheck(coresys)
should_run = connectivity_check.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.evaluations.connectivity_check.EvaluateConnectivityCheck.evaluate",
return_value=False,
) as evaluate:
for state in should_run:
coresys.core.state = state
await connectivity_check()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
coresys.core.state = state
await connectivity_check()
evaluate.assert_not_called()
evaluate.reset_mock()

65
tests/test_supervisor.py Normal file
View File

@ -0,0 +1,65 @@
"""Test supervisor object."""
from datetime import timedelta
from unittest.mock import AsyncMock, PropertyMock, patch
from aiohttp.client_exceptions import ClientError
import pytest
from supervisor.coresys import CoreSys
from supervisor.supervisor import Supervisor
@pytest.fixture(name="websession")
async def fixture_webession(coresys: CoreSys) -> AsyncMock:
"""Mock of websession."""
mock_websession = AsyncMock()
with patch.object(
type(coresys), "websession", new=PropertyMock(return_value=mock_websession)
):
yield mock_websession
@pytest.fixture(name="supervisor_unthrottled")
async def fixture_supervisor_unthrottled(coresys: CoreSys) -> Supervisor:
"""Get supervisor object with connectivity check throttle removed."""
with patch(
"supervisor.supervisor._check_connectivity_throttle_period",
return_value=timedelta(),
):
yield coresys.supervisor
@pytest.mark.parametrize(
"side_effect,connectivity", [(ClientError(), False), (None, True)]
)
async def test_connectivity_check(
supervisor_unthrottled: Supervisor,
websession: AsyncMock,
side_effect: Exception | None,
connectivity: bool,
):
"""Test connectivity check."""
assert supervisor_unthrottled.connectivity is True
websession.head.side_effect = side_effect
await supervisor_unthrottled.check_connectivity()
assert supervisor_unthrottled.connectivity is connectivity
@pytest.mark.parametrize("side_effect,call_count", [(ClientError(), 3), (None, 1)])
async def test_connectivity_check_throttling(
coresys: CoreSys,
websession: AsyncMock,
side_effect: Exception | None,
call_count: int,
):
"""Test connectivity check throttled when checks succeed."""
coresys.supervisor.connectivity = None
websession.head.side_effect = side_effect
for _ in range(3):
await coresys.supervisor.check_connectivity()
assert websession.head.call_count == call_count