diff --git a/.coveragerc b/.coveragerc
index 72249c7684a..a9fc9c433b8 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -990,7 +990,6 @@ omit =
homeassistant/components/squeezebox/__init__.py
homeassistant/components/squeezebox/browse_media.py
homeassistant/components/squeezebox/media_player.py
- homeassistant/components/ssdp/util.py
homeassistant/components/starline/*
homeassistant/components/starlingbank/sensor.py
homeassistant/components/steam_online/sensor.py
diff --git a/homeassistant/components/dlna_dmr/manifest.json b/homeassistant/components/dlna_dmr/manifest.json
index 67d9713628a..f844bdf987b 100644
--- a/homeassistant/components/dlna_dmr/manifest.json
+++ b/homeassistant/components/dlna_dmr/manifest.json
@@ -2,7 +2,7 @@
"domain": "dlna_dmr",
"name": "DLNA Digital Media Renderer",
"documentation": "https://www.home-assistant.io/integrations/dlna_dmr",
- "requirements": ["async-upnp-client==0.20.0"],
+ "requirements": ["async-upnp-client==0.21.2"],
"dependencies": ["network"],
"codeowners": [],
"iot_class": "local_push"
diff --git a/homeassistant/components/sonos/__init__.py b/homeassistant/components/sonos/__init__.py
index ae3652683d4..6650e5f8904 100644
--- a/homeassistant/components/sonos/__init__.py
+++ b/homeassistant/components/sonos/__init__.py
@@ -263,8 +263,10 @@ class SonosDiscoveryManager:
else:
async_dispatcher_send(self.hass, f"{SONOS_SEEN}-{uid}")
- @callback
- def _async_ssdp_discovered_player(self, info):
+ async def _async_ssdp_discovered_player(self, info, change):
+ if change == ssdp.SsdpChange.BYEBYE:
+ return
+
discovered_ip = urlparse(info[ssdp.ATTR_SSDP_LOCATION]).hostname
boot_seqnum = info.get("X-RINCON-BOOTSEQ")
uid = info.get(ssdp.ATTR_UPNP_UDN)
@@ -316,7 +318,7 @@ class SonosDiscoveryManager:
return
self.entry.async_on_unload(
- ssdp.async_register_callback(
+ await ssdp.async_register_callback(
self.hass, self._async_ssdp_discovered_player, {"st": UPNP_ST}
)
)
diff --git a/homeassistant/components/ssdp/__init__.py b/homeassistant/components/ssdp/__init__.py
index 63ad6acb181..ce2901d4f1a 100644
--- a/homeassistant/components/ssdp/__init__.py
+++ b/homeassistant/components/ssdp/__init__.py
@@ -2,14 +2,18 @@
from __future__ import annotations
import asyncio
-from collections.abc import Mapping
+from collections.abc import Awaitable
from datetime import timedelta
+from enum import Enum
from ipaddress import IPv4Address, IPv6Address
import logging
-from typing import Any, Callable
+from typing import Any, Callable, Mapping
-from async_upnp_client.search import SSDPListener
+from async_upnp_client.aiohttp import AiohttpSessionRequester
+from async_upnp_client.const import DeviceOrServiceType, SsdpHeaders, SsdpSource
+from async_upnp_client.description_cache import DescriptionCache
from async_upnp_client.ssdp import SSDP_PORT
+from async_upnp_client.ssdp_listener import SsdpDevice, SsdpListener
from async_upnp_client.utils import CaseInsensitiveDict
from homeassistant import config_entries
@@ -19,12 +23,12 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
MATCH_ALL,
)
-from homeassistant.core import CoreState, HomeAssistant, callback as core_callback
+from homeassistant.core import HomeAssistant, callback as core_callback
+from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_ssdp, bind_hass
-from .descriptions import DescriptionManager
from .flow import FlowDispatcher, SSDPFlow
DOMAIN = "ssdp"
@@ -61,14 +65,25 @@ DISCOVERY_MAPPING = {
"location": ATTR_SSDP_LOCATION,
}
+SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")
+SsdpCallback = Callable[[Mapping[str, Any], SsdpChange], Awaitable]
+
+
+SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
+ SsdpSource.SEARCH: SsdpChange.ALIVE,
+ SsdpSource.ADVERTISEMENT_ALIVE: SsdpChange.ALIVE,
+ SsdpSource.ADVERTISEMENT_BYEBYE: SsdpChange.BYEBYE,
+ SsdpSource.ADVERTISEMENT_UPDATE: SsdpChange.UPDATE,
+}
+
_LOGGER = logging.getLogger(__name__)
@bind_hass
-def async_register_callback(
+async def async_register_callback(
hass: HomeAssistant,
- callback: Callable[[dict], None],
+ callback: SsdpCallback,
match_dict: None | dict[str, str] = None,
) -> Callable[[], None]:
"""Register to receive a callback on ssdp broadcast.
@@ -76,60 +91,61 @@ def async_register_callback(
Returns a callback that can be used to cancel the registration.
"""
scanner: Scanner = hass.data[DOMAIN]
- return scanner.async_register_callback(callback, match_dict)
+ return await scanner.async_register_callback(callback, match_dict)
@bind_hass
-def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
+async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
hass: HomeAssistant, udn: str, st: str
) -> dict[str, str] | None:
"""Fetch the discovery info cache."""
scanner: Scanner = hass.data[DOMAIN]
- return scanner.async_get_discovery_info_by_udn_st(udn, st)
+ return await scanner.async_get_discovery_info_by_udn_st(udn, st)
@bind_hass
-def async_get_discovery_info_by_st( # pylint: disable=invalid-name
+async def async_get_discovery_info_by_st( # pylint: disable=invalid-name
hass: HomeAssistant, st: str
) -> list[dict[str, str]]:
"""Fetch all the entries matching the st."""
scanner: Scanner = hass.data[DOMAIN]
- return scanner.async_get_discovery_info_by_st(st)
+ return await scanner.async_get_discovery_info_by_st(st)
@bind_hass
-def async_get_discovery_info_by_udn(
+async def async_get_discovery_info_by_udn(
hass: HomeAssistant, udn: str
) -> list[dict[str, str]]:
"""Fetch all the entries matching the udn."""
scanner: Scanner = hass.data[DOMAIN]
- return scanner.async_get_discovery_info_by_udn(udn)
+ return await scanner.async_get_discovery_info_by_udn(udn)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the SSDP integration."""
- scanner = hass.data[DOMAIN] = Scanner(hass, await async_get_ssdp(hass))
+ scanner = hass.data[DOMAIN] = Scanner(hass)
asyncio.create_task(scanner.async_start())
return True
-@core_callback
-def _async_process_callbacks(
- callbacks: list[Callable[[dict], None]], discovery_info: dict[str, str]
+async def _async_process_callbacks(
+ callbacks: list[SsdpCallback],
+ discovery_info: dict[str, str],
+ ssdp_change: SsdpChange,
) -> None:
for callback in callbacks:
try:
- callback(discovery_info)
+ await callback(discovery_info, ssdp_change)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failed to callback info: %s", discovery_info)
@core_callback
def _async_headers_match(
- headers: Mapping[str, str], match_dict: dict[str, str]
+ headers: Mapping[str, Any], match_dict: dict[str, str]
) -> bool:
for header, val in match_dict.items():
if val == MATCH_ALL:
@@ -141,25 +157,39 @@ def _async_headers_match(
class Scanner:
- """Class to manage SSDP scanning."""
+ """Class to manage SSDP searching and SSDP advertisements."""
- def __init__(
- self, hass: HomeAssistant, integration_matchers: dict[str, list[dict[str, str]]]
- ) -> None:
+ def __init__(self, hass: HomeAssistant) -> None:
"""Initialize class."""
self.hass = hass
- self.seen: set[tuple[str, str | None]] = set()
- self.cache: dict[tuple[str, str], Mapping[str, str]] = {}
- self._integration_matchers = integration_matchers
self._cancel_scan: Callable[[], None] | None = None
- self._ssdp_listeners: list[SSDPListener] = []
- self._callbacks: list[tuple[Callable[[dict], None], dict[str, str]]] = []
- self.flow_dispatcher: FlowDispatcher | None = None
- self.description_manager: DescriptionManager | None = None
+ self._ssdp_listeners: list[SsdpListener] = []
+ self._callbacks: list[tuple[SsdpCallback, dict[str, str]]] = []
+ self._flow_dispatcher: FlowDispatcher | None = None
+ self._description_cache: DescriptionCache | None = None
+ self._integration_matchers: dict[str, list[dict[str, str]]] | None = None
- @core_callback
- def async_register_callback(
- self, callback: Callable[[dict], None], match_dict: None | dict[str, str] = None
+ @property
+ def _ssdp_devices(self) -> list[SsdpDevice]:
+ """Get all seen devices."""
+ return [
+ ssdp_device
+ for ssdp_listener in self._ssdp_listeners
+ for ssdp_device in ssdp_listener.devices.values()
+ ]
+
+ @property
+ def _all_headers_from_ssdp_devices(
+ self,
+ ) -> dict[tuple[str, str], Mapping[str, Any]]:
+ return {
+ (ssdp_device.udn, dst): headers
+ for ssdp_device in self._ssdp_devices
+ for dst, headers in ssdp_device.all_combined_headers.items()
+ }
+
+ async def async_register_callback(
+ self, callback: SsdpCallback, match_dict: None | dict[str, str] = None
) -> Callable[[], None]:
"""Register a callback."""
if match_dict is None:
@@ -167,12 +197,13 @@ class Scanner:
# Make sure any entries that happened
# before the callback was registered are fired
- if self.hass.state != CoreState.running:
- for headers in self.cache.values():
- if _async_headers_match(headers, match_dict):
- _async_process_callbacks(
- [callback], self._async_headers_to_discovery_info(headers)
- )
+ for headers in self._all_headers_from_ssdp_devices.values():
+ if _async_headers_match(headers, match_dict):
+ await _async_process_callbacks(
+ [callback],
+ await self._async_headers_to_discovery_info(headers),
+ SsdpChange.ALIVE,
+ )
callback_entry = (callback, match_dict)
self._callbacks.append(callback_entry)
@@ -183,14 +214,19 @@ class Scanner:
return _async_remove_callback
- @core_callback
- def async_stop(self, *_: Any) -> None:
+ async def async_stop(self, *_: Any) -> None:
"""Stop the scanner."""
assert self._cancel_scan is not None
self._cancel_scan()
- for listener in self._ssdp_listeners:
- listener.async_stop()
- self._ssdp_listeners = []
+
+ await self._async_stop_ssdp_listeners()
+
+ async def _async_stop_ssdp_listeners(self) -> None:
+ """Stop the SSDP listeners."""
+ await asyncio.gather(
+ *(listener.async_stop() for listener in self._ssdp_listeners),
+ return_exceptions=True,
+ )
async def _async_build_source_set(self) -> set[IPv4Address | IPv6Address]:
"""Build the list of ssdp sources."""
@@ -208,34 +244,57 @@ class Scanner:
}
async def async_scan(self, *_: Any) -> None:
- """Scan for new entries using ssdp default and broadcast target."""
+ """Scan for new entries using ssdp listeners."""
+ await self.async_scan_multicast()
+ await self.async_scan_broadcast()
+
+ async def async_scan_multicast(self, *_: Any) -> None:
+ """Scan for new entries using multicase target."""
+ for ssdp_listener in self._ssdp_listeners:
+ await ssdp_listener.async_search()
+
+ async def async_scan_broadcast(self, *_: Any) -> None:
+ """Scan for new entries using broadcast target."""
+ # Some sonos devices only seem to respond if we send to the broadcast
+ # address. This matches pysonos' behavior
+ # https://github.com/amelchio/pysonos/blob/d4329b4abb657d106394ae69357805269708c996/pysonos/discovery.py#L120
for listener in self._ssdp_listeners:
- listener.async_search()
try:
IPv4Address(listener.source_ip)
except ValueError:
continue
- # Some sonos devices only seem to respond if we send to the broadcast
- # address. This matches pysonos' behavior
- # https://github.com/amelchio/pysonos/blob/d4329b4abb657d106394ae69357805269708c996/pysonos/discovery.py#L120
- listener.async_search((str(IPV4_BROADCAST), SSDP_PORT))
+ await listener.async_search((str(IPV4_BROADCAST), SSDP_PORT))
async def async_start(self) -> None:
- """Start the scanner."""
- self.description_manager = DescriptionManager(self.hass)
- self.flow_dispatcher = FlowDispatcher(self.hass)
+ """Start the scanners."""
+ session = async_get_clientsession(self.hass)
+ requester = AiohttpSessionRequester(session, True, 10)
+ self._description_cache = DescriptionCache(requester)
+ self._flow_dispatcher = FlowDispatcher(self.hass)
+ self._integration_matchers = await async_get_ssdp(self.hass)
+
+ await self._async_start_ssdp_listeners()
+
+ self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
+ self.hass.bus.async_listen_once(
+ EVENT_HOMEASSISTANT_STARTED, self._flow_dispatcher.async_start
+ )
+ self._cancel_scan = async_track_time_interval(
+ self.hass, self.async_scan, SCAN_INTERVAL
+ )
+
+ # Trigger the initial-scan.
+ await self.async_scan()
+
+ async def _async_start_ssdp_listeners(self) -> None:
+ """Start the SSDP Listeners."""
for source_ip in await self._async_build_source_set():
self._ssdp_listeners.append(
- SSDPListener(
- async_connect_callback=self.async_scan,
- async_callback=self._async_process_entry,
+ SsdpListener(
+ async_callback=self._ssdp_listener_callback,
source_ip=source_ip,
)
)
- self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
- self.hass.bus.async_listen_once(
- EVENT_HOMEASSISTANT_STARTED, self.flow_dispatcher.async_start
- )
results = await asyncio.gather(
*(listener.async_start() for listener in self._ssdp_listeners),
return_exceptions=True,
@@ -251,135 +310,116 @@ class Scanner:
failed_listeners.append(self._ssdp_listeners[idx])
for listener in failed_listeners:
self._ssdp_listeners.remove(listener)
- self._cancel_scan = async_track_time_interval(
- self.hass, self.async_scan, SCAN_INTERVAL
- )
@core_callback
def _async_get_matching_callbacks(
- self, headers: Mapping[str, str]
- ) -> list[Callable[[dict], None]]:
+ self,
+ combined_headers: SsdpHeaders,
+ ) -> list[SsdpCallback]:
"""Return a list of callbacks that match."""
return [
callback
for callback, match_dict in self._callbacks
- if _async_headers_match(headers, match_dict)
+ if _async_headers_match(combined_headers, match_dict)
]
@core_callback
- def _async_matching_domains(self, info_with_req: CaseInsensitiveDict) -> set[str]:
+ def _async_matching_domains(self, info_with_desc: CaseInsensitiveDict) -> set[str]:
+ assert self._integration_matchers is not None
domains = set()
for domain, matchers in self._integration_matchers.items():
for matcher in matchers:
- if all(info_with_req.get(k) == v for (k, v) in matcher.items()):
+ if all(info_with_desc.get(k) == v for (k, v) in matcher.items()):
domains.add(domain)
return domains
- def _async_seen(self, header_st: str | None, header_location: str | None) -> bool:
- """Check if we have seen a specific st and optional location."""
- if header_st is None:
- return True
- return (header_st, header_location) in self.seen
+ async def _ssdp_listener_callback(
+ self, ssdp_device: SsdpDevice, dst: DeviceOrServiceType, source: SsdpSource
+ ) -> None:
+ """Handle a device/service change."""
+ _LOGGER.debug(
+ "Change, ssdp_device: %s, dst: %s, source: %s", ssdp_device, dst, source
+ )
- def _async_see(self, header_st: str | None, header_location: str | None) -> None:
- """Mark a specific st and optional location as seen."""
- if header_st is not None:
- self.seen.add((header_st, header_location))
+ location = ssdp_device.location
+ info_desc = await self._async_get_description_dict(location) or {}
+ combined_headers = ssdp_device.combined_headers(dst)
+ info_with_desc = CaseInsensitiveDict(combined_headers, **info_desc)
+ discovery_info = discovery_info_from_headers_and_description(info_with_desc)
- def _async_unsee(self, header_st: str | None, header_location: str | None) -> None:
- """If we see a device in a new location, unsee the original location."""
- if header_st is not None:
- self.seen.discard((header_st, header_location))
+ callbacks = self._async_get_matching_callbacks(combined_headers)
+ ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source]
+ await _async_process_callbacks(callbacks, discovery_info, ssdp_change)
- async def _async_process_entry(self, headers: Mapping[str, str]) -> None:
- """Process SSDP entries."""
- _LOGGER.debug("_async_process_entry: %s", headers)
- h_st = headers.get("st")
- h_location = headers.get("location")
+ for domain in self._async_matching_domains(info_with_desc):
+ _LOGGER.debug("Discovered %s at %s", domain, location)
- if h_st and (udn := _udn_from_usn(headers.get("usn"))):
- cache_key = (udn, h_st)
- if old_headers := self.cache.get(cache_key):
- old_h_location = old_headers.get("location")
- if h_location != old_h_location:
- self._async_unsee(old_headers.get("st"), old_h_location)
- self.cache[cache_key] = headers
-
- callbacks = self._async_get_matching_callbacks(headers)
- if self._async_seen(h_st, h_location) and not callbacks:
- return
-
- assert self.description_manager is not None
- info_req = await self.description_manager.fetch_description(h_location) or {}
- info_with_req = CaseInsensitiveDict(**headers, **info_req)
- discovery_info = discovery_info_from_headers_and_request(info_with_req)
-
- _async_process_callbacks(callbacks, discovery_info)
-
- if self._async_seen(h_st, h_location):
- return
- self._async_see(h_st, h_location)
-
- for domain in self._async_matching_domains(info_with_req):
- _LOGGER.debug("Discovered %s at %s", domain, h_location)
flow: SSDPFlow = {
"domain": domain,
"context": {"source": config_entries.SOURCE_SSDP},
"data": discovery_info,
}
- assert self.flow_dispatcher is not None
- self.flow_dispatcher.create(flow)
+ assert self._flow_dispatcher is not None
+ self._flow_dispatcher.create(flow)
- @core_callback
- def _async_headers_to_discovery_info(
- self, headers: Mapping[str, str]
- ) -> dict[str, str]:
+ async def _async_get_description_dict(
+ self, location: str | None
+ ) -> Mapping[str, str]:
+ """Get description dict."""
+ assert self._description_cache is not None
+ return await self._description_cache.async_get_description_dict(location) or {}
+
+ async def _async_headers_to_discovery_info(
+ self, headers: Mapping[str, Any]
+ ) -> dict[str, Any]:
"""Combine the headers and description into discovery_info.
Building this is a bit expensive so we only do it on demand.
"""
- assert self.description_manager is not None
+ assert self._description_cache is not None
location = headers["location"]
- info_req = self.description_manager.async_cached_description(location) or {}
- return discovery_info_from_headers_and_request(
- CaseInsensitiveDict(**headers, **info_req)
+ info_desc = (
+ await self._description_cache.async_get_description_dict(location) or {}
+ )
+ return discovery_info_from_headers_and_description(
+ CaseInsensitiveDict(headers, **info_desc)
)
- @core_callback
- def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
+ async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
self, udn: str, st: str
- ) -> dict[str, str] | None:
+ ) -> dict[str, Any] | None:
"""Return discovery_info for a udn and st."""
- if headers := self.cache.get((udn, st)):
- return self._async_headers_to_discovery_info(headers)
+ if headers := self._all_headers_from_ssdp_devices.get((udn, st)):
+ return await self._async_headers_to_discovery_info(headers)
return None
- @core_callback
- def async_get_discovery_info_by_st( # pylint: disable=invalid-name
+ async def async_get_discovery_info_by_st( # pylint: disable=invalid-name
self, st: str
- ) -> list[dict[str, str]]:
+ ) -> list[dict[str, Any]]:
"""Return matching discovery_infos for a st."""
return [
- self._async_headers_to_discovery_info(headers)
- for udn_st, headers in self.cache.items()
+ await self._async_headers_to_discovery_info(headers)
+ for udn_st, headers in self._all_headers_from_ssdp_devices.items()
if udn_st[1] == st
]
- @core_callback
- def async_get_discovery_info_by_udn(self, udn: str) -> list[dict[str, str]]:
+ async def async_get_discovery_info_by_udn(self, udn: str) -> list[dict[str, Any]]:
"""Return matching discovery_infos for a udn."""
return [
- self._async_headers_to_discovery_info(headers)
- for udn_st, headers in self.cache.items()
+ await self._async_headers_to_discovery_info(headers)
+ for udn_st, headers in self._all_headers_from_ssdp_devices.items()
if udn_st[0] == udn
]
-def discovery_info_from_headers_and_request(
- info_with_req: CaseInsensitiveDict,
-) -> dict[str, str]:
+def discovery_info_from_headers_and_description(
+ info_with_desc: CaseInsensitiveDict,
+) -> dict[str, Any]:
"""Convert headers and description to discovery_info."""
- info = {DISCOVERY_MAPPING.get(k.lower(), k): v for k, v in info_with_req.items()}
+ info = {
+ DISCOVERY_MAPPING.get(k.lower(), k): v
+ for k, v in info_with_desc.as_dict().items()
+ }
if ATTR_UPNP_UDN not in info and ATTR_SSDP_USN in info:
if udn := _udn_from_usn(info[ATTR_SSDP_USN]):
diff --git a/homeassistant/components/ssdp/descriptions.py b/homeassistant/components/ssdp/descriptions.py
deleted file mode 100644
index e754b10669a..00000000000
--- a/homeassistant/components/ssdp/descriptions.py
+++ /dev/null
@@ -1,69 +0,0 @@
-"""The SSDP integration."""
-from __future__ import annotations
-
-import asyncio
-import logging
-
-import aiohttp
-from defusedxml import ElementTree
-
-from homeassistant.core import HomeAssistant, callback
-
-from .util import etree_to_dict
-
-_LOGGER = logging.getLogger(__name__)
-
-
-class DescriptionManager:
- """Class to cache and manage fetching descriptions."""
-
- def __init__(self, hass: HomeAssistant) -> None:
- """Init the manager."""
- self.hass = hass
- self._description_cache: dict[str, None | dict[str, str]] = {}
-
- async def fetch_description(
- self, xml_location: str | None
- ) -> None | dict[str, str]:
- """Fetch the location or get it from the cache."""
- if xml_location is None:
- return None
- if xml_location not in self._description_cache:
- try:
- self._description_cache[xml_location] = await self._fetch_description(
- xml_location
- )
- except Exception: # pylint: disable=broad-except
- # If it fails, cache the failure so we do not keep trying over and over
- self._description_cache[xml_location] = None
- _LOGGER.exception("Failed to fetch ssdp data from: %s", xml_location)
-
- return self._description_cache[xml_location]
-
- @callback
- def async_cached_description(self, xml_location: str) -> None | dict[str, str]:
- """Fetch the description from the cache."""
- return self._description_cache.get(xml_location)
-
- async def _fetch_description(self, xml_location: str) -> None | dict[str, str]:
- """Fetch an XML description."""
- session = self.hass.helpers.aiohttp_client.async_get_clientsession()
- try:
- for _ in range(2):
- resp = await session.get(xml_location, timeout=5)
- # Samsung Smart TV sometimes returns an empty document the
- # first time. Retry once.
- if xml := await resp.text(errors="replace"):
- break
- except (aiohttp.ClientError, asyncio.TimeoutError) as err:
- _LOGGER.debug("Error fetching %s: %s", xml_location, err)
- return None
-
- try:
- tree = ElementTree.fromstring(xml)
- except ElementTree.ParseError as err:
- _LOGGER.debug("Error parsing %s: %s", xml_location, err)
- return None
-
- root = etree_to_dict(tree).get("root") or {}
- return root.get("device") or {}
diff --git a/homeassistant/components/ssdp/manifest.json b/homeassistant/components/ssdp/manifest.json
index 746e90c7388..1e7dec03d7d 100644
--- a/homeassistant/components/ssdp/manifest.json
+++ b/homeassistant/components/ssdp/manifest.json
@@ -4,7 +4,7 @@
"documentation": "https://www.home-assistant.io/integrations/ssdp",
"requirements": [
"defusedxml==0.7.1",
- "async-upnp-client==0.20.0"
+ "async-upnp-client==0.21.2"
],
"dependencies": ["network"],
"after_dependencies": ["zeroconf"],
diff --git a/homeassistant/components/ssdp/util.py b/homeassistant/components/ssdp/util.py
deleted file mode 100644
index c28f8ce088d..00000000000
--- a/homeassistant/components/ssdp/util.py
+++ /dev/null
@@ -1,42 +0,0 @@
-"""Util functions used by SSDP."""
-from __future__ import annotations
-
-from collections import defaultdict
-from typing import Any
-
-from defusedxml import ElementTree
-
-
-# Adapted from http://stackoverflow.com/a/10077069
-# to follow the XML to JSON spec
-# https://www.xml.com/pub/a/2006/05/31/converting-between-xml-and-json.html
-def etree_to_dict(tree: ElementTree) -> dict[str, dict[str, Any] | None]:
- """Convert an ETree object to a dict."""
- # strip namespace
- tag_name = tree.tag[tree.tag.find("}") + 1 :]
-
- tree_dict: dict[str, dict[str, Any] | None] = {
- tag_name: {} if tree.attrib else None
- }
- children = list(tree)
- if children:
- child_dict: dict[str, list] = defaultdict(list)
- for child in map(etree_to_dict, children):
- for k, val in child.items():
- child_dict[k].append(val)
- tree_dict = {
- tag_name: {k: v[0] if len(v) == 1 else v for k, v in child_dict.items()}
- }
- dict_meta = tree_dict[tag_name]
- if tree.attrib:
- assert dict_meta is not None
- dict_meta.update(("@" + k, v) for k, v in tree.attrib.items())
- if tree.text:
- text = tree.text.strip()
- if children or tree.attrib:
- if text:
- assert dict_meta is not None
- dict_meta["#text"] = text
- else:
- tree_dict[tag_name] = text
- return tree_dict
diff --git a/homeassistant/components/upnp/__init__.py b/homeassistant/components/upnp/__init__.py
index 9541331fe0b..3251b8c69fb 100644
--- a/homeassistant/components/upnp/__init__.py
+++ b/homeassistant/components/upnp/__init__.py
@@ -5,7 +5,6 @@ import asyncio
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import timedelta
-from ipaddress import ip_address
from typing import Any
import voluptuous as vol
@@ -13,13 +12,12 @@ import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.components.binary_sensor import BinarySensorEntityDescription
-from homeassistant.components.network import async_get_source_ip
-from homeassistant.components.network.const import PUBLIC_TARGET_IP
from homeassistant.components.sensor import SensorEntityDescription
+from homeassistant.components.ssdp import SsdpChange
from homeassistant.config_entries import ConfigEntry
-from homeassistant.core import HomeAssistant, callback
+from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
-from homeassistant.helpers import config_validation as cv, device_registry as dr
+from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
@@ -27,7 +25,6 @@ from homeassistant.helpers.update_coordinator import (
)
from .const import (
- CONF_LOCAL_IP,
CONFIG_ENTRY_HOSTNAME,
CONFIG_ENTRY_SCAN_INTERVAL,
CONFIG_ENTRY_ST,
@@ -36,7 +33,6 @@ from .const import (
DOMAIN,
DOMAIN_CONFIG,
DOMAIN_DEVICES,
- DOMAIN_LOCAL_IP,
LOGGER,
)
from .device import Device
@@ -49,9 +45,7 @@ PLATFORMS = ["binary_sensor", "sensor"]
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
- {
- vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
- },
+ {},
)
},
extra=vol.ALLOW_EXTRA,
@@ -63,11 +57,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
LOGGER.debug("async_setup, config: %s", config)
conf_default = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN]
conf = config.get(DOMAIN, conf_default)
- local_ip = await async_get_source_ip(hass, PUBLIC_TARGET_IP)
hass.data[DOMAIN] = {
DOMAIN_CONFIG: conf,
DOMAIN_DEVICES: {},
- DOMAIN_LOCAL_IP: conf.get(CONF_LOCAL_IP, local_ip),
}
# Only start if set up via configuration.yaml.
@@ -93,16 +85,18 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
device_discovered_event = asyncio.Event()
discovery_info: Mapping[str, Any] | None = None
- @callback
- def device_discovered(info: Mapping[str, Any]) -> None:
+ async def device_discovered(headers: Mapping[str, Any], change: SsdpChange) -> None:
+ if change == SsdpChange.BYEBYE:
+ return
+
nonlocal discovery_info
LOGGER.debug(
- "Device discovered: %s, at: %s", usn, info[ssdp.ATTR_SSDP_LOCATION]
+ "Device discovered: %s, at: %s", usn, headers[ssdp.ATTR_SSDP_LOCATION]
)
- discovery_info = info
+ discovery_info = headers
device_discovered_event.set()
- cancel_discovered_callback = ssdp.async_register_callback(
+ cancel_discovered_callback = await ssdp.async_register_callback(
hass,
device_discovered,
{
@@ -177,9 +171,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
LOGGER.debug("Enabling sensors")
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
- # Start device updater.
- await device.async_start()
-
return True
@@ -187,9 +178,6 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
"""Unload a UPnP/IGD device from a config entry."""
LOGGER.debug("Unloading config entry: %s", config_entry.unique_id)
- if coordinator := hass.data[DOMAIN].pop(config_entry.entry_id, None):
- await coordinator.device.async_stop()
-
LOGGER.debug("Deleting sensors")
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
@@ -228,10 +216,10 @@ class UpnpDataUpdateCoordinator(DataUpdateCoordinator):
self.device.async_get_status(),
)
- data = dict(update_values[0])
- data.update(update_values[1])
-
- return data
+ return {
+ **update_values[0],
+ **update_values[1],
+ }
class UpnpEntity(CoordinatorEntity):
diff --git a/homeassistant/components/upnp/config_flow.py b/homeassistant/components/upnp/config_flow.py
index 5df4e267427..9352ae0a5ff 100644
--- a/homeassistant/components/upnp/config_flow.py
+++ b/homeassistant/components/upnp/config_flow.py
@@ -10,6 +10,7 @@ import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
+from homeassistant.components.ssdp import SsdpChange
from homeassistant.const import CONF_SCAN_INTERVAL
from homeassistant.core import HomeAssistant, callback
@@ -40,8 +41,10 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
"""Wait for a device to be discovered."""
device_discovered_event = asyncio.Event()
- @callback
- def device_discovered(info: Mapping[str, Any]) -> None:
+ async def device_discovered(info: Mapping[str, Any], change: SsdpChange) -> None:
+ if change == SsdpChange.BYEBYE:
+ return
+
LOGGER.info(
"Device discovered: %s, at: %s",
info[ssdp.ATTR_SSDP_USN],
@@ -49,14 +52,14 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
)
device_discovered_event.set()
- cancel_discovered_callback_1 = ssdp.async_register_callback(
+ cancel_discovered_callback_1 = await ssdp.async_register_callback(
hass,
device_discovered,
{
ssdp.ATTR_SSDP_ST: ST_IGD_V1,
},
)
- cancel_discovered_callback_2 = ssdp.async_register_callback(
+ cancel_discovered_callback_2 = await ssdp.async_register_callback(
hass,
device_discovered,
{
@@ -77,11 +80,11 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
return True
-def _discovery_igd_devices(hass: HomeAssistant) -> list[Mapping[str, Any]]:
+async def _async_discover_igd_devices(hass: HomeAssistant) -> list[Mapping[str, Any]]:
"""Discovery IGD devices."""
- return ssdp.async_get_discovery_info_by_st(
+ return await ssdp.async_get_discovery_info_by_st(
hass, ST_IGD_V1
- ) + ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2)
+ ) + await ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2)
class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@@ -121,7 +124,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return await self._async_create_entry_from_discovery(discovery)
# Discover devices.
- discoveries = _discovery_igd_devices(self.hass)
+ discoveries = await _async_discover_igd_devices(self.hass)
# Store discoveries which have not been configured.
current_unique_ids = {
@@ -171,7 +174,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
# Discover devices.
await _async_wait_for_discoveries(self.hass)
- discoveries = _discovery_igd_devices(self.hass)
+ discoveries = await _async_discover_igd_devices(self.hass)
# Ensure anything to add. If not, silently abort.
if not discoveries:
diff --git a/homeassistant/components/upnp/const.py b/homeassistant/components/upnp/const.py
index d66a954962f..5eeb73abc4a 100644
--- a/homeassistant/components/upnp/const.py
+++ b/homeassistant/components/upnp/const.py
@@ -6,11 +6,9 @@ from homeassistant.const import TIME_SECONDS
LOGGER = logging.getLogger(__package__)
-CONF_LOCAL_IP = "local_ip"
DOMAIN = "upnp"
DOMAIN_CONFIG = "config"
DOMAIN_DEVICES = "devices"
-DOMAIN_LOCAL_IP = "local_ip"
BYTES_RECEIVED = "bytes_received"
BYTES_SENT = "bytes_sent"
PACKETS_RECEIVED = "packets_received"
diff --git a/homeassistant/components/upnp/device.py b/homeassistant/components/upnp/device.py
index a1040816629..7205da71c84 100644
--- a/homeassistant/components/upnp/device.py
+++ b/homeassistant/components/upnp/device.py
@@ -3,25 +3,23 @@ from __future__ import annotations
import asyncio
from collections.abc import Mapping
-from ipaddress import IPv4Address
from typing import Any
from urllib.parse import urlparse
-from async_upnp_client import UpnpFactory
+from async_upnp_client import UpnpDevice, UpnpFactory
from async_upnp_client.aiohttp import AiohttpSessionRequester
-from async_upnp_client.device_updater import DeviceUpdater
from async_upnp_client.profiles.igd import IgdDevice
+from homeassistant.components import ssdp
+from homeassistant.components.ssdp import SsdpChange
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
+from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
import homeassistant.util.dt as dt_util
from .const import (
BYTES_RECEIVED,
BYTES_SENT,
- CONF_LOCAL_IP,
- DOMAIN,
- DOMAIN_CONFIG,
LOGGER as _LOGGER,
PACKETS_RECEIVED,
PACKETS_SENT,
@@ -32,54 +30,61 @@ from .const import (
)
-def _get_local_ip(hass: HomeAssistant) -> IPv4Address | None:
- """Get the configured local ip."""
- if DOMAIN in hass.data and DOMAIN_CONFIG in hass.data[DOMAIN]:
- local_ip = hass.data[DOMAIN][DOMAIN_CONFIG].get(CONF_LOCAL_IP)
- if local_ip:
- return IPv4Address(local_ip)
- return None
-
-
class Device:
"""Home Assistant representation of a UPnP/IGD device."""
- def __init__(self, igd_device: IgdDevice, device_updater: DeviceUpdater) -> None:
+ def __init__(self, hass: HomeAssistant, igd_device: IgdDevice) -> None:
"""Initialize UPnP/IGD device."""
+ self.hass = hass
self._igd_device = igd_device
- self._device_updater = device_updater
+ self.coordinator: DataUpdateCoordinator = None
@classmethod
- async def async_create_device(
+ async def async_create_upnp_device(
cls, hass: HomeAssistant, ssdp_location: str
- ) -> Device:
- """Create UPnP/IGD device."""
+ ) -> UpnpDevice:
+ """Create UPnP device."""
# Build async_upnp_client requester.
session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, True, 10)
# Create async_upnp_client device.
factory = UpnpFactory(requester, disable_state_variable_validation=True)
- upnp_device = await factory.async_create_device(ssdp_location)
+ return await factory.async_create_device(ssdp_location)
+
+ @classmethod
+ async def async_create_device(
+ cls, hass: HomeAssistant, ssdp_location: str
+ ) -> Device:
+ """Create UPnP/IGD device."""
+ upnp_device = await Device.async_create_upnp_device(hass, ssdp_location)
# Create profile wrapper.
igd_device = IgdDevice(upnp_device, None)
+ device = cls(hass, igd_device)
- # Create updater.
- local_ip = _get_local_ip(hass)
- device_updater = DeviceUpdater(
- device=upnp_device, factory=factory, source_ip=local_ip
+ # Register SSDP callback for updates.
+ usn = f"{upnp_device.udn}::{upnp_device.device_type}"
+ await ssdp.async_register_callback(
+ hass, device.async_ssdp_callback, {ssdp.ATTR_SSDP_USN: usn}
)
- return cls(igd_device, device_updater)
+ return device
- async def async_start(self) -> None:
- """Start the device updater."""
- await self._device_updater.async_start()
+ async def async_ssdp_callback(
+ self, headers: Mapping[str, Any], change: SsdpChange
+ ) -> None:
+ """SSDP callback, update if needed."""
+ if change != SsdpChange.UPDATE or ssdp.ATTR_SSDP_LOCATION not in headers:
+ return
- async def async_stop(self) -> None:
- """Stop the device updater."""
- await self._device_updater.async_stop()
+ location = headers[ssdp.ATTR_SSDP_LOCATION]
+ device = self._igd_device.device
+ if location == device.device_url:
+ return
+
+ new_upnp_device = Device.async_create_upnp_device(self.hass, location)
+ device.reinit(new_upnp_device)
@property
def udn(self) -> str:
diff --git a/homeassistant/components/upnp/manifest.json b/homeassistant/components/upnp/manifest.json
index 5f38a827ec7..fa71fa67751 100644
--- a/homeassistant/components/upnp/manifest.json
+++ b/homeassistant/components/upnp/manifest.json
@@ -3,7 +3,7 @@
"name": "UPnP/IGD",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/upnp",
- "requirements": ["async-upnp-client==0.20.0"],
+ "requirements": ["async-upnp-client==0.21.2"],
"dependencies": ["network", "ssdp"],
"codeowners": ["@StevenLooman","@ehendrix23"],
"ssdp": [
diff --git a/homeassistant/components/yamaha_musiccast/__init__.py b/homeassistant/components/yamaha_musiccast/__init__.py
index 0de8428b0dc..9f691773e11 100644
--- a/homeassistant/components/yamaha_musiccast/__init__.py
+++ b/homeassistant/components/yamaha_musiccast/__init__.py
@@ -30,7 +30,7 @@ SCAN_INTERVAL = timedelta(seconds=60)
async def get_upnp_desc(hass: HomeAssistant, host: str):
"""Get the upnp description URL for a given host, using the SSPD scanner."""
- ssdp_entries = ssdp.async_get_discovery_info_by_st(hass, "upnp:rootdevice")
+ ssdp_entries = await ssdp.async_get_discovery_info_by_st(hass, "upnp:rootdevice")
matches = [w for w in ssdp_entries if w.get("_host", "") == host]
upnp_desc = None
for match in matches:
diff --git a/homeassistant/components/yeelight/__init__.py b/homeassistant/components/yeelight/__init__.py
index a0deb0fdf21..5473e8eb553 100644
--- a/homeassistant/components/yeelight/__init__.py
+++ b/homeassistant/components/yeelight/__init__.py
@@ -8,7 +8,7 @@ from ipaddress import IPv4Address, IPv6Address
import logging
from urllib.parse import urlparse
-from async_upnp_client.search import SSDPListener
+from async_upnp_client.search import SsdpSearchListener
import voluptuous as vol
from yeelight import BulbException
from yeelight.aio import KEY_CONNECTED, AsyncBulb
@@ -395,7 +395,7 @@ class YeelightScanner:
return _async_connected
self._listeners.append(
- SSDPListener(
+ SsdpSearchListener(
async_callback=self._async_process_entry,
service_type=SSDP_ST,
target=SSDP_TARGET,
diff --git a/homeassistant/components/yeelight/manifest.json b/homeassistant/components/yeelight/manifest.json
index 0a4b5d4499f..47329235863 100644
--- a/homeassistant/components/yeelight/manifest.json
+++ b/homeassistant/components/yeelight/manifest.json
@@ -2,7 +2,7 @@
"domain": "yeelight",
"name": "Yeelight",
"documentation": "https://www.home-assistant.io/integrations/yeelight",
- "requirements": ["yeelight==0.7.4", "async-upnp-client==0.20.0"],
+ "requirements": ["yeelight==0.7.4", "async-upnp-client==0.21.2"],
"codeowners": ["@rytilahti", "@zewelor", "@shenxn", "@starkillerOG"],
"config_flow": true,
"dependencies": ["network"],
diff --git a/homeassistant/package_constraints.txt b/homeassistant/package_constraints.txt
index a5f5104c0ab..546a56aa736 100644
--- a/homeassistant/package_constraints.txt
+++ b/homeassistant/package_constraints.txt
@@ -4,7 +4,7 @@ aiodiscover==1.4.2
aiohttp==3.7.4.post0
aiohttp_cors==0.7.0
astral==2.2
-async-upnp-client==0.20.0
+async-upnp-client==0.21.2
async_timeout==3.0.1
attrs==21.2.0
awesomeversion==21.8.1
diff --git a/requirements_all.txt b/requirements_all.txt
index e3fb347059b..2d43b04240f 100644
--- a/requirements_all.txt
+++ b/requirements_all.txt
@@ -318,7 +318,7 @@ asterisk_mbox==0.5.0
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
-async-upnp-client==0.20.0
+async-upnp-client==0.21.2
# homeassistant.components.supla
asyncpysupla==0.0.5
diff --git a/requirements_test_all.txt b/requirements_test_all.txt
index 989f8fa5cfe..b6c37dac08a 100644
--- a/requirements_test_all.txt
+++ b/requirements_test_all.txt
@@ -209,7 +209,7 @@ arcam-fmj==0.7.0
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
-async-upnp-client==0.20.0
+async-upnp-client==0.21.2
# homeassistant.components.aurora
auroranoaa==0.0.2
diff --git a/tests/components/sonos/conftest.py b/tests/components/sonos/conftest.py
index 294e243901a..b1b3dd7be10 100644
--- a/tests/components/sonos/conftest.py
+++ b/tests/components/sonos/conftest.py
@@ -74,16 +74,28 @@ def soco_fixture(music_library, speaker_info, battery_info, alarm_clock):
yield mock_soco
+@pytest.fixture(autouse=True)
+async def silent_ssdp_scanner(hass):
+ """Start SSDP component and get Scanner, prevent actual SSDP traffic."""
+ with patch(
+ "homeassistant.components.ssdp.Scanner._async_start_ssdp_listeners"
+ ), patch("homeassistant.components.ssdp.Scanner._async_stop_ssdp_listeners"), patch(
+ "homeassistant.components.ssdp.Scanner.async_scan"
+ ):
+ yield
+
+
@pytest.fixture(name="discover", autouse=True)
def discover_fixture(soco):
"""Create a mock soco discover fixture."""
- def do_callback(hass, callback, *args, **kwargs):
- callback(
+ async def do_callback(hass, callback, *args, **kwargs):
+ await callback(
{
ssdp.ATTR_UPNP_UDN: soco.uid,
ssdp.ATTR_SSDP_LOCATION: f"http://{soco.ip_address}/",
- }
+ },
+ ssdp.SsdpChange.ALIVE,
)
return MagicMock()
diff --git a/tests/components/ssdp/conftest.py b/tests/components/ssdp/conftest.py
new file mode 100644
index 00000000000..0b390ae469b
--- /dev/null
+++ b/tests/components/ssdp/conftest.py
@@ -0,0 +1,25 @@
+"""Configuration for SSDP tests."""
+from unittest.mock import AsyncMock, patch
+
+from async_upnp_client.ssdp_listener import SsdpListener
+import pytest
+
+
+@pytest.fixture(autouse=True)
+async def silent_ssdp_listener():
+ """Patch SsdpListener class, preventing any actual SSDP traffic."""
+ with patch("homeassistant.components.ssdp.SsdpListener.async_start"), patch(
+ "homeassistant.components.ssdp.SsdpListener.async_stop"
+ ), patch("homeassistant.components.ssdp.SsdpListener.async_search"):
+ # Fixtures are initialized before patches. When the component is started here,
+ # certain functions/methods might not be patched in time.
+ yield SsdpListener
+
+
+@pytest.fixture
+def mock_flow_init(hass):
+ """Mock hass.config_entries.flow.async_init."""
+ with patch.object(
+ hass.config_entries.flow, "async_init", return_value=AsyncMock()
+ ) as mock_init:
+ yield mock_init
diff --git a/tests/components/ssdp/test_init.py b/tests/components/ssdp/test_init.py
index f176ccff0f2..ef12d2b53f7 100644
--- a/tests/components/ssdp/test_init.py
+++ b/tests/components/ssdp/test_init.py
@@ -1,14 +1,14 @@
"""Test the SSDP integration."""
-import asyncio
-from datetime import timedelta
+from datetime import datetime, timedelta
from ipaddress import IPv4Address, IPv6Address
-from unittest.mock import patch
+from unittest.mock import ANY, AsyncMock, patch
-import aiohttp
-from async_upnp_client.search import SSDPListener
+from async_upnp_client.ssdp import udn_from_headers
+from async_upnp_client.ssdp_listener import SsdpListener
from async_upnp_client.utils import CaseInsensitiveDict
import pytest
+import homeassistant
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.const import (
@@ -16,127 +16,163 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
MATCH_ALL,
)
-from homeassistant.core import CoreState, callback
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
-from tests.common import async_fire_time_changed, mock_coro
+from tests.common import async_fire_time_changed
-def _patched_ssdp_listener(info, *args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
-
- async def _async_callback(*_):
- await listener.async_callback(info)
-
- @callback
- def _async_search(*_):
- # Prevent an actual scan.
- pass
-
- listener.async_start = _async_callback
- listener.async_search = _async_search
- return listener
+def _ssdp_headers(headers):
+ return CaseInsensitiveDict(
+ headers, _timestamp=datetime(2021, 1, 1, 12, 00), _udn=udn_from_headers(headers)
+ )
-async def _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp):
- def _generate_fake_ssdp_listener(*args, **kwargs):
- return _patched_ssdp_listener(
- mock_ssdp_response,
- *args,
- **kwargs,
- )
-
- with patch(
- "homeassistant.components.ssdp.async_get_ssdp",
- return_value=mock_get_ssdp,
- ), patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ), patch.object(
- hass.config_entries.flow, "async_init", return_value=mock_coro()
- ) as mock_init:
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- await hass.async_block_till_done()
- await hass.async_block_till_done()
-
- return mock_init
+async def init_ssdp_component(hass: homeassistant) -> SsdpListener:
+ """Initialize ssdp component and get SsdpListener."""
+ await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
+ await hass.async_block_till_done()
+ return hass.data[ssdp.DOMAIN]._ssdp_listeners[0]
-async def test_scan_match_st(hass, caplog, mock_get_source_ip):
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={"mock-domain": [{"st": "mock-st"}]},
+)
+@pytest.mark.usefixtures("mock_get_source_ip")
+async def test_ssdp_flow_dispatched_on_st(mock_get_ssdp, hass, caplog, mock_flow_init):
"""Test matching based on ST."""
- mock_ssdp_response = {
- "st": "mock-st",
- "location": None,
- "usn": "mock-usn",
- "server": "mock-server",
- "ext": "",
- }
- mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
+ mock_ssdp_search_response = _ssdp_headers(
+ {
+ "st": "mock-st",
+ "location": None,
+ "usn": "uuid:mock-udn::mock-st",
+ "server": "mock-server",
+ "ext": "",
+ }
+ )
+ ssdp_listener = await init_ssdp_component(hass)
+ await ssdp_listener._on_search(mock_ssdp_search_response)
+ await hass.async_block_till_done()
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
- assert len(mock_init.mock_calls) == 1
- assert mock_init.mock_calls[0][1][0] == "mock-domain"
- assert mock_init.mock_calls[0][2]["context"] == {
+ assert len(mock_flow_init.mock_calls) == 1
+ assert mock_flow_init.mock_calls[0][1][0] == "mock-domain"
+ assert mock_flow_init.mock_calls[0][2]["context"] == {
"source": config_entries.SOURCE_SSDP
}
- assert mock_init.mock_calls[0][2]["data"] == {
+ assert mock_flow_init.mock_calls[0][2]["data"] == {
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_LOCATION: None,
- ssdp.ATTR_SSDP_USN: "mock-usn",
+ ssdp.ATTR_SSDP_USN: "uuid:mock-udn::mock-st",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_UPNP_UDN: "uuid:mock-udn",
+ "_udn": ANY,
+ "_timestamp": ANY,
}
assert "Failed to fetch ssdp data" not in caplog.text
-async def test_partial_response(hass, caplog, mock_get_source_ip):
- """Test location and st missing."""
- mock_ssdp_response = {
- "usn": "mock-usn",
- "server": "mock-server",
- "ext": "",
- }
- mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
-
- assert len(mock_init.mock_calls) == 0
-
-
-@pytest.mark.parametrize(
- "key", (ssdp.ATTR_UPNP_MANUFACTURER, ssdp.ATTR_UPNP_DEVICE_TYPE)
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={"mock-domain": [{"manufacturer": "Paulus"}]},
)
-async def test_scan_match_upnp_devicedesc(
- hass, aioclient_mock, key, mock_get_source_ip
+async def test_scan_match_upnp_devicedesc_manufacturer(
+ mock_get_ssdp, hass, aioclient_mock, mock_flow_init
):
"""Test matching based on UPnP device description data."""
aioclient_mock.get(
"http://1.1.1.1",
- text=f"""
+ text="""
- <{key}>Paulus{key}>
+ Paulus
""",
)
- mock_get_ssdp = {"mock-domain": [{key: "Paulus"}]}
- mock_ssdp_response = {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- }
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
+ mock_ssdp_search_response = _ssdp_headers(
+ {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ "usn": "uuid:mock-udn::mock-st",
+ }
+ )
+ ssdp_listener = await init_ssdp_component(hass)
+ await ssdp_listener._on_search(mock_ssdp_search_response)
+ await hass.async_block_till_done()
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+
# If we get duplicate response, ensure we only look it up once
assert len(aioclient_mock.mock_calls) == 1
- assert len(mock_init.mock_calls) == 1
- assert mock_init.mock_calls[0][1][0] == "mock-domain"
- assert mock_init.mock_calls[0][2]["context"] == {
+ assert len(mock_flow_init.mock_calls) == 1
+ assert mock_flow_init.mock_calls[0][1][0] == "mock-domain"
+ assert mock_flow_init.mock_calls[0][2]["context"] == {
"source": config_entries.SOURCE_SSDP
}
-async def test_scan_not_all_present(hass, aioclient_mock, mock_get_source_ip):
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={"mock-domain": [{"deviceType": "Paulus"}]},
+)
+async def test_scan_match_upnp_devicedesc_devicetype(
+ mock_get_ssdp, hass, aioclient_mock, mock_flow_init
+):
+ """Test matching based on UPnP device description data."""
+ aioclient_mock.get(
+ "http://1.1.1.1",
+ text="""
+
+
+ Paulus
+
+
+ """,
+ )
+ mock_ssdp_search_response = _ssdp_headers(
+ {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ "usn": "uuid:mock-udn::mock-st",
+ }
+ )
+ ssdp_listener = await init_ssdp_component(hass)
+ await ssdp_listener._on_search(mock_ssdp_search_response)
+ await hass.async_block_till_done()
+
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+
+ # If we get duplicate response, ensure we only look it up once
+ assert len(aioclient_mock.mock_calls) == 1
+ assert len(mock_flow_init.mock_calls) == 1
+ assert mock_flow_init.mock_calls[0][1][0] == "mock-domain"
+ assert mock_flow_init.mock_calls[0][2]["context"] == {
+ "source": config_entries.SOURCE_SSDP
+ }
+
+
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={
+ "mock-domain": [
+ {
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
+ }
+ ]
+ },
+)
+async def test_scan_not_all_present(
+ mock_get_ssdp, hass, aioclient_mock, mock_flow_init
+):
"""Test match fails if some specified attributes are not present."""
aioclient_mock.get(
"http://1.1.1.1",
@@ -148,24 +184,34 @@ async def test_scan_not_all_present(hass, aioclient_mock, mock_get_source_ip):
""",
)
- mock_ssdp_response = {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- }
- mock_get_ssdp = {
+ mock_ssdp_search_response = _ssdp_headers(
+ {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ )
+ ssdp_listener = await init_ssdp_component(hass)
+ await ssdp_listener._on_search(mock_ssdp_search_response)
+ await hass.async_block_till_done()
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+
+ assert not mock_flow_init.mock_calls
+
+
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
+ ssdp.ATTR_UPNP_MANUFACTURER: "Not-Paulus",
}
]
- }
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
-
- assert not mock_init.mock_calls
-
-
-async def test_scan_not_all_match(hass, aioclient_mock, mock_get_source_ip):
+ },
+)
+async def test_scan_not_all_match(mock_get_ssdp, hass, aioclient_mock, mock_flow_init):
"""Test match fails if some specified attribute values differ."""
aioclient_mock.get(
"http://1.1.1.1",
@@ -178,191 +224,52 @@ async def test_scan_not_all_match(hass, aioclient_mock, mock_get_source_ip):
""",
)
- mock_ssdp_response = {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- }
- mock_get_ssdp = {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_UPNP_MANUFACTURER: "Not-Paulus",
- }
- ]
- }
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
-
- assert not mock_init.mock_calls
-
-
-@pytest.mark.parametrize("exc", [asyncio.TimeoutError, aiohttp.ClientError])
-async def test_scan_description_fetch_fail(
- hass, aioclient_mock, exc, mock_get_source_ip
-):
- """Test failing to fetch description."""
- aioclient_mock.get("http://1.1.1.1", exc=exc)
- mock_ssdp_response = {
- "st": "mock-st",
- "usn": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
- "location": "http://1.1.1.1",
- }
- mock_get_ssdp = {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
- }
- ]
- }
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
-
- assert not mock_init.mock_calls
-
- assert ssdp.async_get_discovery_info_by_st(hass, "mock-st") == [
+ mock_ssdp_search_response = _ssdp_headers(
{
- "UDN": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
- "ssdp_location": "http://1.1.1.1",
- "ssdp_st": "mock-st",
- "ssdp_usn": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ "usn": "uuid:mock-udn::mock-st",
}
- ]
-
-
-async def test_scan_description_parse_fail(hass, aioclient_mock, mock_get_source_ip):
- """Test invalid XML."""
- aioclient_mock.get(
- "http://1.1.1.1",
- text="""
-INVALIDXML
- """,
)
-
- mock_ssdp_response = {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- }
- mock_get_ssdp = {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
- }
- ]
- }
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
-
- assert not mock_init.mock_calls
-
-
-async def test_invalid_characters(hass, aioclient_mock, mock_get_source_ip):
- """Test that we replace bad characters with placeholders."""
- aioclient_mock.get(
- "http://1.1.1.1",
- text="""
-
-
- ABC
- \xff\xff\xff\xff
-
-
- """,
- )
-
- mock_ssdp_response = {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- }
- mock_get_ssdp = {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
- }
- ]
- }
-
- mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
-
- assert len(mock_init.mock_calls) == 1
- assert mock_init.mock_calls[0][1][0] == "mock-domain"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert mock_init.mock_calls[0][2]["data"] == {
- "ssdp_location": "http://1.1.1.1",
- "ssdp_st": "mock-st",
- "deviceType": "ABC",
- "serialNumber": "ÿÿÿÿ",
- }
-
-
-@patch("homeassistant.components.ssdp.SSDPListener.async_start")
-@patch("homeassistant.components.ssdp.SSDPListener.async_search")
-@patch("homeassistant.components.ssdp.SSDPListener.async_stop")
-async def test_start_stop_scanner(
- async_stop_mock, async_search_mock, async_start_mock, hass, mock_get_source_ip
-):
- """Test we start and stop the scanner."""
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
-
+ ssdp_listener = await init_ssdp_component(hass)
+ await ssdp_listener._on_search(mock_ssdp_search_response)
+ await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
+
+ assert not mock_flow_init.mock_calls
+
+
+@patch( # XXX TODO: Isn't this duplicate with mock_get_source_ip?
+ "homeassistant.components.ssdp.Scanner._async_build_source_set",
+ return_value={IPv4Address("192.168.1.1")},
+)
+@pytest.mark.usefixtures("mock_get_source_ip")
+async def test_start_stop_scanner(mock_source_set, hass):
+ """Test we start and stop the scanner."""
+ ssdp_listener = await init_ssdp_component(hass)
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
- assert async_start_mock.call_count == 1
- # Next is 2, as async_upnp_client triggers 1 SSDPListener._async_on_connect
- assert async_search_mock.call_count == 2
- assert async_stop_mock.call_count == 0
+ assert ssdp_listener.async_start.call_count == 1
+ assert ssdp_listener.async_search.call_count == 4
+ assert ssdp_listener.async_stop.call_count == 0
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
- assert async_start_mock.call_count == 1
- assert async_search_mock.call_count == 2
- assert async_stop_mock.call_count == 1
-
-
-async def test_unexpected_exception_while_fetching(
- hass, aioclient_mock, caplog, mock_get_source_ip
-):
- """Test unexpected exception while fetching."""
- aioclient_mock.get(
- "http://1.1.1.1",
- text="""
-
-
- ABC
- \xff\xff\xff\xff
-
-
- """,
- )
- mock_ssdp_response = {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- }
- mock_get_ssdp = {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
- }
- ]
- }
-
- with patch(
- "homeassistant.components.ssdp.descriptions.ElementTree.fromstring",
- side_effect=ValueError,
- ):
- mock_init = await _async_run_mocked_scan(
- hass, mock_ssdp_response, mock_get_ssdp
- )
-
- assert len(mock_init.mock_calls) == 0
- assert "Failed to fetch ssdp data from: http://1.1.1.1" in caplog.text
+ assert ssdp_listener.async_start.call_count == 1
+ assert ssdp_listener.async_search.call_count == 4
+ assert ssdp_listener.async_stop.call_count == 1
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch("homeassistant.components.ssdp.async_get_ssdp", return_value={})
async def test_scan_with_registered_callback(
- hass, aioclient_mock, caplog, mock_get_source_ip
+ mock_get_ssdp, hass, aioclient_mock, caplog
):
"""Test matching based on callback."""
aioclient_mock.get(
@@ -375,221 +282,86 @@ async def test_scan_with_registered_callback(
""",
)
- mock_ssdp_response = {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- "usn": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
- "server": "mock-server",
- "x-rincon-bootseq": "55",
- "ext": "",
- }
- not_matching_integration_callbacks = []
- integration_match_all_callbacks = []
- integration_match_all_not_present_callbacks = []
- integration_callbacks = []
- integration_callbacks_from_cache = []
- match_any_callbacks = []
+ mock_ssdp_search_response = _ssdp_headers(
+ {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ "usn": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::mock-st",
+ "server": "mock-server",
+ "x-rincon-bootseq": "55",
+ "ext": "",
+ }
+ )
+ ssdp_listener = await init_ssdp_component(hass)
- @callback
- def _async_exception_callbacks(info):
- raise ValueError
+ async_exception_callback = AsyncMock(side_effect=ValueError)
+ await ssdp.async_register_callback(hass, async_exception_callback, {})
- @callback
- def _async_integration_callbacks(info):
- integration_callbacks.append(info)
+ async_integration_callback = AsyncMock()
+ await ssdp.async_register_callback(
+ hass, async_integration_callback, {"st": "mock-st"}
+ )
- @callback
- def _async_integration_match_all_callbacks(info):
- integration_match_all_callbacks.append(info)
+ async_integration_match_all_callback1 = AsyncMock()
+ await ssdp.async_register_callback(
+ hass, async_integration_match_all_callback1, {"x-rincon-bootseq": MATCH_ALL}
+ )
- @callback
- def _async_integration_match_all_not_present_callbacks(info):
- integration_match_all_not_present_callbacks.append(info)
+ async_integration_match_all_not_present_callback1 = AsyncMock()
+ await ssdp.async_register_callback(
+ hass,
+ async_integration_match_all_not_present_callback1,
+ {"x-not-there": MATCH_ALL},
+ )
- @callback
- def _async_integration_callbacks_from_cache(info):
- integration_callbacks_from_cache.append(info)
+ async_not_matching_integration_callback1 = AsyncMock()
+ await ssdp.async_register_callback(
+ hass, async_not_matching_integration_callback1, {"st": "not-match-mock-st"}
+ )
- @callback
- def _async_not_matching_integration_callbacks(info):
- not_matching_integration_callbacks.append(info)
+ async_match_any_callback1 = AsyncMock()
+ await ssdp.async_register_callback(hass, async_match_any_callback1)
- @callback
- def _async_match_any_callbacks(info):
- match_any_callbacks.append(info)
+ await hass.async_block_till_done()
+ await ssdp_listener._on_search(mock_ssdp_search_response)
- def _generate_fake_ssdp_listener(*args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
-
- async def _async_callback(*_):
- await listener.async_callback(mock_ssdp_response)
-
- @callback
- def _callback(*_):
- hass.async_create_task(listener.async_callback(mock_ssdp_response))
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ):
- hass.state = CoreState.stopped
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- ssdp.async_register_callback(hass, _async_exception_callbacks, {})
- ssdp.async_register_callback(
- hass,
- _async_integration_callbacks,
- {"st": "mock-st"},
- )
- ssdp.async_register_callback(
- hass,
- _async_integration_match_all_callbacks,
- {"x-rincon-bootseq": MATCH_ALL},
- )
- ssdp.async_register_callback(
- hass,
- _async_integration_match_all_not_present_callbacks,
- {"x-not-there": MATCH_ALL},
- )
- ssdp.async_register_callback(
- hass,
- _async_not_matching_integration_callbacks,
- {"st": "not-match-mock-st"},
- )
- ssdp.async_register_callback(
- hass,
- _async_match_any_callbacks,
- )
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- ssdp.async_register_callback(
- hass,
- _async_integration_callbacks_from_cache,
- {"st": "mock-st"},
- )
- await hass.async_block_till_done()
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- hass.state = CoreState.running
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- assert hass.state == CoreState.running
-
- assert len(integration_callbacks) == 5
- assert len(integration_callbacks_from_cache) == 5
- assert len(integration_match_all_callbacks) == 5
- assert len(integration_match_all_not_present_callbacks) == 0
- assert len(match_any_callbacks) == 5
- assert len(not_matching_integration_callbacks) == 0
- assert integration_callbacks[0] == {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_SSDP_EXT: "",
- ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
- ssdp.ATTR_SSDP_SERVER: "mock-server",
- ssdp.ATTR_SSDP_ST: "mock-st",
- ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
- ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
- "x-rincon-bootseq": "55",
- }
+ assert async_integration_callback.call_count == 1
+ assert async_integration_match_all_callback1.call_count == 1
+ assert async_integration_match_all_not_present_callback1.call_count == 0
+ assert async_match_any_callback1.call_count == 1
+ assert async_not_matching_integration_callback1.call_count == 0
+ assert async_integration_callback.call_args[0] == (
+ {
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
+ ssdp.ATTR_SSDP_SERVER: "mock-server",
+ ssdp.ATTR_SSDP_ST: "mock-st",
+ ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::mock-st",
+ ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
+ "x-rincon-bootseq": "55",
+ "_udn": ANY,
+ "_timestamp": ANY,
+ },
+ ssdp.SsdpChange.ALIVE,
+ )
assert "Failed to callback info" in caplog.text
-
-async def test_unsolicited_ssdp_registered_callback(
- hass, aioclient_mock, caplog, mock_get_source_ip
-):
- """Test matching based on callback can handle unsolicited ssdp traffic without st."""
- aioclient_mock.get(
- "http://10.6.9.12:1400/xml/device_description.xml",
- text="""
-
-
- Paulus
-
-
- """,
+ async_integration_callback_from_cache = AsyncMock()
+ await ssdp.async_register_callback(
+ hass, async_integration_callback_from_cache, {"st": "mock-st"}
)
- mock_ssdp_response = {
- "location": "http://10.6.9.12:1400/xml/device_description.xml",
- "nt": "uuid:RINCON_1111BB963FD801400",
- "nts": "ssdp:alive",
- "server": "Linux UPnP/1.0 Sonos/63.2-88230 (ZPS12)",
- "usn": "uuid:RINCON_1111BB963FD801400",
- "x-rincon-household": "Sonos_dfjfkdghjhkjfhkdjfhkd",
- "x-rincon-bootseq": "250",
- "bootid.upnp.org": "250",
- "x-rincon-wifimode": "0",
- "x-rincon-variant": "1",
- "household.smartspeaker.audio": "Sonos_v3294823948542543534",
- }
- integration_callbacks = []
- @callback
- def _async_integration_callbacks(info):
- integration_callbacks.append(info)
-
- def _generate_fake_ssdp_listener(*args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
-
- async def _async_callback(*_):
- await listener.async_callback(mock_ssdp_response)
-
- @callback
- def _callback(*_):
- hass.async_create_task(listener.async_callback(mock_ssdp_response))
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ):
- hass.state = CoreState.stopped
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- ssdp.async_register_callback(
- hass,
- _async_integration_callbacks,
- {"nts": "ssdp:alive", "x-rincon-bootseq": MATCH_ALL},
- )
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- hass.state = CoreState.running
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- assert hass.state == CoreState.running
-
- assert (
- len(integration_callbacks) == 4
- ) # unsolicited callbacks without st are not cached
- assert integration_callbacks[0] == {
- "UDN": "uuid:RINCON_1111BB963FD801400",
- "bootid.upnp.org": "250",
- "deviceType": "Paulus",
- "household.smartspeaker.audio": "Sonos_v3294823948542543534",
- "nt": "uuid:RINCON_1111BB963FD801400",
- "nts": "ssdp:alive",
- "ssdp_location": "http://10.6.9.12:1400/xml/device_description.xml",
- "ssdp_server": "Linux UPnP/1.0 Sonos/63.2-88230 (ZPS12)",
- "ssdp_usn": "uuid:RINCON_1111BB963FD801400",
- "x-rincon-bootseq": "250",
- "x-rincon-household": "Sonos_dfjfkdghjhkjfhkdjfhkd",
- "x-rincon-variant": "1",
- "x-rincon-wifimode": "0",
- }
- assert "Failed to callback info" not in caplog.text
+ assert async_integration_callback_from_cache.call_count == 1
-async def test_scan_second_hit(hass, aioclient_mock, caplog, mock_get_source_ip):
- """Test matching on second scan."""
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={"mock-domain": [{"st": "mock-st"}]},
+)
+async def test_getting_existing_headers(mock_get_ssdp, hass, aioclient_mock):
+ """Test getting existing/previously scanned headers."""
aioclient_mock.get(
"http://1.1.1.1",
text="""
@@ -600,9 +372,8 @@ async def test_scan_second_hit(hass, aioclient_mock, caplog, mock_get_source_ip)
""",
)
-
- mock_ssdp_response = CaseInsensitiveDict(
- **{
+ mock_ssdp_search_response = _ssdp_headers(
+ {
"ST": "mock-st",
"LOCATION": "http://1.1.1.1",
"USN": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
@@ -610,121 +381,59 @@ async def test_scan_second_hit(hass, aioclient_mock, caplog, mock_get_source_ip)
"EXT": "",
}
)
- mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
- integration_callbacks = []
+ ssdp_listener = await init_ssdp_component(hass)
+ await ssdp_listener._on_search(mock_ssdp_search_response)
- @callback
- def _async_integration_callbacks(info):
- integration_callbacks.append(info)
+ discovery_info_by_st = await ssdp.async_get_discovery_info_by_st(hass, "mock-st")
+ assert discovery_info_by_st == [
+ {
+ ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
+ ssdp.ATTR_SSDP_SERVER: "mock-server",
+ ssdp.ATTR_SSDP_ST: "mock-st",
+ ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ "_udn": ANY,
+ "_timestamp": ANY,
+ }
+ ]
- def _generate_fake_ssdp_listener(*args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
-
- async def _async_callback(*_):
- pass
-
- @callback
- def _callback(*_):
- hass.async_create_task(listener.async_callback(mock_ssdp_response))
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.async_get_ssdp",
- return_value=mock_get_ssdp,
- ), patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ), patch.object(
- hass.config_entries.flow, "async_init", return_value=mock_coro()
- ) as mock_init:
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- remove = ssdp.async_register_callback(
- hass,
- _async_integration_callbacks,
- {"st": "mock-st"},
- )
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- remove()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
-
- assert len(integration_callbacks) == 4
- assert integration_callbacks[0] == {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_SSDP_EXT: "",
- ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
- ssdp.ATTR_SSDP_SERVER: "mock-server",
- ssdp.ATTR_SSDP_ST: "mock-st",
- ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
- ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
- }
- assert len(mock_init.mock_calls) == 1
- assert mock_init.mock_calls[0][1][0] == "mock-domain"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert mock_init.mock_calls[0][2]["data"] == {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_SSDP_ST: "mock-st",
- ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
- ssdp.ATTR_SSDP_SERVER: "mock-server",
- ssdp.ATTR_SSDP_EXT: "",
- ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
- ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
- }
- assert "Failed to fetch ssdp data" not in caplog.text
- udn_discovery_info = ssdp.async_get_discovery_info_by_st(hass, "mock-st")
- discovery_info = udn_discovery_info[0]
- assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
- assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
- assert (
- discovery_info[ssdp.ATTR_UPNP_UDN]
- == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
- )
- assert (
- discovery_info[ssdp.ATTR_SSDP_USN]
- == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
- )
-
- st_discovery_info = ssdp.async_get_discovery_info_by_udn(
+ discovery_info_by_udn = await ssdp.async_get_discovery_info_by_udn(
hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
- discovery_info = st_discovery_info[0]
- assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
- assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
- assert (
- discovery_info[ssdp.ATTR_UPNP_UDN]
- == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
- )
- assert (
- discovery_info[ssdp.ATTR_SSDP_USN]
- == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
- )
+ assert discovery_info_by_udn == [
+ {
+ ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
+ ssdp.ATTR_SSDP_SERVER: "mock-server",
+ ssdp.ATTR_SSDP_ST: "mock-st",
+ ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ "_udn": ANY,
+ "_timestamp": ANY,
+ }
+ ]
- discovery_info = ssdp.async_get_discovery_info_by_udn_st(
+ discovery_info_by_udn_st = await ssdp.async_get_discovery_info_by_udn_st(
hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", "mock-st"
)
- assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
- assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
- assert (
- discovery_info[ssdp.ATTR_UPNP_UDN]
- == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
- )
- assert (
- discovery_info[ssdp.ATTR_SSDP_USN]
- == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
- )
+ assert discovery_info_by_udn_st == {
+ ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
+ ssdp.ATTR_SSDP_SERVER: "mock-server",
+ ssdp.ATTR_SSDP_ST: "mock-st",
+ ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ "_udn": ANY,
+ "_timestamp": ANY,
+ }
- assert ssdp.async_get_discovery_info_by_udn_st(hass, "wrong", "mock-st") is None
+ assert (
+ await ssdp.async_get_discovery_info_by_udn_st(hass, "wrong", "mock-st") is None
+ )
_ADAPTERS_WITH_MANUAL_CONFIG = [
@@ -762,410 +471,99 @@ _ADAPTERS_WITH_MANUAL_CONFIG = [
]
-async def test_async_detect_interfaces_setting_empty_route(hass, mock_get_source_ip):
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={
+ "mock-domain": [
+ {
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
+ }
+ ]
+ },
+)
+@patch(
+ "homeassistant.components.ssdp.network.async_get_adapters",
+ return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
+) # XXX TODO: Isn't this duplicate with mock_get_source_ip?
+async def test_async_detect_interfaces_setting_empty_route(
+ mock_get_adapters, mock_get_ssdp, hass
+):
"""Test without default interface config and the route returns nothing."""
- mock_get_ssdp = {
+ await init_ssdp_component(hass)
+
+ ssdp_listeners = hass.data[ssdp.DOMAIN]._ssdp_listeners
+ source_ips = {ssdp_listener.source_ip for ssdp_listener in ssdp_listeners}
+ assert source_ips == {IPv6Address("2001:db8::"), IPv4Address("192.168.1.5")}
+
+
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
]
- }
- create_args = []
-
- def _generate_fake_ssdp_listener(*args, **kwargs):
- create_args.append([args, kwargs])
- listener = SSDPListener(*args, **kwargs)
-
- async def _async_callback(*_):
- pass
-
- @callback
- def _callback(*_):
- pass
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.async_get_ssdp",
- return_value=mock_get_ssdp,
- ), patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ), patch(
- "homeassistant.components.ssdp.network.async_get_adapters",
- return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
- ):
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- await hass.async_block_till_done()
-
- argset = set()
- for argmap in create_args:
- argset.add((argmap[1].get("source_ip"), argmap[1].get("target_ip")))
-
- assert argset == {
- (IPv6Address("2001:db8::"), None),
- (IPv4Address("192.168.1.5"), None),
- }
-
-
-async def test_bind_failure_skips_adapter(hass, caplog, mock_get_source_ip):
+ },
+)
+@patch(
+ "homeassistant.components.ssdp.network.async_get_adapters",
+ return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
+) # XXX TODO: Isn't this duplicate with mock_get_source_ip?
+async def test_bind_failure_skips_adapter(
+ mock_get_adapters, mock_get_ssdp, hass, caplog
+):
"""Test that an adapter with a bind failure is skipped."""
- mock_get_ssdp = {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
- }
- ]
- }
- create_args = []
- search_args = []
- @callback
- def _callback(*args):
- nonlocal search_args
- search_args.append(args)
- pass
+ async def _async_start(self):
+ if self.source_ip == IPv6Address("2001:db8::"):
+ raise OSError
- def _generate_failing_ssdp_listener(*args, **kwargs):
- create_args.append([args, kwargs])
- listener = SSDPListener(*args, **kwargs)
+ SsdpListener.async_start = _async_start
+ await init_ssdp_component(hass)
- async def _async_callback(*_):
- if kwargs["source_ip"] == IPv6Address("2001:db8::"):
- raise OSError
- pass
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.async_get_ssdp",
- return_value=mock_get_ssdp,
- ), patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_failing_ssdp_listener,
- ), patch(
- "homeassistant.components.ssdp.network.async_get_adapters",
- return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
- ):
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- await hass.async_block_till_done()
-
- argset = set()
- for argmap in create_args:
- argset.add((argmap[1].get("source_ip"), argmap[1].get("target_ip")))
-
- assert argset == {
- (IPv6Address("2001:db8::"), None),
- (IPv4Address("192.168.1.5"), None),
- }
assert "Failed to setup listener for" in caplog.text
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- assert set(search_args) == {
- (),
- (
- (
- "255.255.255.255",
- 1900,
- ),
- ),
- }
+ ssdp_listeners = hass.data[ssdp.DOMAIN]._ssdp_listeners
+ source_ips = {ssdp_listener.source_ip for ssdp_listener in ssdp_listeners}
+ assert source_ips == {
+ IPv4Address("192.168.1.5")
+ } # Note no SsdpListener for IPv6 address.
-async def test_ipv4_does_additional_search_for_sonos(hass, caplog, mock_get_source_ip):
- """Test that only ipv4 does an additional search for Sonos."""
- mock_get_ssdp = {
+@pytest.mark.usefixtures("mock_get_source_ip")
+@patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value={
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
]
- }
- search_args = []
+ },
+)
+@patch(
+ "homeassistant.components.ssdp.network.async_get_adapters",
+ return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
+) # XXX TODO: Isn't this duplicate with mock_get_source_ip?
+async def test_ipv4_does_additional_search_for_sonos(
+ mock_get_adapters, mock_get_ssdp, hass
+):
+ """Test that only ipv4 does an additional search for Sonos."""
+ ssdp_listener = await init_ssdp_component(hass)
- def _generate_fake_ssdp_listener(*args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+ async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
+ await hass.async_block_till_done()
- async def _async_callback(*_):
- pass
-
- @callback
- def _callback(*args):
- nonlocal search_args
- search_args.append(args)
- pass
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.async_get_ssdp",
- return_value=mock_get_ssdp,
- ), patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ), patch(
- "homeassistant.components.ssdp.network.async_get_adapters",
- return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
- ):
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- await hass.async_block_till_done()
-
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
-
- assert set(search_args) == {
- (),
+ assert ssdp_listener.async_search.call_count == 6
+ assert ssdp_listener.async_search.call_args[0] == (
(
- (
- "255.255.255.255",
- 1900,
- ),
+ "255.255.255.255",
+ 1900,
),
- }
-
-
-async def test_location_change_evicts_prior_location_from_cache(
- hass, aioclient_mock, mock_get_source_ip
-):
- """Test that a location change for a UDN will evict the prior location from the cache."""
- mock_get_ssdp = {
- "hue": [{"manufacturer": "Signify", "modelName": "Philips hue bridge 2015"}]
- }
-
- hue_response = """
-
-
-1
-0
-
-http://{ip_address}:80/
-
-urn:schemas-upnp-org:device:Basic:1
-Philips hue ({ip_address})
-Signify
-http://www.philips-hue.com
-Philips hue Personal Wireless Lighting
-Philips hue bridge 2015
-BSB002
-http://www.philips-hue.com
-001788a36abf
-uuid:2f402f80-da50-11e1-9b23-001788a36abf
-
-
- """
-
- aioclient_mock.get(
- "http://192.168.212.23/description.xml",
- text=hue_response.format(ip_address="192.168.212.23"),
)
- aioclient_mock.get(
- "http://169.254.8.155/description.xml",
- text=hue_response.format(ip_address="169.254.8.155"),
- )
- ssdp_response_without_location = {
- "ST": "uuid:2f402f80-da50-11e1-9b23-001788a36abf",
- "_udn": "uuid:2f402f80-da50-11e1-9b23-001788a36abf",
- "USN": "uuid:2f402f80-da50-11e1-9b23-001788a36abf",
- "SERVER": "Hue/1.0 UPnP/1.0 IpBridge/1.44.0",
- "hue-bridgeid": "001788FFFEA36ABF",
- "EXT": "",
- }
-
- mock_good_ip_ssdp_response = CaseInsensitiveDict(
- **ssdp_response_without_location,
- **{"LOCATION": "http://192.168.212.23/description.xml"},
- )
- mock_link_local_ip_ssdp_response = CaseInsensitiveDict(
- **ssdp_response_without_location,
- **{"LOCATION": "http://169.254.8.155/description.xml"},
- )
- mock_ssdp_response = mock_good_ip_ssdp_response
-
- def _generate_fake_ssdp_listener(*args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
-
- async def _async_callback(*_):
- pass
-
- @callback
- def _callback(*_):
- hass.async_create_task(listener.async_callback(mock_ssdp_response))
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.async_get_ssdp",
- return_value=mock_get_ssdp,
- ), patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ), patch.object(
- hass.config_entries.flow, "async_init"
- ) as mock_init:
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- assert len(mock_init.mock_calls) == 1
- assert mock_init.mock_calls[0][1][0] == "hue"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert (
- mock_init.mock_calls[0][2]["data"][ssdp.ATTR_SSDP_LOCATION]
- == mock_good_ip_ssdp_response["location"]
- )
-
- mock_init.reset_mock()
- mock_ssdp_response = mock_link_local_ip_ssdp_response
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=400))
- await hass.async_block_till_done()
- assert mock_init.mock_calls[0][1][0] == "hue"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert (
- mock_init.mock_calls[0][2]["data"][ssdp.ATTR_SSDP_LOCATION]
- == mock_link_local_ip_ssdp_response["location"]
- )
-
- mock_init.reset_mock()
- mock_ssdp_response = mock_good_ip_ssdp_response
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=600))
- await hass.async_block_till_done()
- assert mock_init.mock_calls[0][1][0] == "hue"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert (
- mock_init.mock_calls[0][2]["data"][ssdp.ATTR_SSDP_LOCATION]
- == mock_good_ip_ssdp_response["location"]
- )
-
-
-async def test_location_change_with_overlapping_udn_st_combinations(
- hass, aioclient_mock
-):
- """Test handling when a UDN and ST broadcast multiple locations."""
- mock_get_ssdp = {
- "test_integration": [
- {"manufacturer": "test_manufacturer", "modelName": "test_model"}
- ]
- }
-
- hue_response = """
-
-
-test_manufacturer
-test_model
-
-
- """
-
- aioclient_mock.get(
- "http://192.168.72.1:49154/wps_device.xml",
- text=hue_response.format(ip_address="192.168.72.1"),
- )
- aioclient_mock.get(
- "http://192.168.72.1:49152/wps_device.xml",
- text=hue_response.format(ip_address="192.168.72.1"),
- )
- ssdp_response_without_location = {
- "ST": "upnp:rootdevice",
- "_udn": "uuid:a793d3cc-e802-44fb-84f4-5a30f33115b6",
- "USN": "uuid:a793d3cc-e802-44fb-84f4-5a30f33115b6::upnp:rootdevice",
- "EXT": "",
- }
-
- port_49154_response = CaseInsensitiveDict(
- **ssdp_response_without_location,
- **{"LOCATION": "http://192.168.72.1:49154/wps_device.xml"},
- )
- port_49152_response = CaseInsensitiveDict(
- **ssdp_response_without_location,
- **{"LOCATION": "http://192.168.72.1:49152/wps_device.xml"},
- )
- mock_ssdp_response = port_49154_response
-
- def _generate_fake_ssdp_listener(*args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
-
- async def _async_callback(*_):
- pass
-
- @callback
- def _callback(*_):
- hass.async_create_task(listener.async_callback(mock_ssdp_response))
-
- listener.async_start = _async_callback
- listener.async_search = _callback
- return listener
-
- with patch(
- "homeassistant.components.ssdp.async_get_ssdp",
- return_value=mock_get_ssdp,
- ), patch(
- "homeassistant.components.ssdp.SSDPListener",
- new=_generate_fake_ssdp_listener,
- ), patch.object(
- hass.config_entries.flow, "async_init"
- ) as mock_init:
- assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
- await hass.async_block_till_done()
- hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
- await hass.async_block_till_done()
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
- await hass.async_block_till_done()
- assert len(mock_init.mock_calls) == 1
- assert mock_init.mock_calls[0][1][0] == "test_integration"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert (
- mock_init.mock_calls[0][2]["data"][ssdp.ATTR_SSDP_LOCATION]
- == port_49154_response["location"]
- )
-
- mock_init.reset_mock()
- mock_ssdp_response = port_49152_response
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=400))
- await hass.async_block_till_done()
- assert mock_init.mock_calls[0][1][0] == "test_integration"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert (
- mock_init.mock_calls[0][2]["data"][ssdp.ATTR_SSDP_LOCATION]
- == port_49152_response["location"]
- )
-
- mock_init.reset_mock()
- mock_ssdp_response = port_49154_response
- async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=600))
- await hass.async_block_till_done()
- assert mock_init.mock_calls[0][1][0] == "test_integration"
- assert mock_init.mock_calls[0][2]["context"] == {
- "source": config_entries.SOURCE_SSDP
- }
- assert (
- mock_init.mock_calls[0][2]["data"][ssdp.ATTR_SSDP_LOCATION]
- == port_49154_response["location"]
- )
+ assert ssdp_listener.async_search.call_args[1] == {}
diff --git a/tests/components/upnp/__init__.py b/tests/components/upnp/__init__.py
index 4fcc4167e5b..54ceff6eb1d 100644
--- a/tests/components/upnp/__init__.py
+++ b/tests/components/upnp/__init__.py
@@ -1 +1 @@
-"""Tests for the IGD component."""
+"""Tests for the upnp component."""
diff --git a/tests/components/upnp/common.py b/tests/components/upnp/common.py
deleted file mode 100644
index 4dd0fd4083d..00000000000
--- a/tests/components/upnp/common.py
+++ /dev/null
@@ -1,23 +0,0 @@
-"""Common for upnp."""
-
-from urllib.parse import urlparse
-
-from homeassistant.components import ssdp
-
-TEST_UDN = "uuid:device"
-TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
-TEST_USN = f"{TEST_UDN}::{TEST_ST}"
-TEST_LOCATION = "http://192.168.1.1/desc.xml"
-TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
-TEST_FRIENDLY_NAME = "friendly name"
-TEST_DISCOVERY = {
- ssdp.ATTR_SSDP_LOCATION: TEST_LOCATION,
- ssdp.ATTR_SSDP_ST: TEST_ST,
- ssdp.ATTR_SSDP_USN: TEST_USN,
- ssdp.ATTR_UPNP_UDN: TEST_UDN,
- "usn": TEST_USN,
- "location": TEST_LOCATION,
- "_host": TEST_HOSTNAME,
- "_udn": TEST_UDN,
- "friendlyName": TEST_FRIENDLY_NAME,
-}
diff --git a/tests/components/upnp/conftest.py b/tests/components/upnp/conftest.py
new file mode 100644
index 00000000000..5af99e9ac2d
--- /dev/null
+++ b/tests/components/upnp/conftest.py
@@ -0,0 +1,187 @@
+"""Configuration for SSDP tests."""
+from typing import Any, Mapping
+from unittest.mock import AsyncMock, MagicMock, patch
+from urllib.parse import urlparse
+
+import pytest
+
+from homeassistant.components import ssdp
+from homeassistant.components.upnp.const import (
+ BYTES_RECEIVED,
+ BYTES_SENT,
+ PACKETS_RECEIVED,
+ PACKETS_SENT,
+ ROUTER_IP,
+ ROUTER_UPTIME,
+ TIMESTAMP,
+ WAN_STATUS,
+)
+from homeassistant.core import HomeAssistant
+from homeassistant.util import dt
+
+TEST_UDN = "uuid:device"
+TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
+TEST_USN = f"{TEST_UDN}::{TEST_ST}"
+TEST_LOCATION = "http://192.168.1.1/desc.xml"
+TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
+TEST_FRIENDLY_NAME = "friendly name"
+TEST_DISCOVERY = {
+ ssdp.ATTR_SSDP_LOCATION: TEST_LOCATION,
+ ssdp.ATTR_SSDP_ST: TEST_ST,
+ ssdp.ATTR_SSDP_USN: TEST_USN,
+ ssdp.ATTR_UPNP_UDN: TEST_UDN,
+ "usn": TEST_USN,
+ "location": TEST_LOCATION,
+ "_host": TEST_HOSTNAME,
+ "_udn": TEST_UDN,
+ "friendlyName": TEST_FRIENDLY_NAME,
+}
+
+
+class MockDevice:
+ """Mock device for Device."""
+
+ def __init__(self, hass: HomeAssistant, udn: str) -> None:
+ """Initialize mock device."""
+ self.hass = hass
+ self._udn = udn
+ self.traffic_times_polled = 0
+ self.status_times_polled = 0
+
+ @classmethod
+ async def async_create_device(cls, hass, ssdp_location) -> "MockDevice":
+ """Return self."""
+ return cls(hass, TEST_UDN)
+
+ async def async_ssdp_callback(
+ self, headers: Mapping[str, Any], change: ssdp.SsdpChange
+ ) -> None:
+ """SSDP callback, update if needed."""
+ pass
+
+ @property
+ def udn(self) -> str:
+ """Get the UDN."""
+ return self._udn
+
+ @property
+ def manufacturer(self) -> str:
+ """Get manufacturer."""
+ return "mock-manufacturer"
+
+ @property
+ def name(self) -> str:
+ """Get name."""
+ return "mock-name"
+
+ @property
+ def model_name(self) -> str:
+ """Get the model name."""
+ return "mock-model-name"
+
+ @property
+ def device_type(self) -> str:
+ """Get the device type."""
+ return "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
+
+ @property
+ def usn(self) -> str:
+ """Get the USN."""
+ return f"{self.udn}::{self.device_type}"
+
+ @property
+ def unique_id(self) -> str:
+ """Get the unique id."""
+ return self.usn
+
+ @property
+ def hostname(self) -> str:
+ """Get the hostname."""
+ return "mock-hostname"
+
+ async def async_get_traffic_data(self) -> Mapping[str, Any]:
+ """Get traffic data."""
+ self.traffic_times_polled += 1
+ return {
+ TIMESTAMP: dt.utcnow(),
+ BYTES_RECEIVED: 0,
+ BYTES_SENT: 0,
+ PACKETS_RECEIVED: 0,
+ PACKETS_SENT: 0,
+ }
+
+ async def async_get_status(self) -> Mapping[str, Any]:
+ """Get connection status, uptime, and external IP."""
+ self.status_times_polled += 1
+ return {
+ WAN_STATUS: "Connected",
+ ROUTER_UPTIME: 0,
+ ROUTER_IP: "192.168.0.1",
+ }
+
+
+@pytest.fixture(autouse=True)
+def mock_upnp_device():
+ """Mock homeassistant.components.upnp.Device."""
+ with patch(
+ "homeassistant.components.upnp.Device", new=MockDevice
+ ) as mock_async_create_device:
+ yield mock_async_create_device
+
+
+@pytest.fixture
+def mock_setup_entry():
+ """Mock async_setup_entry."""
+ with patch(
+ "homeassistant.components.upnp.async_setup_entry",
+ return_value=AsyncMock(True),
+ ) as mock_setup:
+ yield mock_setup
+
+
+@pytest.fixture(autouse=True)
+async def silent_ssdp_scanner(hass):
+ """Start SSDP component and get Scanner, prevent actual SSDP traffic."""
+ with patch(
+ "homeassistant.components.ssdp.Scanner._async_start_ssdp_listeners"
+ ), patch("homeassistant.components.ssdp.Scanner._async_stop_ssdp_listeners"), patch(
+ "homeassistant.components.ssdp.Scanner.async_scan"
+ ):
+ yield
+
+
+@pytest.fixture
+async def ssdp_instant_discovery():
+ """Instance discovery."""
+ # Set up device discovery callback.
+ async def register_callback(hass, callback, match_dict):
+ """Immediately do callback."""
+ await callback(TEST_DISCOVERY, ssdp.SsdpChange.ALIVE)
+ return MagicMock()
+
+ with patch(
+ "homeassistant.components.ssdp.async_register_callback",
+ side_effect=register_callback,
+ ) as mock_register, patch(
+ "homeassistant.components.ssdp.async_get_discovery_info_by_st",
+ return_value=[TEST_DISCOVERY],
+ ) as mock_get_info:
+ yield (mock_register, mock_get_info)
+
+
+@pytest.fixture
+async def ssdp_no_discovery():
+ """No discovery."""
+ # Set up device discovery callback.
+ async def register_callback(hass, callback, match_dict):
+ """Don't do callback."""
+ return MagicMock()
+
+ with patch(
+ "homeassistant.components.ssdp.async_register_callback",
+ side_effect=register_callback,
+ ) as mock_register, patch(
+ "homeassistant.components.ssdp.async_get_discovery_info_by_st",
+ return_value=[],
+ ) as mock_get_info:
+ yield (mock_register, mock_get_info)
diff --git a/tests/components/upnp/mock_ssdp_scanner.py b/tests/components/upnp/mock_ssdp_scanner.py
deleted file mode 100644
index 39f9a801bb6..00000000000
--- a/tests/components/upnp/mock_ssdp_scanner.py
+++ /dev/null
@@ -1,49 +0,0 @@
-"""Mock ssdp.Scanner."""
-from __future__ import annotations
-
-from typing import Any
-from unittest.mock import patch
-
-import pytest
-
-from homeassistant.components import ssdp
-from homeassistant.core import callback
-
-
-class MockSsdpDescriptionManager(ssdp.DescriptionManager):
- """Mocked ssdp DescriptionManager."""
-
- async def fetch_description(
- self, xml_location: str | None
- ) -> None | dict[str, str]:
- """Fetch the location or get it from the cache."""
- if xml_location is None:
- return None
- return {}
-
-
-class MockSsdpScanner(ssdp.Scanner):
- """Mocked ssdp Scanner."""
-
- @callback
- def async_stop(self, *_: Any) -> None:
- """Stop the scanner."""
- # Do nothing.
-
- async def async_start(self) -> None:
- """Start the scanner."""
- self.description_manager = MockSsdpDescriptionManager(self.hass)
-
- @callback
- def async_scan(self, *_: Any) -> None:
- """Scan for new entries."""
- # Do nothing.
-
-
-@pytest.fixture
-def mock_ssdp_scanner():
- """Mock ssdp Scanner."""
- with patch(
- "homeassistant.components.ssdp.Scanner", new=MockSsdpScanner
- ) as mock_ssdp_scanner:
- yield mock_ssdp_scanner
diff --git a/tests/components/upnp/mock_upnp_device.py b/tests/components/upnp/mock_upnp_device.py
deleted file mode 100644
index 230fd480cb1..00000000000
--- a/tests/components/upnp/mock_upnp_device.py
+++ /dev/null
@@ -1,104 +0,0 @@
-"""Mock device for testing purposes."""
-
-from typing import Any, Mapping
-from unittest.mock import AsyncMock, patch
-
-import pytest
-
-from homeassistant.components.upnp.const import (
- BYTES_RECEIVED,
- BYTES_SENT,
- PACKETS_RECEIVED,
- PACKETS_SENT,
- ROUTER_IP,
- ROUTER_UPTIME,
- TIMESTAMP,
- WAN_STATUS,
-)
-from homeassistant.components.upnp.device import Device
-from homeassistant.util import dt
-
-from .common import TEST_UDN
-
-
-class MockDevice(Device):
- """Mock device for Device."""
-
- def __init__(self, udn: str) -> None:
- """Initialize mock device."""
- igd_device = object()
- mock_device_updater = AsyncMock()
- super().__init__(igd_device, mock_device_updater)
- self._udn = udn
- self.traffic_times_polled = 0
- self.status_times_polled = 0
-
- @classmethod
- async def async_create_device(cls, hass, ssdp_location) -> "MockDevice":
- """Return self."""
- return cls(TEST_UDN)
-
- @property
- def udn(self) -> str:
- """Get the UDN."""
- return self._udn
-
- @property
- def manufacturer(self) -> str:
- """Get manufacturer."""
- return "mock-manufacturer"
-
- @property
- def name(self) -> str:
- """Get name."""
- return "mock-name"
-
- @property
- def model_name(self) -> str:
- """Get the model name."""
- return "mock-model-name"
-
- @property
- def device_type(self) -> str:
- """Get the device type."""
- return "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
-
- @property
- def hostname(self) -> str:
- """Get the hostname."""
- return "mock-hostname"
-
- async def async_get_traffic_data(self) -> Mapping[str, Any]:
- """Get traffic data."""
- self.traffic_times_polled += 1
- return {
- TIMESTAMP: dt.utcnow(),
- BYTES_RECEIVED: 0,
- BYTES_SENT: 0,
- PACKETS_RECEIVED: 0,
- PACKETS_SENT: 0,
- }
-
- async def async_get_status(self) -> Mapping[str, Any]:
- """Get connection status, uptime, and external IP."""
- self.status_times_polled += 1
- return {
- WAN_STATUS: "Connected",
- ROUTER_UPTIME: 0,
- ROUTER_IP: "192.168.0.1",
- }
-
- async def async_start(self) -> None:
- """Start the device updater."""
-
- async def async_stop(self) -> None:
- """Stop the device updater."""
-
-
-@pytest.fixture
-def mock_upnp_device():
- """Mock upnp Device.async_create_device."""
- with patch(
- "homeassistant.components.upnp.Device", new=MockDevice
- ) as mock_async_create_device:
- yield mock_async_create_device
diff --git a/tests/components/upnp/test_config_flow.py b/tests/components/upnp/test_config_flow.py
index a83b9ac41dd..fa315804917 100644
--- a/tests/components/upnp/test_config_flow.py
+++ b/tests/components/upnp/test_config_flow.py
@@ -1,7 +1,6 @@
"""Test UPnP/IGD config flow."""
from datetime import timedelta
-from unittest.mock import patch
import pytest
@@ -15,11 +14,10 @@ from homeassistant.components.upnp.const import (
DEFAULT_SCAN_INTERVAL,
DOMAIN,
)
-from homeassistant.core import CoreState, HomeAssistant
-from homeassistant.setup import async_setup_component
+from homeassistant.core import HomeAssistant
from homeassistant.util import dt
-from .common import (
+from .conftest import (
TEST_DISCOVERY,
TEST_FRIENDLY_NAME,
TEST_HOSTNAME,
@@ -28,25 +26,15 @@ from .common import (
TEST_UDN,
TEST_USN,
)
-from .mock_ssdp_scanner import mock_ssdp_scanner # noqa: F401
-from .mock_upnp_device import mock_upnp_device # noqa: F401
from tests.common import MockConfigEntry, async_fire_time_changed
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
-async def test_flow_ssdp_discovery(
- hass: HomeAssistant,
-):
+@pytest.mark.usefixtures(
+ "ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
+)
+async def test_flow_ssdp(hass: HomeAssistant):
"""Test config flow: discovered + configured through ssdp."""
- # Ensure we have a ssdp Scanner.
- await async_setup_component(hass, DOMAIN, {})
- await hass.async_block_till_done()
- ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
- ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
- # Speed up callback in ssdp.async_register_callback.
- hass.state = CoreState.not_running
-
# Discovered via step ssdp.
result = await hass.config_entries.flow.async_init(
DOMAIN,
@@ -70,7 +58,7 @@ async def test_flow_ssdp_discovery(
}
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
+@pytest.mark.usefixtures("mock_get_source_ip")
async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant):
"""Test config flow: incomplete discovery through ssdp."""
# Discovered via step ssdp.
@@ -88,7 +76,7 @@ async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant):
assert result["reason"] == "incomplete_discovery"
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
+@pytest.mark.usefixtures("mock_get_source_ip")
async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant):
"""Test config flow: discovery through ssdp, but ignored, as hostname is used by existing config entry."""
# Existing entry.
@@ -113,17 +101,11 @@ async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant):
assert result["reason"] == "discovery_ignored"
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
+@pytest.mark.usefixtures(
+ "ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
+)
async def test_flow_user(hass: HomeAssistant):
"""Test config flow: discovered + configured through user."""
- # Ensure we have a ssdp Scanner.
- await async_setup_component(hass, DOMAIN, {})
- await hass.async_block_till_done()
- ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
- ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
- # Speed up callback in ssdp.async_register_callback.
- hass.state = CoreState.not_running
-
# Discovered via step user.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
@@ -145,17 +127,11 @@ async def test_flow_user(hass: HomeAssistant):
}
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
+@pytest.mark.usefixtures(
+ "ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
+)
async def test_flow_import(hass: HomeAssistant):
"""Test config flow: configured through configuration.yaml."""
- # Ensure we have a ssdp Scanner.
- await async_setup_component(hass, DOMAIN, {})
- await hass.async_block_till_done()
- ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
- ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
- # Speed up callback in ssdp.async_register_callback.
- hass.state = CoreState.not_running
-
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
@@ -169,7 +145,7 @@ async def test_flow_import(hass: HomeAssistant):
}
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
+@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
async def test_flow_import_already_configured(hass: HomeAssistant):
"""Test config flow: configured through configuration.yaml, but existing config entry."""
# Existing entry.
@@ -193,37 +169,20 @@ async def test_flow_import_already_configured(hass: HomeAssistant):
assert result["reason"] == "already_configured"
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
+@pytest.mark.usefixtures("ssdp_no_discovery", "mock_get_source_ip")
async def test_flow_import_no_devices_found(hass: HomeAssistant):
"""Test config flow: no devices found, configured through configuration.yaml."""
- # Ensure we have a ssdp Scanner.
- await async_setup_component(hass, DOMAIN, {})
- await hass.async_block_till_done()
- ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
- ssdp_scanner.cache.clear()
-
# Discovered via step import.
- with patch(
- "homeassistant.components.upnp.config_flow.SSDP_SEARCH_TIMEOUT", new=0.0
- ):
- result = await hass.config_entries.flow.async_init(
- DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
- )
- assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
- assert result["reason"] == "no_devices_found"
+ result = await hass.config_entries.flow.async_init(
+ DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
+ )
+ assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
+ assert result["reason"] == "no_devices_found"
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
+@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
async def test_options_flow(hass: HomeAssistant):
"""Test options flow."""
- # Ensure we have a ssdp Scanner.
- await async_setup_component(hass, DOMAIN, {})
- await hass.async_block_till_done()
- ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
- ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
- # Speed up callback in ssdp.async_register_callback.
- hass.state = CoreState.not_running
-
# Set up config entry.
config_entry = MockConfigEntry(
domain=DOMAIN,
diff --git a/tests/components/upnp/test_init.py b/tests/components/upnp/test_init.py
index 6f0aa438310..6b3d2a5187f 100644
--- a/tests/components/upnp/test_init.py
+++ b/tests/components/upnp/test_init.py
@@ -3,25 +3,23 @@ from __future__ import annotations
import pytest
-from homeassistant.components import ssdp
from homeassistant.components.upnp.const import (
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
DOMAIN,
)
-from homeassistant.core import CoreState, HomeAssistant
+from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
-from .common import TEST_DISCOVERY, TEST_ST, TEST_UDN
-from .mock_ssdp_scanner import mock_ssdp_scanner # noqa: F401
-from .mock_upnp_device import mock_upnp_device # noqa: F401
+from .conftest import TEST_ST, TEST_UDN
from tests.common import MockConfigEntry
-@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
+@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
async def test_async_setup_entry_default(hass: HomeAssistant):
"""Test async_setup_entry."""
+
entry = MockConfigEntry(
domain=DOMAIN,
data={
@@ -34,12 +32,6 @@ async def test_async_setup_entry_default(hass: HomeAssistant):
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
- # Device is discovered.
- ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
- ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
- # Speed up callback in ssdp.async_register_callback.
- hass.state = CoreState.not_running
-
# Load config_entry.
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id) is True
diff --git a/tests/components/yamaha_musiccast/test_config_flow.py b/tests/components/yamaha_musiccast/test_config_flow.py
index 0e52d598bf6..31d63bac158 100644
--- a/tests/components/yamaha_musiccast/test_config_flow.py
+++ b/tests/components/yamaha_musiccast/test_config_flow.py
@@ -14,6 +14,17 @@ from homeassistant.const import CONF_HOST
from tests.common import MockConfigEntry
+@pytest.fixture(autouse=True)
+async def silent_ssdp_scanner(hass):
+ """Start SSDP component and get Scanner, prevent actual SSDP traffic."""
+ with patch(
+ "homeassistant.components.ssdp.Scanner._async_start_ssdp_listeners"
+ ), patch("homeassistant.components.ssdp.Scanner._async_stop_ssdp_listeners"), patch(
+ "homeassistant.components.ssdp.Scanner.async_scan"
+ ):
+ yield
+
+
@pytest.fixture(autouse=True)
def mock_setup_entry():
"""Mock setting up a config entry."""
diff --git a/tests/components/yeelight/__init__.py b/tests/components/yeelight/__init__.py
index 4a862fa13dd..eb7ac01e3b1 100644
--- a/tests/components/yeelight/__init__.py
+++ b/tests/components/yeelight/__init__.py
@@ -4,7 +4,7 @@ from datetime import timedelta
from ipaddress import IPv4Address
from unittest.mock import AsyncMock, MagicMock, patch
-from async_upnp_client.search import SSDPListener
+from async_upnp_client.search import SsdpSearchListener
from yeelight import BulbException, BulbType
from yeelight.main import _MODEL_SPECS
@@ -145,7 +145,7 @@ def _mocked_bulb(cannot_connect=False):
def _patched_ssdp_listener(info, *args, **kwargs):
- listener = SSDPListener(*args, **kwargs)
+ listener = SsdpSearchListener(*args, **kwargs)
async def _async_callback(*_):
if kwargs["source_ip"] == IPv4Address(FAIL_TO_BIND_IP):
@@ -173,7 +173,7 @@ def _patch_discovery(no_device=False, capabilities=None):
)
return patch(
- "homeassistant.components.yeelight.SSDPListener",
+ "homeassistant.components.yeelight.SsdpSearchListener",
new=_generate_fake_ssdp_listener,
)