Fix upnp device not being reinitialized after device changes location (#63133)

This commit is contained in:
Steven Looman 2022-01-07 14:10:11 +01:00 committed by GitHub
parent 6d42af1b12
commit 9bf1397c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 215 additions and 124 deletions

View File

@ -16,7 +16,7 @@ from homeassistant.components.ssdp import SsdpChange
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
import homeassistant.util.dt as dt_util
from homeassistant.util.dt import utcnow
from .const import (
BYTES_RECEIVED,
@ -31,6 +31,17 @@ from .const import (
)
async def async_create_upnp_device(
hass: HomeAssistant, ssdp_location: str
) -> UpnpDevice:
"""Create UPnP device."""
session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, with_sleep=True, timeout=20)
factory = UpnpFactory(requester, disable_state_variable_validation=True)
return await factory.async_create_device(ssdp_location)
class Device:
"""Home Assistant representation of a UPnP/IGD device."""
@ -40,25 +51,12 @@ class Device:
self._igd_device = igd_device
self.coordinator: DataUpdateCoordinator = None
@classmethod
async def async_create_upnp_device(
cls, hass: HomeAssistant, ssdp_location: str
) -> UpnpDevice:
"""Create UPnP device."""
# Build async_upnp_client requester.
session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, True, 20)
# Create async_upnp_client device.
factory = UpnpFactory(requester, disable_state_variable_validation=True)
return await factory.async_create_device(ssdp_location)
@classmethod
async def async_create_device(
cls, hass: HomeAssistant, ssdp_location: str
) -> Device:
"""Create UPnP/IGD device."""
upnp_device = await Device.async_create_upnp_device(hass, ssdp_location)
upnp_device = await async_create_upnp_device(hass, ssdp_location)
# Create profile wrapper.
igd_device = IgdDevice(upnp_device, None)
@ -67,7 +65,7 @@ class Device:
# Register SSDP callback for updates.
usn = f"{upnp_device.udn}::{upnp_device.device_type}"
await ssdp.async_register_callback(
hass, device.async_ssdp_callback, {ssdp.ATTR_SSDP_USN: usn}
hass, device.async_ssdp_callback, {"usn": usn}
)
return device
@ -76,7 +74,8 @@ class Device:
self, headers: Mapping[str, Any], change: SsdpChange
) -> None:
"""SSDP callback, update if needed."""
if change != SsdpChange.UPDATE or ssdp.ATTR_SSDP_LOCATION not in headers:
_LOGGER.debug("SSDP Callback, change: %s, headers: %s", change, headers)
if ssdp.ATTR_SSDP_LOCATION not in headers:
return
location = headers[ssdp.ATTR_SSDP_LOCATION]
@ -84,7 +83,7 @@ class Device:
if location == device.device_url:
return
new_upnp_device = Device.async_create_upnp_device(self.hass, location)
new_upnp_device = await async_create_upnp_device(self.hass, location)
device.reinit(new_upnp_device)
@property
@ -155,7 +154,7 @@ class Device:
)
return {
TIMESTAMP: dt_util.utcnow(),
TIMESTAMP: utcnow(),
BYTES_RECEIVED: values[0],
BYTES_SENT: values[1],
PACKETS_RECEIVED: values[2],

View File

@ -1,8 +1,11 @@
"""Configuration for SSDP tests."""
from typing import Any, Mapping
from typing import Optional, Sequence
from unittest.mock import AsyncMock, MagicMock, patch
from urllib.parse import urlparse
from async_upnp_client.client import UpnpDevice
from async_upnp_client.event_handler import UpnpEventHandler
from async_upnp_client.profiles.igd import StatusInfo
import pytest
from homeassistant.components import ssdp
@ -16,7 +19,6 @@ from homeassistant.components.upnp.const import (
PACKETS_SENT,
ROUTER_IP,
ROUTER_UPTIME,
TIMESTAMP,
WAN_STATUS,
)
from homeassistant.core import HomeAssistant
@ -29,17 +31,20 @@ TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
TEST_USN = f"{TEST_UDN}::{TEST_ST}"
TEST_LOCATION = "http://192.168.1.1/desc.xml"
TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
TEST_FRIENDLY_NAME = "friendly name"
TEST_FRIENDLY_NAME = "mock-name"
TEST_DISCOVERY = ssdp.SsdpServiceInfo(
ssdp_usn=TEST_USN,
ssdp_st=TEST_ST,
ssdp_location=TEST_LOCATION,
upnp={
ssdp.ATTR_UPNP_UDN: TEST_UDN,
"usn": TEST_USN,
"location": TEST_LOCATION,
"_udn": TEST_UDN,
"friendlyName": TEST_FRIENDLY_NAME,
"location": TEST_LOCATION,
"usn": TEST_USN,
ssdp.ATTR_UPNP_DEVICE_TYPE: TEST_ST,
ssdp.ATTR_UPNP_FRIENDLY_NAME: TEST_FRIENDLY_NAME,
ssdp.ATTR_UPNP_MANUFACTURER: "mock-manufacturer",
ssdp.ATTR_UPNP_MODEL_NAME: "mock-model-name",
ssdp.ATTR_UPNP_UDN: TEST_UDN,
},
ssdp_headers={
"_host": TEST_HOSTNAME,
@ -47,52 +52,37 @@ TEST_DISCOVERY = ssdp.SsdpServiceInfo(
)
class MockDevice:
"""Mock device for Device."""
class MockUpnpDevice:
"""Mock async_upnp_client UpnpDevice."""
def __init__(self, hass: HomeAssistant, udn: str) -> None:
"""Initialize mock device."""
self.hass = hass
self._udn = udn
self.traffic_times_polled = 0
self.status_times_polled = 0
self._timestamp = dt.utcnow()
@classmethod
async def async_create_device(cls, hass, ssdp_location) -> "MockDevice":
"""Return self."""
return cls(hass, TEST_UDN)
async def async_ssdp_callback(
self, headers: Mapping[str, Any], change: ssdp.SsdpChange
) -> None:
"""SSDP callback, update if needed."""
pass
@property
def udn(self) -> str:
"""Get the UDN."""
return self._udn
def __init__(self, location: str) -> None:
"""Initialize."""
self.device_url = location
@property
def manufacturer(self) -> str:
"""Get manufacturer."""
return "mock-manufacturer"
return TEST_DISCOVERY.upnp[ssdp.ATTR_UPNP_MANUFACTURER]
@property
def name(self) -> str:
"""Get name."""
return "mock-name"
return TEST_DISCOVERY.upnp[ssdp.ATTR_UPNP_FRIENDLY_NAME]
@property
def model_name(self) -> str:
"""Get the model name."""
return "mock-model-name"
return TEST_DISCOVERY.upnp[ssdp.ATTR_UPNP_MODEL_NAME]
@property
def device_type(self) -> str:
"""Get the device type."""
return "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
return TEST_DISCOVERY.upnp[ssdp.ATTR_UPNP_DEVICE_TYPE]
@property
def udn(self) -> str:
"""Get the UDN."""
return TEST_DISCOVERY.upnp[ssdp.ATTR_UPNP_UDN]
@property
def usn(self) -> str:
@ -104,39 +94,118 @@ class MockDevice:
"""Get the unique id."""
return self.usn
@property
def hostname(self) -> str:
"""Get the hostname."""
return "mock-hostname"
def reinit(self, new_upnp_device: UpnpDevice) -> None:
"""Reinitialize."""
self.device_url = new_upnp_device.device_url
async def async_get_traffic_data(self) -> Mapping[str, Any]:
"""Get traffic data."""
self.traffic_times_polled += 1
return {
TIMESTAMP: self._timestamp,
class MockIgdDevice:
"""Mock async_upnp_client IgdDevice."""
def __init__(self, device: MockUpnpDevice, event_handler: UpnpEventHandler) -> None:
"""Initialize mock device."""
self.device = device
self.profile_device = device
self._timestamp = dt.utcnow()
self.traffic_times_polled = 0
self.status_times_polled = 0
self.traffic_data = {
BYTES_RECEIVED: 0,
BYTES_SENT: 0,
PACKETS_RECEIVED: 0,
PACKETS_SENT: 0,
}
async def async_get_status(self) -> Mapping[str, Any]:
"""Get connection status, uptime, and external IP."""
self.status_times_polled += 1
return {
self.status_data = {
WAN_STATUS: "Connected",
ROUTER_UPTIME: 10,
ROUTER_IP: "8.9.10.11",
}
@property
def name(self) -> str:
"""Get the name of the device."""
return self.profile_device.name
@property
def manufacturer(self) -> str:
"""Get the manufacturer of this device."""
return self.profile_device.manufacturer
@property
def model_name(self) -> str:
"""Get the model name of this device."""
return self.profile_device.model_name
@property
def udn(self) -> str:
"""Get the UDN of the device."""
return self.profile_device.udn
@property
def device_type(self) -> str:
"""Get the device type of this device."""
return self.profile_device.device_type
async def async_get_total_bytes_received(self) -> Optional[int]:
"""Get total bytes received."""
self.traffic_times_polled += 1
return self.traffic_data[BYTES_RECEIVED]
async def async_get_total_bytes_sent(self) -> Optional[int]:
"""Get total bytes sent."""
return self.traffic_data[BYTES_SENT]
async def async_get_total_packets_received(self) -> Optional[int]:
"""Get total packets received."""
return self.traffic_data[PACKETS_RECEIVED]
async def async_get_total_packets_sent(self) -> Optional[int]:
"""Get total packets sent."""
return self.traffic_data[PACKETS_SENT]
async def async_get_external_ip_address(
self, services: Optional[Sequence[str]] = None
) -> Optional[str]:
"""
Get the external IP address.
:param services List of service names to try to get action from, defaults to [WANIPC,WANPPP]
"""
return self.status_data[ROUTER_IP]
async def async_get_status_info(
self, services: Optional[Sequence[str]] = None
) -> Optional[StatusInfo]:
"""
Get status info.
:param services List of service names to try to get action from, defaults to [WANIPC,WANPPP]
"""
self.status_times_polled += 1
return StatusInfo(
self.status_data[WAN_STATUS], "", self.status_data[ROUTER_UPTIME]
)
@pytest.fixture(autouse=True)
def mock_upnp_device():
"""Mock homeassistant.components.upnp.Device."""
async def mock_async_create_upnp_device(
hass: HomeAssistant, location: str
) -> UpnpDevice:
"""Create UPnP device."""
return MockUpnpDevice(location)
with patch(
"homeassistant.components.upnp.Device", new=MockDevice
) as mock_async_create_device:
yield mock_async_create_device
"homeassistant.components.upnp.device.async_create_upnp_device",
side_effect=mock_async_create_upnp_device,
) as mock_async_create_upnp_device, patch(
"homeassistant.components.upnp.device.IgdDevice", new=MockIgdDevice
) as mock_igd_device:
yield mock_async_create_upnp_device, mock_igd_device
@pytest.fixture

View File

@ -1,7 +1,6 @@
"""Tests for UPnP/IGD binary_sensor."""
from datetime import timedelta
from unittest.mock import AsyncMock
from homeassistant.components.upnp.const import (
DOMAIN,
@ -12,7 +11,7 @@ from homeassistant.components.upnp.const import (
from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util
from .conftest import MockDevice
from .conftest import MockIgdDevice
from tests.common import MockConfigEntry, async_fire_time_changed
@ -21,20 +20,19 @@ async def test_upnp_binary_sensors(
hass: HomeAssistant, setup_integration: MockConfigEntry
):
"""Test normal sensors."""
mock_device: MockDevice = hass.data[DOMAIN][setup_integration.entry_id].device
# First poll.
wan_status_state = hass.states.get("binary_sensor.mock_name_wan_status")
assert wan_status_state.state == "on"
# Second poll.
mock_device.async_get_status = AsyncMock(
return_value={
WAN_STATUS: "Disconnected",
ROUTER_UPTIME: 100,
ROUTER_IP: "",
}
)
mock_device: MockIgdDevice = hass.data[DOMAIN][
setup_integration.entry_id
].device._igd_device
mock_device.status_data = {
WAN_STATUS: "Disconnected",
ROUTER_UPTIME: 100,
ROUTER_IP: "",
}
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=31))
await hass.async_block_till_done()

View File

@ -25,7 +25,7 @@ from .conftest import (
TEST_ST,
TEST_UDN,
TEST_USN,
MockDevice,
MockIgdDevice,
)
from tests.common import MockConfigEntry, async_fire_time_changed
@ -199,9 +199,11 @@ async def test_options_flow(hass: HomeAssistant):
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id) is True
await hass.async_block_till_done()
mock_device: MockDevice = hass.data[DOMAIN][config_entry.entry_id].device
# Reset.
mock_device: MockIgdDevice = hass.data[DOMAIN][
config_entry.entry_id
].device._igd_device
mock_device.traffic_times_polled = 0
mock_device.status_times_polled = 0

View File

@ -3,14 +3,17 @@ from __future__ import annotations
import pytest
from homeassistant.components import ssdp
from homeassistant.components.upnp import UpnpDataUpdateCoordinator
from homeassistant.components.upnp.const import (
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
DOMAIN,
)
from homeassistant.components.upnp.device import Device
from homeassistant.core import HomeAssistant
from .conftest import TEST_ST, TEST_UDN
from .conftest import TEST_DISCOVERY, TEST_ST, TEST_UDN
from tests.common import MockConfigEntry
@ -18,7 +21,6 @@ from tests.common import MockConfigEntry
@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
async def test_async_setup_entry_default(hass: HomeAssistant):
"""Test async_setup_entry."""
entry = MockConfigEntry(
domain=DOMAIN,
data={
@ -30,3 +32,21 @@ async def test_async_setup_entry_default(hass: HomeAssistant):
# Load config_entry.
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id) is True
async def test_reinitialize_device(
hass: HomeAssistant, setup_integration: MockConfigEntry
):
"""Test device is reinitialized when device changes location."""
config_entry = setup_integration
coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
device: Device = coordinator.device
assert device._igd_device.device.device_url == TEST_DISCOVERY.ssdp_location
# Reinit.
new_location = "http://192.168.1.1:12345/desc.xml"
headers = {
ssdp.ATTR_SSDP_LOCATION: new_location,
}
await device.async_ssdp_callback(headers, ...)
assert device._igd_device.device.device_url == new_location

View File

@ -1,8 +1,8 @@
"""Tests for UPnP/IGD sensor."""
from datetime import timedelta
from unittest.mock import AsyncMock
from unittest.mock import patch
from homeassistant.components.upnp import UpnpDataUpdateCoordinator
from homeassistant.components.upnp.const import (
BYTES_RECEIVED,
BYTES_SENT,
@ -18,15 +18,13 @@ from homeassistant.components.upnp.const import (
from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util
from .conftest import MockDevice
from .conftest import MockIgdDevice
from tests.common import MockConfigEntry, async_fire_time_changed
async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEntry):
"""Test normal sensors."""
mock_device: MockDevice = hass.data[DOMAIN][setup_integration.entry_id].device
# First poll.
b_received_state = hass.states.get("sensor.mock_name_b_received")
b_sent_state = hass.states.get("sensor.mock_name_b_sent")
@ -42,23 +40,21 @@ async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEn
assert wan_status_state.state == "Connected"
# Second poll.
mock_device.async_get_traffic_data = AsyncMock(
return_value={
TIMESTAMP: mock_device._timestamp + UPDATE_INTERVAL,
BYTES_RECEIVED: 10240,
BYTES_SENT: 20480,
PACKETS_RECEIVED: 30,
PACKETS_SENT: 40,
}
)
mock_device.async_get_status = AsyncMock(
return_value={
WAN_STATUS: "Disconnected",
ROUTER_UPTIME: 100,
ROUTER_IP: "",
}
)
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=31))
mock_device: MockIgdDevice = hass.data[DOMAIN][
setup_integration.entry_id
].device._igd_device
mock_device.traffic_data = {
BYTES_RECEIVED: 10240,
BYTES_SENT: 20480,
PACKETS_RECEIVED: 30,
PACKETS_SENT: 40,
}
mock_device.status_data = {
WAN_STATUS: "Disconnected",
ROUTER_UPTIME: 100,
ROUTER_IP: "",
}
async_fire_time_changed(hass, dt_util.utcnow() + UPDATE_INTERVAL)
await hass.async_block_till_done()
b_received_state = hass.states.get("sensor.mock_name_b_received")
@ -79,7 +75,9 @@ async def test_derived_upnp_sensors(
hass: HomeAssistant, setup_integration: MockConfigEntry
):
"""Test derived sensors."""
mock_device: MockDevice = hass.data[DOMAIN][setup_integration.entry_id].device
coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][
setup_integration.entry_id
]
# First poll.
kib_s_received_state = hass.states.get("sensor.mock_name_kib_s_received")
@ -92,23 +90,28 @@ async def test_derived_upnp_sensors(
assert packets_s_sent_state.state == "unknown"
# Second poll.
mock_device.async_get_traffic_data = AsyncMock(
return_value={
TIMESTAMP: mock_device._timestamp + UPDATE_INTERVAL,
now = coordinator.data[TIMESTAMP]
with patch(
"homeassistant.components.upnp.device.utcnow",
return_value=now + UPDATE_INTERVAL,
):
mock_device: MockIgdDevice = coordinator.device._igd_device
mock_device.traffic_data = {
BYTES_RECEIVED: int(10240 * UPDATE_INTERVAL.total_seconds()),
BYTES_SENT: int(20480 * UPDATE_INTERVAL.total_seconds()),
PACKETS_RECEIVED: int(30 * UPDATE_INTERVAL.total_seconds()),
PACKETS_SENT: int(40 * UPDATE_INTERVAL.total_seconds()),
}
)
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=31))
await hass.async_block_till_done()
async_fire_time_changed(hass, now + UPDATE_INTERVAL)
await hass.async_block_till_done()
kib_s_received_state = hass.states.get("sensor.mock_name_kib_s_received")
kib_s_sent_state = hass.states.get("sensor.mock_name_kib_s_sent")
packets_s_received_state = hass.states.get("sensor.mock_name_packets_s_received")
packets_s_sent_state = hass.states.get("sensor.mock_name_packets_s_sent")
assert kib_s_received_state.state == "10.0"
assert kib_s_sent_state.state == "20.0"
assert packets_s_received_state.state == "30.0"
assert packets_s_sent_state.state == "40.0"
kib_s_received_state = hass.states.get("sensor.mock_name_kib_s_received")
kib_s_sent_state = hass.states.get("sensor.mock_name_kib_s_sent")
packets_s_received_state = hass.states.get(
"sensor.mock_name_packets_s_received"
)
packets_s_sent_state = hass.states.get("sensor.mock_name_packets_s_sent")
assert kib_s_received_state.state == "10.0"
assert kib_s_sent_state.state == "20.0"
assert packets_s_received_state.state == "30.0"
assert packets_s_sent_state.state == "40.0"