diff --git a/homeassistant/components/upnp/__init__.py b/homeassistant/components/upnp/__init__.py index f4aa6137fef..b571a2b447f 100644 --- a/homeassistant/components/upnp/__init__.py +++ b/homeassistant/components/upnp/__init__.py @@ -8,7 +8,7 @@ from datetime import timedelta from ipaddress import ip_address from typing import Any -from async_upnp_client.exceptions import UpnpConnectionError +from async_upnp_client.exceptions import UpnpCommunicationError, UpnpConnectionError import voluptuous as vol from homeassistant import config_entries @@ -26,19 +26,20 @@ from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.update_coordinator import ( CoordinatorEntity, DataUpdateCoordinator, + UpdateFailed, ) from .const import ( CONF_LOCAL_IP, - CONFIG_ENTRY_HOSTNAME, + CONFIG_ENTRY_MAC_ADDRESS, + CONFIG_ENTRY_ORIGINAL_UDN, CONFIG_ENTRY_ST, CONFIG_ENTRY_UDN, DEFAULT_SCAN_INTERVAL, DOMAIN, - DOMAIN_DEVICES, LOGGER, ) -from .device import Device +from .device import Device, async_get_mac_address_from_host NOTIFICATION_ID = "upnp_notification" NOTIFICATION_TITLE = "UPnP/IGD Setup" @@ -65,9 +66,7 @@ CONFIG_SCHEMA = vol.Schema( async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up UPnP component.""" - hass.data[DOMAIN] = { - DOMAIN_DEVICES: {}, - } + hass.data[DOMAIN] = {} # Only start if set up via configuration.yaml. if DOMAIN in config: @@ -82,7 +81,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up UPnP/IGD device from a config entry.""" - LOGGER.debug("Setting up config entry: %s", entry.unique_id) + LOGGER.debug("Setting up config entry: %s", entry.entry_id) udn = entry.data[CONFIG_ENTRY_UDN] st = entry.data[CONFIG_ENTRY_ST] # pylint: disable=invalid-name @@ -126,67 +125,99 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: try: device = await Device.async_create_device(hass, location) except UpnpConnectionError as err: - LOGGER.debug("Error connecting to device %s", location) + LOGGER.debug( + "Error connecting to device at location: %s, err: %s", location, err + ) raise ConfigEntryNotReady from err - # Ensure entry has a unique_id. - if not entry.unique_id: - LOGGER.debug( - "Setting unique_id: %s, for config_entry: %s", - device.unique_id, - entry, - ) + # Track the original UDN such that existing sensors do not change their unique_id. + if CONFIG_ENTRY_ORIGINAL_UDN not in entry.data: hass.config_entries.async_update_entry( entry=entry, - unique_id=device.unique_id, + data={ + **entry.data, + CONFIG_ENTRY_ORIGINAL_UDN: device.udn, + }, ) + device.original_udn = entry.data[CONFIG_ENTRY_ORIGINAL_UDN] - # Ensure entry has a hostname, for older entries. - if ( - CONFIG_ENTRY_HOSTNAME not in entry.data - or entry.data[CONFIG_ENTRY_HOSTNAME] != device.hostname - ): + # Store mac address for changed UDN matching. + if device.host: + device.mac_address = await async_get_mac_address_from_host(hass, device.host) + if device.mac_address and not entry.data.get("CONFIG_ENTRY_MAC_ADDRESS"): hass.config_entries.async_update_entry( entry=entry, - data={CONFIG_ENTRY_HOSTNAME: device.hostname, **entry.data}, + data={ + **entry.data, + CONFIG_ENTRY_MAC_ADDRESS: device.mac_address, + }, ) - # Create device registry entry. + connections = {(dr.CONNECTION_UPNP, device.udn)} + if device.mac_address: + connections.add((dr.CONNECTION_NETWORK_MAC, device.mac_address)) + device_registry = dr.async_get(hass) - device_registry.async_get_or_create( - config_entry_id=entry.entry_id, - connections={(dr.CONNECTION_UPNP, device.udn)}, - identifiers={(DOMAIN, device.udn)}, - name=device.name, - manufacturer=device.manufacturer, - model=device.model_name, + device_entry = device_registry.async_get_device( + identifiers=set(), connections=connections ) + if device_entry: + LOGGER.debug( + "Found device using connections: %s, device_entry: %s", + connections, + device_entry, + ) + if not device_entry: + # No device found, create new device entry. + device_entry = device_registry.async_get_or_create( + config_entry_id=entry.entry_id, + connections=connections, + identifiers={(DOMAIN, device.usn)}, + name=device.name, + manufacturer=device.manufacturer, + model=device.model_name, + ) + LOGGER.debug( + "Created device using UDN '%s', device_entry: %s", device.udn, device_entry + ) + else: + # Update identifier. + device_entry = device_registry.async_update_device( + device_entry.id, + new_identifiers={(DOMAIN, device.usn)}, + ) + assert device_entry update_interval = timedelta(seconds=DEFAULT_SCAN_INTERVAL) - LOGGER.debug("update_interval: %s", update_interval) coordinator = UpnpDataUpdateCoordinator( hass, device=device, + device_entry=device_entry, update_interval=update_interval, ) + # Try an initial refresh. + await coordinator.async_config_entry_first_refresh() + # Save coordinator. hass.data[DOMAIN][entry.entry_id] = coordinator - await coordinator.async_config_entry_first_refresh() - # Setup platforms, creating sensors/binary_sensors. hass.config_entries.async_setup_platforms(entry, PLATFORMS) return True -async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool: +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a UPnP/IGD device from a config entry.""" - LOGGER.debug("Unloading config entry: %s", config_entry.unique_id) + LOGGER.debug("Unloading config entry: %s", entry.entry_id) # Unload platforms. - return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS) + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + if unload_ok: + del hass.data[DOMAIN][entry.entry_id] + + return unload_ok @dataclass @@ -209,26 +240,45 @@ class UpnpDataUpdateCoordinator(DataUpdateCoordinator): """Define an object to update data from UPNP device.""" def __init__( - self, hass: HomeAssistant, device: Device, update_interval: timedelta + self, + hass: HomeAssistant, + device: Device, + device_entry: dr.DeviceEntry, + update_interval: timedelta, ) -> None: """Initialize.""" self.device = device + self.device_entry = device_entry super().__init__( - hass, LOGGER, name=device.name, update_interval=update_interval + hass, + LOGGER, + name=device.name, + update_interval=update_interval, + update_method=self._async_fetch_data, ) - async def _async_update_data(self) -> Mapping[str, Any]: + async def _async_fetch_data(self) -> Mapping[str, Any]: """Update data.""" - update_values = await asyncio.gather( - self.device.async_get_traffic_data(), - self.device.async_get_status(), - ) + try: + update_values = await asyncio.gather( + self.device.async_get_traffic_data(), + self.device.async_get_status(), + ) - return { - **update_values[0], - **update_values[1], - } + return { + **update_values[0], + **update_values[1], + } + except UpnpCommunicationError as exception: + LOGGER.debug( + "Caught exception when updating device: %s, exception: %s", + self.device, + exception, + ) + raise UpdateFailed( + f"Unable to communicate with IGD at: {self.device.device_url}" + ) from exception class UpnpEntity(CoordinatorEntity[UpnpDataUpdateCoordinator]): @@ -247,13 +297,13 @@ class UpnpEntity(CoordinatorEntity[UpnpDataUpdateCoordinator]): self._device = coordinator.device self.entity_description = entity_description self._attr_name = f"{coordinator.device.name} {entity_description.name}" - self._attr_unique_id = f"{coordinator.device.udn}_{entity_description.unique_id or entity_description.key}" + self._attr_unique_id = f"{coordinator.device.original_udn}_{entity_description.unique_id or entity_description.key}" self._attr_device_info = DeviceInfo( - connections={(dr.CONNECTION_UPNP, coordinator.device.udn)}, - name=coordinator.device.name, - manufacturer=coordinator.device.manufacturer, - model=coordinator.device.model_name, - configuration_url=f"http://{coordinator.device.hostname}", + connections=coordinator.device_entry.connections, + name=coordinator.device_entry.name, + manufacturer=coordinator.device_entry.manufacturer, + model=coordinator.device_entry.model, + configuration_url=coordinator.device_entry.configuration_url, ) @property diff --git a/homeassistant/components/upnp/config_flow.py b/homeassistant/components/upnp/config_flow.py index 169ee28ea00..b7208e6e6e2 100644 --- a/homeassistant/components/upnp/config_flow.py +++ b/homeassistant/components/upnp/config_flow.py @@ -14,7 +14,9 @@ from homeassistant.core import HomeAssistant from homeassistant.data_entry_flow import FlowResult from .const import ( - CONFIG_ENTRY_HOSTNAME, + CONFIG_ENTRY_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS, + CONFIG_ENTRY_ORIGINAL_UDN, CONFIG_ENTRY_ST, CONFIG_ENTRY_UDN, DOMAIN, @@ -23,6 +25,7 @@ from .const import ( ST_IGD_V1, ST_IGD_V2, ) +from .device import async_get_mac_address_from_host def _friendly_name_from_discovery(discovery_info: ssdp.SsdpServiceInfo) -> str: @@ -50,15 +53,13 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool: device_discovered_event = asyncio.Event() async def device_discovered(info: SsdpServiceInfo, change: SsdpChange) -> None: - if change == SsdpChange.BYEBYE: - return - - LOGGER.info( - "Device discovered: %s, at: %s", - info.ssdp_usn, - info.ssdp_location, - ) - device_discovered_event.set() + if change != SsdpChange.BYEBYE: + LOGGER.info( + "Device discovered: %s, at: %s", + info.ssdp_usn, + info.ssdp_location, + ) + device_discovered_event.set() cancel_discovered_callback_1 = await ssdp.async_register_callback( hass, @@ -97,6 +98,14 @@ async def _async_discover_igd_devices( ) + await ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2) +async def _async_mac_address_from_discovery( + hass: HomeAssistant, discovery: SsdpServiceInfo +) -> str | None: + """Get the mac address from a discovery.""" + host = discovery.ssdp_headers["_host"] + return await async_get_mac_address_from_host(hass, host) + + class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): """Handle a UPnP/IGD config flow.""" @@ -118,15 +127,13 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): if user_input is not None: # Ensure wanted device was discovered. assert self._discoveries - matching_discoveries = [ - discovery - for discovery in self._discoveries - if discovery.ssdp_usn == user_input["unique_id"] - ] - if not matching_discoveries: - return self.async_abort(reason="no_devices_found") - - discovery = matching_discoveries[0] + discovery = next( + iter( + discovery + for discovery in self._discoveries + if discovery.ssdp_usn == user_input["unique_id"] + ) + ) await self.async_set_unique_id(discovery.ssdp_usn, raise_on_progress=False) return await self._async_create_entry_from_discovery(discovery) @@ -217,21 +224,46 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): # Ensure not already configuring/configured. unique_id = discovery_info.ssdp_usn await self.async_set_unique_id(unique_id) - hostname = discovery_info.ssdp_headers["_host"] + mac_address = await _async_mac_address_from_discovery(self.hass, discovery_info) self._abort_if_unique_id_configured( - updates={CONFIG_ENTRY_HOSTNAME: hostname}, reload_on_update=False + # Store mac address for older entries. + # The location is stored in the config entry such that when the location changes, the entry is reloaded. + updates={ + CONFIG_ENTRY_MAC_ADDRESS: mac_address, + CONFIG_ENTRY_LOCATION: discovery_info.ssdp_location, + }, ) # Handle devices changing their UDN, only allow a single host. - existing_entries = self._async_current_entries() - for config_entry in existing_entries: - entry_hostname = config_entry.data.get(CONFIG_ENTRY_HOSTNAME) - if entry_hostname == hostname: - LOGGER.debug( - "Found existing config_entry with same hostname, discovery ignored" - ) + for entry in self._async_current_entries(include_ignore=True): + entry_mac_address = entry.data.get(CONFIG_ENTRY_MAC_ADDRESS) + entry_st = entry.data.get(CONFIG_ENTRY_ST) + if entry_mac_address != mac_address: + continue + + if discovery_info.ssdp_st != entry_st: + # Check ssdp_st to prevent swapping between IGDv1 and IGDv2. + continue + + if entry.source == config_entries.SOURCE_IGNORE: + # Host was already ignored. Don't update ignored entries. return self.async_abort(reason="discovery_ignored") + LOGGER.debug("Updating entry: %s", entry.entry_id) + self.hass.config_entries.async_update_entry( + entry, + unique_id=unique_id, + data={**entry.data, CONFIG_ENTRY_UDN: discovery_info.ssdp_udn}, + ) + if entry.state == config_entries.ConfigEntryState.LOADED: + # Only reload when entry has state LOADED; when entry has state SETUP_RETRY, + # another load is started, causing the entry to be loaded twice. + LOGGER.debug("Reloading entry: %s", entry.entry_id) + self.hass.async_create_task( + self.hass.config_entries.async_reload(entry.entry_id) + ) + return self.async_abort(reason="config_entry_updated") + # Store discovery. self._discoveries = [discovery_info] @@ -265,9 +297,12 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): ) title = _friendly_name_from_discovery(discovery) + mac_address = await _async_mac_address_from_discovery(self.hass, discovery) data = { CONFIG_ENTRY_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN], CONFIG_ENTRY_ST: discovery.ssdp_st, - CONFIG_ENTRY_HOSTNAME: discovery.ssdp_headers["_host"], + CONFIG_ENTRY_ORIGINAL_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN], + CONFIG_ENTRY_LOCATION: discovery.ssdp_location, + CONFIG_ENTRY_MAC_ADDRESS: mac_address, } return self.async_create_entry(title=title, data=data) diff --git a/homeassistant/components/upnp/const.py b/homeassistant/components/upnp/const.py index 1643b540914..e673922d1c2 100644 --- a/homeassistant/components/upnp/const.py +++ b/homeassistant/components/upnp/const.py @@ -8,7 +8,6 @@ LOGGER = logging.getLogger(__package__) CONF_LOCAL_IP = "local_ip" DOMAIN = "upnp" -DOMAIN_DEVICES = "devices" BYTES_RECEIVED = "bytes_received" BYTES_SENT = "bytes_sent" PACKETS_RECEIVED = "packets_received" @@ -22,7 +21,9 @@ ROUTER_UPTIME = "uptime" KIBIBYTE = 1024 CONFIG_ENTRY_ST = "st" CONFIG_ENTRY_UDN = "udn" -CONFIG_ENTRY_HOSTNAME = "hostname" +CONFIG_ENTRY_ORIGINAL_UDN = "original_udn" +CONFIG_ENTRY_MAC_ADDRESS = "mac_address" +CONFIG_ENTRY_LOCATION = "location" DEFAULT_SCAN_INTERVAL = timedelta(seconds=30).total_seconds() ST_IGD_V1 = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" ST_IGD_V2 = "urn:schemas-upnp-org:device:InternetGatewayDevice:2" diff --git a/homeassistant/components/upnp/device.py b/homeassistant/components/upnp/device.py index a3b5e63bf41..0e7c7902bd9 100644 --- a/homeassistant/components/upnp/device.py +++ b/homeassistant/components/upnp/device.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio from collections.abc import Mapping +from functools import partial +from ipaddress import ip_address from typing import Any from urllib.parse import urlparse @@ -11,9 +13,8 @@ from async_upnp_client.client import UpnpDevice from async_upnp_client.client_factory import UpnpFactory from async_upnp_client.exceptions import UpnpError from async_upnp_client.profiles.igd import IgdDevice, StatusInfo +from getmac import get_mac_address -from homeassistant.components import ssdp -from homeassistant.components.ssdp import SsdpChange, SsdpServiceInfo from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.update_coordinator import DataUpdateCoordinator @@ -32,6 +33,20 @@ from .const import ( ) +async def async_get_mac_address_from_host(hass: HomeAssistant, host: str) -> str | None: + """Get mac address from host.""" + ip_addr = ip_address(host) + if ip_addr.version == 4: + mac_address = await hass.async_add_executor_job( + partial(get_mac_address, ip=host) + ) + else: + mac_address = await hass.async_add_executor_job( + partial(get_mac_address, ip6=host) + ) + return mac_address + + async def async_create_upnp_device( hass: HomeAssistant, ssdp_location: str ) -> UpnpDevice: @@ -51,6 +66,7 @@ class Device: self.hass = hass self._igd_device = igd_device self.coordinator: DataUpdateCoordinator | None = None + self._mac_address: str | None = None @classmethod async def async_create_device( @@ -63,36 +79,27 @@ class Device: igd_device = IgdDevice(upnp_device, None) device = cls(hass, igd_device) - # Register SSDP callback for updates. - usn = f"{upnp_device.udn}::{upnp_device.device_type}" - await ssdp.async_register_callback( - hass, device.async_ssdp_callback, {"usn": usn} - ) - return device - async def async_ssdp_callback( - self, service_info: SsdpServiceInfo, change: SsdpChange - ) -> None: - """SSDP callback, update if needed.""" - _LOGGER.debug( - "SSDP Callback, change: %s, headers: %s", change, service_info.ssdp_headers - ) - if service_info.ssdp_location is None: - return + @property + def mac_address(self) -> str | None: + """Get the mac address.""" + return self._mac_address - if change == SsdpChange.ALIVE: - # We care only about updates. - return + @mac_address.setter + def mac_address(self, mac_address: str) -> None: + """Set the mac address.""" + self._mac_address = mac_address - device = self._igd_device.device - if service_info.ssdp_location == device.device_url: - return + @property + def original_udn(self) -> str | None: + """Get the mac address.""" + return self._original_udn - new_upnp_device = await async_create_upnp_device( - self.hass, service_info.ssdp_location - ) - device.reinit(new_upnp_device) + @original_udn.setter + def original_udn(self, original_udn: str) -> None: + """Set the original UDN.""" + self._original_udn = original_udn @property def udn(self) -> str: @@ -130,12 +137,22 @@ class Device: return self.usn @property - def hostname(self) -> str | None: + def host(self) -> str | None: """Get the hostname.""" url = self._igd_device.device.device_url parsed = urlparse(url) return parsed.hostname + @property + def device_url(self) -> str: + """Get the device_url of the device.""" + return self._igd_device.device.device_url + + @property + def serial_number(self) -> str | None: + """Get the serial number.""" + return self._igd_device.device.serial_number + def __str__(self) -> str: """Get string representation.""" return f"IGD Device: {self.name}/{self.udn}::{self.device_type}" @@ -179,7 +196,7 @@ class Device: return_exceptions=True, ) status_info: StatusInfo | None = None - ip_address: str | None = None + router_ip: str | None = None for idx, value in enumerate(values): if isinstance(value, UpnpError): @@ -199,10 +216,10 @@ class Device: if isinstance(value, StatusInfo): status_info = value elif isinstance(value, str): - ip_address = value + router_ip = value return { WAN_STATUS: status_info[0] if status_info is not None else None, ROUTER_UPTIME: status_info[2] if status_info is not None else None, - ROUTER_IP: ip_address, + ROUTER_IP: router_ip, } diff --git a/homeassistant/components/upnp/manifest.json b/homeassistant/components/upnp/manifest.json index f3c11c61841..998bebab5d4 100644 --- a/homeassistant/components/upnp/manifest.json +++ b/homeassistant/components/upnp/manifest.json @@ -3,7 +3,7 @@ "name": "UPnP/IGD", "config_flow": true, "documentation": "https://www.home-assistant.io/integrations/upnp", - "requirements": ["async-upnp-client==0.27.0"], + "requirements": ["async-upnp-client==0.27.0", "getmac==0.8.2"], "dependencies": ["network", "ssdp"], "codeowners": ["@StevenLooman", "@ehendrix23"], "ssdp": [ diff --git a/requirements_all.txt b/requirements_all.txt index d453a8b59e1..0378be5c9eb 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -700,6 +700,7 @@ georss_qld_bushfire_alert_client==0.5 # homeassistant.components.minecraft_server # homeassistant.components.nmap_tracker # homeassistant.components.samsungtv +# homeassistant.components.upnp getmac==0.8.2 # homeassistant.components.gios diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 3dcbfe59702..00d1f77561f 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -494,6 +494,7 @@ georss_qld_bushfire_alert_client==0.5 # homeassistant.components.minecraft_server # homeassistant.components.nmap_tracker # homeassistant.components.samsungtv +# homeassistant.components.upnp getmac==0.8.2 # homeassistant.components.gios diff --git a/tests/components/upnp/conftest.py b/tests/components/upnp/conftest.py index 479cd900050..dd22db878cf 100644 --- a/tests/components/upnp/conftest.py +++ b/tests/components/upnp/conftest.py @@ -14,6 +14,9 @@ from homeassistant.components import ssdp from homeassistant.components.upnp.const import ( BYTES_RECEIVED, BYTES_SENT, + CONFIG_ENTRY_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS, + CONFIG_ENTRY_ORIGINAL_UDN, CONFIG_ENTRY_ST, CONFIG_ENTRY_UDN, DOMAIN, @@ -34,9 +37,11 @@ TEST_USN = f"{TEST_UDN}::{TEST_ST}" TEST_LOCATION = "http://192.168.1.1/desc.xml" TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname TEST_FRIENDLY_NAME = "mock-name" +TEST_MAC_ADDRESS = "00:11:22:33:44:55" TEST_DISCOVERY = ssdp.SsdpServiceInfo( - ssdp_usn=TEST_USN, ssdp_st=TEST_ST, + ssdp_udn=TEST_UDN, + ssdp_usn=TEST_USN, ssdp_location=TEST_LOCATION, upnp={ "_udn": TEST_UDN, @@ -210,6 +215,26 @@ def mock_upnp_device(): yield mock_async_create_upnp_device, mock_igd_device +@pytest.fixture +def mock_mac_address_from_host(): + """Get mac address.""" + with patch( + "homeassistant.components.upnp.device.get_mac_address", + return_value=TEST_MAC_ADDRESS, + ): + yield + + +@pytest.fixture +def mock_no_mac_address_from_host(): + """Get no mac address.""" + with patch( + "homeassistant.components.upnp.device.get_mac_address", + return_value=None, + ): + yield + + @pytest.fixture def mock_setup_entry(): """Mock async_setup_entry.""" @@ -272,15 +297,23 @@ async def ssdp_no_discovery(): @pytest.fixture -async def setup_integration( - hass: HomeAssistant, mock_get_source_ip, ssdp_instant_discovery, mock_upnp_device +async def config_entry( + hass: HomeAssistant, + mock_get_source_ip, + ssdp_instant_discovery, + mock_upnp_device, + mock_mac_address_from_host, ): """Create an initialized integration.""" entry = MockConfigEntry( domain=DOMAIN, + unique_id=TEST_USN, data={ - CONFIG_ENTRY_UDN: TEST_UDN, CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, }, ) diff --git a/tests/components/upnp/test_binary_sensor.py b/tests/components/upnp/test_binary_sensor.py index 0a8095cb10f..22264792420 100644 --- a/tests/components/upnp/test_binary_sensor.py +++ b/tests/components/upnp/test_binary_sensor.py @@ -16,9 +16,7 @@ from .conftest import MockIgdDevice from tests.common import MockConfigEntry, async_fire_time_changed -async def test_upnp_binary_sensors( - hass: HomeAssistant, setup_integration: MockConfigEntry -): +async def test_upnp_binary_sensors(hass: HomeAssistant, config_entry: MockConfigEntry): """Test normal sensors.""" # First poll. wan_status_state = hass.states.get("binary_sensor.mock_name_wan_status") @@ -26,7 +24,7 @@ async def test_upnp_binary_sensors( # Second poll. mock_device: MockIgdDevice = hass.data[DOMAIN][ - setup_integration.entry_id + config_entry.entry_id ].device._igd_device mock_device.status_data = { WAN_STATUS: "Disconnected", diff --git a/tests/components/upnp/test_config_flow.py b/tests/components/upnp/test_config_flow.py index 800bf541834..ea8b3381cd1 100644 --- a/tests/components/upnp/test_config_flow.py +++ b/tests/components/upnp/test_config_flow.py @@ -1,11 +1,16 @@ """Test UPnP/IGD config flow.""" +from copy import deepcopy +from unittest.mock import MagicMock, patch + import pytest from homeassistant import config_entries, data_entry_flow from homeassistant.components import ssdp from homeassistant.components.upnp.const import ( - CONFIG_ENTRY_HOSTNAME, + CONFIG_ENTRY_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS, + CONFIG_ENTRY_ORIGINAL_UDN, CONFIG_ENTRY_ST, CONFIG_ENTRY_UDN, DOMAIN, @@ -15,8 +20,8 @@ from homeassistant.core import HomeAssistant from .conftest import ( TEST_DISCOVERY, TEST_FRIENDLY_NAME, - TEST_HOSTNAME, TEST_LOCATION, + TEST_MAC_ADDRESS, TEST_ST, TEST_UDN, TEST_USN, @@ -26,7 +31,10 @@ from tests.common import MockConfigEntry @pytest.mark.usefixtures( - "ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip" + "ssdp_instant_discovery", + "mock_setup_entry", + "mock_get_source_ip", + "mock_mac_address_from_host", ) async def test_flow_ssdp(hass: HomeAssistant): """Test config flow: discovered + configured through ssdp.""" @@ -49,7 +57,9 @@ async def test_flow_ssdp(hass: HomeAssistant): assert result["data"] == { CONFIG_ENTRY_ST: TEST_ST, CONFIG_ENTRY_UDN: TEST_UDN, - CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, } @@ -73,32 +83,225 @@ async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant): assert result["reason"] == "incomplete_discovery" -@pytest.mark.usefixtures("mock_get_source_ip") -async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant): - """Test config flow: discovery through ssdp, but ignored, as hostname is used by existing config entry.""" - # Existing entry. - config_entry = MockConfigEntry( - domain=DOMAIN, - data={ - CONFIG_ENTRY_UDN: TEST_UDN + "2", - CONFIG_ENTRY_ST: TEST_ST, - CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME, - }, +@pytest.mark.usefixtures( + "ssdp_instant_discovery", + "mock_setup_entry", + "mock_get_source_ip", + "mock_no_mac_address_from_host", +) +async def test_flow_ssdp_no_mac_address(hass: HomeAssistant): + """Test config flow: discovered + configured through ssdp.""" + # Discovered via step ssdp. + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=TEST_DISCOVERY, ) - config_entry.add_to_hass(hass) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "ssdp_confirm" + + # Confirm via step ssdp_confirm. + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={}, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == TEST_FRIENDLY_NAME + assert result["data"] == { + CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: None, + } + + +@pytest.mark.usefixtures("mock_mac_address_from_host") +async def test_flow_ssdp_discovery_changed_udn(hass: HomeAssistant): + """Test config flow: discovery through ssdp, same device, but new UDN, matched on mac address.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id=TEST_USN, + data={ + CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, + }, + source=config_entries.SOURCE_SSDP, + state=config_entries.ConfigEntryState.LOADED, + ) + entry.add_to_hass(hass) + + # New discovery via step ssdp. + new_udn = TEST_UDN + "2" + new_discovery = deepcopy(TEST_DISCOVERY) + new_discovery.ssdp_usn = f"{new_udn}::{TEST_ST}" + new_discovery.upnp["_udn"] = new_udn + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=new_discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "config_entry_updated" + + +@pytest.mark.usefixtures( + "ssdp_instant_discovery", + "mock_setup_entry", + "mock_get_source_ip", +) +async def test_flow_ssdp_discovery_changed_udn_but_st_differs(hass: HomeAssistant): + """Test config flow: discovery through ssdp, same device, but new UDN, and different ST, so not matched --> new discovery.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id=TEST_USN, + data={ + CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, + }, + source=config_entries.SOURCE_SSDP, + state=config_entries.ConfigEntryState.LOADED, + ) + entry.add_to_hass(hass) + + # UDN + mac address different: New discovery via step ssdp. + new_udn = TEST_UDN + "2" + with patch( + "homeassistant.components.upnp.device.get_mac_address", + return_value=TEST_MAC_ADDRESS + "2", + ): + new_discovery = deepcopy(TEST_DISCOVERY) + new_discovery.ssdp_usn = f"{new_udn}::{TEST_ST}" + new_discovery.upnp["_udn"] = new_udn + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=new_discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "ssdp_confirm" + + # UDN + ST different: New discovery via step ssdp. + with patch( + "homeassistant.components.upnp.device.get_mac_address", + return_value=TEST_MAC_ADDRESS, + ): + new_st = TEST_ST + "2" + new_discovery = deepcopy(TEST_DISCOVERY) + new_discovery.ssdp_usn = f"{new_udn}::{new_st}" + new_discovery.ssdp_st = new_st + new_discovery.upnp["_udn"] = new_udn + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=new_discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "ssdp_confirm" + + +@pytest.mark.usefixtures("mock_mac_address_from_host") +async def test_flow_ssdp_discovery_changed_location(hass: HomeAssistant): + """Test config flow: discovery through ssdp, same device, but new location.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id=TEST_USN, + data={ + CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, + }, + source=config_entries.SOURCE_SSDP, + state=config_entries.ConfigEntryState.LOADED, + ) + entry.add_to_hass(hass) + + # Discovery via step ssdp. + new_location = TEST_DISCOVERY.ssdp_location + "2" + new_discovery = deepcopy(TEST_DISCOVERY) + new_discovery.ssdp_location = new_location + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=new_discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "already_configured" + + # Test if location is updated. + assert entry.data[CONFIG_ENTRY_LOCATION] == new_location + + +@pytest.mark.usefixtures("mock_mac_address_from_host") +async def test_flow_ssdp_discovery_ignored_entry(hass: HomeAssistant): + """Test config flow: discovery through ssdp, same device, but new UDN, matched on mac address.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id=TEST_USN, + data={ + CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, + }, + source=config_entries.SOURCE_IGNORE, + ) + entry.add_to_hass(hass) - # Discovered via step ssdp, but ignored. result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_SSDP}, data=TEST_DISCOVERY, ) assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "already_configured" + + +@pytest.mark.usefixtures("mock_mac_address_from_host") +async def test_flow_ssdp_discovery_changed_udn_ignored_entry(hass: HomeAssistant): + """Test config flow: discovery through ssdp, same device, but new UDN, matched on mac address, entry ignored.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id=TEST_USN, + data={ + CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, + }, + source=config_entries.SOURCE_IGNORE, + ) + entry.add_to_hass(hass) + + # New discovery via step ssdp. + new_udn = TEST_UDN + "2" + new_discovery = deepcopy(TEST_DISCOVERY) + new_discovery.ssdp_usn = f"{new_udn}::{TEST_ST}" + new_discovery.upnp["_udn"] = new_udn + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=new_discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT assert result["reason"] == "discovery_ignored" @pytest.mark.usefixtures( - "ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip" + "ssdp_instant_discovery", + "mock_setup_entry", + "mock_get_source_ip", + "mock_mac_address_from_host", ) async def test_flow_user(hass: HomeAssistant): """Test config flow: discovered + configured through user.""" @@ -119,12 +322,32 @@ async def test_flow_user(hass: HomeAssistant): assert result["data"] == { CONFIG_ENTRY_ST: TEST_ST, CONFIG_ENTRY_UDN: TEST_UDN, - CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, } @pytest.mark.usefixtures( - "ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip" + "ssdp_no_discovery", + "mock_setup_entry", + "mock_get_source_ip", + "mock_mac_address_from_host", +) +async def test_flow_user_no_discovery(hass: HomeAssistant): + """Test config flow: user, but no discovery.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "no_devices_found" + + +@pytest.mark.usefixtures( + "ssdp_instant_discovery", + "mock_setup_entry", + "mock_get_source_ip", + "mock_mac_address_from_host", ) async def test_flow_import(hass: HomeAssistant): """Test config flow: configured through configuration.yaml.""" @@ -137,23 +360,62 @@ async def test_flow_import(hass: HomeAssistant): assert result["data"] == { CONFIG_ENTRY_ST: TEST_ST, CONFIG_ENTRY_UDN: TEST_UDN, - CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, } +@pytest.mark.usefixtures( + "mock_get_source_ip", +) +async def test_flow_import_incomplete_discovery(hass: HomeAssistant): + """Test config flow: configured through configuration.yaml, but incomplete discovery.""" + incomplete_discovery = ssdp.SsdpServiceInfo( + ssdp_usn=TEST_USN, + ssdp_st=TEST_ST, + ssdp_location=TEST_LOCATION, + upnp={ + # ssdp.ATTR_UPNP_UDN: TEST_UDN, # Not provided. + }, + ) + + async def register_callback(hass, callback, match_dict): + """Immediately do callback.""" + await callback(incomplete_discovery, ssdp.SsdpChange.ALIVE) + return MagicMock() + + with patch( + "homeassistant.components.ssdp.async_register_callback", + side_effect=register_callback, + ), patch( + "homeassistant.components.upnp.ssdp.async_get_discovery_info_by_st", + return_value=[incomplete_discovery], + ): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_IMPORT} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "incomplete_discovery" + + @pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip") async def test_flow_import_already_configured(hass: HomeAssistant): """Test config flow: configured through configuration.yaml, but existing config entry.""" # Existing entry. - config_entry = MockConfigEntry( + entry = MockConfigEntry( domain=DOMAIN, + unique_id=TEST_USN, data={ - CONFIG_ENTRY_UDN: TEST_UDN, CONFIG_ENTRY_ST: TEST_ST, - CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, }, + state=config_entries.ConfigEntryState.LOADED, ) - config_entry.add_to_hass(hass) + entry.add_to_hass(hass) # Discovered via step import. result = await hass.config_entries.flow.async_init( diff --git a/tests/components/upnp/test_init.py b/tests/components/upnp/test_init.py index bd2096b59a0..925fed40d5b 100644 --- a/tests/components/upnp/test_init.py +++ b/tests/components/upnp/test_init.py @@ -3,29 +3,35 @@ from __future__ import annotations import pytest -from homeassistant.components import ssdp -from homeassistant.components.upnp import UpnpDataUpdateCoordinator from homeassistant.components.upnp.const import ( + CONFIG_ENTRY_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS, + CONFIG_ENTRY_ORIGINAL_UDN, CONFIG_ENTRY_ST, CONFIG_ENTRY_UDN, DOMAIN, ) -from homeassistant.components.upnp.device import Device from homeassistant.core import HomeAssistant -from .conftest import TEST_DISCOVERY, TEST_ST, TEST_UDN +from .conftest import TEST_LOCATION, TEST_MAC_ADDRESS, TEST_ST, TEST_UDN, TEST_USN from tests.common import MockConfigEntry -@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip") +@pytest.mark.usefixtures( + "ssdp_instant_discovery", "mock_get_source_ip", "mock_mac_address_from_host" +) async def test_async_setup_entry_default(hass: HomeAssistant): """Test async_setup_entry.""" entry = MockConfigEntry( domain=DOMAIN, + unique_id=TEST_USN, data={ - CONFIG_ENTRY_UDN: TEST_UDN, CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS, }, ) @@ -34,24 +40,23 @@ async def test_async_setup_entry_default(hass: HomeAssistant): assert await hass.config_entries.async_setup(entry.entry_id) is True -async def test_reinitialize_device( - hass: HomeAssistant, setup_integration: MockConfigEntry -): - """Test device is reinitialized when device changes location.""" - config_entry = setup_integration - coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id] - device: Device = coordinator.device - assert device._igd_device.device.device_url == TEST_DISCOVERY.ssdp_location - - # Reinit. - new_location = "http://192.168.1.1:12345/desc.xml" - await device.async_ssdp_callback( - ssdp.SsdpServiceInfo( - ssdp_usn="mock_usn", - ssdp_st="mock_st", - ssdp_location="http://192.168.1.1:12345/desc.xml", - upnp={}, - ), - ..., +@pytest.mark.usefixtures( + "ssdp_instant_discovery", "mock_get_source_ip", "mock_no_mac_address_from_host" +) +async def test_async_setup_entry_default_no_mac_address(hass: HomeAssistant): + """Test async_setup_entry.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id=TEST_USN, + data={ + CONFIG_ENTRY_ST: TEST_ST, + CONFIG_ENTRY_UDN: TEST_UDN, + CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN, + CONFIG_ENTRY_LOCATION: TEST_LOCATION, + CONFIG_ENTRY_MAC_ADDRESS: None, + }, ) - assert device._igd_device.device.device_url == new_location + + # Load config_entry. + entry.add_to_hass(hass) + assert await hass.config_entries.async_setup(entry.entry_id) is True diff --git a/tests/components/upnp/test_sensor.py b/tests/components/upnp/test_sensor.py index a249264ffee..6c00f63a479 100644 --- a/tests/components/upnp/test_sensor.py +++ b/tests/components/upnp/test_sensor.py @@ -25,7 +25,7 @@ from .conftest import MockIgdDevice from tests.common import MockConfigEntry, async_fire_time_changed -async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEntry): +async def test_upnp_sensors(hass: HomeAssistant, config_entry: MockConfigEntry): """Test normal sensors.""" # First poll. b_received_state = hass.states.get("sensor.mock_name_b_received") @@ -43,7 +43,7 @@ async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEn # Second poll. mock_device: MockIgdDevice = hass.data[DOMAIN][ - setup_integration.entry_id + config_entry.entry_id ].device._igd_device mock_device.traffic_data = { BYTES_RECEIVED: 10240, @@ -74,13 +74,9 @@ async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEn assert wan_status_state.state == "Disconnected" -async def test_derived_upnp_sensors( - hass: HomeAssistant, setup_integration: MockConfigEntry -): +async def test_derived_upnp_sensors(hass: HomeAssistant, config_entry: MockConfigEntry): """Test derived sensors.""" - coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][ - setup_integration.entry_id - ] + coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id] # First poll. kib_s_received_state = hass.states.get("sensor.mock_name_kib_s_received")