From b19bf9b147f4321e89d1f7f01e68337f2102f460 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Tue, 22 Feb 2022 10:14:08 +1100 Subject: [PATCH] Add dlna_dms integration to support DLNA Digital Media Servers (#66437) --- CODEOWNERS | 2 + homeassistant/components/dlna_dms/__init__.py | 28 + .../components/dlna_dms/config_flow.py | 177 ++++ homeassistant/components/dlna_dms/const.py | 78 ++ homeassistant/components/dlna_dms/dms.py | 726 +++++++++++++++ .../components/dlna_dms/manifest.json | 29 + .../components/dlna_dms/media_source.py | 126 +++ .../components/dlna_dms/strings.json | 24 + .../components/dlna_dms/translations/en.json | 24 + homeassistant/generated/config_flows.py | 1 + homeassistant/generated/ssdp.py | 18 + requirements_all.txt | 1 + requirements_test_all.txt | 1 + tests/components/dlna_dms/__init__.py | 1 + tests/components/dlna_dms/conftest.py | 131 +++ tests/components/dlna_dms/test_config_flow.py | 346 +++++++ .../dlna_dms/test_device_availability.py | 705 ++++++++++++++ .../dlna_dms/test_dms_device_source.py | 878 ++++++++++++++++++ tests/components/dlna_dms/test_init.py | 59 ++ .../components/dlna_dms/test_media_source.py | 255 +++++ 20 files changed, 3610 insertions(+) create mode 100644 homeassistant/components/dlna_dms/__init__.py create mode 100644 homeassistant/components/dlna_dms/config_flow.py create mode 100644 homeassistant/components/dlna_dms/const.py create mode 100644 homeassistant/components/dlna_dms/dms.py create mode 100644 homeassistant/components/dlna_dms/manifest.json create mode 100644 homeassistant/components/dlna_dms/media_source.py create mode 100644 homeassistant/components/dlna_dms/strings.json create mode 100644 homeassistant/components/dlna_dms/translations/en.json create mode 100644 tests/components/dlna_dms/__init__.py create mode 100644 tests/components/dlna_dms/conftest.py create mode 100644 tests/components/dlna_dms/test_config_flow.py create mode 100644 tests/components/dlna_dms/test_device_availability.py create mode 100644 tests/components/dlna_dms/test_dms_device_source.py create mode 100644 tests/components/dlna_dms/test_init.py create mode 100644 tests/components/dlna_dms/test_media_source.py diff --git a/CODEOWNERS b/CODEOWNERS index a8d24fa03a3..075c8abbc65 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -214,6 +214,8 @@ homeassistant/components/digital_ocean/* @fabaff homeassistant/components/discogs/* @thibmaek homeassistant/components/dlna_dmr/* @StevenLooman @chishm tests/components/dlna_dmr/* @StevenLooman @chishm +homeassistant/components/dlna_dms/* @chishm +tests/components/dlna_dms/* @chishm homeassistant/components/dnsip/* @gjohansson-ST tests/components/dnsip/* @gjohansson-ST homeassistant/components/doorbird/* @oblogic7 @bdraco @flacjacket diff --git a/homeassistant/components/dlna_dms/__init__.py b/homeassistant/components/dlna_dms/__init__.py new file mode 100644 index 00000000000..b09547e07c8 --- /dev/null +++ b/homeassistant/components/dlna_dms/__init__.py @@ -0,0 +1,28 @@ +"""The DLNA Digital Media Server integration. + +A single config entry is used, with SSDP discovery for media servers. Each +server is wrapped in a DmsEntity, and the server's USN is used as the unique_id. +""" +from __future__ import annotations + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant + +from .const import LOGGER +from .dms import get_domain_data + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up DLNA DMS device from a config entry.""" + LOGGER.debug("Setting up config entry: %s", entry.unique_id) + + # Forward setup to this domain's data manager + return await get_domain_data(hass).async_setup_entry(entry) + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a config entry.""" + LOGGER.debug("Unloading config entry: %s", entry.unique_id) + + # Forward unload to this domain's data manager + return await get_domain_data(hass).async_unload_entry(entry) diff --git a/homeassistant/components/dlna_dms/config_flow.py b/homeassistant/components/dlna_dms/config_flow.py new file mode 100644 index 00000000000..7ae3a104fc1 --- /dev/null +++ b/homeassistant/components/dlna_dms/config_flow.py @@ -0,0 +1,177 @@ +"""Config flow for DLNA DMS.""" +from __future__ import annotations + +import logging +from pprint import pformat +from typing import Any, cast +from urllib.parse import urlparse + +from async_upnp_client.profiles.dlna import DmsDevice +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant.components import ssdp +from homeassistant.const import CONF_DEVICE_ID, CONF_HOST, CONF_URL +from homeassistant.data_entry_flow import AbortFlow, FlowResult +from homeassistant.exceptions import IntegrationError + +from .const import DEFAULT_NAME, DOMAIN + +LOGGER = logging.getLogger(__name__) + + +class ConnectError(IntegrationError): + """Error occurred when trying to connect to a device.""" + + +class DlnaDmsFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a DLNA DMS config flow. + + The Unique Service Name (USN) of the DMS device is used as the unique_id for + config entries and for entities. This USN may differ from the root USN if + the DMS is an embedded device. + """ + + VERSION = 1 + + def __init__(self) -> None: + """Initialize flow.""" + self._discoveries: dict[str, ssdp.SsdpServiceInfo] = {} + self._location: str | None = None + self._usn: str | None = None + self._name: str | None = None + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle a flow initialized by the user by listing unconfigured devices.""" + LOGGER.debug("async_step_user: user_input: %s", user_input) + + if user_input is not None and (host := user_input.get(CONF_HOST)): + # User has chosen a device + discovery = self._discoveries[host] + await self._async_parse_discovery(discovery) + return self._create_entry() + + if not (discoveries := await self._async_get_discoveries()): + # Nothing found, abort configuration + return self.async_abort(reason="no_devices_found") + + self._discoveries = { + cast(str, urlparse(discovery.ssdp_location).hostname): discovery + for discovery in discoveries + } + + discovery_choices = { + host: f"{discovery.upnp.get(ssdp.ATTR_UPNP_FRIENDLY_NAME)} ({host})" + for host, discovery in self._discoveries.items() + } + data_schema = vol.Schema({vol.Optional(CONF_HOST): vol.In(discovery_choices)}) + return self.async_show_form(step_id="user", data_schema=data_schema) + + async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult: + """Handle a flow initialized by SSDP discovery.""" + LOGGER.debug("async_step_ssdp: discovery_info %s", pformat(discovery_info)) + + await self._async_parse_discovery(discovery_info) + + # Abort if the device doesn't support all services required for a DmsDevice. + # Use the discovery_info instead of DmsDevice.is_profile_device to avoid + # contacting the device again. + discovery_service_list = discovery_info.upnp.get(ssdp.ATTR_UPNP_SERVICE_LIST) + if not discovery_service_list: + return self.async_abort(reason="not_dms") + discovery_service_ids = { + service.get("serviceId") + for service in discovery_service_list.get("service") or [] + } + if not DmsDevice.SERVICE_IDS.issubset(discovery_service_ids): + return self.async_abort(reason="not_dms") + + # Abort if another config entry has the same location, in case the + # device doesn't have a static and unique UDN (breaking the UPnP spec). + self._async_abort_entries_match({CONF_URL: self._location}) + + self.context["title_placeholders"] = {"name": self._name} + + return await self.async_step_confirm() + + async def async_step_confirm( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Allow the user to confirm adding the device.""" + LOGGER.debug("async_step_confirm: %s", user_input) + + if user_input is not None: + return self._create_entry() + + self._set_confirm_only() + return self.async_show_form(step_id="confirm") + + def _create_entry(self) -> FlowResult: + """Create a config entry, assuming all required information is now known.""" + LOGGER.debug( + "_async_create_entry: location: %s, USN: %s", self._location, self._usn + ) + assert self._name + assert self._location + assert self._usn + + data = {CONF_URL: self._location, CONF_DEVICE_ID: self._usn} + return self.async_create_entry(title=self._name, data=data) + + async def _async_parse_discovery( + self, discovery_info: ssdp.SsdpServiceInfo + ) -> None: + """Get required details from an SSDP discovery. + + Aborts if a device matching the SSDP USN has already been configured. + """ + LOGGER.debug( + "_async_parse_discovery: location: %s, USN: %s", + discovery_info.ssdp_location, + discovery_info.ssdp_usn, + ) + + if not discovery_info.ssdp_location or not discovery_info.ssdp_usn: + raise AbortFlow("bad_ssdp") + + if not self._location: + self._location = discovery_info.ssdp_location + + self._usn = discovery_info.ssdp_usn + await self.async_set_unique_id(self._usn) + + # Abort if already configured, but update the last-known location + self._abort_if_unique_id_configured( + updates={CONF_URL: self._location}, reload_on_update=False + ) + + self._name = ( + discovery_info.upnp.get(ssdp.ATTR_UPNP_FRIENDLY_NAME) + or urlparse(self._location).hostname + or DEFAULT_NAME + ) + + async def _async_get_discoveries(self) -> list[ssdp.SsdpServiceInfo]: + """Get list of unconfigured DLNA devices discovered by SSDP.""" + LOGGER.debug("_get_discoveries") + + # Get all compatible devices from ssdp's cache + discoveries: list[ssdp.SsdpServiceInfo] = [] + for udn_st in DmsDevice.DEVICE_TYPES: + st_discoveries = await ssdp.async_get_discovery_info_by_st( + self.hass, udn_st + ) + discoveries.extend(st_discoveries) + + # Filter out devices already configured + current_unique_ids = { + entry.unique_id + for entry in self._async_current_entries(include_ignore=False) + } + discoveries = [ + disc for disc in discoveries if disc.ssdp_udn not in current_unique_ids + ] + + return discoveries diff --git a/homeassistant/components/dlna_dms/const.py b/homeassistant/components/dlna_dms/const.py new file mode 100644 index 00000000000..8c260272d5f --- /dev/null +++ b/homeassistant/components/dlna_dms/const.py @@ -0,0 +1,78 @@ +"""Constants for the DLNA MediaServer integration.""" +from __future__ import annotations + +from collections.abc import Mapping +import logging +from typing import Final + +from homeassistant.components.media_player import const as _mp_const + +LOGGER = logging.getLogger(__package__) + +DOMAIN: Final = "dlna_dms" +DEFAULT_NAME: Final = "DLNA Media Server" + +SOURCE_SEP: Final = "/" +ROOT_OBJECT_ID: Final = "0" +PATH_SEP: Final = "/" +PATH_SEARCH_FLAG: Final = "?" +PATH_OBJECT_ID_FLAG: Final = ":" +# Only request the metadata needed to build a browse response +DLNA_BROWSE_FILTER: Final = [ + "id", + "upnp:class", + "dc:title", + "res", + "@childCount", + "upnp:albumArtURI", +] +# Get all metadata when resolving, for the use of media_players +DLNA_RESOLVE_FILTER: Final = "*" +# Metadata needed to resolve a path +DLNA_PATH_FILTER: Final = ["id", "upnp:class", "dc:title"] +DLNA_SORT_CRITERIA: Final = ["+upnp:class", "+upnp:originalTrackNumber", "+dc:title"] + +PROTOCOL_HTTP: Final = "http-get" +PROTOCOL_RTSP: Final = "rtsp-rtp-udp" +PROTOCOL_ANY: Final = "*" +STREAMABLE_PROTOCOLS: Final = [PROTOCOL_HTTP, PROTOCOL_RTSP, PROTOCOL_ANY] + +# Map UPnP object class to media_player media class +MEDIA_CLASS_MAP: Mapping[str, str] = { + "object": _mp_const.MEDIA_CLASS_URL, + "object.item": _mp_const.MEDIA_CLASS_URL, + "object.item.imageItem": _mp_const.MEDIA_CLASS_IMAGE, + "object.item.imageItem.photo": _mp_const.MEDIA_CLASS_IMAGE, + "object.item.audioItem": _mp_const.MEDIA_CLASS_MUSIC, + "object.item.audioItem.musicTrack": _mp_const.MEDIA_CLASS_MUSIC, + "object.item.audioItem.audioBroadcast": _mp_const.MEDIA_CLASS_MUSIC, + "object.item.audioItem.audioBook": _mp_const.MEDIA_CLASS_PODCAST, + "object.item.videoItem": _mp_const.MEDIA_CLASS_VIDEO, + "object.item.videoItem.movie": _mp_const.MEDIA_CLASS_MOVIE, + "object.item.videoItem.videoBroadcast": _mp_const.MEDIA_CLASS_TV_SHOW, + "object.item.videoItem.musicVideoClip": _mp_const.MEDIA_CLASS_VIDEO, + "object.item.playlistItem": _mp_const.MEDIA_CLASS_TRACK, + "object.item.textItem": _mp_const.MEDIA_CLASS_URL, + "object.item.bookmarkItem": _mp_const.MEDIA_CLASS_URL, + "object.item.epgItem": _mp_const.MEDIA_CLASS_EPISODE, + "object.item.epgItem.audioProgram": _mp_const.MEDIA_CLASS_MUSIC, + "object.item.epgItem.videoProgram": _mp_const.MEDIA_CLASS_VIDEO, + "object.container": _mp_const.MEDIA_CLASS_DIRECTORY, + "object.container.person": _mp_const.MEDIA_CLASS_ARTIST, + "object.container.person.musicArtist": _mp_const.MEDIA_CLASS_ARTIST, + "object.container.playlistContainer": _mp_const.MEDIA_CLASS_PLAYLIST, + "object.container.album": _mp_const.MEDIA_CLASS_ALBUM, + "object.container.album.musicAlbum": _mp_const.MEDIA_CLASS_ALBUM, + "object.container.album.photoAlbum": _mp_const.MEDIA_CLASS_ALBUM, + "object.container.genre": _mp_const.MEDIA_CLASS_GENRE, + "object.container.genre.musicGenre": _mp_const.MEDIA_CLASS_GENRE, + "object.container.genre.movieGenre": _mp_const.MEDIA_CLASS_GENRE, + "object.container.channelGroup": _mp_const.MEDIA_CLASS_CHANNEL, + "object.container.channelGroup.audioChannelGroup": _mp_const.MEDIA_TYPE_CHANNELS, + "object.container.channelGroup.videoChannelGroup": _mp_const.MEDIA_TYPE_CHANNELS, + "object.container.epgContainer": _mp_const.MEDIA_CLASS_DIRECTORY, + "object.container.storageSystem": _mp_const.MEDIA_CLASS_DIRECTORY, + "object.container.storageVolume": _mp_const.MEDIA_CLASS_DIRECTORY, + "object.container.storageFolder": _mp_const.MEDIA_CLASS_DIRECTORY, + "object.container.bookmarkFolder": _mp_const.MEDIA_CLASS_DIRECTORY, +} diff --git a/homeassistant/components/dlna_dms/dms.py b/homeassistant/components/dlna_dms/dms.py new file mode 100644 index 00000000000..d3a65448f84 --- /dev/null +++ b/homeassistant/components/dlna_dms/dms.py @@ -0,0 +1,726 @@ +"""Wrapper for media_source around async_upnp_client's DmsDevice .""" +from __future__ import annotations + +import asyncio +from collections.abc import Callable, Coroutine +from dataclasses import dataclass +import functools +from typing import Any, TypeVar, cast + +from async_upnp_client import UpnpEventHandler, UpnpFactory, UpnpRequester +from async_upnp_client.aiohttp import AiohttpSessionRequester +from async_upnp_client.const import NotificationSubType +from async_upnp_client.exceptions import UpnpActionError, UpnpConnectionError, UpnpError +from async_upnp_client.profiles.dlna import ContentDirectoryErrorCode, DmsDevice +from didl_lite import didl_lite + +from homeassistant.backports.enum import StrEnum +from homeassistant.components import ssdp +from homeassistant.components.media_player.const import MEDIA_CLASS_DIRECTORY +from homeassistant.components.media_player.errors import BrowseError +from homeassistant.components.media_source.error import Unresolvable +from homeassistant.components.media_source.models import BrowseMediaSource, PlayMedia +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_DEVICE_ID, CONF_URL +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers import aiohttp_client +from homeassistant.util import slugify + +from .const import ( + DLNA_BROWSE_FILTER, + DLNA_PATH_FILTER, + DLNA_RESOLVE_FILTER, + DLNA_SORT_CRITERIA, + DOMAIN, + LOGGER, + MEDIA_CLASS_MAP, + PATH_OBJECT_ID_FLAG, + PATH_SEARCH_FLAG, + PATH_SEP, + ROOT_OBJECT_ID, + STREAMABLE_PROTOCOLS, +) + +_DlnaDmsDeviceMethod = TypeVar("_DlnaDmsDeviceMethod", bound="DmsDeviceSource") +_RetType = TypeVar("_RetType") + + +class DlnaDmsData: + """Storage class for domain global data.""" + + hass: HomeAssistant + lock: asyncio.Lock + requester: UpnpRequester + upnp_factory: UpnpFactory + event_handler: UpnpEventHandler + devices: dict[str, DmsDeviceSource] # Indexed by config_entry.unique_id + sources: dict[str, DmsDeviceSource] # Indexed by source_id + + def __init__( + self, + hass: HomeAssistant, + ) -> None: + """Initialize global data.""" + self.hass = hass + self.lock = asyncio.Lock() + session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False) + self.requester = AiohttpSessionRequester(session, with_sleep=True) + self.upnp_factory = UpnpFactory(self.requester, non_strict=True) + # NOTE: event_handler is not actually used, and is only created to + # satisfy the DmsDevice.__init__ signature + self.event_handler = UpnpEventHandler("", self.requester) + self.devices = {} + self.sources = {} + + async def async_setup_entry(self, config_entry: ConfigEntry) -> bool: + """Create a DMS device connection from a config entry.""" + assert config_entry.unique_id + async with self.lock: + source_id = self._generate_source_id(config_entry.title) + device = DmsDeviceSource(self.hass, config_entry, source_id) + self.devices[config_entry.unique_id] = device + self.sources[device.source_id] = device + + # Update the device when the associated config entry is modified + config_entry.async_on_unload( + config_entry.add_update_listener(self.async_update_entry) + ) + + await device.async_added_to_hass() + return True + + async def async_unload_entry(self, config_entry: ConfigEntry) -> bool: + """Unload a config entry and disconnect the corresponding DMS device.""" + assert config_entry.unique_id + async with self.lock: + device = self.devices.pop(config_entry.unique_id) + del self.sources[device.source_id] + await device.async_will_remove_from_hass() + return True + + async def async_update_entry( + self, hass: HomeAssistant, config_entry: ConfigEntry + ) -> None: + """Update a DMS device when the config entry changes.""" + assert config_entry.unique_id + async with self.lock: + device = self.devices[config_entry.unique_id] + # Update the source_id to match the new name + del self.sources[device.source_id] + device.source_id = self._generate_source_id(config_entry.title) + self.sources[device.source_id] = device + + def _generate_source_id(self, name: str) -> str: + """Generate a unique source ID. + + Caller should hold self.lock when calling this method. + """ + source_id_base = slugify(name) + if source_id_base not in self.sources: + return source_id_base + + tries = 1 + while (suggested_source_id := f"{source_id_base}_{tries}") in self.sources: + tries += 1 + + return suggested_source_id + + +@callback +def get_domain_data(hass: HomeAssistant) -> DlnaDmsData: + """Obtain this integration's domain data, creating it if needed.""" + if DOMAIN in hass.data: + return cast(DlnaDmsData, hass.data[DOMAIN]) + + data = DlnaDmsData(hass) + hass.data[DOMAIN] = data + return data + + +@dataclass +class DidlPlayMedia(PlayMedia): + """Playable media with DIDL metadata.""" + + didl_metadata: didl_lite.DidlObject + + +class DlnaDmsDeviceError(BrowseError, Unresolvable): + """Base for errors raised by DmsDeviceSource. + + Caught by both media_player (BrowseError) and media_source (Unresolvable), + so DmsDeviceSource methods can be used for both browse and resolve + functionality. + """ + + +class DeviceConnectionError(DlnaDmsDeviceError): + """Error occurred with the connection to the server.""" + + +class ActionError(DlnaDmsDeviceError): + """Error when calling a UPnP Action on the device.""" + + +def catch_request_errors( + func: Callable[[_DlnaDmsDeviceMethod, str], Coroutine[Any, Any, _RetType]] +) -> Callable[[_DlnaDmsDeviceMethod, str], Coroutine[Any, Any, _RetType]]: + """Catch UpnpError errors.""" + + @functools.wraps(func) + async def wrapper(self: _DlnaDmsDeviceMethod, req_param: str) -> _RetType: + """Catch UpnpError errors and check availability before and after request.""" + if not self.available: + LOGGER.warning("Device disappeared when trying to call %s", func.__name__) + raise DeviceConnectionError("DMS is not connected") + + try: + return await func(self, req_param) + except UpnpActionError as err: + LOGGER.debug("Server failure", exc_info=err) + if err.error_code == ContentDirectoryErrorCode.NO_SUCH_OBJECT: + LOGGER.debug("No such object: %s", req_param) + raise ActionError(f"No such object: {req_param}") from err + if err.error_code == ContentDirectoryErrorCode.INVALID_SEARCH_CRITERIA: + LOGGER.debug("Invalid query: %s", req_param) + raise ActionError(f"Invalid query: {req_param}") from err + raise DeviceConnectionError(f"Server failure: {err!r}") from err + except UpnpConnectionError as err: + LOGGER.debug("Server disconnected", exc_info=err) + await self.device_disconnect() + raise DeviceConnectionError(f"Server disconnected: {err!r}") from err + except UpnpError as err: + LOGGER.debug("Server communication failure", exc_info=err) + raise DeviceConnectionError( + f"Server communication failure: {err!r}" + ) from err + + return wrapper + + +class DmsDeviceSource: + """DMS Device wrapper, providing media files as a media_source.""" + + hass: HomeAssistant + config_entry: ConfigEntry + + # Unique slug used for media-source URIs + source_id: str + + # Last known URL for the device, used when adding this wrapper to hass to + # try to connect before SSDP has rediscovered it, or when SSDP discovery + # fails. + location: str | None + + _device_lock: asyncio.Lock # Held when connecting or disconnecting the device + _device: DmsDevice | None = None + + # Only try to connect once when an ssdp:alive advertisement is received + _ssdp_connect_failed: bool = False + + # Track BOOTID in SSDP advertisements for device changes + _bootid: int | None = None + + def __init__( + self, hass: HomeAssistant, config_entry: ConfigEntry, source_id: str + ) -> None: + """Initialize a DMS Source.""" + self.hass = hass + self.config_entry = config_entry + self.source_id = source_id + self.location = self.config_entry.data[CONF_URL] + self._device_lock = asyncio.Lock() + + # Callbacks and events + + async def async_added_to_hass(self) -> None: + """Handle addition of this source.""" + + # Try to connect to the last known location, but don't worry if not available + if not self._device and self.location: + try: + await self.device_connect() + except UpnpError as err: + LOGGER.debug("Couldn't connect immediately: %r", err) + + # Get SSDP notifications for only this device + self.config_entry.async_on_unload( + await ssdp.async_register_callback( + self.hass, self.async_ssdp_callback, {"USN": self.usn} + ) + ) + + # async_upnp_client.SsdpListener only reports byebye once for each *UDN* + # (device name) which often is not the USN (service within the device) + # that we're interested in. So also listen for byebye advertisements for + # the UDN, which is reported in the _udn field of the combined_headers. + self.config_entry.async_on_unload( + await ssdp.async_register_callback( + self.hass, + self.async_ssdp_callback, + {"_udn": self.udn, "NTS": NotificationSubType.SSDP_BYEBYE}, + ) + ) + + async def async_will_remove_from_hass(self) -> None: + """Handle removal of this source.""" + await self.device_disconnect() + + async def async_ssdp_callback( + self, info: ssdp.SsdpServiceInfo, change: ssdp.SsdpChange + ) -> None: + """Handle notification from SSDP of device state change.""" + LOGGER.debug( + "SSDP %s notification of device %s at %s", + change, + info.ssdp_usn, + info.ssdp_location, + ) + + try: + bootid_str = info.ssdp_headers[ssdp.ATTR_SSDP_BOOTID] + bootid: int | None = int(bootid_str, 10) + except (KeyError, ValueError): + bootid = None + + if change == ssdp.SsdpChange.UPDATE: + # This is an announcement that bootid is about to change + if self._bootid is not None and self._bootid == bootid: + # Store the new value (because our old value matches) so that we + # can ignore subsequent ssdp:alive messages + try: + next_bootid_str = info.ssdp_headers[ssdp.ATTR_SSDP_NEXTBOOTID] + self._bootid = int(next_bootid_str, 10) + except (KeyError, ValueError): + pass + # Nothing left to do until ssdp:alive comes through + return + + if self._bootid is not None and self._bootid != bootid: + # Device has rebooted + # Maybe connection will succeed now + self._ssdp_connect_failed = False + if self._device: + # Drop existing connection and maybe reconnect + await self.device_disconnect() + self._bootid = bootid + + if change == ssdp.SsdpChange.BYEBYE: + # Device is going away + if self._device: + # Disconnect from gone device + await self.device_disconnect() + # Maybe the next alive message will result in a successful connection + self._ssdp_connect_failed = False + + if ( + change == ssdp.SsdpChange.ALIVE + and not self._device + and not self._ssdp_connect_failed + ): + assert info.ssdp_location + self.location = info.ssdp_location + try: + await self.device_connect() + except UpnpError as err: + self._ssdp_connect_failed = True + LOGGER.warning( + "Failed connecting to recently alive device at %s: %r", + self.location, + err, + ) + + # Device connection/disconnection + + async def device_connect(self) -> None: + """Connect to the device now that it's available.""" + LOGGER.debug("Connecting to device at %s", self.location) + + async with self._device_lock: + if self._device: + LOGGER.debug("Trying to connect when device already connected") + return + + if not self.location: + LOGGER.debug("Not connecting because location is not known") + return + + domain_data = get_domain_data(self.hass) + + # Connect to the base UPNP device + upnp_device = await domain_data.upnp_factory.async_create_device( + self.location + ) + + # Create profile wrapper + self._device = DmsDevice(upnp_device, domain_data.event_handler) + + # Update state variables. We don't care if they change, so this is + # only done once, here. + await self._device.async_update() + + async def device_disconnect(self) -> None: + """Destroy connections to the device now that it's not available. + + Also call when removing this device wrapper from hass to clean up connections. + """ + async with self._device_lock: + if not self._device: + LOGGER.debug("Disconnecting from device that's not connected") + return + + LOGGER.debug("Disconnecting from %s", self._device.name) + + self._device = None + + # Device properties + + @property + def available(self) -> bool: + """Device is available when we have a connection to it.""" + return self._device is not None and self._device.profile_device.available + + @property + def usn(self) -> str: + """Get the USN (Unique Service Name) for the wrapped UPnP device end-point.""" + return self.config_entry.data[CONF_DEVICE_ID] + + @property + def udn(self) -> str: + """Get the UDN (Unique Device Name) based on the USN.""" + return self.usn.partition("::")[0] + + @property + def name(self) -> str: + """Return a name for the media server.""" + return self.config_entry.title + + @property + def icon(self) -> str | None: + """Return an URL to an icon for the media server.""" + if not self._device: + return None + + return self._device.icon + + # MediaSource methods + + async def async_resolve_media(self, identifier: str) -> DidlPlayMedia: + """Resolve a media item to a playable item.""" + LOGGER.debug("async_resolve_media(%s)", identifier) + action, parameters = _parse_identifier(identifier) + + if action is Action.OBJECT: + return await self.async_resolve_object(parameters) + + if action is Action.PATH: + object_id = await self.async_resolve_path(parameters) + return await self.async_resolve_object(object_id) + + if action is Action.SEARCH: + return await self.async_resolve_search(parameters) + + LOGGER.debug("Invalid identifier %s", identifier) + raise Unresolvable(f"Invalid identifier {identifier}") + + async def async_browse_media(self, identifier: str | None) -> BrowseMediaSource: + """Browse media.""" + LOGGER.debug("async_browse_media(%s)", identifier) + action, parameters = _parse_identifier(identifier) + + if action is Action.OBJECT: + return await self.async_browse_object(parameters) + + if action is Action.PATH: + object_id = await self.async_resolve_path(parameters) + return await self.async_browse_object(object_id) + + if action is Action.SEARCH: + return await self.async_browse_search(parameters) + + return await self.async_browse_object(ROOT_OBJECT_ID) + + # DMS methods + + @catch_request_errors + async def async_resolve_object(self, object_id: str) -> DidlPlayMedia: + """Return a playable media item specified by ObjectID.""" + assert self._device + + item = await self._device.async_browse_metadata( + object_id, metadata_filter=DLNA_RESOLVE_FILTER + ) + + # Use the first playable resource + return self._didl_to_play_media(item) + + @catch_request_errors + async def async_resolve_path(self, path: str) -> str: + """Return an Object ID resolved from a path string.""" + assert self._device + + # Iterate through the path, searching for a matching title within the + # DLNA object hierarchy. + object_id = ROOT_OBJECT_ID + for node in path.split(PATH_SEP): + if not node: + # Skip empty names, for when multiple slashes are involved, e.g // + continue + + criteria = ( + f'@parentID="{_esc_quote(object_id)}" and dc:title="{_esc_quote(node)}"' + ) + try: + result = await self._device.async_search_directory( + object_id, + search_criteria=criteria, + metadata_filter=DLNA_PATH_FILTER, + requested_count=1, + ) + except UpnpActionError as err: + LOGGER.debug("Error in call to async_search_directory: %r", err) + if err.error_code == ContentDirectoryErrorCode.NO_SUCH_CONTAINER: + raise Unresolvable(f"No such container: {object_id}") from err + # Search failed, but can still try browsing children + else: + if result.total_matches > 1: + raise Unresolvable(f"Too many items found for {node} in {path}") + + if result.result: + object_id = result.result[0].id + continue + + # Nothing was found via search, fall back to iterating children + result = await self._device.async_browse_direct_children( + object_id, metadata_filter=DLNA_PATH_FILTER + ) + + if result.total_matches == 0 or not result.result: + raise Unresolvable(f"No contents for {node} in {path}") + + node_lower = node.lower() + for child in result.result: + if child.title.lower() == node_lower: + object_id = child.id + break + else: + # Examining all direct children failed too + raise Unresolvable(f"Nothing found for {node} in {path}") + return object_id + + @catch_request_errors + async def async_resolve_search(self, query: str) -> DidlPlayMedia: + """Return first playable media item found by the query string.""" + assert self._device + + result = await self._device.async_search_directory( + container_id=ROOT_OBJECT_ID, + search_criteria=query, + metadata_filter=DLNA_RESOLVE_FILTER, + requested_count=1, + ) + + if result.total_matches == 0 or not result.result: + raise Unresolvable(f"Nothing found for {query}") + + # Use the first result, even if it doesn't have a playable resource + item = result.result[0] + + if not isinstance(item, didl_lite.DidlObject): + raise Unresolvable(f"{item} is not a DidlObject") + + return self._didl_to_play_media(item) + + @catch_request_errors + async def async_browse_object(self, object_id: str) -> BrowseMediaSource: + """Return the contents of a DLNA container by ObjectID.""" + assert self._device + + base_object = await self._device.async_browse_metadata( + object_id, metadata_filter=DLNA_BROWSE_FILTER + ) + + children = await self._device.async_browse_direct_children( + object_id, + metadata_filter=DLNA_BROWSE_FILTER, + sort_criteria=DLNA_SORT_CRITERIA, + ) + + return self._didl_to_media_source(base_object, children) + + @catch_request_errors + async def async_browse_search(self, query: str) -> BrowseMediaSource: + """Return all media items found by the query string.""" + assert self._device + + result = await self._device.async_search_directory( + container_id=ROOT_OBJECT_ID, + search_criteria=query, + metadata_filter=DLNA_BROWSE_FILTER, + ) + + children = [ + self._didl_to_media_source(child) + for child in result.result + if isinstance(child, didl_lite.DidlObject) + ] + + media_source = BrowseMediaSource( + domain=DOMAIN, + identifier=self._make_identifier(Action.SEARCH, query), + media_class=MEDIA_CLASS_DIRECTORY, + media_content_type="", + title="Search results", + can_play=False, + can_expand=True, + children=children, + ) + + media_source.calculate_children_class() + + return media_source + + def _didl_to_play_media(self, item: didl_lite.DidlObject) -> DidlPlayMedia: + """Return the first playable resource from a DIDL-Lite object.""" + assert self._device + + if not item.res: + LOGGER.debug("Object %s has no resources", item.id) + raise Unresolvable("Object has no resources") + + for resource in item.res: + if not resource.uri: + continue + if mime_type := _resource_mime_type(resource): + url = self._device.get_absolute_url(resource.uri) + LOGGER.debug("Resolved to url %s MIME %s", url, mime_type) + return DidlPlayMedia(url, mime_type, item) + + LOGGER.debug("Object %s has no playable resources", item.id) + raise Unresolvable("Object has no playable resources") + + def _didl_to_media_source( + self, + item: didl_lite.DidlObject, + browsed_children: DmsDevice.BrowseResult | None = None, + ) -> BrowseMediaSource: + """Convert a DIDL-Lite object to a browse media source.""" + children: list[BrowseMediaSource] | None = None + + if browsed_children: + children = [ + self._didl_to_media_source(child) + for child in browsed_children.result + if isinstance(child, didl_lite.DidlObject) + ] + + # Can expand if it has children (even if we don't have them yet), or its + # a container type. Otherwise the front-end will try to play it (even if + # can_play is False). + try: + child_count = int(item.child_count) + except (AttributeError, TypeError, ValueError): + child_count = 0 + can_expand = ( + bool(children) or child_count > 0 or isinstance(item, didl_lite.Container) + ) + + # Can play if item has any resource that can be streamed over the network + can_play = any(_resource_is_streaming(res) for res in item.res) + + # Use server name for root object, not "root" + title = self.name if item.id == ROOT_OBJECT_ID else item.title + + mime_type = _resource_mime_type(item.res[0]) if item.res else None + media_content_type = mime_type or item.upnp_class + + media_source = BrowseMediaSource( + domain=DOMAIN, + identifier=self._make_identifier(Action.OBJECT, item.id), + media_class=MEDIA_CLASS_MAP.get(item.upnp_class, ""), + media_content_type=media_content_type, + title=title, + can_play=can_play, + can_expand=can_expand, + children=children, + thumbnail=self._didl_thumbnail_url(item), + ) + + media_source.calculate_children_class() + + return media_source + + def _didl_thumbnail_url(self, item: didl_lite.DidlObject) -> str | None: + """Return absolute URL of a thumbnail for a DIDL-Lite object. + + Some objects have the thumbnail in albumArtURI, others in an image + resource. + """ + assert self._device + + # Based on DmrDevice.media_image_url from async_upnp_client. + if album_art_uri := getattr(item, "album_art_uri", None): + return self._device.get_absolute_url(album_art_uri) + + for resource in item.res: + if not resource.protocol_info or not resource.uri: + continue + if resource.protocol_info.startswith("http-get:*:image/"): + return self._device.get_absolute_url(resource.uri) + + return None + + def _make_identifier(self, action: Action, object_id: str) -> str: + """Make an identifier for BrowseMediaSource.""" + return f"{self.source_id}/{action}{object_id}" + + +class Action(StrEnum): + """Actions that can be specified in a DMS media-source identifier.""" + + OBJECT = PATH_OBJECT_ID_FLAG + PATH = PATH_SEP + SEARCH = PATH_SEARCH_FLAG + + +def _parse_identifier(identifier: str | None) -> tuple[Action | None, str]: + """Parse the media identifier component of a media-source URI.""" + if not identifier: + return None, "" + if identifier.startswith(PATH_OBJECT_ID_FLAG): + return Action.OBJECT, identifier[1:] + if identifier.startswith(PATH_SEP): + return Action.PATH, identifier[1:] + if identifier.startswith(PATH_SEARCH_FLAG): + return Action.SEARCH, identifier[1:] + return Action.PATH, identifier + + +def _resource_is_streaming(resource: didl_lite.Resource) -> bool: + """Determine if a resource can be streamed across a network.""" + # Err on the side of "True" if the protocol info is not available + if not resource.protocol_info: + return True + protocol = resource.protocol_info.split(":")[0].lower() + return protocol.lower() in STREAMABLE_PROTOCOLS + + +def _resource_mime_type(resource: didl_lite.Resource) -> str | None: + """Return the MIME type of a resource, if specified.""" + # This is the contentFormat portion of the ProtocolInfo for an http-get stream + if not resource.protocol_info: + return None + try: + protocol, _, content_format, _ = resource.protocol_info.split(":", 3) + except ValueError: + return None + if protocol.lower() in STREAMABLE_PROTOCOLS: + return content_format + return None + + +def _esc_quote(contents: str) -> str: + """Escape string contents for DLNA search quoted values. + + See ContentDirectory:v4, section 4.1.2. + """ + return contents.replace("\\", "\\\\").replace('"', '\\"') diff --git a/homeassistant/components/dlna_dms/manifest.json b/homeassistant/components/dlna_dms/manifest.json new file mode 100644 index 00000000000..feee4b6e903 --- /dev/null +++ b/homeassistant/components/dlna_dms/manifest.json @@ -0,0 +1,29 @@ +{ + "domain": "dlna_dms", + "name": "DLNA Digital Media Server", + "config_flow": true, + "documentation": "https://www.home-assistant.io/integrations/dlna_dms", + "requirements": ["async-upnp-client==0.23.5"], + "dependencies": ["media_source", "ssdp"], + "ssdp": [ + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:1", + "st": "urn:schemas-upnp-org:device:MediaServer:1" + }, + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:2", + "st": "urn:schemas-upnp-org:device:MediaServer:2" + }, + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:3", + "st": "urn:schemas-upnp-org:device:MediaServer:3" + }, + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:4", + "st": "urn:schemas-upnp-org:device:MediaServer:4" + } + ], + "codeowners": ["@chishm"], + "iot_class": "local_polling", + "quality_scale": "platinum" +} \ No newline at end of file diff --git a/homeassistant/components/dlna_dms/media_source.py b/homeassistant/components/dlna_dms/media_source.py new file mode 100644 index 00000000000..84910b7ff67 --- /dev/null +++ b/homeassistant/components/dlna_dms/media_source.py @@ -0,0 +1,126 @@ +"""Implementation of DLNA DMS as a media source. + +URIs look like "media-source://dlna_dms//" + +Media identifiers can look like: +* `/path/to/file`: slash-separated path through the Content Directory +* `:ObjectID`: colon followed by a server-assigned ID for an object +* `?query`: question mark followed by a query string to search for, + see [DLNA ContentDirectory SearchCriteria](http://www.upnp.org/specs/av/UPnP-av-ContentDirectory-v1-Service.pdf) + for the syntax. +""" + +from __future__ import annotations + +from homeassistant.components.media_player.const import ( + MEDIA_CLASS_CHANNEL, + MEDIA_CLASS_DIRECTORY, + MEDIA_TYPE_CHANNEL, + MEDIA_TYPE_CHANNELS, +) +from homeassistant.components.media_player.errors import BrowseError +from homeassistant.components.media_source.error import Unresolvable +from homeassistant.components.media_source.models import ( + BrowseMediaSource, + MediaSource, + MediaSourceItem, +) +from homeassistant.core import HomeAssistant + +from .const import DOMAIN, LOGGER, PATH_OBJECT_ID_FLAG, ROOT_OBJECT_ID, SOURCE_SEP +from .dms import DidlPlayMedia, get_domain_data + + +async def async_get_media_source(hass: HomeAssistant): + """Set up DLNA DMS media source.""" + LOGGER.debug("Setting up DLNA media sources") + return DmsMediaSource(hass) + + +class DmsMediaSource(MediaSource): + """Provide DLNA Media Servers as media sources.""" + + name = "DLNA Servers" + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize DLNA source.""" + super().__init__(DOMAIN) + + self.hass = hass + + async def async_resolve_media(self, item: MediaSourceItem) -> DidlPlayMedia: + """Resolve a media item to a playable item.""" + dms_data = get_domain_data(self.hass) + if not dms_data.sources: + raise Unresolvable("No sources have been configured") + + source_id, media_id = _parse_identifier(item) + if not source_id: + raise Unresolvable(f"No source ID in {item.identifier}") + if not media_id: + raise Unresolvable(f"No media ID in {item.identifier}") + + try: + source = dms_data.sources[source_id] + except KeyError as err: + raise Unresolvable(f"Unknown source ID: {source_id}") from err + + return await source.async_resolve_media(media_id) + + async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource: + """Browse media.""" + dms_data = get_domain_data(self.hass) + if not dms_data.sources: + raise BrowseError("No sources have been configured") + + source_id, media_id = _parse_identifier(item) + LOGGER.debug("Browsing for %s / %s", source_id, media_id) + + if not source_id and len(dms_data.sources) > 1: + # Browsing the root of dlna_dms with more than one server, return + # all known servers. + base = BrowseMediaSource( + domain=DOMAIN, + identifier="", + media_class=MEDIA_CLASS_DIRECTORY, + media_content_type=MEDIA_TYPE_CHANNELS, + title=self.name, + can_play=False, + can_expand=True, + children_media_class=MEDIA_CLASS_CHANNEL, + ) + + base.children = [ + BrowseMediaSource( + domain=DOMAIN, + identifier=f"{source_id}/{PATH_OBJECT_ID_FLAG}{ROOT_OBJECT_ID}", + media_class=MEDIA_CLASS_CHANNEL, + media_content_type=MEDIA_TYPE_CHANNEL, + title=source.name, + can_play=False, + can_expand=True, + thumbnail=source.icon, + ) + for source_id, source in dms_data.sources.items() + ] + + return base + + if not source_id: + # No source specified, default to the first registered + source_id = next(iter(dms_data.sources)) + + try: + source = dms_data.sources[source_id] + except KeyError as err: + raise BrowseError(f"Unknown source ID: {source_id}") from err + + return await source.async_browse_media(media_id) + + +def _parse_identifier(item: MediaSourceItem) -> tuple[str | None, str | None]: + """Parse the source_id and media identifier from a media source item.""" + if not item.identifier: + return None, None + source_id, _, media_id = item.identifier.partition(SOURCE_SEP) + return source_id or None, media_id or None diff --git a/homeassistant/components/dlna_dms/strings.json b/homeassistant/components/dlna_dms/strings.json new file mode 100644 index 00000000000..9b59960a78a --- /dev/null +++ b/homeassistant/components/dlna_dms/strings.json @@ -0,0 +1,24 @@ +{ + "config": { + "flow_title": "{name}", + "step": { + "user": { + "title": "Discovered DLNA DMA devices", + "description": "Choose a device to configure", + "data": { + "host": "[%key:common::config_flow::data::host%]" + } + }, + "confirm": { + "description": "[%key:common::config_flow::description::confirm_setup%]" + } + }, + "abort": { + "already_configured": "[%key:common::config_flow::abort::already_configured_device%]", + "already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]", + "bad_ssdp": "SSDP data is missing a required value", + "no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]", + "not_dms": "Device is not a supported Media Server" + } + } +} diff --git a/homeassistant/components/dlna_dms/translations/en.json b/homeassistant/components/dlna_dms/translations/en.json new file mode 100644 index 00000000000..6d07a25a27d --- /dev/null +++ b/homeassistant/components/dlna_dms/translations/en.json @@ -0,0 +1,24 @@ +{ + "config": { + "abort": { + "already_configured": "Device is already configured", + "already_in_progress": "Configuration flow is already in progress", + "bad_ssdp": "SSDP data is missing a required value", + "no_devices_found": "No devices found on the network", + "not_dms": "Device is not a supported Media Server" + }, + "flow_title": "{name}", + "step": { + "confirm": { + "description": "Do you want to start set up?" + }, + "user": { + "data": { + "host": "Host" + }, + "description": "Choose a device to configure", + "title": "Discovered DLNA DMA devices" + } + } + } +} \ No newline at end of file diff --git a/homeassistant/generated/config_flows.py b/homeassistant/generated/config_flows.py index ddec2deb65e..5530c73ed50 100644 --- a/homeassistant/generated/config_flows.py +++ b/homeassistant/generated/config_flows.py @@ -72,6 +72,7 @@ FLOWS = [ "dialogflow", "directv", "dlna_dmr", + "dlna_dms", "dnsip", "doorbird", "dsmr", diff --git a/homeassistant/generated/ssdp.py b/homeassistant/generated/ssdp.py index 1a243d954b9..f0117e2a9c2 100644 --- a/homeassistant/generated/ssdp.py +++ b/homeassistant/generated/ssdp.py @@ -97,6 +97,24 @@ SSDP = { "st": "urn:schemas-upnp-org:device:MediaRenderer:3" } ], + "dlna_dms": [ + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:1", + "st": "urn:schemas-upnp-org:device:MediaServer:1" + }, + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:2", + "st": "urn:schemas-upnp-org:device:MediaServer:2" + }, + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:3", + "st": "urn:schemas-upnp-org:device:MediaServer:3" + }, + { + "deviceType": "urn:schemas-upnp-org:device:MediaServer:4", + "st": "urn:schemas-upnp-org:device:MediaServer:4" + } + ], "fritz": [ { "st": "urn:schemas-upnp-org:device:fritzbox:1" diff --git a/requirements_all.txt b/requirements_all.txt index 93c5a9072ac..511f8abb7c5 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -341,6 +341,7 @@ asmog==0.0.6 asterisk_mbox==0.5.0 # homeassistant.components.dlna_dmr +# homeassistant.components.dlna_dms # homeassistant.components.ssdp # homeassistant.components.upnp # homeassistant.components.yeelight diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 723daaca0cd..926d5e0d6ee 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -249,6 +249,7 @@ aprslib==0.7.0 arcam-fmj==0.12.0 # homeassistant.components.dlna_dmr +# homeassistant.components.dlna_dms # homeassistant.components.ssdp # homeassistant.components.upnp # homeassistant.components.yeelight diff --git a/tests/components/dlna_dms/__init__.py b/tests/components/dlna_dms/__init__.py new file mode 100644 index 00000000000..ed538810521 --- /dev/null +++ b/tests/components/dlna_dms/__init__.py @@ -0,0 +1 @@ +"""Tests for the DLNA MediaServer integration.""" diff --git a/tests/components/dlna_dms/conftest.py b/tests/components/dlna_dms/conftest.py new file mode 100644 index 00000000000..6764001be31 --- /dev/null +++ b/tests/components/dlna_dms/conftest.py @@ -0,0 +1,131 @@ +"""Fixtures for DLNA DMS tests.""" +from __future__ import annotations + +from collections.abc import AsyncGenerator, Iterable +from typing import Final +from unittest.mock import Mock, create_autospec, patch, seal + +from async_upnp_client import UpnpDevice, UpnpService +from async_upnp_client.utils import absolute_url +import pytest + +from homeassistant.components.dlna_dms.const import DOMAIN +from homeassistant.components.dlna_dms.dms import DlnaDmsData, get_domain_data +from homeassistant.const import CONF_DEVICE_ID, CONF_URL +from homeassistant.core import HomeAssistant + +from tests.common import MockConfigEntry + +MOCK_DEVICE_HOST: Final = "192.88.99.21" +MOCK_DEVICE_BASE_URL: Final = f"http://{MOCK_DEVICE_HOST}" +MOCK_DEVICE_LOCATION: Final = MOCK_DEVICE_BASE_URL + "/dms_description.xml" +MOCK_DEVICE_NAME: Final = "Test Server Device" +MOCK_DEVICE_TYPE: Final = "urn:schemas-upnp-org:device:MediaServer:1" +MOCK_DEVICE_UDN: Final = "uuid:7bf34520-f034-4fa2-8d2d-2f709d4221ef" +MOCK_DEVICE_USN: Final = f"{MOCK_DEVICE_UDN}::{MOCK_DEVICE_TYPE}" +MOCK_SOURCE_ID: Final = "test_server_device" + +LOCAL_IP: Final = "192.88.99.1" +EVENT_CALLBACK_URL: Final = "http://192.88.99.1/notify" + +NEW_DEVICE_LOCATION: Final = "http://192.88.99.7" + "/dmr_description.xml" + + +@pytest.fixture +def upnp_factory_mock() -> Iterable[Mock]: + """Mock the UpnpFactory class to construct DMS-style UPnP devices.""" + with patch( + "homeassistant.components.dlna_dms.dms.UpnpFactory", + autospec=True, + spec_set=True, + ) as upnp_factory: + upnp_device = create_autospec(UpnpDevice, instance=True) + upnp_device.name = MOCK_DEVICE_NAME + upnp_device.udn = MOCK_DEVICE_UDN + upnp_device.device_url = MOCK_DEVICE_LOCATION + upnp_device.device_type = MOCK_DEVICE_TYPE + upnp_device.available = True + upnp_device.parent_device = None + upnp_device.root_device = upnp_device + upnp_device.all_devices = [upnp_device] + upnp_device.services = { + "urn:schemas-upnp-org:service:ContentDirectory:1": create_autospec( + UpnpService, + instance=True, + service_type="urn:schemas-upnp-org:service:ContentDirectory:1", + service_id="urn:upnp-org:serviceId:ContentDirectory", + ), + "urn:schemas-upnp-org:service:ConnectionManager:1": create_autospec( + UpnpService, + instance=True, + service_type="urn:schemas-upnp-org:service:ConnectionManager:1", + service_id="urn:upnp-org:serviceId:ConnectionManager", + ), + } + seal(upnp_device) + upnp_factory_instance = upnp_factory.return_value + upnp_factory_instance.async_create_device.return_value = upnp_device + + yield upnp_factory_instance + + +@pytest.fixture +async def domain_data_mock( + hass: HomeAssistant, aioclient_mock, upnp_factory_mock +) -> AsyncGenerator[DlnaDmsData, None]: + """Mock some global data used by this component. + + This includes network clients and library object factories. Mocking it + prevents network use. + + Yields the actual domain data, for ease of access + """ + with patch( + "homeassistant.components.dlna_dms.dms.AiohttpSessionRequester", autospec=True + ): + yield get_domain_data(hass) + + +@pytest.fixture +def config_entry_mock() -> MockConfigEntry: + """Mock a config entry for this platform.""" + mock_entry = MockConfigEntry( + unique_id=MOCK_DEVICE_USN, + domain=DOMAIN, + data={ + CONF_URL: MOCK_DEVICE_LOCATION, + CONF_DEVICE_ID: MOCK_DEVICE_USN, + }, + title=MOCK_DEVICE_NAME, + ) + return mock_entry + + +@pytest.fixture +def dms_device_mock(upnp_factory_mock: Mock) -> Iterable[Mock]: + """Mock the async_upnp_client DMS device, initially connected.""" + with patch( + "homeassistant.components.dlna_dms.dms.DmsDevice", autospec=True + ) as constructor: + device = constructor.return_value + device.on_event = None + device.profile_device = upnp_factory_mock.async_create_device.return_value + device.icon = MOCK_DEVICE_BASE_URL + "/icon.jpg" + device.udn = "device_udn" + device.manufacturer = "device_manufacturer" + device.model_name = "device_model_name" + device.name = "device_name" + device.get_absolute_url.side_effect = lambda url: absolute_url( + MOCK_DEVICE_BASE_URL, url + ) + + yield device + + +@pytest.fixture(autouse=True) +def ssdp_scanner_mock() -> Iterable[Mock]: + """Mock the SSDP module.""" + with patch("homeassistant.components.ssdp.Scanner", autospec=True) as mock_scanner: + reg_callback = mock_scanner.return_value.async_register_callback + reg_callback.return_value = Mock(return_value=None) + yield mock_scanner.return_value diff --git a/tests/components/dlna_dms/test_config_flow.py b/tests/components/dlna_dms/test_config_flow.py new file mode 100644 index 00000000000..df8d55dbc25 --- /dev/null +++ b/tests/components/dlna_dms/test_config_flow.py @@ -0,0 +1,346 @@ +"""Test the DLNA DMS config flow.""" +from __future__ import annotations + +import dataclasses +from typing import Final +from unittest.mock import Mock + +from async_upnp_client import UpnpError +import pytest + +from homeassistant import config_entries, data_entry_flow +from homeassistant.components import ssdp +from homeassistant.components.dlna_dms.const import DOMAIN +from homeassistant.const import CONF_DEVICE_ID, CONF_HOST, CONF_URL +from homeassistant.core import HomeAssistant + +from .conftest import ( + MOCK_DEVICE_HOST, + MOCK_DEVICE_LOCATION, + MOCK_DEVICE_NAME, + MOCK_DEVICE_TYPE, + MOCK_DEVICE_UDN, + MOCK_DEVICE_USN, + NEW_DEVICE_LOCATION, +) + +from tests.common import MockConfigEntry + +# Auto-use the domain_data_mock and dms_device_mock fixtures for every test in this module +pytestmark = [ + pytest.mark.usefixtures("domain_data_mock"), + pytest.mark.usefixtures("dms_device_mock"), +] + +WRONG_DEVICE_TYPE: Final = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" + +MOCK_ROOT_DEVICE_UDN: Final = "ROOT_DEVICE" + +MOCK_DISCOVERY: Final = ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={ + ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN, + ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE, + ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME, + ssdp.ATTR_UPNP_SERVICE_LIST: { + "service": [ + { + "SCPDURL": "/ContentDirectory/scpd.xml", + "controlURL": "/ContentDirectory/control.xml", + "eventSubURL": "/ContentDirectory/event.xml", + "serviceId": "urn:upnp-org:serviceId:ContentDirectory", + "serviceType": "urn:schemas-upnp-org:service:ContentDirectory:1", + }, + { + "SCPDURL": "/ConnectionManager/scpd.xml", + "controlURL": "/ConnectionManager/control.xml", + "eventSubURL": "/ConnectionManager/event.xml", + "serviceId": "urn:upnp-org:serviceId:ConnectionManager", + "serviceType": "urn:schemas-upnp-org:service:ConnectionManager:1", + }, + ] + }, + }, + x_homeassistant_matching_domains={DOMAIN}, +) + + +async def test_user_flow(hass: HomeAssistant, ssdp_scanner_mock: Mock) -> None: + """Test user-init'd flow, user selects discovered device.""" + ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [ + [MOCK_DISCOVERY], + [], + [], + [], + ] + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["errors"] is None + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={CONF_HOST: MOCK_DEVICE_HOST} + ) + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == MOCK_DEVICE_NAME + assert result["data"] == { + CONF_URL: MOCK_DEVICE_LOCATION, + CONF_DEVICE_ID: MOCK_DEVICE_USN, + } + assert result["options"] == {} + + await hass.async_block_till_done() + + +async def test_user_flow_no_devices( + hass: HomeAssistant, ssdp_scanner_mock: Mock +) -> None: + """Test user-init'd flow, there's really no devices to choose from.""" + ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [ + [], + [], + [], + [], + ] + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "no_devices_found" + + +async def test_ssdp_flow_success(hass: HomeAssistant) -> None: + """Test that SSDP discovery with an available device works.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_DISCOVERY, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={} + ) + await hass.async_block_till_done() + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == MOCK_DEVICE_NAME + assert result["data"] == { + CONF_URL: MOCK_DEVICE_LOCATION, + CONF_DEVICE_ID: MOCK_DEVICE_USN, + } + assert result["options"] == {} + + +async def test_ssdp_flow_unavailable( + hass: HomeAssistant, domain_data_mock: Mock +) -> None: + """Test that SSDP discovery with an unavailable device still succeeds. + + All the required information for configuration is obtained from the SSDP + message, there's no need to connect to the device to configure it. + """ + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_DISCOVERY, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "confirm" + + domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={} + ) + await hass.async_block_till_done() + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == MOCK_DEVICE_NAME + assert result["data"] == { + CONF_URL: MOCK_DEVICE_LOCATION, + CONF_DEVICE_ID: MOCK_DEVICE_USN, + } + assert result["options"] == {} + + +async def test_ssdp_flow_existing( + hass: HomeAssistant, config_entry_mock: MockConfigEntry +) -> None: + """Test that SSDP discovery of existing config entry updates the URL.""" + config_entry_mock.add_to_hass(hass) + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_st="mock_st", + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_udn=MOCK_DEVICE_UDN, + upnp={ + ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN, + ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE, + ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME, + }, + ), + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "already_configured" + assert config_entry_mock.data[CONF_URL] == NEW_DEVICE_LOCATION + + +async def test_ssdp_flow_duplicate_location( + hass: HomeAssistant, config_entry_mock: MockConfigEntry +) -> None: + """Test that discovery of device with URL matching existing entry gets aborted.""" + config_entry_mock.add_to_hass(hass) + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_DISCOVERY, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "already_configured" + assert config_entry_mock.data[CONF_URL] == MOCK_DEVICE_LOCATION + + +async def test_ssdp_flow_bad_data( + hass: HomeAssistant, config_entry_mock: MockConfigEntry +) -> None: + """Test bad SSDP discovery information is rejected cleanly.""" + # Missing location + discovery = dataclasses.replace(MOCK_DISCOVERY, ssdp_location="") + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "bad_ssdp" + + # Missing USN + discovery = dataclasses.replace(MOCK_DISCOVERY, ssdp_usn="") + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "bad_ssdp" + + +async def test_duplicate_name( + hass: HomeAssistant, config_entry_mock: MockConfigEntry +) -> None: + """Test device with name same as another results in no error.""" + config_entry_mock.add_to_hass(hass) + + mock_entry_1 = MockConfigEntry( + unique_id="mock_entry_1", + domain=DOMAIN, + data={ + CONF_URL: "not-important", + CONF_DEVICE_ID: "not-important", + }, + title=MOCK_DEVICE_NAME, + ) + mock_entry_1.add_to_hass(hass) + + # New UDN, USN, and location to be sure it's a new device + new_device_udn = "uuid:7bf34520-f034-4fa2-8d2d-2f709d422000" + new_device_usn = f"{new_device_udn}::{MOCK_DEVICE_TYPE}" + new_device_location = "http://192.88.99.22/dms_description.xml" + discovery = dataclasses.replace( + MOCK_DISCOVERY, + ssdp_usn=new_device_usn, + ssdp_location=new_device_location, + ssdp_udn=new_device_udn, + ) + discovery.upnp = dict(discovery.upnp) + discovery.upnp[ssdp.ATTR_UPNP_UDN] = new_device_udn + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={} + ) + await hass.async_block_till_done() + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == MOCK_DEVICE_NAME + assert result["data"] == { + CONF_URL: new_device_location, + CONF_DEVICE_ID: new_device_usn, + } + assert result["options"] == {} + + +async def test_ssdp_flow_upnp_udn( + hass: HomeAssistant, config_entry_mock: MockConfigEntry +) -> None: + """Test that SSDP discovery ignores the root device's UDN.""" + config_entry_mock.add_to_hass(hass) + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={ + ssdp.ATTR_UPNP_UDN: "DIFFERENT_ROOT_DEVICE", + ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE, + ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME, + }, + ), + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "already_configured" + assert config_entry_mock.data[CONF_URL] == NEW_DEVICE_LOCATION + + +async def test_ssdp_missing_services(hass: HomeAssistant) -> None: + """Test SSDP ignores devices that are missing required services.""" + # No services defined at all + discovery = dataclasses.replace(MOCK_DISCOVERY) + discovery.upnp = dict(discovery.upnp) + del discovery.upnp[ssdp.ATTR_UPNP_SERVICE_LIST] + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=discovery, + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "not_dms" + + # ContentDirectory service is missing + discovery = dataclasses.replace(MOCK_DISCOVERY) + discovery.upnp = dict(discovery.upnp) + discovery.upnp[ssdp.ATTR_UPNP_SERVICE_LIST] = { + "service": [ + service + for service in discovery.upnp[ssdp.ATTR_UPNP_SERVICE_LIST]["service"] + if service.get("serviceId") != "urn:upnp-org:serviceId:ContentDirectory" + ] + } + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_SSDP}, data=discovery + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "not_dms" diff --git a/tests/components/dlna_dms/test_device_availability.py b/tests/components/dlna_dms/test_device_availability.py new file mode 100644 index 00000000000..a0cfb3ab2d2 --- /dev/null +++ b/tests/components/dlna_dms/test_device_availability.py @@ -0,0 +1,705 @@ +"""Test how the DmsDeviceSource handles available and unavailable devices.""" +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterable +import logging +from unittest.mock import ANY, DEFAULT, Mock, patch + +from async_upnp_client.exceptions import UpnpConnectionError, UpnpError +from didl_lite import didl_lite +import pytest + +from homeassistant.components import ssdp +from homeassistant.components.dlna_dms.const import DOMAIN +from homeassistant.components.dlna_dms.dms import DmsDeviceSource, get_domain_data +from homeassistant.components.media_player.errors import BrowseError +from homeassistant.components.media_source.error import Unresolvable +from homeassistant.core import HomeAssistant +from homeassistant.setup import async_setup_component + +from .conftest import ( + MOCK_DEVICE_LOCATION, + MOCK_DEVICE_NAME, + MOCK_DEVICE_TYPE, + MOCK_DEVICE_UDN, + MOCK_DEVICE_USN, + NEW_DEVICE_LOCATION, +) + +from tests.common import MockConfigEntry + +# Auto-use the domain_data_mock for every test in this module +pytestmark = [ + pytest.mark.usefixtures("domain_data_mock"), +] + + +async def setup_mock_component( + hass: HomeAssistant, mock_entry: MockConfigEntry +) -> DmsDeviceSource: + """Set up a mock DlnaDmrEntity with the given configuration.""" + mock_entry.add_to_hass(hass) + assert await async_setup_component(hass, DOMAIN, {}) is True + await hass.async_block_till_done() + + domain_data = get_domain_data(hass) + return next(iter(domain_data.devices.values())) + + +@pytest.fixture +async def connected_source_mock( + hass: HomeAssistant, + config_entry_mock: MockConfigEntry, + ssdp_scanner_mock: Mock, + dms_device_mock: Mock, +) -> AsyncIterable[DmsDeviceSource]: + """Fixture to set up a mock DmsDeviceSource in a connected state. + + Yields the entity. Cleans up the entity after the test is complete. + """ + entity = await setup_mock_component(hass, config_entry_mock) + + # Check the entity has registered all needed listeners + assert len(config_entry_mock.update_listeners) == 1 + assert ssdp_scanner_mock.async_register_callback.await_count == 2 + assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0 + + # Run the test + yield entity + + # Unload config entry to clean up + assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == { + "require_restart": False + } + + # Check entity has cleaned up its resources + assert not config_entry_mock.update_listeners + assert ( + ssdp_scanner_mock.async_register_callback.await_count + == ssdp_scanner_mock.async_register_callback.return_value.call_count + ) + + +@pytest.fixture +async def disconnected_source_mock( + hass: HomeAssistant, + upnp_factory_mock: Mock, + config_entry_mock: MockConfigEntry, + ssdp_scanner_mock: Mock, + dms_device_mock: Mock, +) -> AsyncIterable[DmsDeviceSource]: + """Fixture to set up a mock DmsDeviceSource in a disconnected state. + + Yields the entity. Cleans up the entity after the test is complete. + """ + # Cause the connection attempt to fail + upnp_factory_mock.async_create_device.side_effect = UpnpConnectionError + + entity = await setup_mock_component(hass, config_entry_mock) + + # Check the entity has registered all needed listeners + assert len(config_entry_mock.update_listeners) == 1 + assert ssdp_scanner_mock.async_register_callback.await_count == 2 + assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0 + + # Run the test + yield entity + + # Unload config entry to clean up + assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == { + "require_restart": False + } + + # Check entity has cleaned up its resources + assert not config_entry_mock.update_listeners + assert ( + ssdp_scanner_mock.async_register_callback.await_count + == ssdp_scanner_mock.async_register_callback.return_value.call_count + ) + + +async def test_unavailable_device( + hass: HomeAssistant, + upnp_factory_mock: Mock, + ssdp_scanner_mock: Mock, + config_entry_mock: MockConfigEntry, +) -> None: + """Test a DlnaDmsEntity with out a connected DmsDevice.""" + # Cause connection attempts to fail + upnp_factory_mock.async_create_device.side_effect = UpnpConnectionError + + with patch( + "homeassistant.components.dlna_dms.dms.DmsDevice", autospec=True + ) as dms_device_constructor_mock: + connected_source_mock = await setup_mock_component(hass, config_entry_mock) + + # Check device is not created + dms_device_constructor_mock.assert_not_called() + + # Check attempt was made to create a device from the supplied URL + upnp_factory_mock.async_create_device.assert_awaited_once_with(MOCK_DEVICE_LOCATION) + # Check SSDP notifications are registered + ssdp_scanner_mock.async_register_callback.assert_any_call( + ANY, {"USN": MOCK_DEVICE_USN} + ) + ssdp_scanner_mock.async_register_callback.assert_any_call( + ANY, {"_udn": MOCK_DEVICE_UDN, "NTS": "ssdp:byebye"} + ) + # Quick check of the state to verify the entity has no connected DmsDevice + assert not connected_source_mock.available + # Check the name matches that supplied + assert connected_source_mock.name == MOCK_DEVICE_NAME + + # Check attempts to browse and resolve media give errors + with pytest.raises(BrowseError): + await connected_source_mock.async_browse_media("/browse_path") + with pytest.raises(BrowseError): + await connected_source_mock.async_browse_media(":browse_object") + with pytest.raises(BrowseError): + await connected_source_mock.async_browse_media("?browse_search") + with pytest.raises(Unresolvable): + await connected_source_mock.async_resolve_media("/resolve_path") + with pytest.raises(Unresolvable): + await connected_source_mock.async_resolve_media(":resolve_object") + with pytest.raises(Unresolvable): + await connected_source_mock.async_resolve_media("?resolve_search") + + # Unload config entry to clean up + assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == { + "require_restart": False + } + + # Confirm SSDP notifications unregistered + assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 2 + + +async def test_become_available( + hass: HomeAssistant, + upnp_factory_mock: Mock, + ssdp_scanner_mock: Mock, + config_entry_mock: MockConfigEntry, + dms_device_mock: Mock, +) -> None: + """Test a device becoming available after the entity is constructed.""" + # Cause connection attempts to fail before adding the entity + upnp_factory_mock.async_create_device.side_effect = UpnpConnectionError + connected_source_mock = await setup_mock_component(hass, config_entry_mock) + assert not connected_source_mock.available + + # Mock device is now available. + upnp_factory_mock.async_create_device.side_effect = None + upnp_factory_mock.async_create_device.reset_mock() + + # Send an SSDP notification from the now alive device + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + # Check device was created from the supplied URL + upnp_factory_mock.async_create_device.assert_awaited_once_with(NEW_DEVICE_LOCATION) + # Quick check of the state to verify the entity has a connected DmsDevice + assert connected_source_mock.available + + # Unload config entry to clean up + assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == { + "require_restart": False + } + + # Confirm SSDP notifications unregistered + assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 2 + + +async def test_alive_but_gone( + hass: HomeAssistant, + upnp_factory_mock: Mock, + ssdp_scanner_mock: Mock, + disconnected_source_mock: DmsDeviceSource, +) -> None: + """Test a device sending an SSDP alive announcement, but not being connectable.""" + upnp_factory_mock.async_create_device.side_effect = UpnpError + + # Send an SSDP notification from the still missing device + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_st=MOCK_DEVICE_TYPE, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"}, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + # There should be a connection attempt to the device + upnp_factory_mock.async_create_device.assert_awaited() + + # Device should still be unavailable + assert not disconnected_source_mock.available + + # Send the same SSDP notification, expecting no extra connection attempts + upnp_factory_mock.async_create_device.reset_mock() + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_st=MOCK_DEVICE_TYPE, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"}, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + upnp_factory_mock.async_create_device.assert_not_called() + upnp_factory_mock.async_create_device.assert_not_awaited() + assert not disconnected_source_mock.available + + # Send an SSDP notification with a new BOOTID, indicating the device has rebooted + upnp_factory_mock.async_create_device.reset_mock() + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_st=MOCK_DEVICE_TYPE, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"}, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + # Rebooted device (seen via BOOTID) should mean a new connection attempt + upnp_factory_mock.async_create_device.assert_awaited() + assert not disconnected_source_mock.available + + # Send byebye message to indicate device is going away. Next alive message + # should result in a reconnect attempt even with same BOOTID. + upnp_factory_mock.async_create_device.reset_mock() + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.BYEBYE, + ) + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_st=MOCK_DEVICE_TYPE, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"}, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + # Rebooted device (seen via byebye/alive) should mean a new connection attempt + upnp_factory_mock.async_create_device.assert_awaited() + assert not disconnected_source_mock.available + + +async def test_multiple_ssdp_alive( + hass: HomeAssistant, + upnp_factory_mock: Mock, + ssdp_scanner_mock: Mock, + disconnected_source_mock: DmsDeviceSource, +) -> None: + """Test multiple SSDP alive notifications is ok, only connects to device once.""" + upnp_factory_mock.async_create_device.reset_mock() + + # Contacting the device takes long enough that 2 simultaneous attempts could be made + async def create_device_delayed(_location): + """Delay before continuing with async_create_device. + + This gives a chance for parallel calls to `device_connect` to occur. + """ + await asyncio.sleep(0.1) + return DEFAULT + + upnp_factory_mock.async_create_device.side_effect = create_device_delayed + + # Send two SSDP notifications with the new device URL + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=NEW_DEVICE_LOCATION, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + # Check device is contacted exactly once + upnp_factory_mock.async_create_device.assert_awaited_once_with(NEW_DEVICE_LOCATION) + + # Device should be available + assert disconnected_source_mock.available + + +async def test_ssdp_byebye( + hass: HomeAssistant, + ssdp_scanner_mock: Mock, + connected_source_mock: DmsDeviceSource, +) -> None: + """Test device is disconnected when byebye is received.""" + # First byebye will cause a disconnect + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_headers={"NTS": "ssdp:byebye"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.BYEBYE, + ) + + # Device should be gone + assert not connected_source_mock.available + + # Second byebye will do nothing + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_headers={"NTS": "ssdp:byebye"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.BYEBYE, + ) + + +async def test_ssdp_update_seen_bootid( + hass: HomeAssistant, + ssdp_scanner_mock: Mock, + upnp_factory_mock: Mock, + disconnected_source_mock: DmsDeviceSource, +) -> None: + """Test device does not reconnect when it gets ssdp:update with next bootid.""" + # Start with a disconnected device + entity = disconnected_source_mock + assert not entity.available + + # "Reconnect" the device + upnp_factory_mock.async_create_device.reset_mock() + upnp_factory_mock.async_create_device.side_effect = None + + # Send SSDP alive with boot ID + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + # Device should be connected + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send SSDP update with next boot ID + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_headers={ + "NTS": "ssdp:update", + ssdp.ATTR_SSDP_BOOTID: "1", + ssdp.ATTR_SSDP_NEXTBOOTID: "2", + }, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.UPDATE, + ) + await hass.async_block_till_done() + + # Device was not reconnected, even with a new boot ID + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send SSDP update with same next boot ID, again + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_headers={ + "NTS": "ssdp:update", + ssdp.ATTR_SSDP_BOOTID: "1", + ssdp.ATTR_SSDP_NEXTBOOTID: "2", + }, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.UPDATE, + ) + await hass.async_block_till_done() + + # Nothing should change + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send SSDP update with bad next boot ID + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_headers={ + "NTS": "ssdp:update", + ssdp.ATTR_SSDP_BOOTID: "2", + ssdp.ATTR_SSDP_NEXTBOOTID: "7c848375-a106-4bd1-ac3c-8e50427c8e4f", + }, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.UPDATE, + ) + await hass.async_block_till_done() + + # Nothing should change + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send a new SSDP alive with the new boot ID, device should not reconnect + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + +async def test_ssdp_update_missed_bootid( + hass: HomeAssistant, + ssdp_scanner_mock: Mock, + upnp_factory_mock: Mock, + disconnected_source_mock: DmsDeviceSource, +) -> None: + """Test device disconnects when it gets ssdp:update bootid it wasn't expecting.""" + # Start with a disconnected device + entity = disconnected_source_mock + assert not entity.available + + # "Reconnect" the device + upnp_factory_mock.async_create_device.reset_mock() + upnp_factory_mock.async_create_device.side_effect = None + + # Send SSDP alive with boot ID + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + # Device should be connected + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send SSDP update with skipped boot ID (not previously seen) + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_udn=MOCK_DEVICE_UDN, + ssdp_headers={ + "NTS": "ssdp:update", + ssdp.ATTR_SSDP_BOOTID: "2", + ssdp.ATTR_SSDP_NEXTBOOTID: "3", + }, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.UPDATE, + ) + await hass.async_block_till_done() + + # Device should not *re*-connect yet + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send a new SSDP alive with the new boot ID, device should reconnect + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "3"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 2 + + +async def test_ssdp_bootid( + hass: HomeAssistant, + upnp_factory_mock: Mock, + ssdp_scanner_mock: Mock, + disconnected_source_mock: DmsDeviceSource, +) -> None: + """Test an alive with a new BOOTID.UPNP.ORG header causes a reconnect.""" + # Start with a disconnected device + entity = disconnected_source_mock + assert not entity.available + + # "Reconnect" the device + upnp_factory_mock.async_create_device.side_effect = None + upnp_factory_mock.async_create_device.reset_mock() + + # Send SSDP alive with boot ID + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send SSDP alive with same boot ID, nothing should happen + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 1 + + # Send a new SSDP alive with an incremented boot ID, device should be dis/reconnected + await ssdp_callback( + ssdp.SsdpServiceInfo( + ssdp_usn=MOCK_DEVICE_USN, + ssdp_location=MOCK_DEVICE_LOCATION, + ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"}, + ssdp_st=MOCK_DEVICE_TYPE, + upnp={}, + ), + ssdp.SsdpChange.ALIVE, + ) + await hass.async_block_till_done() + + assert entity.available + assert upnp_factory_mock.async_create_device.await_count == 2 + + +async def test_repeated_connect( + caplog: pytest.LogCaptureFixture, + connected_source_mock: DmsDeviceSource, + upnp_factory_mock: Mock, +) -> None: + """Test trying to connect an already connected device is safely ignored.""" + upnp_factory_mock.async_create_device.reset_mock() + # Calling internal function directly to skip trying to time 2 SSDP messages carefully + with caplog.at_level(logging.DEBUG): + await connected_source_mock.device_connect() + assert ( + "Trying to connect when device already connected" == caplog.records[-1].message + ) + assert not upnp_factory_mock.async_create_device.await_count + + +async def test_connect_no_location( + caplog: pytest.LogCaptureFixture, + disconnected_source_mock: DmsDeviceSource, + upnp_factory_mock: Mock, +) -> None: + """Test trying to connect without a location is safely ignored.""" + disconnected_source_mock.location = "" + upnp_factory_mock.async_create_device.reset_mock() + # Calling internal function directly to skip trying to time 2 SSDP messages carefully + with caplog.at_level(logging.DEBUG): + await disconnected_source_mock.device_connect() + assert "Not connecting because location is not known" == caplog.records[-1].message + assert not upnp_factory_mock.async_create_device.await_count + + +async def test_become_unavailable( + hass: HomeAssistant, + connected_source_mock: DmsDeviceSource, + dms_device_mock: Mock, +) -> None: + """Test a device becoming unavailable.""" + # Mock a good resolve result + dms_device_mock.async_browse_metadata.return_value = didl_lite.Item( + id="object_id", + restricted=False, + title="Object", + res=[didl_lite.Resource(uri="foo", protocol_info="http-get:*:audio/mpeg:")], + ) + + # Check async_resolve_object currently works + await connected_source_mock.async_resolve_media(":object_id") + + # Now break the network connection + dms_device_mock.async_browse_metadata.side_effect = UpnpConnectionError + + # The device should be considered available until next contacted + assert connected_source_mock.available + + # async_resolve_object should fail + with pytest.raises(Unresolvable): + await connected_source_mock.async_resolve_media(":object_id") + + # The device should now be unavailable + assert not connected_source_mock.available diff --git a/tests/components/dlna_dms/test_dms_device_source.py b/tests/components/dlna_dms/test_dms_device_source.py new file mode 100644 index 00000000000..4ee9cce91ba --- /dev/null +++ b/tests/components/dlna_dms/test_dms_device_source.py @@ -0,0 +1,878 @@ +"""Test the interface methods of DmsDeviceSource, except availability.""" +from collections.abc import AsyncIterable +from typing import Final, Union +from unittest.mock import ANY, Mock, call + +from async_upnp_client.exceptions import UpnpActionError, UpnpConnectionError, UpnpError +from async_upnp_client.profiles.dlna import ContentDirectoryErrorCode, DmsDevice +from didl_lite import didl_lite +import pytest + +from homeassistant.components.dlna_dms.const import DOMAIN +from homeassistant.components.dlna_dms.dms import ( + ActionError, + DeviceConnectionError, + DlnaDmsData, + DmsDeviceSource, +) +from homeassistant.components.media_player.errors import BrowseError +from homeassistant.components.media_source.error import Unresolvable +from homeassistant.components.media_source.models import BrowseMediaSource +from homeassistant.const import CONF_DEVICE_ID, CONF_URL +from homeassistant.core import HomeAssistant + +from .conftest import ( + MOCK_DEVICE_BASE_URL, + MOCK_DEVICE_NAME, + MOCK_DEVICE_TYPE, + MOCK_DEVICE_USN, + MOCK_SOURCE_ID, +) + +from tests.common import MockConfigEntry + +BrowseResultList = list[Union[didl_lite.DidlObject, didl_lite.Descriptor]] + + +@pytest.fixture +async def device_source_mock( + hass: HomeAssistant, + config_entry_mock: MockConfigEntry, + ssdp_scanner_mock: Mock, + dms_device_mock: Mock, + domain_data_mock: DlnaDmsData, +) -> AsyncIterable[DmsDeviceSource]: + """Fixture to set up a DmsDeviceSource in a connected state and cleanup at completion.""" + await hass.config_entries.async_add(config_entry_mock) + await hass.async_block_till_done() + + mock_entity = domain_data_mock.devices[MOCK_DEVICE_USN] + + # Check the DmsDeviceSource has registered all needed listeners + assert len(config_entry_mock.update_listeners) == 1 + assert ssdp_scanner_mock.async_register_callback.await_count == 2 + assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0 + + # Run the test + yield mock_entity + + # Unload config entry to clean up + assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == { + "require_restart": False + } + + # Check DmsDeviceSource has cleaned up its resources + assert not config_entry_mock.update_listeners + assert ( + ssdp_scanner_mock.async_register_callback.await_count + == ssdp_scanner_mock.async_register_callback.return_value.call_count + ) + assert MOCK_DEVICE_USN not in domain_data_mock.devices + assert MOCK_SOURCE_ID not in domain_data_mock.sources + + +async def test_update_source_id( + hass: HomeAssistant, + config_entry_mock: MockConfigEntry, + device_source_mock: DmsDeviceSource, + domain_data_mock: DlnaDmsData, +) -> None: + """Test the config listener updates the source_id and source list upon title change.""" + new_title: Final = "New Name" + new_source_id: Final = "new_name" + assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID} + hass.config_entries.async_update_entry(config_entry_mock, title=new_title) + await hass.async_block_till_done() + + assert device_source_mock.source_id == new_source_id + assert domain_data_mock.sources.keys() == {new_source_id} + + +async def test_update_existing_source_id( + caplog: pytest.LogCaptureFixture, + hass: HomeAssistant, + config_entry_mock: MockConfigEntry, + device_source_mock: DmsDeviceSource, + domain_data_mock: DlnaDmsData, +) -> None: + """Test the config listener gracefully handles colliding source_id.""" + new_title: Final = "New Name" + new_source_id: Final = "new_name" + new_source_id_2: Final = "new_name_1" + # Set up another config entry to collide with the new source_id + colliding_entry = MockConfigEntry( + unique_id=f"different-udn::{MOCK_DEVICE_TYPE}", + domain=DOMAIN, + data={ + CONF_URL: "http://192.88.99.22/dms_description.xml", + CONF_DEVICE_ID: f"different-udn::{MOCK_DEVICE_TYPE}", + }, + title=new_title, + ) + await hass.config_entries.async_add(colliding_entry) + await hass.async_block_till_done() + + assert device_source_mock.source_id == MOCK_SOURCE_ID + assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID, new_source_id} + assert domain_data_mock.sources[MOCK_SOURCE_ID] is device_source_mock + + # Update the existing entry to match the other entry's name + hass.config_entries.async_update_entry(config_entry_mock, title=new_title) + await hass.async_block_till_done() + + # The existing device's source ID should be a newly generated slug + assert device_source_mock.source_id == new_source_id_2 + assert domain_data_mock.sources.keys() == {new_source_id, new_source_id_2} + assert domain_data_mock.sources[new_source_id_2] is device_source_mock + + # Changing back to the old name should not cause issues + hass.config_entries.async_update_entry(config_entry_mock, title=MOCK_DEVICE_NAME) + await hass.async_block_till_done() + + assert device_source_mock.source_id == MOCK_SOURCE_ID + assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID, new_source_id} + assert domain_data_mock.sources[MOCK_SOURCE_ID] is device_source_mock + + # Remove the collision and try again + await hass.config_entries.async_remove(colliding_entry.entry_id) + assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID} + + hass.config_entries.async_update_entry(config_entry_mock, title=new_title) + await hass.async_block_till_done() + + assert device_source_mock.source_id == new_source_id + assert domain_data_mock.sources.keys() == {new_source_id} + + +async def test_catch_request_error_unavailable( + device_source_mock: DmsDeviceSource, +) -> None: + """Test the device is checked for availability before trying requests.""" + device_source_mock._device = None + + with pytest.raises(DeviceConnectionError): + await device_source_mock.async_resolve_object("id") + with pytest.raises(DeviceConnectionError): + await device_source_mock.async_resolve_path("path") + with pytest.raises(DeviceConnectionError): + await device_source_mock.async_resolve_search("query") + with pytest.raises(DeviceConnectionError): + await device_source_mock.async_browse_object("object_id") + with pytest.raises(DeviceConnectionError): + await device_source_mock.async_browse_search("query") + + +async def test_catch_request_error( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test errors when making requests to the device are handled.""" + dms_device_mock.async_browse_metadata.side_effect = UpnpActionError( + error_code=ContentDirectoryErrorCode.NO_SUCH_OBJECT + ) + with pytest.raises(ActionError, match="No such object: bad_id"): + await device_source_mock.async_resolve_media(":bad_id") + + dms_device_mock.async_search_directory.side_effect = UpnpActionError( + error_code=ContentDirectoryErrorCode.INVALID_SEARCH_CRITERIA + ) + with pytest.raises(ActionError, match="Invalid query: bad query"): + await device_source_mock.async_resolve_media("?bad query") + + dms_device_mock.async_browse_metadata.side_effect = UpnpActionError( + error_code=ContentDirectoryErrorCode.CANNOT_PROCESS_REQUEST + ) + with pytest.raises(DeviceConnectionError, match="Server failure: "): + await device_source_mock.async_resolve_media(":good_id") + + dms_device_mock.async_browse_metadata.side_effect = UpnpError + with pytest.raises( + DeviceConnectionError, match="Server communication failure: UpnpError(.*)" + ): + await device_source_mock.async_resolve_media(":bad_id") + + # UpnpConnectionErrors will cause the device_source_mock to disconnect from the device + assert device_source_mock.available + dms_device_mock.async_browse_metadata.side_effect = UpnpConnectionError + with pytest.raises( + DeviceConnectionError, match="Server disconnected: UpnpConnectionError(.*)" + ): + await device_source_mock.async_resolve_media(":bad_id") + assert not device_source_mock.available + + +async def test_icon(device_source_mock: DmsDeviceSource, dms_device_mock: Mock) -> None: + """Test the device's icon URL is returned.""" + assert device_source_mock.icon == dms_device_mock.icon + + device_source_mock._device = None + assert device_source_mock.icon is None + + +async def test_resolve_media_invalid(device_source_mock: DmsDeviceSource) -> None: + """Test async_resolve_media will raise Unresolvable when an identifier isn't supplied.""" + with pytest.raises(Unresolvable, match="Invalid identifier.*"): + await device_source_mock.async_resolve_media("") + + +async def test_resolve_media_object( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test the async_resolve_object method via async_resolve_media.""" + object_id: Final = "123" + res_url: Final = "foo/bar" + res_abs_url: Final = f"{MOCK_DEVICE_BASE_URL}/{res_url}" + res_mime: Final = "audio/mpeg" + # Success case: one resource + didl_item = didl_lite.Item( + id=object_id, + restricted="false", + title="Object", + res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")], + ) + dms_device_mock.async_browse_metadata.return_value = didl_item + result = await device_source_mock.async_resolve_media(f":{object_id}") + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + object_id, metadata_filter="*" + ) + assert result.url == res_abs_url + assert result.mime_type == res_mime + assert result.didl_metadata is didl_item + + # Success case: two resources, first is playable + didl_item = didl_lite.Item( + id=object_id, + restricted="false", + title="Object", + res=[ + didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:"), + didl_lite.Resource( + uri="thumbnail.png", protocol_info="http-get:*:image/png:" + ), + ], + ) + dms_device_mock.async_browse_metadata.return_value = didl_item + result = await device_source_mock.async_resolve_media(f":{object_id}") + assert result.url == res_abs_url + assert result.mime_type == res_mime + assert result.didl_metadata is didl_item + + # Success case: three resources, only third is playable + didl_item = didl_lite.Item( + id=object_id, + restricted="false", + title="Object", + res=[ + didl_lite.Resource(uri="", protocol_info=""), + didl_lite.Resource(uri="internal:thing", protocol_info="internal:*::"), + didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:"), + ], + ) + dms_device_mock.async_browse_metadata.return_value = didl_item + result = await device_source_mock.async_resolve_media(f":{object_id}") + assert result.url == res_abs_url + assert result.mime_type == res_mime + assert result.didl_metadata is didl_item + + # Failure case: no resources + didl_item = didl_lite.Item( + id=object_id, + restricted="false", + title="Object", + res=[], + ) + dms_device_mock.async_browse_metadata.return_value = didl_item + with pytest.raises(Unresolvable, match="Object has no resources"): + await device_source_mock.async_resolve_media(f":{object_id}") + + # Failure case: resources are not playable + didl_item = didl_lite.Item( + id=object_id, + restricted="false", + title="Object", + res=[didl_lite.Resource(uri="internal:thing", protocol_info="internal:*::")], + ) + dms_device_mock.async_browse_metadata.return_value = didl_item + with pytest.raises(Unresolvable, match="Object has no playable resources"): + await device_source_mock.async_resolve_media(f":{object_id}") + + +async def test_resolve_media_path( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test the async_resolve_path method via async_resolve_media.""" + path: Final = "path/to/thing" + object_ids: Final = ["path_id", "to_id", "thing_id"] + res_url: Final = "foo/bar" + res_abs_url: Final = f"{MOCK_DEVICE_BASE_URL}/{res_url}" + res_mime: Final = "audio/mpeg" + + search_directory_result = [] + for ob_id, ob_title in zip(object_ids, path.split("/")): + didl_item = didl_lite.Item( + id=ob_id, + restricted="false", + title=ob_title, + res=[], + ) + search_directory_result.append(DmsDevice.BrowseResult([didl_item], 1, 1, 0)) + + # Test that path is resolved correctly + dms_device_mock.async_search_directory.side_effect = search_directory_result + dms_device_mock.async_browse_metadata.return_value = didl_lite.Item( + id=object_ids[-1], + restricted="false", + title="thing", + res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")], + ) + result = await device_source_mock.async_resolve_media(f"/{path}") + assert dms_device_mock.async_search_directory.await_args_list == [ + call( + parent_id, + search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"', + metadata_filter=["id", "upnp:class", "dc:title"], + requested_count=1, + ) + for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/")) + ] + assert result.url == res_abs_url + assert result.mime_type == res_mime + + # Test a path starting with a / (first / is path action, second / is root of path) + dms_device_mock.async_search_directory.reset_mock() + dms_device_mock.async_search_directory.side_effect = search_directory_result + result = await device_source_mock.async_resolve_media(f"//{path}") + assert dms_device_mock.async_search_directory.await_args_list == [ + call( + parent_id, + search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"', + metadata_filter=["id", "upnp:class", "dc:title"], + requested_count=1, + ) + for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/")) + ] + assert result.url == res_abs_url + assert result.mime_type == res_mime + + +async def test_resolve_path_simple( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_resolve_path for simple success as for test_resolve_media_path.""" + path: Final = "path/to/thing" + object_ids: Final = ["path_id", "to_id", "thing_id"] + search_directory_result = [] + for ob_id, ob_title in zip(object_ids, path.split("/")): + didl_item = didl_lite.Item( + id=ob_id, + restricted="false", + title=ob_title, + res=[], + ) + search_directory_result.append(DmsDevice.BrowseResult([didl_item], 1, 1, 0)) + + dms_device_mock.async_search_directory.side_effect = search_directory_result + result = await device_source_mock.async_resolve_path(path) + assert dms_device_mock.async_search_directory.call_args_list == [ + call( + parent_id, + search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"', + metadata_filter=["id", "upnp:class", "dc:title"], + requested_count=1, + ) + for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/")) + ] + assert result == object_ids[-1] + assert not dms_device_mock.async_browse_direct_children.await_count + + +async def test_resolve_path_browsed( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_resolve_path: action error results in browsing.""" + path: Final = "path/to/thing" + object_ids: Final = ["path_id", "to_id", "thing_id"] + + search_directory_result = [] + for ob_id, ob_title in zip(object_ids, path.split("/")): + didl_item = didl_lite.Item( + id=ob_id, + restricted="false", + title=ob_title, + res=[], + ) + search_directory_result.append(DmsDevice.BrowseResult([didl_item], 1, 1, 0)) + dms_device_mock.async_search_directory.side_effect = [ + search_directory_result[0], + # 2nd level can't be searched (this happens with Kodi) + UpnpActionError(), + search_directory_result[2], + ] + + browse_children_result: BrowseResultList = [] + for title in ("Irrelevant", "to", "Ignored"): + browse_children_result.append( + didl_lite.Item(id=f"{title}_id", restricted="false", title=title, res=[]) + ) + dms_device_mock.async_browse_direct_children.side_effect = [ + DmsDevice.BrowseResult(browse_children_result, 3, 3, 0) + ] + + result = await device_source_mock.async_resolve_path(path) + # All levels should have an attempted search + assert dms_device_mock.async_search_directory.await_args_list == [ + call( + parent_id, + search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"', + metadata_filter=["id", "upnp:class", "dc:title"], + requested_count=1, + ) + for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/")) + ] + assert result == object_ids[-1] + # 2nd level should also be browsed + assert dms_device_mock.async_browse_direct_children.await_args_list == [ + call("path_id", metadata_filter=["id", "upnp:class", "dc:title"]) + ] + + +async def test_resolve_path_browsed_nothing( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_resolve_path: action error results in browsing, but nothing found.""" + dms_device_mock.async_search_directory.side_effect = UpnpActionError() + # No children + dms_device_mock.async_browse_direct_children.side_effect = [ + DmsDevice.BrowseResult([], 0, 0, 0) + ] + with pytest.raises(Unresolvable, match="No contents for thing in thing/other"): + await device_source_mock.async_resolve_path(r"thing/other") + + # There are children, but they don't match + dms_device_mock.async_browse_direct_children.side_effect = [ + DmsDevice.BrowseResult( + [ + didl_lite.Item( + id="nothingid", restricted="false", title="not thing", res=[] + ) + ], + 1, + 1, + 0, + ) + ] + with pytest.raises(Unresolvable, match="Nothing found for thing in thing/other"): + await device_source_mock.async_resolve_path(r"thing/other") + + +async def test_resolve_path_quoted( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_resolve_path: quotes and backslashes in the path get escaped correctly.""" + dms_device_mock.async_search_directory.side_effect = [ + DmsDevice.BrowseResult( + [ + didl_lite.Item( + id=r'id_with quote" and back\slash', + restricted="false", + title="path", + res=[], + ) + ], + 1, + 1, + 0, + ), + UpnpError("Quick abort"), + ] + with pytest.raises(DeviceConnectionError): + await device_source_mock.async_resolve_path(r'path/quote"back\slash') + assert dms_device_mock.async_search_directory.await_args_list == [ + call( + "0", + search_criteria='@parentID="0" and dc:title="path"', + metadata_filter=["id", "upnp:class", "dc:title"], + requested_count=1, + ), + call( + r'id_with quote" and back\slash', + search_criteria=r'@parentID="id_with quote\" and back\\slash" and dc:title="quote\"back\\slash"', + metadata_filter=["id", "upnp:class", "dc:title"], + requested_count=1, + ), + ] + + +async def test_resolve_path_ambiguous( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_resolve_path: ambiguous results (too many matches) gives error.""" + dms_device_mock.async_search_directory.side_effect = [ + DmsDevice.BrowseResult( + [ + didl_lite.Item( + id=r"thing 1", + restricted="false", + title="thing", + res=[], + ), + didl_lite.Item( + id=r"thing 2", + restricted="false", + title="thing", + res=[], + ), + ], + 2, + 2, + 0, + ) + ] + with pytest.raises( + Unresolvable, match="Too many items found for thing in thing/other" + ): + await device_source_mock.async_resolve_path(r"thing/other") + + +async def test_resolve_path_no_such_container( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_resolve_path: Explicit check for NO_SUCH_CONTAINER.""" + dms_device_mock.async_search_directory.side_effect = UpnpActionError( + error_code=ContentDirectoryErrorCode.NO_SUCH_CONTAINER + ) + with pytest.raises(Unresolvable, match="No such container: 0"): + await device_source_mock.async_resolve_path(r"thing/other") + + +async def test_resolve_media_search( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test the async_resolve_search method via async_resolve_media.""" + res_url: Final = "foo/bar" + res_abs_url: Final = f"{MOCK_DEVICE_BASE_URL}/{res_url}" + res_mime: Final = "audio/mpeg" + + # No results + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [], 0, 0, 0 + ) + with pytest.raises(Unresolvable, match='Nothing found for dc:title="thing"'): + await device_source_mock.async_resolve_media('?dc:title="thing"') + assert dms_device_mock.async_search_directory.await_args_list == [ + call( + container_id="0", + search_criteria='dc:title="thing"', + metadata_filter="*", + requested_count=1, + ) + ] + + # One result + dms_device_mock.async_search_directory.reset_mock() + didl_item = didl_lite.Item( + id="thing's id", + restricted="false", + title="thing", + res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")], + ) + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [didl_item], 1, 1, 0 + ) + result = await device_source_mock.async_resolve_media('?dc:title="thing"') + assert result.url == res_abs_url + assert result.mime_type == res_mime + assert result.didl_metadata is didl_item + assert dms_device_mock.async_search_directory.await_count == 1 + # Values should be taken from search result, not querying the item's metadata + assert dms_device_mock.async_browse_metadata.await_count == 0 + + # Two results - uses the first + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [didl_item], 1, 2, 0 + ) + result = await device_source_mock.async_resolve_media('?dc:title="thing"') + assert result.url == res_abs_url + assert result.mime_type == res_mime + assert result.didl_metadata is didl_item + + # Bad result + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [didl_lite.Descriptor("id", "namespace")], 1, 1, 0 + ) + with pytest.raises(Unresolvable, match="Descriptor.* is not a DidlObject"): + await device_source_mock.async_resolve_media('?dc:title="thing"') + + +async def test_browse_media_root( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_browse_media with no identifier will browse the root of the device.""" + dms_device_mock.async_browse_metadata.return_value = didl_lite.DidlObject( + id="0", restricted="false", title="root" + ) + dms_device_mock.async_browse_direct_children.return_value = DmsDevice.BrowseResult( + [], 0, 0, 0 + ) + # No identifier (first opened in media browser) + result = await device_source_mock.async_browse_media(None) + assert result.identifier == f"{MOCK_SOURCE_ID}/:0" + assert result.title == MOCK_DEVICE_NAME + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + "0", metadata_filter=ANY + ) + dms_device_mock.async_browse_direct_children.assert_awaited_once_with( + "0", metadata_filter=ANY, sort_criteria=ANY + ) + + dms_device_mock.async_browse_metadata.reset_mock() + dms_device_mock.async_browse_direct_children.reset_mock() + # Empty string identifier + result = await device_source_mock.async_browse_media("") + assert result.identifier == f"{MOCK_SOURCE_ID}/:0" + assert result.title == MOCK_DEVICE_NAME + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + "0", metadata_filter=ANY + ) + dms_device_mock.async_browse_direct_children.assert_awaited_once_with( + "0", metadata_filter=ANY, sort_criteria=ANY + ) + + +async def test_browse_media_object( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_browse_object via async_browse_media.""" + object_id = "1234" + child_titles = ("Item 1", "Thing", "Item 2") + dms_device_mock.async_browse_metadata.return_value = didl_lite.Container( + id=object_id, restricted="false", title="subcontainer" + ) + children_result = DmsDevice.BrowseResult([], 3, 3, 0) + for title in child_titles: + children_result.result.append( + didl_lite.Item( + id=title + "_id", + restricted="false", + title=title, + res=[ + didl_lite.Resource( + uri=title + "_url", protocol_info="http-get:*:audio/mpeg:" + ) + ], + ), + ) + dms_device_mock.async_browse_direct_children.return_value = children_result + + result = await device_source_mock.async_browse_media(f":{object_id}") + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + object_id, metadata_filter=ANY + ) + dms_device_mock.async_browse_direct_children.assert_awaited_once_with( + object_id, metadata_filter=ANY, sort_criteria=ANY + ) + + assert result.domain == DOMAIN + assert result.identifier == f"{MOCK_SOURCE_ID}/:{object_id}" + assert result.title == "subcontainer" + assert not result.can_play + assert result.can_expand + assert result.children + for child, title in zip(result.children, child_titles): + assert isinstance(child, BrowseMediaSource) + assert child.identifier == f"{MOCK_SOURCE_ID}/:{title}_id" + assert child.title == title + assert child.can_play + assert not child.can_expand + assert not child.children + + +async def test_browse_media_path( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_browse_media with a path.""" + title = "folder" + con_id = "123" + container = didl_lite.Container(id=con_id, restricted="false", title=title) + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [container], 1, 1, 0 + ) + dms_device_mock.async_browse_metadata.return_value = container + dms_device_mock.async_browse_direct_children.return_value = DmsDevice.BrowseResult( + [], 0, 0, 0 + ) + + result = await device_source_mock.async_browse_media(f"{title}") + assert result.identifier == f"{MOCK_SOURCE_ID}/:{con_id}" + assert result.title == title + + dms_device_mock.async_search_directory.assert_awaited_once_with( + "0", + search_criteria=f'@parentID="0" and dc:title="{title}"', + metadata_filter=ANY, + requested_count=1, + ) + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + con_id, metadata_filter=ANY + ) + dms_device_mock.async_browse_direct_children.assert_awaited_once_with( + con_id, metadata_filter=ANY, sort_criteria=ANY + ) + + +async def test_browse_media_search( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test async_browse_media with a search query.""" + query = 'dc:title contains "FooBar"' + object_details = (("111", "FooBar baz"), ("432", "Not FooBar"), ("99", "FooBar")) + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [ + didl_lite.DidlObject(id=ob_id, restricted="false", title=title) + for ob_id, title in object_details + ], + 3, + 3, + 0, + ) + # Test that descriptors are skipped + dms_device_mock.async_search_directory.return_value.result.insert( + 1, didl_lite.Descriptor("id", "name_space") + ) + + result = await device_source_mock.async_browse_media(f"?{query}") + assert result.identifier == f"{MOCK_SOURCE_ID}/?{query}" + assert result.title == "Search results" + assert result.children + + for obj, child in zip(object_details, result.children): + assert isinstance(child, BrowseMediaSource) + assert child.identifier == f"{MOCK_SOURCE_ID}/:{obj[0]}" + assert child.title == obj[1] + assert not child.children + + +async def test_browse_search_invalid( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test searching with an invalid query gives a BrowseError.""" + query = "title == FooBar" + dms_device_mock.async_search_directory.side_effect = UpnpActionError( + error_code=ContentDirectoryErrorCode.INVALID_SEARCH_CRITERIA + ) + with pytest.raises(BrowseError, match=f"Invalid query: {query}"): + await device_source_mock.async_browse_media(f"?{query}") + + +async def test_browse_search_no_results( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test a search with no results does not give an error.""" + query = 'dc:title contains "FooBar"' + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [], 0, 0, 0 + ) + + result = await device_source_mock.async_browse_media(f"?{query}") + assert result.identifier == f"{MOCK_SOURCE_ID}/?{query}" + assert result.title == "Search results" + assert not result.children + + +async def test_thumbnail( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test getting thumbnails URLs for items.""" + # Use browse_search to get multiple items at once for least effort + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + [ + # Thumbnail as albumArtURI property + didl_lite.MusicAlbum( + id="a", + restricted="false", + title="a", + res=[], + album_art_uri="a_thumb.jpg", + ), + # Thumbnail as resource (1st resource is media item, 2nd is missing + # a URI, 3rd is thumbnail) + didl_lite.MusicTrack( + id="b", + restricted="false", + title="b", + res=[ + didl_lite.Resource( + uri="b_track.mp3", protocol_info="http-get:*:audio/mpeg:" + ), + didl_lite.Resource(uri="", protocol_info="internal:*::"), + didl_lite.Resource( + uri="b_thumb.png", protocol_info="http-get:*:image/png:" + ), + ], + ), + # No thumbnail + didl_lite.MusicTrack( + id="c", + restricted="false", + title="c", + res=[ + didl_lite.Resource( + uri="c_track.mp3", protocol_info="http-get:*:audio/mpeg:" + ) + ], + ), + ], + 3, + 3, + 0, + ) + + result = await device_source_mock.async_browse_media("?query") + assert result.children + assert result.children[0].thumbnail == f"{MOCK_DEVICE_BASE_URL}/a_thumb.jpg" + assert result.children[1].thumbnail == f"{MOCK_DEVICE_BASE_URL}/b_thumb.png" + assert result.children[2].thumbnail is None + + +async def test_can_play( + device_source_mock: DmsDeviceSource, dms_device_mock: Mock +) -> None: + """Test determination of playability for items.""" + protocol_infos = [ + # No protocol info for resource + ("", True), + # Protocol info is poorly formatted but can play + ("http-get", True), + # Protocol info is poorly formatted and can't play + ("internal", False), + # Protocol is HTTP + ("http-get:*:audio/mpeg", True), + # Protocol is RTSP + ("rtsp-rtp-udp:*:MPA:", True), + # Protocol is something else + ("internal:*:audio/mpeg:", False), + ] + + search_results: BrowseResultList = [] + # No resources + search_results.append(didl_lite.DidlObject(id="", restricted="false", title="")) + search_results.extend( + didl_lite.MusicTrack( + id="", + restricted="false", + title="", + res=[didl_lite.Resource(uri="", protocol_info=protocol_info)], + ) + for protocol_info, _ in protocol_infos + ) + + # Use browse_search to get multiple items at once for least effort + dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult( + search_results, len(search_results), len(search_results), 0 + ) + + result = await device_source_mock.async_browse_media("?query") + assert result.children + assert not result.children[0].can_play + for idx, info_can_play in enumerate(protocol_infos): + protocol_info, can_play = info_can_play + assert result.children[idx + 1].can_play is can_play, f"Checked {protocol_info}" diff --git a/tests/components/dlna_dms/test_init.py b/tests/components/dlna_dms/test_init.py new file mode 100644 index 00000000000..16254adca89 --- /dev/null +++ b/tests/components/dlna_dms/test_init.py @@ -0,0 +1,59 @@ +"""Test the DLNA DMS component setup, cleanup, and module-level functions.""" + +from unittest.mock import Mock + +from homeassistant.components.dlna_dms.const import DOMAIN +from homeassistant.components.dlna_dms.dms import DlnaDmsData +from homeassistant.core import HomeAssistant +from homeassistant.setup import async_setup_component + +from tests.common import MockConfigEntry + + +async def test_resource_lifecycle( + hass: HomeAssistant, + domain_data_mock: DlnaDmsData, + config_entry_mock: MockConfigEntry, + ssdp_scanner_mock: Mock, + dms_device_mock: Mock, +) -> None: + """Test that resources are acquired/released as the entity is setup/unloaded.""" + # Set up the config entry + config_entry_mock.add_to_hass(hass) + assert await async_setup_component(hass, DOMAIN, {}) is True + await hass.async_block_till_done() + + # Check the entity is created and working + assert len(domain_data_mock.devices) == 1 + assert len(domain_data_mock.sources) == 1 + entity = next(iter(domain_data_mock.devices.values())) + assert entity.available is True + + # Check update listeners are subscribed + assert len(config_entry_mock.update_listeners) == 1 + assert ssdp_scanner_mock.async_register_callback.await_count == 2 + assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0 + + # Check event notifiers are not subscribed - dlna_dms doesn't use them + assert dms_device_mock.async_subscribe_services.await_count == 0 + assert dms_device_mock.async_unsubscribe_services.await_count == 0 + assert dms_device_mock.on_event is None + + # Unload the config entry + assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == { + "require_restart": False + } + + # Check update listeners are released + assert not config_entry_mock.update_listeners + assert ssdp_scanner_mock.async_register_callback.await_count == 2 + assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 2 + + # Check event notifiers are still not subscribed + assert dms_device_mock.async_subscribe_services.await_count == 0 + assert dms_device_mock.async_unsubscribe_services.await_count == 0 + assert dms_device_mock.on_event is None + + # Check entity is gone + assert not domain_data_mock.devices + assert not domain_data_mock.sources diff --git a/tests/components/dlna_dms/test_media_source.py b/tests/components/dlna_dms/test_media_source.py new file mode 100644 index 00000000000..4b43402ecbd --- /dev/null +++ b/tests/components/dlna_dms/test_media_source.py @@ -0,0 +1,255 @@ +"""Tests for dlna_dms.media_source, mostly testing DmsMediaSource.""" +from unittest.mock import ANY, Mock + +from async_upnp_client.exceptions import UpnpError +from didl_lite import didl_lite +import pytest + +from homeassistant.components.dlna_dms.const import DOMAIN +from homeassistant.components.dlna_dms.dms import DlnaDmsData, DmsDeviceSource +from homeassistant.components.dlna_dms.media_source import ( + DmsMediaSource, + async_get_media_source, +) +from homeassistant.components.media_player.errors import BrowseError +from homeassistant.components.media_source.error import Unresolvable +from homeassistant.components.media_source.models import ( + BrowseMediaSource, + MediaSourceItem, +) +from homeassistant.const import CONF_DEVICE_ID, CONF_URL +from homeassistant.core import HomeAssistant + +from .conftest import ( + MOCK_DEVICE_BASE_URL, + MOCK_DEVICE_NAME, + MOCK_DEVICE_TYPE, + MOCK_DEVICE_USN, + MOCK_SOURCE_ID, +) + +from tests.common import MockConfigEntry + + +@pytest.fixture +async def entity( + hass: HomeAssistant, + config_entry_mock: MockConfigEntry, + dms_device_mock: Mock, + domain_data_mock: DlnaDmsData, +) -> DmsDeviceSource: + """Fixture to set up a DmsDeviceSource in a connected state and cleanup at completion.""" + await hass.config_entries.async_add(config_entry_mock) + await hass.async_block_till_done() + return domain_data_mock.devices[MOCK_DEVICE_USN] + + +@pytest.fixture +async def dms_source(hass: HomeAssistant, entity: DmsDeviceSource) -> DmsMediaSource: + """Fixture providing a pre-constructed DmsMediaSource with a single device.""" + return DmsMediaSource(hass) + + +async def test_get_media_source(hass: HomeAssistant) -> None: + """Test the async_get_media_source function and DmsMediaSource constructor.""" + source = await async_get_media_source(hass) + assert isinstance(source, DmsMediaSource) + assert source.domain == DOMAIN + + +async def test_resolve_media_unconfigured(hass: HomeAssistant) -> None: + """Test resolve_media without any devices being configured.""" + source = DmsMediaSource(hass) + item = MediaSourceItem(hass, DOMAIN, "source_id/media_id") + with pytest.raises(Unresolvable, match="No sources have been configured"): + await source.async_resolve_media(item) + + +async def test_resolve_media_bad_identifier( + hass: HomeAssistant, dms_source: DmsMediaSource +) -> None: + """Test trying to resolve an item that has an unresolvable identifier.""" + # Empty identifier + item = MediaSourceItem(hass, DOMAIN, "") + with pytest.raises(Unresolvable, match="No source ID.*"): + await dms_source.async_resolve_media(item) + + # Identifier has media_id but no source_id + item = MediaSourceItem(hass, DOMAIN, "/media_id") + with pytest.raises(Unresolvable, match="No source ID.*"): + await dms_source.async_resolve_media(item) + + # Identifier has source_id but no media_id + item = MediaSourceItem(hass, DOMAIN, "source_id/") + with pytest.raises(Unresolvable, match="No media ID.*"): + await dms_source.async_resolve_media(item) + + # Identifier is missing source_id/media_id separator + item = MediaSourceItem(hass, DOMAIN, "source_id") + with pytest.raises(Unresolvable, match="No media ID.*"): + await dms_source.async_resolve_media(item) + + # Identifier has an unknown source_id + item = MediaSourceItem(hass, DOMAIN, "unknown_source/media_id") + with pytest.raises(Unresolvable, match="Unknown source ID: unknown_source"): + await dms_source.async_resolve_media(item) + + +async def test_resolve_media_success( + hass: HomeAssistant, dms_source: DmsMediaSource, dms_device_mock: Mock +) -> None: + """Test resolving an item via a DmsDeviceSource.""" + object_id = "123" + item = MediaSourceItem(hass, DOMAIN, f"{MOCK_SOURCE_ID}/:{object_id}") + + res_url = "foo/bar" + res_mime = "audio/mpeg" + didl_item = didl_lite.Item( + id=object_id, + restricted=False, + title="Object", + res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")], + ) + dms_device_mock.async_browse_metadata.return_value = didl_item + + result = await dms_source.async_resolve_media(item) + assert result.url == f"{MOCK_DEVICE_BASE_URL}/{res_url}" + assert result.mime_type == res_mime + assert result.didl_metadata is didl_item + + +async def test_browse_media_unconfigured(hass: HomeAssistant) -> None: + """Test browse_media without any devices being configured.""" + source = DmsMediaSource(hass) + item = MediaSourceItem(hass, DOMAIN, "source_id/media_id") + with pytest.raises(BrowseError, match="No sources have been configured"): + await source.async_browse_media(item) + + item = MediaSourceItem(hass, DOMAIN, "") + with pytest.raises(BrowseError, match="No sources have been configured"): + await source.async_browse_media(item) + + +async def test_browse_media_bad_identifier( + hass: HomeAssistant, dms_source: DmsMediaSource +) -> None: + """Test browse_media with a bad source_id.""" + item = MediaSourceItem(hass, DOMAIN, "bad-id/media_id") + with pytest.raises(BrowseError, match="Unknown source ID: bad-id"): + await dms_source.async_browse_media(item) + + +async def test_browse_media_single_source_no_identifier( + hass: HomeAssistant, dms_source: DmsMediaSource, dms_device_mock: Mock +) -> None: + """Test browse_media without a source_id, with a single device registered.""" + # Fast bail-out, mock will be checked after + dms_device_mock.async_browse_metadata.side_effect = UpnpError + + # No source_id nor media_id + item = MediaSourceItem(hass, DOMAIN, "") + with pytest.raises(BrowseError): + await dms_source.async_browse_media(item) + # Mock device should've been browsed for the root directory + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + "0", metadata_filter=ANY + ) + + # No source_id but a media_id + item = MediaSourceItem(hass, DOMAIN, "/:media-item-id") + dms_device_mock.async_browse_metadata.reset_mock() + with pytest.raises(BrowseError): + await dms_source.async_browse_media(item) + # Mock device should've been browsed for the root directory + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + "media-item-id", metadata_filter=ANY + ) + + +async def test_browse_media_multiple_sources( + hass: HomeAssistant, dms_source: DmsMediaSource, dms_device_mock: Mock +) -> None: + """Test browse_media without a source_id, with multiple devices registered.""" + # Set up a second source + other_source_id = "second_source" + other_source_title = "Second source" + other_config_entry = MockConfigEntry( + unique_id=f"different-udn::{MOCK_DEVICE_TYPE}", + domain=DOMAIN, + data={ + CONF_URL: "http://192.88.99.22/dms_description.xml", + CONF_DEVICE_ID: f"different-udn::{MOCK_DEVICE_TYPE}", + }, + title=other_source_title, + ) + await hass.config_entries.async_add(other_config_entry) + await hass.async_block_till_done() + + # No source_id nor media_id + item = MediaSourceItem(hass, DOMAIN, "") + result = await dms_source.async_browse_media(item) + # Mock device should not have been browsed + assert dms_device_mock.async_browse_metadata.await_count == 0 + # Result will be a list of available devices + assert result.title == "DLNA Servers" + assert result.children + assert isinstance(result.children[0], BrowseMediaSource) + assert result.children[0].identifier == f"{MOCK_SOURCE_ID}/:0" + assert result.children[0].title == MOCK_DEVICE_NAME + assert isinstance(result.children[1], BrowseMediaSource) + assert result.children[1].identifier == f"{other_source_id}/:0" + assert result.children[1].title == other_source_title + + # No source_id but a media_id - will give the exact same list of all devices + item = MediaSourceItem(hass, DOMAIN, "/:media-item-id") + result = await dms_source.async_browse_media(item) + # Mock device should not have been browsed + assert dms_device_mock.async_browse_metadata.await_count == 0 + # Result will be a list of available devices + assert result.title == "DLNA Servers" + assert result.children + assert isinstance(result.children[0], BrowseMediaSource) + assert result.children[0].identifier == f"{MOCK_SOURCE_ID}/:0" + assert result.children[0].title == MOCK_DEVICE_NAME + assert isinstance(result.children[1], BrowseMediaSource) + assert result.children[1].identifier == f"{other_source_id}/:0" + assert result.children[1].title == other_source_title + + +async def test_browse_media_source_id( + hass: HomeAssistant, + config_entry_mock: MockConfigEntry, + dms_device_mock: Mock, + domain_data_mock: DlnaDmsData, +) -> None: + """Test browse_media with an explicit source_id.""" + # Set up a second device first, then the primary mock device. + # This allows testing that the right source is chosen by source_id + other_source_title = "Second source" + other_config_entry = MockConfigEntry( + unique_id=f"different-udn::{MOCK_DEVICE_TYPE}", + domain=DOMAIN, + data={ + CONF_URL: "http://192.88.99.22/dms_description.xml", + CONF_DEVICE_ID: f"different-udn::{MOCK_DEVICE_TYPE}", + }, + title=other_source_title, + ) + await hass.config_entries.async_add(other_config_entry) + await hass.async_block_till_done() + + await hass.config_entries.async_add(config_entry_mock) + await hass.async_block_till_done() + + # Fast bail-out, mock will be checked after + dms_device_mock.async_browse_metadata.side_effect = UpnpError + + # Browse by source_id + item = MediaSourceItem(hass, DOMAIN, f"{MOCK_SOURCE_ID}/:media-item-id") + dms_source = DmsMediaSource(hass) + with pytest.raises(BrowseError): + await dms_source.async_browse_media(item) + # Mock device should've been browsed for the root directory + dms_device_mock.async_browse_metadata.assert_awaited_once_with( + "media-item-id", metadata_filter=ANY + )