diff --git a/homeassistant/components/discovery/__init__.py b/homeassistant/components/discovery/__init__.py index 227995db971..87c0e41533b 100644 --- a/homeassistant/components/discovery/__init__.py +++ b/homeassistant/components/discovery/__init__.py @@ -32,7 +32,6 @@ SERVICE_FREEBOX = "freebox" SERVICE_HASS_IOS_APP = "hass_ios" SERVICE_HASSIO = "hassio" SERVICE_HEOS = "heos" -SERVICE_IGD = "igd" SERVICE_KONNECTED = "konnected" SERVICE_MOBILE_APP = "hass_mobile_app" SERVICE_NETGEAR = "netgear_router" @@ -48,7 +47,6 @@ SERVICE_XIAOMI_GW = "xiaomi_gw" CONFIG_ENTRY_HANDLERS = { SERVICE_DAIKIN: "daikin", SERVICE_TELLDUSLIVE: "tellduslive", - SERVICE_IGD: "upnp", } SERVICE_HANDLERS = { diff --git a/homeassistant/components/upnp/__init__.py b/homeassistant/components/upnp/__init__.py index 4d599be88b1..f183daa4ae9 100644 --- a/homeassistant/components/upnp/__init__.py +++ b/homeassistant/components/upnp/__init__.py @@ -19,6 +19,12 @@ from .const import ( CONF_HASS, CONF_LOCAL_IP, CONF_PORTS, + CONFIG_ENTRY_ST, + CONFIG_ENTRY_UDN, + DISCOVERY_LOCATION, + DISCOVERY_ST, + DISCOVERY_UDN, + DISCOVERY_USN, DOMAIN, LOGGER as _LOGGER, ) @@ -89,40 +95,41 @@ async def async_discover_and_construct( """Discovery devices and construct a Device for one.""" # pylint: disable=invalid-name discovery_infos = await Device.async_discover(hass) + _LOGGER.debug("Discovered devices: %s", discovery_infos) if not discovery_infos: _LOGGER.info("No UPnP/IGD devices discovered") return None if udn: - # get the discovery info with specified UDN - _LOGGER.debug("Discovery_infos: %s", discovery_infos) - filtered = [di for di in discovery_infos if di["udn"] == udn] + # Get the discovery info with specified UDN/ST. + filtered = [di for di in discovery_infos if di[DISCOVERY_UDN] == udn] if st: - _LOGGER.debug("Filtering on ST: %s", st) - filtered = [di for di in discovery_infos if di["st"] == st] + filtered = [di for di in discovery_infos if di[DISCOVERY_ST] == st] if not filtered: _LOGGER.warning( - 'Wanted UPnP/IGD device with UDN "%s" not found, ' "aborting", udn + 'Wanted UPnP/IGD device with UDN "%s" not found, aborting', udn ) return None - # ensure we're always taking the latest - filtered = sorted(filtered, key=itemgetter("st"), reverse=True) + + # Ensure we're always taking the latest, if we filtered only on UDN. + filtered = sorted(filtered, key=itemgetter(DISCOVERY_ST), reverse=True) discovery_info = filtered[0] else: - # get the first/any + # Get the first/any. discovery_info = discovery_infos[0] if len(discovery_infos) > 1: device_name = discovery_info.get( - "usn", discovery_info.get("ssdp_description", "") + DISCOVERY_USN, discovery_info.get(DISCOVERY_LOCATION, "") ) _LOGGER.info("Detected multiple UPnP/IGD devices, using: %s", device_name) - ssdp_description = discovery_info["ssdp_description"] - return await Device.async_create_device(hass, ssdp_description) + location = discovery_info[DISCOVERY_LOCATION] + return await Device.async_create_device(hass, location) async def async_setup(hass: HomeAssistantType, config: ConfigType): """Set up UPnP component.""" + _LOGGER.debug("async_setup, config: %s", config) conf_default = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN] conf = config.get(DOMAIN, conf_default) local_ip = await hass.async_add_executor_job(get_local_ip) @@ -133,7 +140,8 @@ async def async_setup(hass: HomeAssistantType, config: ConfigType): "ports": conf.get(CONF_PORTS), } - if conf is not None: + # Only start if set up via configuration.yaml. + if DOMAIN in config: hass.async_create_task( hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_IMPORT} @@ -145,23 +153,26 @@ async def async_setup(hass: HomeAssistantType, config: ConfigType): async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry) -> bool: """Set up UPnP/IGD device from a config entry.""" + _LOGGER.debug("async_setup_entry, config_entry: %s", config_entry.data) domain_data = hass.data[DOMAIN] conf = domain_data["config"] # discover and construct - udn = config_entry.data.get("udn") - st = config_entry.data.get("st") # pylint: disable=invalid-name + udn = config_entry.data.get(CONFIG_ENTRY_UDN) + st = config_entry.data.get(CONFIG_ENTRY_ST) # pylint: disable=invalid-name device = await async_discover_and_construct(hass, udn, st) if not device: _LOGGER.info("Unable to create UPnP/IGD, aborting") raise ConfigEntryNotReady - # 'register'/save UDN + ST + # 'register'/save device hass.data[DOMAIN]["devices"][device.udn] = device - hass.config_entries.async_update_entry( - entry=config_entry, - data={**config_entry.data, "udn": device.udn, "st": device.device_type}, - ) + + # Ensure entry has proper unique_id. + if config_entry.unique_id != device.unique_id: + hass.config_entries.async_update_entry( + entry=config_entry, unique_id=device.unique_id, + ) # create device registry entry device_registry = await dr.async_get_registry(hass) @@ -211,7 +222,7 @@ async def async_unload_entry( hass: HomeAssistantType, config_entry: ConfigEntry ) -> bool: """Unload a UPnP/IGD device from a config entry.""" - udn = config_entry.data["udn"] + udn = config_entry.data[CONFIG_ENTRY_UDN] device = hass.data[DOMAIN]["devices"][udn] # remove port mapping diff --git a/homeassistant/components/upnp/config_flow.py b/homeassistant/components/upnp/config_flow.py index 1601595b6a9..4701f21633c 100644 --- a/homeassistant/components/upnp/config_flow.py +++ b/homeassistant/components/upnp/config_flow.py @@ -1,10 +1,187 @@ """Config flow for UPNP.""" -from homeassistant import config_entries -from homeassistant.helpers import config_entry_flow +from typing import Mapping, Optional -from .const import DOMAIN +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant.components import ssdp + +from .const import ( # pylint: disable=unused-import + CONFIG_ENTRY_ST, + CONFIG_ENTRY_UDN, + DISCOVERY_LOCATION, + DISCOVERY_NAME, + DISCOVERY_ST, + DISCOVERY_UDN, + DISCOVERY_USN, + DOMAIN, + LOGGER as _LOGGER, +) from .device import Device -config_entry_flow.register_discovery_flow( - DOMAIN, "UPnP/IGD", Device.async_discover, config_entries.CONN_CLASS_LOCAL_POLL -) + +class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a UPnP/IGD config flow.""" + + VERSION = 1 + CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL + + # Paths: + # - ssdp(discovery_info) --> ssdp_confirm(None) --> ssdp_confirm({}) --> create_entry() + # - user(None): scan --> user({...}) --> create_entry() + # - import(None) --> create_entry() + + def __init__(self): + """Initialize the UPnP/IGD config flow.""" + self._discoveries: Mapping = None + + async def async_step_user(self, user_input: Optional[Mapping] = None): + """Handle a flow start.""" + _LOGGER.debug("async_step_user: user_input: %s", user_input) + # This uses DISCOVERY_USN as the identifier for the device. + + if user_input is not None: + # Ensure wanted device was discovered. + matching_discoveries = [ + discovery + for discovery in self._discoveries + if discovery[DISCOVERY_USN] == user_input["usn"] + ] + if not matching_discoveries: + return self.async_abort(reason="no_devices_discovered") + + discovery = matching_discoveries[0] + await self.async_set_unique_id( + discovery[DISCOVERY_USN], raise_on_progress=False + ) + return await self._async_create_entry_from_data(discovery) + + # Discover devices. + discoveries = await Device.async_discover(self.hass) + + # Store discoveries which have not been configured, add name for each discovery. + current_usns = {entry.unique_id for entry in self._async_current_entries()} + self._discoveries = [ + { + **discovery, + DISCOVERY_NAME: await self._async_get_name_for_discovery(discovery), + } + for discovery in discoveries + if discovery[DISCOVERY_USN] not in current_usns + ] + + # Ensure anything to add. + if not self._discoveries: + return self.async_abort(reason="no_devices_found") + + data_schema = vol.Schema( + { + vol.Required("usn"): vol.In( + { + discovery[DISCOVERY_USN]: discovery[DISCOVERY_NAME] + for discovery in self._discoveries + } + ), + } + ) + return self.async_show_form(step_id="user", data_schema=data_schema,) + + async def async_step_import(self, import_info: Optional[Mapping]): + """Import a new UPnP/IGD device as a config entry. + + This flow is triggered by `async_setup`. If no device has been + configured before, find any device and create a config_entry for it. + Otherwise, do nothing. + """ + _LOGGER.debug("async_step_import: import_info: %s", import_info) + + if import_info is None: + # Landed here via configuration.yaml entry. + # Any device already added, then abort. + if self._async_current_entries(): + _LOGGER.debug("aborting, already configured") + return self.async_abort(reason="already_configured") + + # Test if import_info isn't already configured. + if import_info is not None and any( + import_info["udn"] == entry.data[CONFIG_ENTRY_UDN] + and import_info["st"] == entry.data[CONFIG_ENTRY_ST] + for entry in self._async_current_entries() + ): + return self.async_abort(reason="already_configured") + + # Discover devices. + self._discoveries = await Device.async_discover(self.hass) + + # Ensure anything to add. If not, silently abort. + if not self._discoveries: + _LOGGER.info("No UPnP devices discovered, aborting.") + return self.async_abort(reason="no_devices_found") + + discovery = self._discoveries[0] + return await self._async_create_entry_from_data(discovery) + + async def async_step_ssdp(self, discovery_info: Mapping): + """Handle a discovered UPnP/IGD device. + + This flow is triggered by the SSDP component. It will check if the + host is already configured and delegate to the import step if not. + """ + _LOGGER.debug("async_step_ssdp: discovery_info: %s", discovery_info) + + # Ensure not already configuring/configured. + udn = discovery_info[ssdp.ATTR_UPNP_UDN] + st = discovery_info[ssdp.ATTR_SSDP_ST] # pylint: disable=invalid-name + usn = f"{udn}::{st}" + await self.async_set_unique_id(usn) + self._abort_if_unique_id_configured() + + # Store discovery. + name = discovery_info.get("friendlyName", "") + discovery = { + DISCOVERY_UDN: udn, + DISCOVERY_ST: st, + DISCOVERY_NAME: name, + } + self._discoveries = [discovery] + + # Ensure user recognizable. + # pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167 + self.context["title_placeholders"] = { + "name": name, + } + + return await self.async_step_ssdp_confirm() + + async def async_step_ssdp_confirm(self, user_input: Optional[Mapping] = None): + """Confirm integration via SSDP.""" + _LOGGER.debug("async_step_ssdp_confirm: user_input: %s", user_input) + if user_input is None: + return self.async_show_form(step_id="ssdp_confirm") + + discovery = self._discoveries[0] + return await self._async_create_entry_from_data(discovery) + + async def _async_create_entry_from_data(self, discovery: Mapping): + """Create an entry from own _data.""" + _LOGGER.debug("_async_create_entry_from_data: discovery: %s", discovery) + # Get name from device, if not found already. + if DISCOVERY_NAME not in discovery and DISCOVERY_LOCATION in discovery: + discovery[DISCOVERY_NAME] = await self._async_get_name_for_discovery( + discovery + ) + + title = discovery.get(DISCOVERY_NAME, "") + data = { + CONFIG_ENTRY_UDN: discovery[DISCOVERY_UDN], + CONFIG_ENTRY_ST: discovery[DISCOVERY_ST], + } + return self.async_create_entry(title=title, data=data) + + async def _async_get_name_for_discovery(self, discovery: Mapping): + """Get the name of the device from a discovery.""" + _LOGGER.debug("_async_get_name_for_discovery: discovery: %s", discovery) + device = await Device.async_create_device( + self.hass, discovery[DISCOVERY_LOCATION] + ) + return device.name diff --git a/homeassistant/components/upnp/const.py b/homeassistant/components/upnp/const.py index 80b5b718bbb..ee3b5873310 100644 --- a/homeassistant/components/upnp/const.py +++ b/homeassistant/components/upnp/const.py @@ -20,3 +20,10 @@ DATA_PACKETS = "packets" DATA_RATE_PACKETS_PER_SECOND = f"{DATA_PACKETS}/{TIME_SECONDS}" KIBIBYTE = 1024 UPDATE_INTERVAL = timedelta(seconds=30) +DISCOVERY_NAME = "name" +DISCOVERY_LOCATION = "location" +DISCOVERY_ST = "st" +DISCOVERY_UDN = "udn" +DISCOVERY_USN = "usn" +CONFIG_ENTRY_UDN = "udn" +CONFIG_ENTRY_ST = "st" diff --git a/homeassistant/components/upnp/device.py b/homeassistant/components/upnp/device.py index 73ae06d9945..ec7753bce87 100644 --- a/homeassistant/components/upnp/device.py +++ b/homeassistant/components/upnp/device.py @@ -1,7 +1,7 @@ """Home Assistant representation of an UPnP/IGD.""" import asyncio from ipaddress import IPv4Address -from typing import Mapping +from typing import List, Mapping import aiohttp from async_upnp_client import UpnpError, UpnpFactory @@ -16,6 +16,10 @@ from .const import ( BYTES_RECEIVED, BYTES_SENT, CONF_LOCAL_IP, + DISCOVERY_LOCATION, + DISCOVERY_ST, + DISCOVERY_UDN, + DISCOVERY_USN, DOMAIN, LOGGER as _LOGGER, PACKETS_RECEIVED, @@ -33,7 +37,7 @@ class Device: self._mapped_ports = [] @classmethod - async def async_discover(cls, hass: HomeAssistantType): + async def async_discover(cls, hass: HomeAssistantType) -> List[Mapping]: """Discover UPnP/IGD devices.""" _LOGGER.debug("Discovering UPnP/IGD devices") local_ip = None @@ -47,9 +51,11 @@ class Device: # add extra info and store devices devices = [] for discovery_info in discovery_infos: - discovery_info["udn"] = discovery_info["_udn"] - discovery_info["ssdp_description"] = discovery_info["location"] - discovery_info["source"] = "async_upnp_client" + discovery_info[DISCOVERY_UDN] = discovery_info["_udn"] + discovery_info[DISCOVERY_ST] = discovery_info["st"] + discovery_info[DISCOVERY_LOCATION] = discovery_info["location"] + usn = f"{discovery_info[DISCOVERY_UDN]}::{discovery_info[DISCOVERY_ST]}" + discovery_info[DISCOVERY_USN] = usn _LOGGER.debug("Discovered device: %s", discovery_info) devices.append(discovery_info) @@ -57,7 +63,7 @@ class Device: return devices @classmethod - async def async_create_device(cls, hass: HomeAssistantType, ssdp_description: str): + async def async_create_device(cls, hass: HomeAssistantType, ssdp_location: str): """Create UPnP/IGD device.""" # build async_upnp_client requester session = async_get_clientsession(hass) @@ -65,7 +71,7 @@ class Device: # create async_upnp_client device factory = UpnpFactory(requester, disable_state_variable_validation=True) - upnp_device = await factory.async_create_device(ssdp_description) + upnp_device = await factory.async_create_device(ssdp_location) igd_device = IgdDevice(upnp_device, None) @@ -96,6 +102,11 @@ class Device: """Get the device type.""" return self._igd_device.device_type + @property + def unique_id(self) -> str: + """Get the unique id.""" + return f"{self.udn}::{self.device_type}" + def __str__(self) -> str: """Get string representation.""" return f"IGD Device: {self.name}/{self.udn}" diff --git a/homeassistant/components/upnp/manifest.json b/homeassistant/components/upnp/manifest.json index 2f6e5de5884..e3b30cec9a4 100644 --- a/homeassistant/components/upnp/manifest.json +++ b/homeassistant/components/upnp/manifest.json @@ -5,5 +5,13 @@ "documentation": "https://www.home-assistant.io/integrations/upnp", "requirements": ["async-upnp-client==0.14.13"], "dependencies": [], - "codeowners": ["@StevenLooman"] + "codeowners": ["@StevenLooman"], + "ssdp": [ + { + "st": "urn:schemas-upnp-org:device:InternetGatewayDevice:1" + }, + { + "st": "urn:schemas-upnp-org:device:InternetGatewayDevice:2" + } + ] } diff --git a/homeassistant/components/upnp/strings.json b/homeassistant/components/upnp/strings.json index 5ad90b2c0cb..9f2f6978341 100644 --- a/homeassistant/components/upnp/strings.json +++ b/homeassistant/components/upnp/strings.json @@ -1,25 +1,22 @@ { "config": { + "flow_title": "UPnP/IGD: {name}", "step": { - "confirm": { - "description": "Do you want to set up UPnP/IGD?" + "init": { + }, + "ssdp_confirm": { + "description": "Do you want to set up this UPnP/IGD device?" }, "user": { - "title": "Configuration options", "data": { - "enable_port_mapping": "Enable port mapping for Home Assistant", - "enable_sensors": "Add traffic sensors", - "igd": "UPnP/IGD" + "usn": "Device" } } }, "abort": { "already_configured": "UPnP/IGD is already configured", - "incomplete_device": "Ignoring incomplete UPnP device", "no_devices_discovered": "No UPnP/IGDs discovered", - "no_devices_found": "No UPnP/IGD devices found on the network.", - "no_sensors_or_port_mapping": "Enable at least sensors or port mapping", - "single_instance_allowed": "Only a single configuration of UPnP/IGD is necessary." + "no_devices_found": "No UPnP/IGD devices found on the network." } } } diff --git a/homeassistant/components/upnp/translations/en.json b/homeassistant/components/upnp/translations/en.json index 6da89c0e3d6..124ccfdd17d 100644 --- a/homeassistant/components/upnp/translations/en.json +++ b/homeassistant/components/upnp/translations/en.json @@ -1,28 +1,21 @@ { "config": { + "flow_title": "UPnP/IGD: {name}", "abort": { "already_configured": "UPnP/IGD is already configured", - "incomplete_device": "Ignoring incomplete UPnP device", "no_devices_discovered": "No UPnP/IGDs discovered", - "no_devices_found": "No UPnP/IGD devices found on the network.", - "no_sensors_or_port_mapping": "Enable at least sensors or port mapping", - "single_instance_allowed": "Only a single configuration of UPnP/IGD is necessary." + "no_devices_found": "No UPnP/IGD devices found on the network." }, "step": { - "confirm": { - "description": "Do you want to set up UPnP/IGD?", - "title": "UPnP/IGD" - }, "init": { - "title": "UPnP/IGD" + }, + "ssdp_confirm": { + "description": "Do you want to set up this UPnP/IGD device?" }, "user": { "data": { - "enable_port_mapping": "Enable port mapping for Home Assistant", - "enable_sensors": "Add traffic sensors", - "igd": "UPnP/IGD" - }, - "title": "Configuration options" + "usn": "Device" + } } } } diff --git a/homeassistant/generated/ssdp.py b/homeassistant/generated/ssdp.py index 5dbef37d9bf..f46ba1611a8 100644 --- a/homeassistant/generated/ssdp.py +++ b/homeassistant/generated/ssdp.py @@ -81,6 +81,14 @@ SSDP = { "manufacturer": "Synology" } ], + "upnp": [ + { + "st": "urn:schemas-upnp-org:device:InternetGatewayDevice:1" + }, + { + "st": "urn:schemas-upnp-org:device:InternetGatewayDevice:2" + } + ], "wemo": [ { "manufacturer": "Belkin International Inc." diff --git a/tests/components/upnp/mock_device.py b/tests/components/upnp/mock_device.py new file mode 100644 index 00000000000..17d9b5659c5 --- /dev/null +++ b/tests/components/upnp/mock_device.py @@ -0,0 +1,77 @@ +"""Mock device for testing purposes.""" + +from typing import Mapping + +from homeassistant.components.upnp.const import ( + BYTES_RECEIVED, + BYTES_SENT, + PACKETS_RECEIVED, + PACKETS_SENT, + TIMESTAMP, +) +from homeassistant.components.upnp.device import Device +import homeassistant.util.dt as dt_util + + +class MockDevice(Device): + """Mock device for Device.""" + + def __init__(self, udn): + """Initialize mock device.""" + igd_device = object() + super().__init__(igd_device) + self._udn = udn + self.added_port_mappings = [] + self.removed_port_mappings = [] + + @classmethod + async def async_create_device(cls, hass, ssdp_location): + """Return self.""" + return cls("UDN") + + @property + def udn(self) -> str: + """Get the UDN.""" + return self._udn + + @property + def manufacturer(self) -> str: + """Get manufacturer.""" + return "mock-manufacturer" + + @property + def name(self) -> str: + """Get name.""" + return "mock-name" + + @property + def model_name(self) -> str: + """Get the model name.""" + return "mock-model-name" + + @property + def device_type(self) -> str: + """Get the device type.""" + return "urn:schemas-upnp-org:device:InternetGatewayDevice:1" + + async def _async_add_port_mapping( + self, external_port: int, local_ip: str, internal_port: int + ) -> None: + """Add a port mapping.""" + entry = [external_port, local_ip, internal_port] + self.added_port_mappings.append(entry) + + async def _async_delete_port_mapping(self, external_port: int) -> None: + """Remove a port mapping.""" + entry = external_port + self.removed_port_mappings.append(entry) + + async def async_get_traffic_data(self) -> Mapping[str, any]: + """Get traffic data.""" + return { + TIMESTAMP: dt_util.utcnow(), + BYTES_RECEIVED: 0, + BYTES_SENT: 0, + PACKETS_RECEIVED: 0, + PACKETS_SENT: 0, + } diff --git a/tests/components/upnp/test_config_flow.py b/tests/components/upnp/test_config_flow.py new file mode 100644 index 00000000000..c6e383bae55 --- /dev/null +++ b/tests/components/upnp/test_config_flow.py @@ -0,0 +1,124 @@ +"""Test UPnP/IGD config flow.""" + +from homeassistant import config_entries, data_entry_flow +from homeassistant.components import ssdp +from homeassistant.components.upnp.const import ( + DISCOVERY_LOCATION, + DISCOVERY_ST, + DISCOVERY_UDN, + DISCOVERY_USN, + DOMAIN, +) +from homeassistant.components.upnp.device import Device +from homeassistant.helpers.typing import HomeAssistantType + +from .mock_device import MockDevice + +from tests.async_mock import AsyncMock, patch + + +async def test_flow_ssdp_discovery(hass: HomeAssistantType): + """Test config flow: discovered + configured through ssdp.""" + udn = "uuid:device_1" + mock_device = MockDevice(udn) + discovery_infos = [ + { + DISCOVERY_ST: mock_device.device_type, + DISCOVERY_UDN: mock_device.udn, + DISCOVERY_LOCATION: "dummy", + } + ] + with patch.object( + Device, "async_create_device", AsyncMock(return_value=mock_device) + ), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)): + # Discovered via step ssdp. + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data={ + ssdp.ATTR_SSDP_ST: mock_device.device_type, + ssdp.ATTR_UPNP_UDN: mock_device.udn, + "friendlyName": mock_device.name, + }, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "ssdp_confirm" + + # Confirm via step ssdp_confirm. + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={}, + ) + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == mock_device.name + assert result["data"] == { + "st": mock_device.device_type, + "udn": mock_device.udn, + } + + +async def test_flow_user(hass: HomeAssistantType): + """Test config flow: discovered + configured through user.""" + udn = "uuid:device_1" + mock_device = MockDevice(udn) + usn = f"{mock_device.udn}::{mock_device.device_type}" + discovery_infos = [ + { + DISCOVERY_USN: usn, + DISCOVERY_ST: mock_device.device_type, + DISCOVERY_UDN: mock_device.udn, + DISCOVERY_LOCATION: "dummy", + } + ] + + with patch.object( + Device, "async_create_device", AsyncMock(return_value=mock_device) + ), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)): + # Discovered via step user. + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + + # Confirmed via step user. + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={"usn": usn}, + ) + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == mock_device.name + assert result["data"] == { + "st": mock_device.device_type, + "udn": mock_device.udn, + } + + +async def test_flow_config(hass: HomeAssistantType): + """Test config flow: discovered + configured through configuration.yaml.""" + udn = "uuid:device_1" + mock_device = MockDevice(udn) + usn = f"{mock_device.udn}::{mock_device.device_type}" + discovery_infos = [ + { + DISCOVERY_USN: usn, + DISCOVERY_ST: mock_device.device_type, + DISCOVERY_UDN: mock_device.udn, + DISCOVERY_LOCATION: "dummy", + } + ] + + with patch.object( + Device, "async_create_device", AsyncMock(return_value=mock_device) + ), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)): + # Discovered via step import. + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_IMPORT} + ) + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == mock_device.name + assert result["data"] == { + "st": mock_device.device_type, + "udn": mock_device.udn, + } diff --git a/tests/components/upnp/test_init.py b/tests/components/upnp/test_init.py index 7c43c24cdc3..7d32c37c9ef 100644 --- a/tests/components/upnp/test_init.py +++ b/tests/components/upnp/test_init.py @@ -3,91 +3,49 @@ from ipaddress import IPv4Address from homeassistant.components import upnp +from homeassistant.components.upnp.const import ( + DISCOVERY_LOCATION, + DISCOVERY_ST, + DISCOVERY_UDN, +) from homeassistant.components.upnp.device import Device from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.setup import async_setup_component -from tests.async_mock import patch -from tests.common import MockConfigEntry, mock_coro +from .mock_device import MockDevice - -class MockDevice(Device): - """Mock device for Device.""" - - def __init__(self, udn): - """Initialize mock device.""" - igd_device = object() - super().__init__(igd_device) - self._udn = udn - self.added_port_mappings = [] - self.removed_port_mappings = [] - - @classmethod - async def async_create_device(cls, hass, ssdp_description): - """Return self.""" - return cls("UDN") - - @property - def udn(self) -> str: - """Get the UDN.""" - return self._udn - - @property - def manufacturer(self) -> str: - """Get manufacturer.""" - return "mock-manufacturer" - - @property - def name(self) -> str: - """Get name.""" - return "mock-name" - - @property - def model_name(self) -> str: - """Get the model name.""" - return "mock-model-name" - - @property - def device_type(self) -> str: - """Get the device type.""" - return "urn:schemas-upnp-org:device:InternetGatewayDevice:1" - - async def _async_add_port_mapping( - self, external_port: int, local_ip: str, internal_port: int - ) -> None: - """Add a port mapping.""" - entry = [external_port, local_ip, internal_port] - self.added_port_mappings.append(entry) - - async def _async_delete_port_mapping(self, external_port: int) -> None: - """Remove a port mapping.""" - entry = external_port - self.removed_port_mappings.append(entry) +from tests.async_mock import AsyncMock, patch +from tests.common import MockConfigEntry async def test_async_setup_entry_default(hass): """Test async_setup_entry.""" udn = "uuid:device_1" - entry = MockConfigEntry(domain=upnp.DOMAIN) + mock_device = MockDevice(udn) + discovery_infos = [ + { + DISCOVERY_UDN: mock_device.udn, + DISCOVERY_ST: mock_device.device_type, + DISCOVERY_LOCATION: "http://192.168.1.1/desc.xml", + } + ] + entry = MockConfigEntry( + domain=upnp.DOMAIN, data={"udn": mock_device.udn, "st": mock_device.device_type} + ) config = { # no upnp } - with patch.object(Device, "async_create_device") as create_device, patch.object( - Device, "async_discover", return_value=mock_coro([]) - ) as async_discover: + async_discover = AsyncMock(return_value=[]) + with patch.object( + Device, "async_create_device", AsyncMock(return_value=mock_device) + ), patch.object(Device, "async_discover", async_discover): + # initialisation of component, no device discovered await async_setup_component(hass, "upnp", config) await hass.async_block_till_done() - # mock homeassistant.components.upnp.device.Device - mock_device = MockDevice(udn) - discovery_infos = [ - {"udn": udn, "ssdp_description": "http://192.168.1.1/desc.xml"} - ] - - create_device.return_value = mock_device + # loading of config_entry, device discovered async_discover.return_value = discovery_infos - assert await upnp.async_setup_entry(hass, entry) is True # ensure device is stored/used @@ -105,7 +63,17 @@ async def test_async_setup_entry_port_mapping(hass): """Test async_setup_entry.""" # pylint: disable=invalid-name udn = "uuid:device_1" - entry = MockConfigEntry(domain=upnp.DOMAIN) + mock_device = MockDevice(udn) + discovery_infos = [ + { + DISCOVERY_UDN: mock_device.udn, + DISCOVERY_ST: mock_device.device_type, + DISCOVERY_LOCATION: "http://192.168.1.1/desc.xml", + } + ] + entry = MockConfigEntry( + domain=upnp.DOMAIN, data={"udn": mock_device.udn, "st": mock_device.device_type} + ) config = { "http": {}, @@ -115,21 +83,17 @@ async def test_async_setup_entry_port_mapping(hass): "ports": {"hass": "hass"}, }, } - with patch.object(Device, "async_create_device") as create_device, patch.object( - Device, "async_discover", return_value=mock_coro([]) - ) as async_discover: + async_discover = AsyncMock(return_value=[]) + with patch.object( + Device, "async_create_device", AsyncMock(return_value=mock_device) + ), patch.object(Device, "async_discover", async_discover): + # initialisation of component, no device discovered await async_setup_component(hass, "http", config) await async_setup_component(hass, "upnp", config) await hass.async_block_till_done() - mock_device = MockDevice(udn) - discovery_infos = [ - {"udn": udn, "ssdp_description": "http://192.168.1.1/desc.xml"} - ] - - create_device.return_value = mock_device + # loading of config_entry, device discovered async_discover.return_value = discovery_infos - assert await upnp.async_setup_entry(hass, entry) is True # ensure device is stored/used