Use dataclass for SsdpServiceInfo (#59931)

Co-authored-by: epenet <epenet@users.noreply.github.com>
This commit is contained in:
epenet 2021-11-29 17:10:07 +01:00 committed by GitHub
parent 7ece86ee8d
commit ec1c52d945
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 295 additions and 175 deletions

View File

@ -6,7 +6,7 @@ from urllib.parse import urlsplit
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import dhcp, zeroconf
from homeassistant.components import dhcp, ssdp, zeroconf
from homeassistant.config_entries import SOURCE_IGNORE
from homeassistant.const import (
CONF_HOST,
@ -163,7 +163,7 @@ class AxisFlowHandler(config_entries.ConfigFlow, domain=AXIS_DOMAIN):
}
)
async def async_step_ssdp(self, discovery_info: dict):
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo):
"""Prepare configuration for a SSDP discovered Axis device."""
url = urlsplit(discovery_info["presentationURL"])
return await self._process_discovered_device(

View File

@ -8,13 +8,13 @@ from urllib.parse import urlparse
from directv import DIRECTV, DIRECTVError
import voluptuous as vol
from homeassistant.components import ssdp
from homeassistant.components.ssdp import ATTR_SSDP_LOCATION, ATTR_UPNP_SERIAL
from homeassistant.config_entries import ConfigFlow
from homeassistant.const import CONF_HOST, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import CONF_RECEIVER_ID, DOMAIN
@ -67,7 +67,7 @@ class DirecTVConfigFlow(ConfigFlow, domain=DOMAIN):
return self.async_create_entry(title=user_input[CONF_HOST], data=user_input)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle SSDP discovery."""
host = urlparse(discovery_info[ATTR_SSDP_LOCATION]).hostname
receiver_id = None

View File

@ -25,7 +25,6 @@ from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.exceptions import IntegrationError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import (
CONF_CALLBACK_URL_OVERRIDE,
@ -57,7 +56,7 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self) -> None:
"""Initialize flow."""
self._discoveries: dict[str, Mapping[str, Any]] = {}
self._discoveries: dict[str, ssdp.SsdpServiceInfo] = {}
self._location: str | None = None
self._udn: str | None = None
self._device_type: str | None = None
@ -205,7 +204,7 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self._set_confirm_only()
return self.async_show_form(step_id="import_turn_on", errors=errors)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a flow initialized by SSDP discovery."""
LOGGER.debug("async_step_ssdp: discovery_info %s", pformat(discovery_info))
@ -330,7 +329,7 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_create_entry(title=title, data=data, options=self._options)
async def _async_set_info_from_discovery(
self, discovery_info: Mapping[str, Any], abort_if_configured: bool = True
self, discovery_info: ssdp.SsdpServiceInfo, abort_if_configured: bool = True
) -> None:
"""Set information required for a config entry from the SSDP discovery."""
LOGGER.debug(
@ -361,12 +360,12 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
or DEFAULT_NAME
)
async def _async_get_discoveries(self) -> list[Mapping[str, Any]]:
async def _async_get_discoveries(self) -> list[ssdp.SsdpServiceInfo]:
"""Get list of unconfigured DLNA devices discovered by SSDP."""
LOGGER.debug("_get_discoveries")
# Get all compatible devices from ssdp's cache
discoveries: list[Mapping[str, Any]] = []
discoveries: list[ssdp.SsdpServiceInfo] = []
for udn_st in DmrDevice.DEVICE_TYPES:
st_discoveries = await ssdp.async_get_discovery_info_by_st(
self.hass, udn_st
@ -454,7 +453,7 @@ class DlnaDmrOptionsFlowHandler(config_entries.OptionsFlow):
)
def _is_ignored_device(discovery_info: Mapping[str, Any]) -> bool:
def _is_ignored_device(discovery_info: ssdp.SsdpServiceInfo) -> bool:
"""Return True if this device should be ignored for discovery.
These devices are supported better by other integrations, so don't bug

View File

@ -2,7 +2,7 @@
from __future__ import annotations
import asyncio
from collections.abc import Mapping, Sequence
from collections.abc import Sequence
import contextlib
from datetime import datetime, timedelta
import functools
@ -241,7 +241,7 @@ class DlnaDmrEntity(MediaPlayerEntity):
await self._device_disconnect()
async def async_ssdp_callback(
self, info: Mapping[str, Any], change: ssdp.SsdpChange
self, info: ssdp.SsdpServiceInfo, change: ssdp.SsdpChange
) -> None:
"""Handle notification from SSDP of device state change."""
_LOGGER.debug(

View File

@ -9,6 +9,7 @@ from urllib.parse import ParseResult, urlparse
from fritzconnection.core.exceptions import FritzConnectionException, FritzSecurityError
import voluptuous as vol
from homeassistant.components import ssdp
from homeassistant.components.device_tracker.const import (
CONF_CONSIDER_HOME,
DEFAULT_CONSIDER_HOME,
@ -22,7 +23,6 @@ from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT, CONF_USERNAME
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.typing import DiscoveryInfoType
from .common import FritzBoxTools
from .const import (
@ -115,7 +115,7 @@ class FritzBoxToolsFlowHandler(ConfigFlow, domain=DOMAIN):
},
)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a flow initialized by discovery."""
ssdp_location: ParseResult = urlparse(discovery_info[ATTR_SSDP_LOCATION])
self._host = ssdp_location.hostname

View File

@ -8,6 +8,7 @@ from pyfritzhome import Fritzhome, LoginError
from requests.exceptions import HTTPError
import voluptuous as vol
from homeassistant.components import ssdp
from homeassistant.components.ssdp import (
ATTR_SSDP_LOCATION,
ATTR_UPNP_FRIENDLY_NAME,
@ -16,7 +17,6 @@ from homeassistant.components.ssdp import (
from homeassistant.config_entries import ConfigEntry, ConfigFlow
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import DEFAULT_HOST, DEFAULT_USERNAME, DOMAIN
@ -119,7 +119,7 @@ class FritzboxConfigFlow(ConfigFlow, domain=DOMAIN):
step_id="user", data_schema=DATA_SCHEMA_USER, errors=errors
)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a flow initialized by discovery."""
host = urlparse(discovery_info[ATTR_SSDP_LOCATION]).hostname
assert isinstance(host, str)

View File

@ -31,7 +31,6 @@ from homeassistant.const import (
)
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import (
CONF_TRACK_WIRED_CLIENTS,
@ -202,7 +201,7 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_create_entry(title=title, data=user_input)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle SSDP initiated config flow."""
await self.async_set_unique_id(discovery_info[ssdp.ATTR_UPNP_UDN])
self._abort_if_unique_id_configured()

View File

@ -18,7 +18,7 @@ from homeassistant.const import CONF_API_KEY, CONF_HOST
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import aiohttp_client
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.typing import ConfigType
from .const import (
CONF_ALLOW_HUE_GROUPS,
@ -186,7 +186,7 @@ class HueFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
},
)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a discovered Hue bridge.
This flow is triggered by the SSDP component. It will check if the
@ -213,7 +213,7 @@ class HueFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="not_hue_bridge")
host = urlparse(discovery_info[ssdp.ATTR_SSDP_LOCATION]).hostname
bridge = await self._get_bridge(host, discovery_info[ssdp.ATTR_UPNP_SERIAL]) # type: ignore[arg-type]
bridge = await self._get_bridge(host, discovery_info[ssdp.ATTR_UPNP_SERIAL])
await self.async_set_unique_id(bridge.id)
self._abort_if_unique_id_configured(

View File

@ -10,6 +10,7 @@ from urllib.parse import urlparse
from hyperion import client, const
import voluptuous as vol
from homeassistant.components import ssdp
from homeassistant.components.ssdp import ATTR_SSDP_LOCATION, ATTR_UPNP_SERIAL
from homeassistant.config_entries import (
SOURCE_REAUTH,
@ -151,7 +152,7 @@ class HyperionConfigFlow(ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="cannot_connect")
return await self._advance_to_auth_step_if_necessary(hyperion_client)
async def async_step_ssdp(self, discovery_info: dict[str, Any]) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a flow initiated by SSDP."""
# Sample data provided by SSDP: {
# 'ssdp_location': 'http://192.168.0.1:8090/description.xml',

View File

@ -9,11 +9,10 @@ from aionanoleaf import InvalidToken, Nanoleaf, Unauthorized, Unavailable
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import zeroconf
from homeassistant.components import ssdp, zeroconf
from homeassistant.const import CONF_HOST, CONF_TOKEN
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import DiscoveryInfoType
from homeassistant.util.json import load_json, save_json
from .const import DOMAIN
@ -114,7 +113,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
discovery_info[zeroconf.ATTR_PROPERTIES][zeroconf.ATTR_PROPERTIES_ID],
)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle Nanoleaf SSDP discovery."""
_LOGGER.debug("SSDP discovered: %s", discovery_info)
return await self._async_discovery_handler(

View File

@ -7,7 +7,7 @@ from urllib.parse import urlparse
from rokuecp import Roku, RokuError
import voluptuous as vol
from homeassistant.components import zeroconf
from homeassistant.components import ssdp, zeroconf
from homeassistant.components.ssdp import (
ATTR_SSDP_LOCATION,
ATTR_UPNP_FRIENDLY_NAME,
@ -18,7 +18,6 @@ from homeassistant.const import CONF_HOST, CONF_NAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import DOMAIN
@ -115,7 +114,7 @@ class RokuConfigFlow(ConfigFlow, domain=DOMAIN):
return await self.async_step_discovery_confirm()
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a flow initialized by discovery."""
host = urlparse(discovery_info[ATTR_SSDP_LOCATION]).hostname
name = discovery_info[ATTR_UPNP_FRIENDLY_NAME]

View File

@ -11,7 +11,7 @@ import getmac
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import dhcp, zeroconf
from homeassistant.components import dhcp, ssdp, zeroconf
from homeassistant.components.ssdp import (
ATTR_SSDP_LOCATION,
ATTR_UPNP_MANUFACTURER,
@ -28,7 +28,6 @@ from homeassistant.const import (
)
from homeassistant.core import callback
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.typing import DiscoveryInfoType
from .bridge import (
SamsungTVBridge,
@ -266,7 +265,7 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
raise data_entry_flow.AbortFlow(RESULT_NOT_SUPPORTED)
async def async_step_ssdp(
self, discovery_info: DiscoveryInfoType
self, discovery_info: ssdp.SsdpServiceInfo
) -> data_entry_flow.FlowResult:
"""Handle a flow initialized by ssdp discovery."""
LOGGER.debug("Samsung device found via SSDP: %s", discovery_info)

View File

@ -2,13 +2,13 @@
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Iterator
from collections.abc import Awaitable, Mapping
from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from ipaddress import IPv4Address, IPv6Address
import logging
from typing import Any, Callable, Mapping
from typing import Any, Callable
from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.const import DeviceOrServiceType, SsdpHeaders, SsdpSource
@ -63,28 +63,6 @@ ATTR_HA_MATCHING_DOMAINS = "x_homeassistant_matching_domains"
PRIMARY_MATCH_KEYS = [ATTR_UPNP_MANUFACTURER, "st", ATTR_UPNP_DEVICE_TYPE, "nt"]
DISCOVERY_MAPPING = {
"usn": ATTR_SSDP_USN,
"ext": ATTR_SSDP_EXT,
"server": ATTR_SSDP_SERVER,
"st": ATTR_SSDP_ST,
"location": ATTR_SSDP_LOCATION,
"_udn": ATTR_SSDP_UDN,
"nt": ATTR_SSDP_NT,
}
SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")
SsdpCallback = Callable[[Mapping[str, Any], SsdpChange], Awaitable]
SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
SsdpSource.SEARCH_ALIVE: SsdpChange.ALIVE,
SsdpSource.SEARCH_CHANGED: SsdpChange.ALIVE,
SsdpSource.ADVERTISEMENT_ALIVE: SsdpChange.ALIVE,
SsdpSource.ADVERTISEMENT_BYEBYE: SsdpChange.BYEBYE,
SsdpSource.ADVERTISEMENT_UPDATE: SsdpChange.UPDATE,
}
_LOGGER = logging.getLogger(__name__)
@ -106,13 +84,14 @@ class _SsdpServiceDescription:
ssdp_udn: str | None = None
ssdp_ext: str | None = None
ssdp_server: str | None = None
ssdp_headers: Mapping[str, Any] = field(default_factory=dict)
@dataclass
class _UpnpServiceDescription:
"""UPnP info."""
upnp: dict[str, Any]
upnp: Mapping[str, Any]
@dataclass
@ -136,7 +115,7 @@ class SsdpServiceInfo(
if not self._warning_logged:
report(
f"accessed discovery_info['{name}'] instead of discovery_info.{name}; this will fail in version 2022.6",
exclude_integrations={"ssdp"},
exclude_integrations={DOMAIN},
error_if_core=False,
level=logging.DEBUG,
)
@ -144,41 +123,58 @@ class SsdpServiceInfo(
# Use a property if it is available, fallback to upnp data
if hasattr(self, name):
return getattr(self, name)
if name in self.ssdp_headers and name not in self.upnp:
return self.ssdp_headers.get(name)
return self.upnp[name]
def get(self, name: str, default: Any = None) -> Any:
"""
Allow property access by name for compatibility reason.
Enable method for compatibility reason.
Deprecated, and will be removed in version 2022.6.
"""
if not self._warning_logged:
report(
f"accessed discovery_info.get('{name}') instead of discovery_info.{name}; this will fail in version 2022.6",
exclude_integrations={"ssdp"},
exclude_integrations={DOMAIN},
error_if_core=False,
level=logging.DEBUG,
)
self._warning_logged = True
if hasattr(self, name):
return getattr(self, name)
return self.upnp.get(name, default)
return self.upnp.get(name, self.ssdp_headers.get(name, default))
def __iter__(self) -> Iterator[str]:
def __contains__(self, name: str) -> bool:
"""
Implement iter(self) on upnp data.
Enable method for compatibility reason.
Deprecated, and will be removed in version 2022.6.
"""
if not self._warning_logged:
report(
"accessed discovery_info.__iter__() instead of discovery_info.upnp.__iter__(); this will fail in version 2022.6",
exclude_integrations={"ssdp"},
"accessed discovery_info.__contains__() instead of discovery_info.upnp.__contains__() "
"or discovery_info.ssdp_headers.__contains__(); this will fail in version 2022.6",
exclude_integrations={DOMAIN},
error_if_core=False,
level=logging.DEBUG,
)
self._warning_logged = True
return self.upnp.__iter__()
if hasattr(self, name):
return getattr(self, name) is not None
return name in self.upnp or name in self.ssdp_headers
SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")
SsdpCallback = Callable[[SsdpServiceInfo, SsdpChange], Awaitable]
SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
SsdpSource.SEARCH_ALIVE: SsdpChange.ALIVE,
SsdpSource.SEARCH_CHANGED: SsdpChange.ALIVE,
SsdpSource.ADVERTISEMENT_ALIVE: SsdpChange.ALIVE,
SsdpSource.ADVERTISEMENT_BYEBYE: SsdpChange.BYEBYE,
SsdpSource.ADVERTISEMENT_UPDATE: SsdpChange.UPDATE,
}
@bind_hass
@ -198,7 +194,7 @@ async def async_register_callback(
@bind_hass
async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
hass: HomeAssistant, udn: str, st: str
) -> dict[str, str] | None:
) -> SsdpServiceInfo | None:
"""Fetch the discovery info cache."""
scanner: Scanner = hass.data[DOMAIN]
return await scanner.async_get_discovery_info_by_udn_st(udn, st)
@ -207,7 +203,7 @@ async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
@bind_hass
async def async_get_discovery_info_by_st( # pylint: disable=invalid-name
hass: HomeAssistant, st: str
) -> list[dict[str, str]]:
) -> list[SsdpServiceInfo]:
"""Fetch all the entries matching the st."""
scanner: Scanner = hass.data[DOMAIN]
return await scanner.async_get_discovery_info_by_st(st)
@ -216,7 +212,7 @@ async def async_get_discovery_info_by_st( # pylint: disable=invalid-name
@bind_hass
async def async_get_discovery_info_by_udn(
hass: HomeAssistant, udn: str
) -> list[dict[str, str]]:
) -> list[SsdpServiceInfo]:
"""Fetch all the entries matching the udn."""
scanner: Scanner = hass.data[DOMAIN]
return await scanner.async_get_discovery_info_by_udn(udn)
@ -237,7 +233,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def _async_process_callbacks(
callbacks: list[SsdpCallback],
discovery_info: dict[str, str],
discovery_info: SsdpServiceInfo,
ssdp_change: SsdpChange,
) -> None:
for callback in callbacks:
@ -496,8 +492,10 @@ class Scanner:
if not callbacks and not matching_domains:
return
discovery_info = discovery_info_from_headers_and_description(info_with_desc)
discovery_info[ATTR_HA_MATCHING_DOMAINS] = matching_domains
discovery_info = discovery_info_from_headers_and_description(
combined_headers, info_desc
)
discovery_info.x_homeassistant_matching_domains = matching_domains
ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source]
await _async_process_callbacks(callbacks, discovery_info, ssdp_change)
@ -505,6 +503,8 @@ class Scanner:
if ssdp_change == SsdpChange.BYEBYE:
return
_LOGGER.debug("Discovery info: %s", discovery_info)
for domain in matching_domains:
_LOGGER.debug("Discovered %s at %s", domain, location)
discovery_flow.async_create_flow(
@ -523,7 +523,7 @@ class Scanner:
async def _async_headers_to_discovery_info(
self, headers: Mapping[str, Any]
) -> dict[str, Any]:
) -> SsdpServiceInfo:
"""Combine the headers and description into discovery_info.
Building this is a bit expensive so we only do it on demand.
@ -533,13 +533,11 @@ class Scanner:
info_desc = (
await self._description_cache.async_get_description_dict(location) or {}
)
return discovery_info_from_headers_and_description(
CaseInsensitiveDict(headers, **info_desc)
)
return discovery_info_from_headers_and_description(headers, info_desc)
async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
self, udn: str, st: str
) -> dict[str, Any] | None:
) -> SsdpServiceInfo | None:
"""Return discovery_info for a udn and st."""
if headers := self._all_headers_from_ssdp_devices.get((udn, st)):
return await self._async_headers_to_discovery_info(headers)
@ -547,7 +545,7 @@ class Scanner:
async def async_get_discovery_info_by_st( # pylint: disable=invalid-name
self, st: str
) -> list[dict[str, Any]]:
) -> list[SsdpServiceInfo]:
"""Return matching discovery_infos for a st."""
return [
await self._async_headers_to_discovery_info(headers)
@ -555,7 +553,7 @@ class Scanner:
if udn_st[1] == st
]
async def async_get_discovery_info_by_udn(self, udn: str) -> list[dict[str, Any]]:
async def async_get_discovery_info_by_udn(self, udn: str) -> list[SsdpServiceInfo]:
"""Return matching discovery_infos for a udn."""
return [
await self._async_headers_to_discovery_info(headers)
@ -565,23 +563,36 @@ class Scanner:
def discovery_info_from_headers_and_description(
info_with_desc: CaseInsensitiveDict,
) -> dict[str, Any]:
combined_headers: Mapping[str, Any],
info_desc: Mapping[str, Any],
) -> SsdpServiceInfo:
"""Convert headers and description to discovery_info."""
info = {
DISCOVERY_MAPPING.get(k.lower(), k): v
for k, v in info_with_desc.as_dict().items()
}
ssdp_usn = combined_headers["usn"]
ssdp_st = combined_headers.get("st")
upnp_info = {**info_desc}
if ATTR_UPNP_UDN not in info and ATTR_SSDP_USN in info:
if udn := _udn_from_usn(info[ATTR_SSDP_USN]):
info[ATTR_UPNP_UDN] = udn
# Increase compatibility: depending on the message type,
# either the ST (Search Target, from M-SEARCH messages)
# or NT (Notification Type, from NOTIFY messages) header is mandatory
if not ssdp_st:
ssdp_st = combined_headers["nt"]
# Increase compatibility.
if ATTR_SSDP_ST not in info and ATTR_SSDP_NT in info:
info[ATTR_SSDP_ST] = info[ATTR_SSDP_NT]
# Ensure UPnP "udn" is set
if ATTR_UPNP_UDN not in upnp_info:
if udn := _udn_from_usn(ssdp_usn):
upnp_info[ATTR_UPNP_UDN] = udn
return info
return SsdpServiceInfo(
ssdp_usn=ssdp_usn,
ssdp_st=ssdp_st,
ssdp_ext=combined_headers.get("ext"),
ssdp_server=combined_headers.get("server"),
ssdp_location=combined_headers.get("location"),
ssdp_udn=combined_headers.get("_udn"),
ssdp_nt=combined_headers.get("nt"),
ssdp_headers=combined_headers,
upnp=upnp_info,
)
def _udn_from_usn(usn: str | None) -> str | None:

View File

@ -237,7 +237,7 @@ class SynologyDSMFlowHandler(ConfigFlow, domain=DOMAIN):
return self._show_form(step)
return await self.async_validate_input_create_entry(user_input, step_id=step)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a discovered synology_dsm."""
parsed_url = urlparse(discovery_info[ssdp.ATTR_SSDP_LOCATION])
friendly_name = (

View File

@ -35,6 +35,7 @@ import homeassistant.util.uuid as uuid_util
if TYPE_CHECKING:
from homeassistant.components.dhcp import DhcpServiceInfo
from homeassistant.components.mqtt.discovery import MqttServiceInfo
from homeassistant.components.ssdp import SsdpServiceInfo
from homeassistant.components.usb import UsbServiceInfo
from homeassistant.components.zeroconf import ZeroconfServiceInfo
@ -1370,10 +1371,10 @@ class ConfigFlow(data_entry_flow.FlowHandler):
return await self.async_step_discovery(dataclasses.asdict(discovery_info))
async def async_step_ssdp(
self, discovery_info: DiscoveryInfoType
self, discovery_info: SsdpServiceInfo
) -> data_entry_flow.FlowResult:
"""Handle a flow initialized by SSDP discovery."""
return await self.async_step_discovery(discovery_info)
return await self.async_step_discovery(dataclasses.asdict(discovery_info))
async def async_step_zeroconf(
self, discovery_info: ZeroconfServiceInfo

View File

@ -5,7 +5,7 @@ import logging
from typing import Any, Awaitable, Callable, Union
from homeassistant import config_entries
from homeassistant.components import dhcp, zeroconf
from homeassistant.components import dhcp, ssdp, zeroconf
from homeassistant.components.mqtt import discovery as mqtt
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
@ -123,7 +123,14 @@ class DiscoveryFlowHandler(config_entries.ConfigFlow):
return await self.async_step_confirm()
async_step_ssdp = async_step_discovery
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a flow initialized by Ssdp discovery."""
if self._async_in_progress() or self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
await self.async_set_unique_id(self._domain)
return await self.async_step_confirm()
async def async_step_import(self, _: dict[str, Any] | None) -> FlowResult:
"""Handle a flow initialized by import."""

View File

@ -1,4 +1,6 @@
"""Test the SSDP integration."""
# pylint: disable=protected-access
from datetime import datetime, timedelta
from ipaddress import IPv4Address, IPv6Address
from unittest.mock import ANY, AsyncMock, patch
@ -62,18 +64,28 @@ async def test_ssdp_flow_dispatched_on_st(mock_get_ssdp, hass, caplog, mock_flow
assert mock_flow_init.mock_calls[0][2]["context"] == {
"source": config_entries.SOURCE_SSDP
}
assert mock_flow_init.mock_calls[0][2]["data"] == {
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_USN: "uuid:mock-udn::mock-st",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_UPNP_UDN: "uuid:mock-udn",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_HA_MATCHING_DOMAINS: {"mock-domain"},
}
mock_call_data: ssdp.SsdpServiceInfo = mock_flow_init.mock_calls[0][2]["data"]
assert mock_call_data.ssdp_st == "mock-st"
assert mock_call_data.ssdp_location == "http://1.1.1.1"
assert mock_call_data.ssdp_usn == "uuid:mock-udn::mock-st"
assert mock_call_data.ssdp_server == "mock-server"
assert mock_call_data.ssdp_ext == ""
assert mock_call_data.ssdp_udn == ANY
assert mock_call_data.ssdp_headers["_timestamp"] == ANY
assert mock_call_data.x_homeassistant_matching_domains == {"mock-domain"}
assert mock_call_data.upnp == {ssdp.ATTR_UPNP_UDN: "uuid:mock-udn"}
assert "Failed to fetch ssdp data" not in caplog.text
# Compatibility with old dict access (to be removed after 2022.6)
assert mock_call_data[ssdp.ATTR_SSDP_ST] == "mock-st"
assert mock_call_data[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert mock_call_data[ssdp.ATTR_SSDP_USN] == "uuid:mock-udn::mock-st"
assert mock_call_data[ssdp.ATTR_SSDP_SERVER] == "mock-server"
assert mock_call_data[ssdp.ATTR_SSDP_EXT] == ""
assert mock_call_data[ssdp.ATTR_UPNP_UDN] == "uuid:mock-udn"
assert mock_call_data[ssdp.ATTR_SSDP_UDN] == ANY
assert mock_call_data["_timestamp"] == ANY
assert mock_call_data[ssdp.ATTR_HA_MATCHING_DOMAINS] == {"mock-domain"}
# End compatibility checks
@pytest.mark.usefixtures("mock_get_source_ip")
@ -347,19 +359,31 @@ async def test_discovery_from_advertisement_sets_ssdp_st(
await hass.async_block_till_done()
discovery_info = await ssdp.async_get_discovery_info_by_udn(hass, "uuid:mock-udn")
assert discovery_info == [
{
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_NT: "mock-st",
ssdp.ATTR_SSDP_ST: "mock-st", # Set by ssdp component, not in original advertisement.
ssdp.ATTR_SSDP_USN: "uuid:mock-udn::mock-st",
ssdp.ATTR_UPNP_UDN: "uuid:mock-udn",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_UDN: ANY,
"nts": "ssdp:alive",
"_timestamp": ANY,
}
]
discovery_info = discovery_info[0]
assert discovery_info.ssdp_location == "http://1.1.1.1"
assert discovery_info.ssdp_nt == "mock-st"
# Set by ssdp component, not in original advertisement.
assert discovery_info.ssdp_st == "mock-st"
assert discovery_info.ssdp_usn == "uuid:mock-udn::mock-st"
assert discovery_info.ssdp_udn == ANY
assert discovery_info.ssdp_headers["nts"] == "ssdp:alive"
assert discovery_info.ssdp_headers["_timestamp"] == ANY
assert discovery_info.upnp == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:mock-udn",
}
# Compatibility with old dict access (to be removed after 2022.6)
assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert discovery_info[ssdp.ATTR_SSDP_NT] == "mock-st"
# Set by ssdp component, not in original advertisement.
assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
assert discovery_info[ssdp.ATTR_SSDP_USN] == "uuid:mock-udn::mock-st"
assert discovery_info[ssdp.ATTR_UPNP_UDN] == "uuid:mock-udn"
assert discovery_info[ssdp.ATTR_UPNP_DEVICE_TYPE] == "Paulus"
assert discovery_info[ssdp.ATTR_SSDP_UDN] == ANY
assert discovery_info["nts"] == "ssdp:alive"
assert discovery_info["_timestamp"] == ANY
# End compatibility checks
@patch( # XXX TODO: Isn't this duplicate with mock_get_source_ip?
@ -452,29 +476,48 @@ async def test_scan_with_registered_callback(
assert async_integration_match_all_not_present_callback1.call_count == 0
assert async_match_any_callback1.call_count == 1
assert async_not_matching_integration_callback1.call_count == 0
assert async_integration_callback.call_args[0] == (
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::mock-st",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
"x-rincon-bootseq": "55",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_HA_MATCHING_DOMAINS: set(),
},
ssdp.SsdpChange.ALIVE,
assert async_integration_callback.call_args[0][1] == ssdp.SsdpChange.ALIVE
mock_call_data: ssdp.SsdpServiceInfo = async_integration_callback.call_args[0][0]
assert mock_call_data.ssdp_ext == ""
assert mock_call_data.ssdp_location == "http://1.1.1.1"
assert mock_call_data.ssdp_server == "mock-server"
assert mock_call_data.ssdp_st == "mock-st"
assert (
mock_call_data.ssdp_usn == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::mock-st"
)
assert mock_call_data.ssdp_headers["x-rincon-bootseq"] == "55"
assert mock_call_data.ssdp_udn == ANY
assert mock_call_data.ssdp_headers["_timestamp"] == ANY
assert mock_call_data.x_homeassistant_matching_domains == set()
assert mock_call_data.upnp == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
# Compatibility with old dict access (to be removed after 2022.6)
assert mock_call_data[ssdp.ATTR_UPNP_DEVICE_TYPE] == "Paulus"
assert mock_call_data[ssdp.ATTR_SSDP_EXT] == ""
assert mock_call_data[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert mock_call_data[ssdp.ATTR_SSDP_SERVER] == "mock-server"
assert mock_call_data[ssdp.ATTR_SSDP_ST] == "mock-st"
assert (
mock_call_data[ssdp.ATTR_SSDP_USN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::mock-st"
)
assert (
mock_call_data[ssdp.ATTR_UPNP_UDN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert mock_call_data["x-rincon-bootseq"] == "55"
assert mock_call_data[ssdp.ATTR_SSDP_UDN] == ANY
assert mock_call_data["_timestamp"] == ANY
assert mock_call_data[ssdp.ATTR_HA_MATCHING_DOMAINS] == set()
# End of compatibility checks
assert "Failed to callback info" in caplog.text
async_integration_callback_from_cache = AsyncMock()
await ssdp.async_register_callback(
hass, async_integration_callback_from_cache, {"st": "mock-st"}
)
assert async_integration_callback_from_cache.call_count == 1
@ -510,51 +553,109 @@ async def test_getting_existing_headers(
await ssdp_listener._on_search(mock_ssdp_search_response)
discovery_info_by_st = await ssdp.async_get_discovery_info_by_st(hass, "mock-st")
assert discovery_info_by_st == [
{
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
}
]
discovery_info_by_st = discovery_info_by_st[0]
assert discovery_info_by_st.ssdp_ext == ""
assert discovery_info_by_st.ssdp_location == "http://1.1.1.1"
assert discovery_info_by_st.ssdp_server == "mock-server"
assert discovery_info_by_st.ssdp_st == "mock-st"
assert (
discovery_info_by_st.ssdp_usn
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
assert discovery_info_by_st.ssdp_udn == ANY
assert discovery_info_by_st.ssdp_headers["_timestamp"] == ANY
assert discovery_info_by_st.upnp == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
# Compatibility with old dict access (to be removed after 2022.6)
assert discovery_info_by_st[ssdp.ATTR_SSDP_EXT] == ""
assert discovery_info_by_st[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert discovery_info_by_st[ssdp.ATTR_SSDP_SERVER] == "mock-server"
assert discovery_info_by_st[ssdp.ATTR_SSDP_ST] == "mock-st"
assert (
discovery_info_by_st[ssdp.ATTR_SSDP_USN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
assert (
discovery_info_by_st[ssdp.ATTR_UPNP_UDN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert discovery_info_by_st[ssdp.ATTR_UPNP_DEVICE_TYPE] == "Paulus"
assert discovery_info_by_st[ssdp.ATTR_SSDP_UDN] == ANY
assert discovery_info_by_st["_timestamp"] == ANY
# End of compatibility checks
discovery_info_by_udn = await ssdp.async_get_discovery_info_by_udn(
hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert discovery_info_by_udn == [
{
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
}
]
discovery_info_by_udn = discovery_info_by_udn[0]
assert discovery_info_by_udn.ssdp_ext == ""
assert discovery_info_by_udn.ssdp_location == "http://1.1.1.1"
assert discovery_info_by_udn.ssdp_server == "mock-server"
assert discovery_info_by_udn.ssdp_st == "mock-st"
assert (
discovery_info_by_udn.ssdp_usn
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
assert discovery_info_by_udn.ssdp_udn == ANY
assert discovery_info_by_udn.ssdp_headers["_timestamp"] == ANY
assert discovery_info_by_udn.upnp == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
# Compatibility with old dict access (to be removed after 2022.6)
assert discovery_info_by_udn[ssdp.ATTR_SSDP_EXT] == ""
assert discovery_info_by_udn[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert discovery_info_by_udn[ssdp.ATTR_SSDP_SERVER] == "mock-server"
assert discovery_info_by_udn[ssdp.ATTR_SSDP_ST] == "mock-st"
assert (
discovery_info_by_udn[ssdp.ATTR_SSDP_USN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
assert (
discovery_info_by_udn[ssdp.ATTR_UPNP_UDN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert discovery_info_by_udn[ssdp.ATTR_UPNP_DEVICE_TYPE] == "Paulus"
assert discovery_info_by_udn[ssdp.ATTR_SSDP_UDN] == ANY
assert discovery_info_by_udn["_timestamp"] == ANY
# End of compatibility checks
discovery_info_by_udn_st = await ssdp.async_get_discovery_info_by_udn_st(
hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", "mock-st"
)
assert discovery_info_by_udn_st == {
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
assert discovery_info_by_udn_st.ssdp_ext == ""
assert discovery_info_by_udn_st.ssdp_location == "http://1.1.1.1"
assert discovery_info_by_udn_st.ssdp_server == "mock-server"
assert discovery_info_by_udn_st.ssdp_st == "mock-st"
assert (
discovery_info_by_udn_st.ssdp_usn
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
assert discovery_info_by_udn_st.ssdp_udn == ANY
assert discovery_info_by_udn_st.ssdp_headers["_timestamp"] == ANY
assert discovery_info_by_udn_st.upnp == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
# Compatibility with old dict access (to be removed after 2022.6)
assert discovery_info_by_udn_st[ssdp.ATTR_SSDP_EXT] == ""
assert discovery_info_by_udn_st[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert discovery_info_by_udn_st[ssdp.ATTR_SSDP_SERVER] == "mock-server"
assert discovery_info_by_udn_st[ssdp.ATTR_SSDP_ST] == "mock-st"
assert (
discovery_info_by_udn_st[ssdp.ATTR_SSDP_USN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
assert (
discovery_info_by_udn_st[ssdp.ATTR_UPNP_UDN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert discovery_info_by_udn_st[ssdp.ATTR_UPNP_DEVICE_TYPE] == "Paulus"
assert discovery_info_by_udn_st[ssdp.ATTR_SSDP_UDN] == ANY
assert discovery_info_by_udn_st["_timestamp"] == ANY
# End of compatibility checks
assert (
await ssdp.async_get_discovery_info_by_udn_st(hass, "wrong", "mock-st") is None

View File

@ -237,7 +237,9 @@ async def test_abort_discovered_multiple(hass, flow_handler, local_impl):
)
result = await hass.config_entries.flow.async_init(
TEST_DOMAIN, context={"source": config_entries.SOURCE_SSDP}
TEST_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=data_entry_flow.BaseServiceInfo(),
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
@ -267,7 +269,9 @@ async def test_abort_discovered_existing_entries(hass, flow_handler, local_impl)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
TEST_DOMAIN, context={"source": config_entries.SOURCE_SSDP}
TEST_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=data_entry_flow.BaseServiceInfo(),
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT

View File

@ -2351,7 +2351,7 @@ async def test_async_setup_update_entry(hass):
"discovery_source",
(
(config_entries.SOURCE_DISCOVERY, {}),
(config_entries.SOURCE_SSDP, {}),
(config_entries.SOURCE_SSDP, BaseServiceInfo()),
(config_entries.SOURCE_USB, BaseServiceInfo()),
(config_entries.SOURCE_HOMEKIT, BaseServiceInfo()),
(config_entries.SOURCE_DHCP, BaseServiceInfo()),