Identify network interfaces by mac over name (#4416)

* Identify network interfaces by mac over name

* Refactor long if statement into method
This commit is contained in:
Mike Degatano 2023-07-06 16:26:19 -04:00 committed by GitHub
parent 96d5fc244e
commit abbf8b9b65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 434 additions and 334 deletions

View File

@ -1,11 +1,11 @@
"""REST API for network."""
import asyncio
from collections.abc import Awaitable
from dataclasses import replace
from ipaddress import ip_address, ip_interface
from typing import Any
from aiohttp import web
import attr
import voluptuous as vol
from ..const import (
@ -43,8 +43,7 @@ from ..const import (
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, HostNetworkNotFound
from ..host.const import AuthMethod, InterfaceType, WifiMode
from ..host.network import (
from ..host.configuration import (
AccessPoint,
Interface,
InterfaceMethod,
@ -52,6 +51,7 @@ from ..host.network import (
VlanConfig,
WifiConfig,
)
from ..host.const import AuthMethod, InterfaceType, WifiMode
from .utils import api_process, api_validate
_SCHEMA_IP_CONFIG = vol.Schema(
@ -121,6 +121,7 @@ def interface_struct(interface: Interface) -> dict[str, Any]:
ATTR_ENABLED: interface.enabled,
ATTR_CONNECTED: interface.connected,
ATTR_PRIMARY: interface.primary,
ATTR_MAC: interface.mac,
ATTR_IPV4: ipconfig_struct(interface.ipv4) if interface.ipv4 else None,
ATTR_IPV6: ipconfig_struct(interface.ipv6) if interface.ipv6 else None,
ATTR_WIFI: wifi_struct(interface.wifi) if interface.wifi else None,
@ -196,19 +197,19 @@ class APINetwork(CoreSysAttributes):
# Apply config
for key, config in body.items():
if key == ATTR_IPV4:
interface.ipv4 = attr.evolve(
interface.ipv4 = replace(
interface.ipv4
or IpConfig(InterfaceMethod.STATIC, [], None, [], None),
**config,
)
elif key == ATTR_IPV6:
interface.ipv6 = attr.evolve(
interface.ipv6 = replace(
interface.ipv6
or IpConfig(InterfaceMethod.STATIC, [], None, [], None),
**config,
)
elif key == ATTR_WIFI:
interface.wifi = attr.evolve(
interface.wifi = replace(
interface.wifi
or WifiConfig(
WifiMode.INFRASTRUCTURE, "", AuthMethod.OPEN, None, None
@ -276,6 +277,7 @@ class APINetwork(CoreSysAttributes):
)
vlan_interface = Interface(
"",
"",
True,
True,

View File

@ -10,6 +10,7 @@ from ...exceptions import (
DBusFatalError,
DBusInterfaceError,
HostNotSupportedError,
NetworkInterfaceNotFound,
)
from ...utils.sentry import capture_exception
from ..const import (
@ -67,9 +68,9 @@ class NetworkManager(DBusInterfaceProxy):
return self._settings
@property
def interfaces(self) -> dict[str, NetworkInterface]:
def interfaces(self) -> set[NetworkInterface]:
"""Return a dictionary of active interfaces."""
return self._interfaces
return set(self._interfaces.values())
@property
@dbus_property
@ -83,6 +84,20 @@ class NetworkManager(DBusInterfaceProxy):
"""Return Network Manager version."""
return AwesomeVersion(self.properties[DBUS_ATTR_VERSION])
def get(self, name_or_mac: str) -> NetworkInterface:
"""Get an interface by name or mac address."""
if name_or_mac not in self._interfaces:
raise NetworkInterfaceNotFound(
f"No interface exists with name or mac address '{name_or_mac}'"
)
return self._interfaces[name_or_mac]
def __contains__(self, item: NetworkInterface | str) -> bool:
"""Return true if specified network interface exists."""
if isinstance(item, str):
return item in self._interfaces
return item in self.interfaces
@dbus_connected
async def activate_connection(
self, connection_object: str, device_object: str
@ -167,9 +182,9 @@ class NetworkManager(DBusInterfaceProxy):
if changed and (
DBUS_ATTR_DEVICES not in changed
or {
intr.object_path for intr in self.interfaces.values() if intr.managed
}.issubset(set(changed[DBUS_ATTR_DEVICES]))
or {intr.object_path for intr in self.interfaces if intr.managed}.issubset(
set(changed[DBUS_ATTR_DEVICES])
)
):
# If none of our managed devices were removed then most likely this is just veths changing.
# We don't care about veths and reprocessing all their changes can swamp a system when
@ -177,8 +192,8 @@ class NetworkManager(DBusInterfaceProxy):
# in rare occaisions but we'll catch it on the next host update scheduled task.
return
interfaces = {}
curr_devices = {intr.object_path: intr for intr in self.interfaces.values()}
interfaces: dict[str, NetworkInterface] = {}
curr_devices = {intr.object_path: intr for intr in self.interfaces}
for device in self.properties[DBUS_ATTR_DEVICES]:
if device in curr_devices and curr_devices[device].is_connected:
interface = curr_devices[device]
@ -222,6 +237,7 @@ class NetworkManager(DBusInterfaceProxy):
interface.primary = False
interfaces[interface.name] = interface
interfaces[interface.hw_address] = interface
# Disconnect removed devices
for device in set(curr_devices.keys()) - set(
@ -242,7 +258,7 @@ class NetworkManager(DBusInterfaceProxy):
def disconnect(self) -> None:
"""Disconnect from D-Bus."""
for intr in self.interfaces.values():
for intr in self.interfaces:
intr.shutdown()
super().disconnect()

View File

@ -1,66 +1,72 @@
"""NetworkConnection object4s for Network Manager."""
"""NetworkConnection objects for Network Manager."""
from dataclasses import dataclass
from ipaddress import IPv4Address, IPv6Address
import attr
@attr.s(slots=True)
@dataclass(slots=True)
class DNSConfiguration:
"""DNS configuration Object."""
nameservers: list[IPv4Address | IPv6Address] = attr.ib()
domains: list[str] = attr.ib()
interface: str = attr.ib()
priority: int = attr.ib()
vpn: bool = attr.ib()
nameservers: list[IPv4Address | IPv6Address]
domains: list[str]
interface: str
priority: int
vpn: bool
@attr.s(slots=True)
@dataclass(slots=True)
class ConnectionProperties:
"""Connection Properties object for Network Manager."""
id: str | None = attr.ib()
uuid: str | None = attr.ib()
type: str | None = attr.ib()
interface_name: str | None = attr.ib()
id: str | None
uuid: str | None
type: str | None
interface_name: str | None
@attr.s(slots=True)
@dataclass(slots=True)
class WirelessProperties:
"""Wireless Properties object for Network Manager."""
ssid: str | None = attr.ib()
assigned_mac: str | None = attr.ib()
mode: str | None = attr.ib()
powersave: int | None = attr.ib()
ssid: str | None
assigned_mac: str | None
mode: str | None
powersave: int | None
@attr.s(slots=True)
@dataclass(slots=True)
class WirelessSecurityProperties:
"""Wireless Security Properties object for Network Manager."""
auth_alg: str | None = attr.ib()
key_mgmt: str | None = attr.ib()
psk: str | None = attr.ib()
auth_alg: str | None
key_mgmt: str | None
psk: str | None
@attr.s(slots=True)
@dataclass(slots=True)
class EthernetProperties:
"""Ethernet properties object for Network Manager."""
assigned_mac: str | None = attr.ib()
assigned_mac: str | None
@attr.s(slots=True)
@dataclass(slots=True)
class VlanProperties:
"""Ethernet properties object for Network Manager."""
id: int | None = attr.ib()
parent: str | None = attr.ib()
id: int | None
parent: str | None
@attr.s(slots=True)
@dataclass(slots=True)
class IpProperties:
"""IP properties object for Network Manager."""
method: str | None = attr.ib()
method: str | None
@dataclass(slots=True)
class DeviceProperties:
"""Device properties object for Network Manager."""
match_device: str | None

View File

@ -9,6 +9,7 @@ from ..const import (
DBUS_ATTR_DEVICE_INTERFACE,
DBUS_ATTR_DEVICE_TYPE,
DBUS_ATTR_DRIVER,
DBUS_ATTR_HWADDRESS,
DBUS_ATTR_MANAGED,
DBUS_IFACE_DEVICE,
DBUS_NAME_NM,
@ -67,6 +68,12 @@ class NetworkInterface(DBusInterfaceProxy):
"""Return interface driver."""
return self.properties[DBUS_ATTR_MANAGED]
@property
@dbus_property
def hw_address(self) -> str:
"""Return hardware address (i.e. mac address) of device."""
return self.properties[DBUS_ATTR_HWADDRESS]
@property
def connection(self) -> NetworkConnection | None:
"""Return the connection used for this interface."""
@ -98,6 +105,18 @@ class NetworkInterface(DBusInterfaceProxy):
self._wireless = wireless
def __eq__(self, other: object) -> bool:
"""Is object equal to another."""
return (
isinstance(other, type(self))
and other.bus_name == self.bus_name
and other.object_path == self.object_path
)
def __hash__(self) -> int:
"""Hash of object."""
return hash((self.bus_name, self.object_path))
async def connect(self, bus: MessageBus) -> None:
"""Connect to D-Bus."""
await super().connect(bus)

View File

@ -2,6 +2,7 @@
import logging
from typing import Any
from dbus_fast import Variant
from dbus_fast.aio.message_bus import MessageBus
from ....const import ATTR_METHOD, ATTR_MODE, ATTR_PSK, ATTR_SSID
@ -10,6 +11,7 @@ from ...interface import DBusInterface
from ...utils import dbus_connected
from ..configuration import (
ConnectionProperties,
DeviceProperties,
EthernetProperties,
IpProperties,
VlanProperties,
@ -24,6 +26,7 @@ CONF_ATTR_802_WIRELESS_SECURITY = "802-11-wireless-security"
CONF_ATTR_VLAN = "vlan"
CONF_ATTR_IPV4 = "ipv4"
CONF_ATTR_IPV6 = "ipv6"
CONF_ATTR_DEVICE = "device"
ATTR_ID = "id"
ATTR_UUID = "uuid"
@ -34,6 +37,7 @@ ATTR_POWERSAVE = "powersave"
ATTR_AUTH_ALG = "auth-alg"
ATTR_KEY_MGMT = "key-mgmt"
ATTR_INTERFACE_NAME = "interface-name"
ATTR_MATCH_DEVICE = "match-device"
IPV4_6_IGNORE_FIELDS = [
"addresses",
@ -47,8 +51,8 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
def _merge_settings_attribute(
base_settings: Any,
new_settings: Any,
base_settings: dict[str, dict[str, Variant]],
new_settings: dict[str, dict[str, Variant]],
attribute: str,
*,
ignore_current_value: list[str] = None,
@ -58,8 +62,7 @@ def _merge_settings_attribute(
if attribute in base_settings:
if ignore_current_value:
for field in ignore_current_value:
if field in base_settings[attribute]:
del base_settings[attribute][field]
base_settings[attribute].pop(field, None)
base_settings[attribute].update(new_settings[attribute])
else:
@ -85,6 +88,7 @@ class NetworkSetting(DBusInterface):
self._vlan: VlanProperties | None = None
self._ipv4: IpProperties | None = None
self._ipv6: IpProperties | None = None
self._device: DeviceProperties | None = None
@property
def connection(self) -> ConnectionProperties | None:
@ -121,19 +125,29 @@ class NetworkSetting(DBusInterface):
"""Return ipv6 properties if any."""
return self._ipv6
@property
def device(self) -> DeviceProperties | None:
"""Return device properties if any."""
return self._device
@dbus_connected
async def get_settings(self) -> dict[str, Any]:
"""Return connection settings."""
return await self.dbus.Settings.Connection.call_get_settings()
@dbus_connected
async def update(self, settings: Any) -> None:
async def update(self, settings: dict[str, dict[str, Variant]]) -> None:
"""Update connection settings."""
new_settings = await self.dbus.Settings.Connection.call_get_settings(
unpack_variants=False
)
new_settings: dict[
str, dict[str, Variant]
] = await self.dbus.Settings.Connection.call_get_settings(unpack_variants=False)
_merge_settings_attribute(new_settings, settings, CONF_ATTR_CONNECTION)
_merge_settings_attribute(
new_settings,
settings,
CONF_ATTR_CONNECTION,
ignore_current_value=[ATTR_INTERFACE_NAME],
)
_merge_settings_attribute(new_settings, settings, CONF_ATTR_802_ETHERNET)
_merge_settings_attribute(new_settings, settings, CONF_ATTR_802_WIRELESS)
_merge_settings_attribute(
@ -152,6 +166,7 @@ class NetworkSetting(DBusInterface):
CONF_ATTR_IPV6,
ignore_current_value=IPV4_6_IGNORE_FIELDS,
)
_merge_settings_attribute(new_settings, settings, CONF_ATTR_DEVICE)
await self.dbus.Settings.Connection.call_update(new_settings)
@ -217,3 +232,8 @@ class NetworkSetting(DBusInterface):
self._ipv6 = IpProperties(
data[CONF_ATTR_IPV6].get(ATTR_METHOD),
)
if CONF_ATTR_DEVICE in data:
self._device = DeviceProperties(
data[CONF_ATTR_DEVICE].get(ATTR_MATCH_DEVICE)
)

View File

@ -9,10 +9,12 @@ from dbus_fast import Variant
from . import (
ATTR_ASSIGNED_MAC,
ATTR_MATCH_DEVICE,
CONF_ATTR_802_ETHERNET,
CONF_ATTR_802_WIRELESS,
CONF_ATTR_802_WIRELESS_SECURITY,
CONF_ATTR_CONNECTION,
CONF_ATTR_DEVICE,
CONF_ATTR_IPV4,
CONF_ATTR_IPV6,
CONF_ATTR_VLAN,
@ -20,7 +22,7 @@ from . import (
from ....host.const import InterfaceMethod, InterfaceType
if TYPE_CHECKING:
from ....host.network import Interface
from ....host.configuration import Interface
def get_connection_from_interface(
@ -45,20 +47,21 @@ def get_connection_from_interface(
if not uuid:
uuid = str(uuid4())
connection = {
"id": Variant("s", name),
"type": Variant("s", iftype),
"uuid": Variant("s", uuid),
"llmnr": Variant("i", 2),
"mdns": Variant("i", 2),
"autoconnect": Variant("b", True),
conn: dict[str, dict[str, Variant]] = {
CONF_ATTR_CONNECTION: {
"id": Variant("s", name),
"type": Variant("s", iftype),
"uuid": Variant("s", uuid),
"llmnr": Variant("i", 2),
"mdns": Variant("i", 2),
"autoconnect": Variant("b", True),
},
}
if interface.type != InterfaceType.VLAN:
connection["interface-name"] = Variant("s", interface.name)
conn = {}
conn[CONF_ATTR_CONNECTION] = connection
conn[CONF_ATTR_DEVICE] = {
ATTR_MATCH_DEVICE: Variant("s", f"mac:{interface.mac}")
}
ipv4 = {}
if not interface.ipv4 or interface.ipv4.method == InterfaceMethod.AUTO:

View File

@ -593,3 +593,10 @@ class MountNotFound(MountError):
class MountJobError(MountError, JobException):
"""Raise on Mount job error."""
# Network
class NetworkInterfaceNotFound(HassioError):
"""Raise on network interface not found."""

View File

@ -0,0 +1,211 @@
"""Network objects for host manager."""
from dataclasses import dataclass
from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
from ..dbus.const import (
ConnectionStateFlags,
ConnectionStateType,
DeviceType,
InterfaceMethod as NMInterfaceMethod,
)
from ..dbus.network.connection import NetworkConnection
from ..dbus.network.interface import NetworkInterface
from .const import AuthMethod, InterfaceMethod, InterfaceType, WifiMode
@dataclass(slots=True)
class AccessPoint:
"""Represent a wifi configuration."""
mode: WifiMode
ssid: str
mac: str
frequency: int
signal: int
@dataclass(slots=True)
class IpConfig:
"""Represent a IP configuration."""
method: InterfaceMethod
address: list[IPv4Interface | IPv6Interface]
gateway: IPv4Address | IPv6Address | None
nameservers: list[IPv4Address | IPv6Address]
ready: bool | None
@dataclass(slots=True)
class WifiConfig:
"""Represent a wifi configuration."""
mode: WifiMode
ssid: str
auth: AuthMethod
psk: str | None
signal: int | None
@dataclass(slots=True)
class VlanConfig:
"""Represent a vlan configuration."""
id: int
interface: str
@dataclass(slots=True)
class Interface:
"""Represent a host network interface."""
name: str
mac: str
enabled: bool
connected: bool
primary: bool
type: InterfaceType
ipv4: IpConfig | None
ipv6: IpConfig | None
wifi: WifiConfig | None
vlan: VlanConfig | None
def equals_dbus_interface(self, inet: NetworkInterface) -> bool:
"""Return true if this represents the dbus interface."""
if not inet.settings:
return False
if inet.settings.device:
return inet.settings.device.match_device == f"mac:{self.mac}"
return inet.settings.connection.interface_name == self.name
@staticmethod
def from_dbus_interface(inet: NetworkInterface) -> "Interface":
"""Coerce a dbus interface into normal Interface."""
ipv4_method = (
Interface._map_nm_method(inet.settings.ipv4.method)
if inet.settings and inet.settings.ipv4
else InterfaceMethod.DISABLED
)
ipv6_method = (
Interface._map_nm_method(inet.settings.ipv6.method)
if inet.settings and inet.settings.ipv6
else InterfaceMethod.DISABLED
)
ipv4_ready = (
bool(inet.connection)
and ConnectionStateFlags.IP4_READY in inet.connection.state_flags
)
ipv6_ready = (
bool(inet.connection)
and ConnectionStateFlags.IP6_READY in inet.connection.state_flags
)
return Interface(
inet.name,
inet.hw_address,
inet.settings is not None,
Interface._map_nm_connected(inet.connection),
inet.primary,
Interface._map_nm_type(inet.type),
IpConfig(
ipv4_method,
inet.connection.ipv4.address if inet.connection.ipv4.address else [],
inet.connection.ipv4.gateway,
inet.connection.ipv4.nameservers
if inet.connection.ipv4.nameservers
else [],
ipv4_ready,
)
if inet.connection and inet.connection.ipv4
else IpConfig(ipv4_method, [], None, [], ipv4_ready),
IpConfig(
ipv6_method,
inet.connection.ipv6.address if inet.connection.ipv6.address else [],
inet.connection.ipv6.gateway,
inet.connection.ipv6.nameservers
if inet.connection.ipv6.nameservers
else [],
ipv6_ready,
)
if inet.connection and inet.connection.ipv6
else IpConfig(ipv6_method, [], None, [], ipv6_ready),
Interface._map_nm_wifi(inet),
Interface._map_nm_vlan(inet),
)
@staticmethod
def _map_nm_method(method: str) -> InterfaceMethod:
"""Map IP interface method."""
mapping = {
NMInterfaceMethod.AUTO: InterfaceMethod.AUTO,
NMInterfaceMethod.DISABLED: InterfaceMethod.DISABLED,
NMInterfaceMethod.MANUAL: InterfaceMethod.STATIC,
NMInterfaceMethod.LINK_LOCAL: InterfaceMethod.DISABLED,
}
return mapping.get(method, InterfaceMethod.DISABLED)
@staticmethod
def _map_nm_connected(connection: NetworkConnection | None) -> bool:
"""Map connectivity state."""
if not connection:
return False
return connection.state in (
ConnectionStateType.ACTIVATED,
ConnectionStateType.ACTIVATING,
)
@staticmethod
def _map_nm_type(device_type: int) -> InterfaceType:
mapping = {
DeviceType.ETHERNET: InterfaceType.ETHERNET,
DeviceType.WIRELESS: InterfaceType.WIRELESS,
DeviceType.VLAN: InterfaceType.VLAN,
}
return mapping[device_type]
@staticmethod
def _map_nm_wifi(inet: NetworkInterface) -> WifiConfig | None:
"""Create mapping to nm wifi property."""
if inet.type != DeviceType.WIRELESS or not inet.settings:
return None
# Authentication and PSK
auth = None
psk = None
if not inet.settings.wireless_security:
auth = AuthMethod.OPEN
elif inet.settings.wireless_security.key_mgmt == "none":
auth = AuthMethod.WEP
elif inet.settings.wireless_security.key_mgmt == "wpa-psk":
auth = AuthMethod.WPA_PSK
psk = inet.settings.wireless_security.psk
# WifiMode
mode = WifiMode.INFRASTRUCTURE
if inet.settings.wireless.mode:
mode = WifiMode(inet.settings.wireless.mode)
# Signal
if inet.wireless:
signal = inet.wireless.active.strength
else:
signal = None
return WifiConfig(
mode,
inet.settings.wireless.ssid,
auth,
psk,
signal,
)
@staticmethod
def _map_nm_vlan(inet: NetworkInterface) -> WifiConfig | None:
"""Create mapping to nm vlan property."""
if inet.type != DeviceType.VLAN or not inet.settings:
return None
return VlanConfig(inet.settings.vlan.id, inet.settings.vlan.parent)

View File

@ -1,14 +1,9 @@
"""Info control for host."""
from __future__ import annotations
import asyncio
from contextlib import suppress
from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
import logging
from typing import Any
import attr
from ..const import ATTR_HOST_INTERNET
from ..coresys import CoreSys, CoreSysAttributes
from ..dbus.const import (
@ -16,11 +11,9 @@ from ..dbus.const import (
DBUS_ATTR_CONNECTIVITY,
DBUS_IFACE_NM,
DBUS_SIGNAL_NM_CONNECTION_ACTIVE_CHANGED,
ConnectionStateFlags,
ConnectionStateType,
ConnectivityState,
DeviceType,
InterfaceMethod as NMInterfaceMethod,
WirelessMethodType,
)
from ..dbus.network.connection import NetworkConnection
@ -32,11 +25,13 @@ from ..exceptions import (
HostNetworkError,
HostNetworkNotFound,
HostNotSupportedError,
NetworkInterfaceNotFound,
)
from ..jobs.const import JobCondition
from ..jobs.decorator import Job
from ..resolution.checks.network_interface_ipv4 import CheckNetworkInterfaceIPV4
from .const import AuthMethod, InterfaceMethod, InterfaceType, WifiMode
from .configuration import AccessPoint, Interface
from .const import InterfaceMethod, WifiMode
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -74,7 +69,7 @@ class NetworkManager(CoreSysAttributes):
def interfaces(self) -> list[Interface]:
"""Return a dictionary of active interfaces."""
interfaces: list[Interface] = []
for inet in self.sys_dbus.network.interfaces.values():
for inet in self.sys_dbus.network.interfaces:
interfaces.append(Interface.from_dbus_interface(inet))
return interfaces
@ -107,12 +102,10 @@ class NetworkManager(CoreSysAttributes):
def get(self, inet_name: str) -> Interface:
"""Return interface from interface name."""
if inet_name not in self.sys_dbus.network.interfaces:
if inet_name not in self.sys_dbus.network:
raise HostNetworkNotFound()
return Interface.from_dbus_interface(
self.sys_dbus.network.interfaces[inet_name]
)
return Interface.from_dbus_interface(self.sys_dbus.network.get(inet_name))
@Job(conditions=[JobCondition.HOST_NETWORK])
async def load(self):
@ -120,7 +113,7 @@ class NetworkManager(CoreSysAttributes):
# Apply current settings on each interface so OS can update any out of date defaults
interfaces = [
Interface.from_dbus_interface(interface)
for interface in self.sys_dbus.network.interfaces.values()
for interface in self.sys_dbus.network.interfaces
if not CheckNetworkInterfaceIPV4.check_interface(interface)
]
with suppress(HostNetworkNotFound):
@ -181,16 +174,14 @@ class NetworkManager(CoreSysAttributes):
self, interface: Interface, *, update_only: bool = False
) -> None:
"""Apply Interface changes to host."""
inet = self.sys_dbus.network.interfaces.get(interface.name)
inet: NetworkInterface | None = None
with suppress(NetworkInterfaceNotFound):
inet = self.sys_dbus.network.get(interface.name)
con: NetworkConnection = None
# Update exist configuration
if (
inet
and inet.settings
and inet.settings.connection.interface_name == interface.name
and interface.enabled
):
if inet and interface.equals_dbus_interface(inet) and interface.enabled:
_LOGGER.debug("Updating existing configuration for %s", interface.name)
settings = get_connection_from_interface(
interface,
@ -287,7 +278,7 @@ class NetworkManager(CoreSysAttributes):
async def scan_wifi(self, interface: Interface) -> list[AccessPoint]:
"""Scan on Interface for AccessPoint."""
inet = self.sys_dbus.network.interfaces.get(interface.name)
inet = self.sys_dbus.network.get(interface.name)
if inet.type != DeviceType.WIRELESS:
raise HostNotSupportedError(
@ -315,188 +306,3 @@ class NetworkManager(CoreSysAttributes):
for accesspoint in await inet.wireless.get_all_accesspoints()
if accesspoint.dbus
]
@attr.s(slots=True)
class AccessPoint:
"""Represent a wifi configuration."""
mode: WifiMode = attr.ib()
ssid: str = attr.ib()
mac: str = attr.ib()
frequency: int = attr.ib()
signal: int = attr.ib()
@attr.s(slots=True)
class IpConfig:
"""Represent a IP configuration."""
method: InterfaceMethod = attr.ib()
address: list[IPv4Interface | IPv6Interface] = attr.ib()
gateway: IPv4Address | IPv6Address | None = attr.ib()
nameservers: list[IPv4Address | IPv6Address] = attr.ib()
ready: bool | None = attr.ib()
@attr.s(slots=True)
class WifiConfig:
"""Represent a wifi configuration."""
mode: WifiMode = attr.ib()
ssid: str = attr.ib()
auth: AuthMethod = attr.ib()
psk: str | None = attr.ib()
signal: int | None = attr.ib()
@attr.s(slots=True)
class VlanConfig:
"""Represent a vlan configuration."""
id: int = attr.ib()
interface: str = attr.ib()
@attr.s(slots=True)
class Interface:
"""Represent a host network interface."""
name: str = attr.ib()
enabled: bool = attr.ib()
connected: bool = attr.ib()
primary: bool = attr.ib()
type: InterfaceType = attr.ib()
ipv4: IpConfig | None = attr.ib()
ipv6: IpConfig | None = attr.ib()
wifi: WifiConfig | None = attr.ib()
vlan: VlanConfig | None = attr.ib()
@staticmethod
def from_dbus_interface(inet: NetworkInterface) -> Interface:
"""Concert a dbus interface into normal Interface."""
ipv4_method = (
Interface._map_nm_method(inet.settings.ipv4.method)
if inet.settings and inet.settings.ipv4
else InterfaceMethod.DISABLED
)
ipv6_method = (
Interface._map_nm_method(inet.settings.ipv6.method)
if inet.settings and inet.settings.ipv6
else InterfaceMethod.DISABLED
)
ipv4_ready = (
bool(inet.connection)
and ConnectionStateFlags.IP4_READY in inet.connection.state_flags
)
ipv6_ready = (
bool(inet.connection)
and ConnectionStateFlags.IP6_READY in inet.connection.state_flags
)
return Interface(
inet.name,
inet.settings is not None,
Interface._map_nm_connected(inet.connection),
inet.primary,
Interface._map_nm_type(inet.type),
IpConfig(
ipv4_method,
inet.connection.ipv4.address if inet.connection.ipv4.address else [],
inet.connection.ipv4.gateway,
inet.connection.ipv4.nameservers
if inet.connection.ipv4.nameservers
else [],
ipv4_ready,
)
if inet.connection and inet.connection.ipv4
else IpConfig(ipv4_method, [], None, [], ipv4_ready),
IpConfig(
ipv6_method,
inet.connection.ipv6.address if inet.connection.ipv6.address else [],
inet.connection.ipv6.gateway,
inet.connection.ipv6.nameservers
if inet.connection.ipv6.nameservers
else [],
ipv6_ready,
)
if inet.connection and inet.connection.ipv6
else IpConfig(ipv6_method, [], None, [], ipv6_ready),
Interface._map_nm_wifi(inet),
Interface._map_nm_vlan(inet),
)
@staticmethod
def _map_nm_method(method: str) -> InterfaceMethod:
"""Map IP interface method."""
mapping = {
NMInterfaceMethod.AUTO: InterfaceMethod.AUTO,
NMInterfaceMethod.DISABLED: InterfaceMethod.DISABLED,
NMInterfaceMethod.MANUAL: InterfaceMethod.STATIC,
NMInterfaceMethod.LINK_LOCAL: InterfaceMethod.DISABLED,
}
return mapping.get(method, InterfaceMethod.DISABLED)
@staticmethod
def _map_nm_connected(connection: NetworkConnection | None) -> bool:
"""Map connectivity state."""
if not connection:
return False
return connection.state in (
ConnectionStateType.ACTIVATED,
ConnectionStateType.ACTIVATING,
)
@staticmethod
def _map_nm_type(device_type: int) -> InterfaceType:
mapping = {
DeviceType.ETHERNET: InterfaceType.ETHERNET,
DeviceType.WIRELESS: InterfaceType.WIRELESS,
DeviceType.VLAN: InterfaceType.VLAN,
}
return mapping[device_type]
@staticmethod
def _map_nm_wifi(inet: NetworkInterface) -> WifiConfig | None:
"""Create mapping to nm wifi property."""
if inet.type != DeviceType.WIRELESS or not inet.settings:
return None
# Authentication and PSK
auth = None
psk = None
if not inet.settings.wireless_security:
auth = AuthMethod.OPEN
elif inet.settings.wireless_security.key_mgmt == "none":
auth = AuthMethod.WEP
elif inet.settings.wireless_security.key_mgmt == "wpa-psk":
auth = AuthMethod.WPA_PSK
psk = inet.settings.wireless_security.psk
# WifiMode
mode = WifiMode.INFRASTRUCTURE
if inet.settings.wireless.mode:
mode = WifiMode(inet.settings.wireless.mode)
# Signal
if inet.wireless:
signal = inet.wireless.active.strength
else:
signal = None
return WifiConfig(
mode,
inet.settings.wireless.ssid,
auth,
psk,
signal,
)
@staticmethod
def _map_nm_vlan(inet: NetworkInterface) -> WifiConfig | None:
"""Create mapping to nm vlan property."""
if inet.type != DeviceType.VLAN or not inet.settings:
return None
return VlanConfig(inet.settings.vlan.id, inet.settings.vlan.parent)

View File

@ -3,6 +3,7 @@ from ...const import CoreState
from ...coresys import CoreSys
from ...dbus.const import ConnectionStateFlags, ConnectionStateType
from ...dbus.network.interface import NetworkInterface
from ...exceptions import NetworkInterfaceNotFound
from ..const import ContextType, IssueType
from .base import CheckBase
@ -17,7 +18,7 @@ class CheckNetworkInterfaceIPV4(CheckBase):
async def run_check(self) -> None:
"""Run check if not affected by issue."""
for interface in self.sys_dbus.network.interfaces.values():
for interface in self.sys_dbus.network.interfaces:
if CheckNetworkInterfaceIPV4.check_interface(interface):
self.sys_resolution.create_issue(
IssueType.IPV4_CONNECTION_PROBLEM,
@ -30,9 +31,12 @@ class CheckNetworkInterfaceIPV4(CheckBase):
if not reference:
return False
interface = self.sys_dbus.network.interfaces.get(reference)
return interface and CheckNetworkInterfaceIPV4.check_interface(interface)
try:
return CheckNetworkInterfaceIPV4.check_interface(
self.sys_dbus.network.get(reference)
)
except NetworkInterfaceNotFound:
return False
@staticmethod
def check_interface(interface: NetworkInterface) -> bool:

View File

@ -1,7 +1,9 @@
"""Test NetwrokInterface API."""
from unittest.mock import AsyncMock, patch
from aiohttp.test_utils import TestClient
from dbus_fast import Variant
import pytest
from supervisor.const import DOCKER_NETWORK, DOCKER_NETWORK_MASK
from supervisor.coresys import CoreSys
@ -17,7 +19,7 @@ from tests.dbus_service_mocks.network_manager import (
from tests.dbus_service_mocks.network_settings import Settings as SettingsService
async def test_api_network_info(api_client, coresys: CoreSys):
async def test_api_network_info(api_client: TestClient, coresys: CoreSys):
"""Test network manager api."""
resp = await api_client.get("/network/info")
result = await resp.json()
@ -32,8 +34,10 @@ async def test_api_network_info(api_client, coresys: CoreSys):
if interface["interface"] == TEST_INTERFACE:
assert interface["primary"]
assert interface["ipv4"]["gateway"] == "192.168.2.1"
assert interface["mac"] == "AA:BB:CC:DD:EE:FF"
if interface["interface"] == TEST_INTERFACE_WLAN:
assert not interface["primary"]
assert interface["mac"] == "FF:EE:DD:CC:BB:AA"
assert interface["ipv4"] == {
"address": [],
"gateway": None,
@ -55,9 +59,10 @@ async def test_api_network_info(api_client, coresys: CoreSys):
assert result["data"]["docker"]["gateway"] == str(coresys.docker.network.gateway)
async def test_api_network_interface_info(api_client):
@pytest.mark.parametrize("intr_id", [TEST_INTERFACE, "AA:BB:CC:DD:EE:FF"])
async def test_api_network_interface_info(api_client: TestClient, intr_id: str):
"""Test network manager api."""
resp = await api_client.get(f"/network/interface/{TEST_INTERFACE}/info")
resp = await api_client.get(f"/network/interface/{intr_id}/info")
result = await resp.json()
assert result["data"]["ipv4"]["address"][-1] == "192.168.2.148/24"
assert result["data"]["ipv4"]["gateway"] == "192.168.2.1"
@ -76,7 +81,7 @@ async def test_api_network_interface_info(api_client):
assert result["data"]["interface"] == TEST_INTERFACE
async def test_api_network_interface_info_default(api_client):
async def test_api_network_interface_info_default(api_client: TestClient):
"""Test network manager default api."""
resp = await api_client.get("/network/interface/default/info")
result = await resp.json()
@ -97,21 +102,21 @@ async def test_api_network_interface_info_default(api_client):
assert result["data"]["interface"] == TEST_INTERFACE
@pytest.mark.parametrize("intr_id", [TEST_INTERFACE, "AA:BB:CC:DD:EE:FF"])
async def test_api_network_interface_update(
api_client,
api_client: TestClient,
coresys: CoreSys,
network_manager_service: NetworkManagerService,
connection_settings_service: ConnectionSettingsService,
intr_id: str,
):
"""Test network manager api."""
network_manager_service.CheckConnectivity.calls.clear()
connection_settings_service.Update.calls.clear()
assert (
coresys.dbus.network.interfaces[TEST_INTERFACE].settings.ipv4.method == "auto"
)
assert coresys.dbus.network.get(TEST_INTERFACE).settings.ipv4.method == "auto"
resp = await api_client.post(
f"/network/interface/{TEST_INTERFACE}/update",
f"/network/interface/{intr_id}/update",
json={
"ipv4": {
"method": "static",
@ -128,12 +133,10 @@ async def test_api_network_interface_update(
await connection_settings_service.ping()
await connection_settings_service.ping()
assert (
coresys.dbus.network.interfaces[TEST_INTERFACE].settings.ipv4.method == "manual"
)
assert coresys.dbus.network.get(TEST_INTERFACE).settings.ipv4.method == "manual"
async def test_api_network_interface_update_wifi(api_client):
async def test_api_network_interface_update_wifi(api_client: TestClient):
"""Test network manager api."""
resp = await api_client.post(
f"/network/interface/{TEST_INTERFACE_WLAN}/update",
@ -152,7 +155,7 @@ async def test_api_network_interface_update_wifi(api_client):
assert result["result"] == "ok"
async def test_api_network_interface_update_remove(api_client):
async def test_api_network_interface_update_remove(api_client: TestClient):
"""Test network manager api."""
resp = await api_client.post(
f"/network/interface/{TEST_INTERFACE}/update",
@ -162,7 +165,7 @@ async def test_api_network_interface_update_remove(api_client):
assert result["result"] == "ok"
async def test_api_network_interface_info_invalid(api_client):
async def test_api_network_interface_info_invalid(api_client: TestClient):
"""Test network manager api."""
resp = await api_client.get("/network/interface/invalid/info")
result = await resp.json()
@ -171,7 +174,7 @@ async def test_api_network_interface_info_invalid(api_client):
assert result["result"] == "error"
async def test_api_network_interface_update_invalid(api_client):
async def test_api_network_interface_update_invalid(api_client: TestClient):
"""Test network manager api."""
resp = await api_client.post("/network/interface/invalid/update", json={})
result = await resp.json()
@ -192,7 +195,7 @@ async def test_api_network_interface_update_invalid(api_client):
)
async def test_api_network_wireless_scan(api_client):
async def test_api_network_wireless_scan(api_client: TestClient):
"""Test network manager api."""
with patch("asyncio.sleep", return_value=AsyncMock()):
resp = await api_client.get(
@ -207,7 +210,9 @@ async def test_api_network_wireless_scan(api_client):
async def test_api_network_reload(
api_client, coresys, network_manager_service: NetworkManagerService
api_client: TestClient,
coresys: CoreSys,
network_manager_service: NetworkManagerService,
):
"""Test network manager reload api."""
network_manager_service.CheckConnectivity.calls.clear()
@ -220,7 +225,7 @@ async def test_api_network_reload(
async def test_api_network_vlan(
api_client,
api_client: TestClient,
coresys: CoreSys,
network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
):

View File

@ -12,14 +12,15 @@ from tests.const import TEST_INTERFACE
@pytest.mark.asyncio
async def test_get_connection_from_interface(network_manager: NetworkManager):
"""Test network interface."""
dbus_interface = network_manager.interfaces[TEST_INTERFACE]
dbus_interface = network_manager.get(TEST_INTERFACE)
interface = Interface.from_dbus_interface(dbus_interface)
connection_payload = get_connection_from_interface(interface)
assert "connection" in connection_payload
assert connection_payload["connection"]["interface-name"].value == TEST_INTERFACE
assert "interface-name" not in connection_payload["connection"]
assert connection_payload["connection"]["type"].value == "802-3-ethernet"
assert connection_payload["device"]["match-device"].value == "mac:AA:BB:CC:DD:EE:FF"
assert connection_payload["ipv4"]["method"].value == "auto"
assert "address-data" not in connection_payload["ipv4"]

View File

@ -52,12 +52,14 @@ async def test_update(
settings = connection_settings_service.Update.calls[0][0]
assert settings["connection"]["id"] == Variant("s", "Supervisor eth0")
assert settings["connection"]["interface-name"] == Variant("s", "eth0")
assert "interface-name" not in settings["connection"]
assert settings["connection"]["uuid"] == Variant(
"s", "0c23631e-2118-355c-bbb0-8943229cb0d6"
)
assert settings["connection"]["autoconnect"] == Variant("b", True)
assert settings["device"] == {"match-device": Variant("s", "mac:AA:BB:CC:DD:EE:FF")}
assert "ipv4" in settings
assert settings["ipv4"]["method"] == Variant("s", "auto")
assert "gateway" not in settings["ipv4"]

View File

@ -57,7 +57,7 @@ async def test_old_ipv4_disconnect(
network_manager: NetworkManager, active_connection_service: ActiveConnectionService
):
"""Test old ipv4 disconnects on ipv4 change."""
connection = network_manager.interfaces[TEST_INTERFACE].connection
connection = network_manager.get(TEST_INTERFACE).connection
ipv4 = connection.ipv4
assert ipv4.is_connected is True
@ -72,7 +72,7 @@ async def test_old_ipv6_disconnect(
network_manager: NetworkManager, active_connection_service: ActiveConnectionService
):
"""Test old ipv6 disconnects on ipv6 change."""
connection = network_manager.interfaces[TEST_INTERFACE].connection
connection = network_manager.get(TEST_INTERFACE).connection
ipv6 = connection.ipv6
assert ipv6.is_connected is True
@ -87,7 +87,7 @@ async def test_old_settings_disconnect(
network_manager: NetworkManager, active_connection_service: ActiveConnectionService
):
"""Test old settings disconnects on settings change."""
connection = network_manager.interfaces[TEST_INTERFACE].connection
connection = network_manager.get(TEST_INTERFACE).connection
settings = connection.settings
assert settings.is_connected is True

View File

@ -118,7 +118,7 @@ async def test_old_connection_disconnect(
network_manager: NetworkManager, device_eth0_service: DeviceService
):
"""Test old connection disconnects on connection change."""
interface = network_manager.interfaces[TEST_INTERFACE]
interface = network_manager.get(TEST_INTERFACE)
connection = interface.connection
assert connection.is_connected is True
@ -133,7 +133,7 @@ async def test_old_wireless_disconnect(
network_manager: NetworkManager, device_wlan0_service: DeviceService
):
"""Test old wireless disconnects on type change."""
interface = network_manager.interfaces[TEST_INTERFACE_WLAN]
interface = network_manager.get(TEST_INTERFACE_WLAN)
wireless = interface.wireless
assert wireless.is_connected is True
@ -167,9 +167,9 @@ async def test_interface_becomes_unmanaged(
device_wlan0_service: DeviceService,
):
"""Test managed objects disconnect when interface becomes unmanaged."""
eth0 = network_manager.interfaces[TEST_INTERFACE]
eth0 = network_manager.get(TEST_INTERFACE)
connection = eth0.connection
wlan0 = network_manager.interfaces[TEST_INTERFACE_WLAN]
wlan0 = network_manager.get(TEST_INTERFACE_WLAN)
wireless = wlan0.wireless
assert connection.is_connected is True

View File

@ -38,7 +38,7 @@ async def test_network_manager(
await network_manager.connect(dbus_session_bus)
assert TEST_INTERFACE in network_manager.interfaces
assert TEST_INTERFACE in network_manager
assert network_manager.connectivity_enabled is True
network_manager_service.emit_properties_changed({"ConnectivityCheckEnabled": False})
@ -123,13 +123,13 @@ async def test_removed_devices_disconnect(
network_manager_service: NetworkManagerService, network_manager: NetworkManager
):
"""Test removed devices are disconnected."""
wlan = network_manager.interfaces[TEST_INTERFACE_WLAN]
wlan = network_manager.get(TEST_INTERFACE_WLAN)
assert wlan.is_connected is True
network_manager_service.emit_properties_changed({"Devices": []})
await network_manager_service.ping()
assert TEST_INTERFACE_WLAN not in network_manager.interfaces
assert TEST_INTERFACE_WLAN not in network_manager
assert wlan.is_connected is False

View File

@ -56,17 +56,16 @@ async def test_request_scan(
"""Test request scan."""
device_wireless_service.RequestScan.calls.clear()
assert (
await network_manager.interfaces[TEST_INTERFACE_WLAN].wireless.request_scan()
is None
await network_manager.get(TEST_INTERFACE_WLAN).wireless.request_scan() is None
)
assert device_wireless_service.RequestScan.calls == [({},)]
async def test_get_all_access_points(network_manager: NetworkManager):
"""Test get all access points."""
accesspoints = await network_manager.interfaces[
accesspoints = await network_manager.get(
TEST_INTERFACE_WLAN
].wireless.get_all_accesspoints()
).wireless.get_all_accesspoints()
assert len(accesspoints) == 2
assert accesspoints[0].mac == "E4:57:40:A9:D7:DE"
assert accesspoints[0].mode == 2
@ -76,7 +75,7 @@ async def test_get_all_access_points(network_manager: NetworkManager):
async def test_old_active_ap_disconnects(network_manager: NetworkManager):
"""Test old access point disconnects on active ap change."""
wireless = network_manager.interfaces[TEST_INTERFACE_WLAN].wireless
wireless = network_manager.get(TEST_INTERFACE_WLAN).wireless
await wireless.update(
{"ActiveAccessPoint": "/org/freedesktop/NetworkManager/AccessPoint/43099"}

View File

@ -58,18 +58,18 @@ async def test_load(coresys: CoreSys, network_manager_service: NetworkManagerSer
assert str(coresys.host.network.dns_servers[0]) == "192.168.30.1"
assert len(coresys.host.network.interfaces) == 2
assert coresys.host.network.interfaces[0].name == "eth0"
assert coresys.host.network.interfaces[0].enabled is True
assert coresys.host.network.interfaces[0].ipv4.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv4.gateway == IPv4Address("192.168.2.1")
assert coresys.host.network.interfaces[0].ipv4.ready is True
assert coresys.host.network.interfaces[0].ipv6.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv6.gateway == IPv6Address(
"fe80::da58:d7ff:fe00:9c69"
)
assert coresys.host.network.interfaces[0].ipv6.ready is True
assert coresys.host.network.interfaces[1].name == "wlan0"
assert coresys.host.network.interfaces[1].enabled is False
name_dict = {intr.name: intr for intr in coresys.host.network.interfaces}
assert "eth0" in name_dict
assert name_dict["eth0"].mac == "AA:BB:CC:DD:EE:FF"
assert name_dict["eth0"].enabled is True
assert name_dict["eth0"].ipv4.method == InterfaceMethod.AUTO
assert name_dict["eth0"].ipv4.gateway == IPv4Address("192.168.2.1")
assert name_dict["eth0"].ipv4.ready is True
assert name_dict["eth0"].ipv6.method == InterfaceMethod.AUTO
assert name_dict["eth0"].ipv6.gateway == IPv6Address("fe80::da58:d7ff:fe00:9c69")
assert name_dict["eth0"].ipv6.ready is True
assert "wlan0" in name_dict
assert name_dict["wlan0"].enabled is False
assert network_manager_service.ActivateConnection.calls == [
(
@ -94,7 +94,7 @@ async def test_load_with_disabled_methods(
"ipv4": disabled,
"ipv6": disabled,
}
await coresys.dbus.network.interfaces["eth0"].settings.reload()
await coresys.dbus.network.get("eth0").settings.reload()
await coresys.host.network.load()
assert network_manager_service.ActivateConnection.calls == []
@ -117,14 +117,13 @@ async def test_load_with_network_connection_issues(
assert network_manager_service.ActivateConnection.calls == []
assert len(coresys.host.network.interfaces) == 2
assert coresys.host.network.interfaces[0].name == "eth0"
assert coresys.host.network.interfaces[0].enabled is True
assert coresys.host.network.interfaces[0].ipv4.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv4.gateway is None
assert coresys.host.network.interfaces[0].ipv6.method == InterfaceMethod.AUTO
assert coresys.host.network.interfaces[0].ipv6.gateway == IPv6Address(
"fe80::da58:d7ff:fe00:9c69"
)
name_dict = {intr.name: intr for intr in coresys.host.network.interfaces}
assert "eth0" in name_dict
assert name_dict["eth0"].enabled is True
assert name_dict["eth0"].ipv4.method == InterfaceMethod.AUTO
assert name_dict["eth0"].ipv4.gateway is None
assert name_dict["eth0"].ipv6.method == InterfaceMethod.AUTO
assert name_dict["eth0"].ipv6.gateway == IPv6Address("fe80::da58:d7ff:fe00:9c69")
async def test_scan_wifi(coresys: CoreSys):