Identify and handle dhcp issues (#3806)

* Identify and handle dhcp issues

* Change test from DHCP to Connection Problem
This commit is contained in:
Mike Degatano 2022-08-23 07:57:16 -04:00 committed by GitHub
parent c7f7fbd41a
commit b4e1e3e853
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 251 additions and 11 deletions

View File

@ -112,6 +112,7 @@ DBUS_ATTR_RESOLV_CONF_MODE = "ResolvConfMode"
DBUS_ATTR_RCMANAGER = "RcManager"
DBUS_ATTR_SSID = "Ssid"
DBUS_ATTR_STATE = "State"
DBUS_ATTR_STATE_FLAGS = "StateFlags"
DBUS_ATTR_STATIC_HOSTNAME = "StaticHostname"
DBUS_ATTR_STATIC_OPERATING_SYSTEM_CPE_NAME = "OperatingSystemCPEName"
DBUS_ATTR_STRENGTH = "Strength"
@ -161,6 +162,23 @@ class ConnectionStateType(int, Enum):
DEACTIVATED = 4
class ConnectionStateFlags(int, Enum):
"""Connection state flags.
https://developer-old.gnome.org/NetworkManager/stable/nm-dbus-types.html#NMActivationStateFlags
"""
NONE = 0
IS_MASTER = 0x1
IS_SLAVE = 0x2
LAYER2_READY = 0x4
IP4_READY = 0x8
IP6_READY = 0x10
MASTER_HAS_SLAVES = 0x20
LIFETIME_BOUND_TO_PROFILE_VISIBILITY = 0x40
EXTERNAL = 0x80
class ConnectivityState(int, Enum):
"""Network connectvity.

View File

@ -8,8 +8,8 @@ import attr
class IpConfiguration:
"""NetworkSettingsIPConfig object for Network Manager."""
gateway: IPv6Address | IPv6Address | None = attr.ib()
nameservers: list[IPv6Address | IPv6Address] = attr.ib()
gateway: IPv4Address | IPv6Address | None = attr.ib()
nameservers: list[IPv4Address | IPv6Address] = attr.ib()
address: list[IPv4Interface | IPv6Interface] = attr.ib()

View File

@ -13,6 +13,7 @@ from ..const import (
DBUS_ATTR_NAMESERVER_DATA,
DBUS_ATTR_NAMESERVERS,
DBUS_ATTR_STATE,
DBUS_ATTR_STATE_FLAGS,
DBUS_ATTR_TYPE,
DBUS_ATTR_UUID,
DBUS_IFACE_CONNECTION_ACTIVE,
@ -20,6 +21,7 @@ from ..const import (
DBUS_IFACE_IP6CONFIG,
DBUS_NAME_NM,
DBUS_OBJECT_BASE,
ConnectionStateFlags,
ConnectionStateType,
)
from ..interface import DBusInterfaceProxy
@ -40,6 +42,7 @@ class NetworkConnection(DBusInterfaceProxy):
self._ipv4: IpConfiguration | None = None
self._ipv6: IpConfiguration | None = None
self._state_flags: set[ConnectionStateFlags] = {ConnectionStateFlags.NONE}
@property
def id(self) -> str:
@ -61,6 +64,11 @@ class NetworkConnection(DBusInterfaceProxy):
"""Return the state of the connection."""
return self.properties[DBUS_ATTR_STATE]
@property
def state_flags(self) -> set[ConnectionStateFlags]:
"""Return state flags of the connection."""
return self._state_flags
@property
def setting_object(self) -> int:
"""Return the connection object path."""
@ -86,6 +94,13 @@ class NetworkConnection(DBusInterfaceProxy):
"""Update connection information."""
self.properties = await self.dbus.get_properties(DBUS_IFACE_CONNECTION_ACTIVE)
# State Flags
self._state_flags = {
flag
for flag in ConnectionStateFlags
if flag.value & self.properties[DBUS_ATTR_STATE_FLAGS]
} or {ConnectionStateFlags.NONE}
# IPv4
if self.properties[DBUS_ATTR_IP4CONFIG] != DBUS_OBJECT_BASE:
ip4 = await DBus.connect(DBUS_NAME_NM, self.properties[DBUS_ATTR_IP4CONFIG])

View File

@ -31,6 +31,7 @@ from ..exceptions import (
)
from ..jobs.const import JobCondition
from ..jobs.decorator import Job
from ..resolution.checks.network_interface import CheckNetworkInterface
from .const import AuthMethod, InterfaceMethod, InterfaceType, WifiMode
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -109,8 +110,9 @@ class NetworkManager(CoreSysAttributes):
# Apply current settings on each interface so OS can update any out of date defaults
interfaces = [
Interface.from_dbus_interface(self.sys_dbus.network.interfaces[i])
for i in self.sys_dbus.network.interfaces
Interface.from_dbus_interface(interface)
for interface in self.sys_dbus.network.interfaces.values()
if not CheckNetworkInterface.check_interface(interface)
]
with suppress(HostNetworkNotFound):
await asyncio.gather(
@ -345,6 +347,16 @@ class Interface:
@staticmethod
def from_dbus_interface(inet: NetworkInterface) -> Interface:
"""Concert a dbus interface into normal Interface."""
ipv4_method = (
Interface._map_nm_method(inet.settings.ipv4.method)
if inet.settings and inet.settings.ipv4
else InterfaceMethod.DISABLED
)
ipv6_method = (
Interface._map_nm_method(inet.settings.ipv6.method)
if inet.settings and inet.settings.ipv6
else InterfaceMethod.DISABLED
)
return Interface(
inet.name,
inet.settings is not None,
@ -352,21 +364,21 @@ class Interface:
inet.primary,
Interface._map_nm_type(inet.type),
IpConfig(
Interface._map_nm_method(inet.settings.ipv4.method),
ipv4_method,
inet.connection.ipv4.address,
inet.connection.ipv4.gateway,
inet.connection.ipv4.nameservers,
)
if inet.connection and inet.connection.ipv4
else IpConfig(InterfaceMethod.DISABLED, [], None, []),
else IpConfig(ipv4_method, [], None, []),
IpConfig(
Interface._map_nm_method(inet.settings.ipv6.method),
ipv6_method,
inet.connection.ipv6.address,
inet.connection.ipv6.gateway,
inet.connection.ipv6.nameservers,
)
if inet.connection and inet.connection.ipv6
else IpConfig(InterfaceMethod.DISABLED, [], None, []),
else IpConfig(ipv6_method, [], None, []),
Interface._map_nm_wifi(inet),
Interface._map_nm_vlan(inet),
)

View File

@ -0,0 +1,63 @@
"""Helpers to check core security."""
from ...const import CoreState
from ...coresys import CoreSys
from ...dbus.const import ConnectionStateFlags, ConnectionStateType
from ...dbus.network.interface import NetworkInterface
from ..const import ContextType, IssueType
from .base import CheckBase
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckNetworkInterface(coresys)
class CheckNetworkInterface(CheckBase):
"""CheckNetworkInterface class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
for interface in self.sys_dbus.network.interfaces.values():
if CheckNetworkInterface.check_interface(interface):
self.sys_resolution.create_issue(
IssueType.NETWORK_CONNECTION_PROBLEM,
ContextType.SYSTEM,
interface.name,
)
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
if not reference:
return False
interface = self.sys_dbus.network.interfaces.get(reference)
return interface and CheckNetworkInterface.check_interface(interface)
@staticmethod
def check_interface(interface: NetworkInterface) -> bool:
"""Return true if a managed, connected interface has an issue."""
if not (interface.managed and interface.connection):
return False
return not (
interface.connection.state
in [ConnectionStateType.ACTIVATED, ConnectionStateType.ACTIVATING]
and ConnectionStateFlags.IP4_READY in interface.connection.state_flags
and ConnectionStateFlags.IP6_READY in interface.connection.state_flags
)
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.NETWORK_CONNECTION_PROBLEM
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SYSTEM
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -73,6 +73,7 @@ class IssueType(str, Enum):
FATAL_ERROR = "fatal_error"
FREE_SPACE = "free_space"
MISSING_IMAGE = "missing_image"
NETWORK_CONNECTION_PROBLEM = "network_connection_problem"
PWNED = "pwned"
SECURITY = "security"
TRUST = "trust"

View File

@ -6,7 +6,7 @@
"Type": "802-3-ethernet",
"Devices": ["/org/freedesktop/NetworkManager/Devices/1"],
"State": 2,
"StateFlags": 12,
"StateFlags": 92,
"Default": true,
"Ip4Config": "/org/freedesktop/NetworkManager/IP4Config/1",
"Dhcp4Config": "/org/freedesktop/NetworkManager/DHCP4Config/1",

View File

@ -1,8 +1,9 @@
"""Test network manager."""
from unittest.mock import Mock, patch
from ipaddress import IPv4Address, IPv6Address
from unittest.mock import Mock, PropertyMock, patch
from supervisor.coresys import CoreSys
from supervisor.dbus.const import InterfaceMethod
from supervisor.dbus.const import ConnectionStateFlags, InterfaceMethod
from supervisor.host.const import InterfaceType
from supervisor.host.network import Interface, IpConfig
@ -24,6 +25,14 @@ async def test_load(coresys: CoreSys):
assert len(coresys.host.network.interfaces) == 2
assert coresys.host.network.interfaces[0].name == "eth0"
assert coresys.host.network.interfaces[0].enabled is True
assert coresys.host.network.interfaces[0].ipv4.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv4.gateway == IPv4Address(
"192.168.2.1"
)
assert coresys.host.network.interfaces[0].ipv6.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv6.gateway == IPv6Address(
"fe80::da58:d7ff:fe00:9c69"
)
assert coresys.host.network.interfaces[1].name == "wlan0"
assert coresys.host.network.interfaces[1].enabled is False
@ -56,3 +65,31 @@ async def test_load_with_disabled_methods(coresys: CoreSys):
await coresys.host.network.load()
activate_connection.assert_not_called()
async def test_load_with_network_connection_issues(coresys: CoreSys):
"""Test load does not update interfaces with network connection issues."""
with patch(
"supervisor.dbus.network.connection.NetworkConnection.state_flags",
new=PropertyMock(return_value={ConnectionStateFlags.IP6_READY}),
), patch(
"supervisor.dbus.network.connection.NetworkConnection.ipv4",
new=PropertyMock(return_value=None),
), patch.object(
coresys.host.sys_dbus.network,
"activate_connection",
new=Mock(wraps=coresys.host.sys_dbus.network.activate_connection),
) as activate_connection:
await coresys.host.network.load()
activate_connection.assert_not_called()
assert len(coresys.host.network.interfaces) == 2
assert coresys.host.network.interfaces[0].name == "eth0"
assert coresys.host.network.interfaces[0].enabled is True
assert coresys.host.network.interfaces[0].ipv4.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv4.gateway is None
assert coresys.host.network.interfaces[0].ipv6.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv6.gateway == IPv6Address(
"fe80::da58:d7ff:fe00:9c69"
)

View File

@ -0,0 +1,94 @@
"""Test Check Network Interface."""
from unittest.mock import PropertyMock, patch
import pytest
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.dbus.const import ConnectionStateFlags
from supervisor.resolution.checks.network_interface import CheckNetworkInterface
from supervisor.resolution.const import ContextType, IssueType
from supervisor.resolution.data import Issue
async def test_base(coresys: CoreSys):
"""Test check basics."""
network_interface = CheckNetworkInterface(coresys)
assert network_interface.slug == "network_interface"
assert network_interface.enabled
@pytest.mark.parametrize(
"state_flags",
[
{ConnectionStateFlags.IP4_READY},
{ConnectionStateFlags.IP6_READY},
{ConnectionStateFlags.NONE},
],
)
async def test_check(coresys: CoreSys, state_flags: set[ConnectionStateFlags]):
"""Test check."""
network_interface = CheckNetworkInterface(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
await network_interface.run_check()
assert len(coresys.resolution.issues) == 0
with patch(
"supervisor.dbus.network.connection.NetworkConnection.state_flags",
new=PropertyMock(return_value=state_flags),
):
await network_interface.run_check()
assert coresys.resolution.issues == [
Issue(IssueType.NETWORK_CONNECTION_PROBLEM, ContextType.SYSTEM, "eth0")
]
@pytest.mark.parametrize(
"state_flags",
[
{ConnectionStateFlags.IP4_READY},
{ConnectionStateFlags.IP6_READY},
{ConnectionStateFlags.NONE},
],
)
async def test_approve(coresys: CoreSys, state_flags: set[ConnectionStateFlags]):
"""Test check."""
network_interface = CheckNetworkInterface(coresys)
coresys.core.state = CoreState.RUNNING
assert not await network_interface.approve_check("eth0")
with patch(
"supervisor.dbus.network.connection.NetworkConnection.state_flags",
new=PropertyMock(return_value=state_flags),
):
assert await network_interface.approve_check("eth0")
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
network_interface = CheckNetworkInterface(coresys)
should_run = network_interface.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.network_interface.CheckNetworkInterface.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await network_interface()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await network_interface()
check.assert_not_called()
check.reset_mock()