From 6cec53bea145d8e007a204d2db82bc5ee8a23963 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 27 Mar 2022 22:01:07 -1000 Subject: [PATCH] Add support for finding the samsungtv MainTvAgent service location (#68763) --- .../components/samsungtv/__init__.py | 80 +++++++++- homeassistant/components/samsungtv/bridge.py | 13 +- .../components/samsungtv/config_flow.py | 113 +++++++++----- homeassistant/components/samsungtv/const.py | 7 +- .../components/samsungtv/manifest.json | 4 + .../components/samsungtv/media_player.py | 6 +- homeassistant/generated/ssdp.py | 3 + tests/components/samsungtv/__init__.py | 15 +- tests/components/samsungtv/conftest.py | 16 ++ tests/components/samsungtv/const.py | 43 +++++- .../components/samsungtv/test_config_flow.py | 146 +++++++++++++++--- tests/components/samsungtv/test_init.py | 44 ++++++ .../components/samsungtv/test_media_player.py | 18 ++- 13 files changed, 416 insertions(+), 92 deletions(-) diff --git a/homeassistant/components/samsungtv/__init__.py b/homeassistant/components/samsungtv/__init__.py index 4363e7d8c44..b141d78fb9a 100644 --- a/homeassistant/components/samsungtv/__init__.py +++ b/homeassistant/components/samsungtv/__init__.py @@ -5,11 +5,13 @@ from collections.abc import Mapping from functools import partial import socket from typing import Any +from urllib.parse import urlparse import getmac import voluptuous as vol from homeassistant import config_entries +from homeassistant.components import ssdp from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( CONF_HOST, @@ -24,6 +26,7 @@ from homeassistant.const import ( from homeassistant.core import Event, HomeAssistant, callback from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.debounce import Debouncer from homeassistant.helpers.typing import ConfigType from .bridge import SamsungTVBridge, async_get_device_info, mac_from_device_info @@ -31,12 +34,17 @@ from .const import ( CONF_MODEL, CONF_ON_ACTION, CONF_SESSION_ID, + CONF_SSDP_MAIN_TV_AGENT_LOCATION, + CONF_SSDP_RENDERING_CONTROL_LOCATION, DEFAULT_NAME, DOMAIN, + ENTRY_RELOAD_COOLDOWN, LEGACY_PORT, LOGGER, METHOD_ENCRYPTED_WEBSOCKET, METHOD_LEGACY, + UPNP_SVC_MAIN_TV_AGENT, + UPNP_SVC_RENDERING_CONTROL, ) @@ -109,6 +117,60 @@ def _async_get_device_bridge( ) +class DebouncedEntryReloader: + """Reload only after the timer expires.""" + + def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: + """Init the debounced entry reloader.""" + self.hass = hass + self.entry = entry + self.token = self.entry.data.get(CONF_TOKEN) + self._debounced_reload = Debouncer( + hass, + LOGGER, + cooldown=ENTRY_RELOAD_COOLDOWN, + immediate=False, + function=self._async_reload_entry, + ) + + async def async_call(self, hass: HomeAssistant, entry: ConfigEntry) -> None: + """Start the countdown for a reload.""" + if (new_token := entry.data.get(CONF_TOKEN)) != self.token: + LOGGER.debug("Skipping reload as its a token update") + self.token = new_token + return # Token updates should not trigger a reload + LOGGER.debug("Calling debouncer to get a reload after cooldown") + await self._debounced_reload.async_call() + + @callback + def async_cancel(self) -> None: + """Cancel any pending reload.""" + self._debounced_reload.async_cancel() + + async def _async_reload_entry(self) -> None: + """Reload entry.""" + LOGGER.debug("Reloading entry %s", self.entry.title) + await self.hass.config_entries.async_reload(self.entry.entry_id) + + +async def _async_update_ssdp_locations(hass: HomeAssistant, entry: ConfigEntry) -> None: + """Update ssdp locations from discovery cache.""" + updates = {} + for ssdp_st, key in ( + (UPNP_SVC_RENDERING_CONTROL, CONF_SSDP_RENDERING_CONTROL_LOCATION), + (UPNP_SVC_MAIN_TV_AGENT, CONF_SSDP_MAIN_TV_AGENT_LOCATION), + ): + for discovery_info in await ssdp.async_get_discovery_info_by_st(hass, ssdp_st): + location = discovery_info.ssdp_location + host = urlparse(location).hostname + if host == entry.data[CONF_HOST]: + updates[key] = location + break + + if updates: + hass.config_entries.async_update_entry(entry, data={**entry.data, **updates}) + + async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up the Samsung TV platform.""" @@ -128,14 +190,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: bridge.register_update_config_entry_callback(_update_config_entry) - # Allow bridge to force the reload of the config_entry - @callback - def _reload_config_entry() -> None: - """Update config entry with the new token.""" - hass.async_create_task(hass.config_entries.async_reload(entry.entry_id)) - - bridge.register_reload_callback(_reload_config_entry) - async def stop_bridge(event: Event) -> None: """Stop SamsungTV bridge connection.""" LOGGER.debug("Stopping SamsungTVBridge %s", bridge.host) @@ -145,8 +199,18 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_bridge) ) + await _async_update_ssdp_locations(hass, entry) + + # We must not await after we setup the reload or there + # will be a race where the config flow will see the entry + # as not loaded and may reload it + debounced_reloader = DebouncedEntryReloader(hass, entry) + entry.async_on_unload(debounced_reloader.async_cancel) + entry.async_on_unload(entry.add_update_listener(debounced_reloader.async_call)) + hass.data[DOMAIN][entry.entry_id] = bridge hass.config_entries.async_setup_platforms(entry, PLATFORMS) + return True diff --git a/homeassistant/components/samsungtv/bridge.py b/homeassistant/components/samsungtv/bridge.py index fd258f562b4..d093dc35c72 100644 --- a/homeassistant/components/samsungtv/bridge.py +++ b/homeassistant/components/samsungtv/bridge.py @@ -148,7 +148,6 @@ class SamsungTVBridge(ABC): self.token: str | None = None self.session_id: str | None = None self._reauth_callback: CALLBACK_TYPE | None = None - self._reload_callback: CALLBACK_TYPE | None = None self._update_config_entry: Callable[[Mapping[str, Any]], None] | None = None self._app_list_callback: Callable[[dict[str, str]], None] | None = None @@ -156,10 +155,6 @@ class SamsungTVBridge(ABC): """Register a callback function.""" self._reauth_callback = func - def register_reload_callback(self, func: CALLBACK_TYPE) -> None: - """Register a callback function.""" - self._reload_callback = func - def register_update_config_entry_callback( self, func: Callable[[Mapping[str, Any]], None] ) -> None: @@ -211,11 +206,6 @@ class SamsungTVBridge(ABC): if self._reauth_callback is not None: self._reauth_callback() - def _notify_reload_callback(self) -> None: - """Notify reload callback.""" - if self._reload_callback is not None: - self._reload_callback() - def _notify_update_config_entry(self, updates: Mapping[str, Any]) -> None: """Notify update config callback.""" if self._update_config_entry is not None: @@ -597,7 +587,7 @@ class SamsungTVWSBridge(SamsungTVBridge): ) == "unrecognized method value : ms.remote.control": LOGGER.error( "Your TV seems to be unsupported by SamsungTVWSBridge" - " and needs a PIN: '%s'. Reloading", + " and needs a PIN: '%s'. Updating config entry", message, ) self._notify_update_config_entry( @@ -606,7 +596,6 @@ class SamsungTVWSBridge(SamsungTVBridge): CONF_PORT: ENCRYPTED_WEBSOCKET_PORT, } ) - self._notify_reload_callback() async def async_power_off(self) -> None: """Send power off command to remote.""" diff --git a/homeassistant/components/samsungtv/config_flow.py b/homeassistant/components/samsungtv/config_flow.py index 0b87e38b00a..2acc0d1c7d5 100644 --- a/homeassistant/components/samsungtv/config_flow.py +++ b/homeassistant/components/samsungtv/config_flow.py @@ -30,6 +30,7 @@ from .const import ( CONF_MANUFACTURER, CONF_MODEL, CONF_SESSION_ID, + CONF_SSDP_MAIN_TV_AGENT_LOCATION, CONF_SSDP_RENDERING_CONTROL_LOCATION, DEFAULT_MANUFACTURER, DOMAIN, @@ -46,7 +47,8 @@ from .const import ( RESULT_SUCCESS, RESULT_UNKNOWN_HOST, SUCCESSFUL_RESULTS, - UPNP_SVC_RENDERINGCONTROL, + UPNP_SVC_MAIN_TV_AGENT, + UPNP_SVC_RENDERING_CONTROL, WEBSOCKET_PORTS, ) @@ -58,7 +60,9 @@ def _strip_uuid(udn: str) -> str: def _entry_is_complete( - entry: config_entries.ConfigEntry, ssdp_rendering_control_location: str | None + entry: config_entries.ConfigEntry, + ssdp_rendering_control_location: str | None, + ssdp_main_tv_agent_location: str | None, ) -> bool: """Return True if the config entry information is complete. @@ -72,6 +76,10 @@ def _entry_is_complete( not ssdp_rendering_control_location or entry.data.get(CONF_SSDP_RENDERING_CONTROL_LOCATION) ) + and ( + not ssdp_main_tv_agent_location + or entry.data.get(CONF_SSDP_MAIN_TV_AGENT_LOCATION) + ) ) @@ -88,6 +96,7 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): self._udn: str | None = None self._upnp_udn: str | None = None self._ssdp_rendering_control_location: str | None = None + self._ssdp_main_tv_agent_location: str | None = None self._manufacturer: str | None = None self._model: str | None = None self._connect_result: str | None = None @@ -111,6 +120,7 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): CONF_NAME: self._name, CONF_PORT: self._bridge.port, CONF_SSDP_RENDERING_CONTROL_LOCATION: self._ssdp_rendering_control_location, + CONF_SSDP_MAIN_TV_AGENT_LOCATION: self._ssdp_main_tv_agent_location, } def _get_entry_from_bridge(self) -> data_entry_flow.FlowResult: @@ -141,7 +151,11 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): await self.async_set_unique_id(self._udn, raise_on_progress=False) if ( entry := self._async_update_existing_matching_entry() - ) and _entry_is_complete(entry, self._ssdp_rendering_control_location): + ) and _entry_is_complete( + entry, + self._ssdp_rendering_control_location, + self._ssdp_main_tv_agent_location, + ): raise data_entry_flow.AbortFlow("already_configured") # Now that we have updated the config entry, we can raise # if another one is progressing @@ -157,7 +171,11 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): updates[ CONF_SSDP_RENDERING_CONTROL_LOCATION ] = self._ssdp_rendering_control_location - self._abort_if_unique_id_configured(updates=updates) + if self._ssdp_main_tv_agent_location: + updates[ + CONF_SSDP_MAIN_TV_AGENT_LOCATION + ] = self._ssdp_main_tv_agent_location + self._abort_if_unique_id_configured(updates=updates, reload_on_update=False) async def _async_create_bridge(self) -> None: """Create the bridge.""" @@ -342,36 +360,54 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): Returns the existing entry if it was updated. """ entry, is_unique_match = self._async_get_existing_matching_entry() - if entry: - entry_kw_args: dict = {} - if self.unique_id and ( - entry.unique_id is None - or (is_unique_match and self.unique_id != entry.unique_id) - ): - entry_kw_args["unique_id"] = self.unique_id - data = entry.data - update_ssdp_rendering_control_location = ( - self._ssdp_rendering_control_location - and data.get(CONF_SSDP_RENDERING_CONTROL_LOCATION) - != self._ssdp_rendering_control_location + if not entry: + return None + entry_kw_args: dict = {} + if ( + self.unique_id + and entry.unique_id is None + or (is_unique_match and self.unique_id != entry.unique_id) + ): + entry_kw_args["unique_id"] = self.unique_id + data: dict[str, Any] = dict(entry.data) + update_ssdp_rendering_control_location = ( + self._ssdp_rendering_control_location + and data.get(CONF_SSDP_RENDERING_CONTROL_LOCATION) + != self._ssdp_rendering_control_location + ) + update_ssdp_main_tv_agent_location = ( + self._ssdp_main_tv_agent_location + and data.get(CONF_SSDP_MAIN_TV_AGENT_LOCATION) + != self._ssdp_main_tv_agent_location + ) + update_mac = self._mac and not data.get(CONF_MAC) + if ( + update_ssdp_rendering_control_location + or update_ssdp_main_tv_agent_location + or update_mac + ): + if update_ssdp_rendering_control_location: + data[ + CONF_SSDP_RENDERING_CONTROL_LOCATION + ] = self._ssdp_rendering_control_location + if update_ssdp_main_tv_agent_location: + data[ + CONF_SSDP_MAIN_TV_AGENT_LOCATION + ] = self._ssdp_main_tv_agent_location + if update_mac: + data[CONF_MAC] = self._mac + entry_kw_args["data"] = data + if not entry_kw_args: + return None + LOGGER.debug("Updating existing config entry with %s", entry_kw_args) + self.hass.config_entries.async_update_entry(entry, **entry_kw_args) + if entry.state != config_entries.ConfigEntryState.LOADED: + # If its loaded it already has a reload listener in place + # and we do not want to trigger multiple reloads + self.hass.async_create_task( + self.hass.config_entries.async_reload(entry.entry_id) ) - update_mac = self._mac and not data.get(CONF_MAC) - if update_ssdp_rendering_control_location or update_mac: - entry_kw_args["data"] = {**entry.data} - if update_ssdp_rendering_control_location: - entry_kw_args["data"][ - CONF_SSDP_RENDERING_CONTROL_LOCATION - ] = self._ssdp_rendering_control_location - if update_mac: - entry_kw_args["data"][CONF_MAC] = self._mac - if entry_kw_args: - LOGGER.debug("Updating existing config entry with %s", entry_kw_args) - self.hass.config_entries.async_update_entry(entry, **entry_kw_args) - self.hass.async_create_task( - self.hass.config_entries.async_reload(entry.entry_id) - ) - return entry - return None + return entry async def _async_start_discovery_with_mac_address(self) -> None: """Start discovery.""" @@ -402,10 +438,17 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a flow initialized by ssdp discovery.""" LOGGER.debug("Samsung device found via SSDP: %s", discovery_info) model_name: str = discovery_info.upnp.get(ssdp.ATTR_UPNP_MODEL_NAME) or "" - if discovery_info.ssdp_st == UPNP_SVC_RENDERINGCONTROL: + if discovery_info.ssdp_st == UPNP_SVC_RENDERING_CONTROL: self._ssdp_rendering_control_location = discovery_info.ssdp_location LOGGER.debug( - "Set SSDP location to: %s", self._ssdp_rendering_control_location + "Set SSDP RenderingControl location to: %s", + self._ssdp_rendering_control_location, + ) + elif discovery_info.ssdp_st == UPNP_SVC_MAIN_TV_AGENT: + self._ssdp_main_tv_agent_location = discovery_info.ssdp_location + LOGGER.debug( + "Set SSDP MainTvAgent location to: %s", + self._ssdp_main_tv_agent_location, ) self._udn = self._upnp_udn = _strip_uuid( discovery_info.upnp[ssdp.ATTR_UPNP_UDN] diff --git a/homeassistant/components/samsungtv/const.py b/homeassistant/components/samsungtv/const.py index ad3300cd1e4..cabd85901f6 100644 --- a/homeassistant/components/samsungtv/const.py +++ b/homeassistant/components/samsungtv/const.py @@ -16,6 +16,7 @@ CONF_DESCRIPTION = "description" CONF_MANUFACTURER = "manufacturer" CONF_MODEL = "model" CONF_SSDP_RENDERING_CONTROL_LOCATION = "ssdp_rendering_control_location" +CONF_SSDP_MAIN_TV_AGENT_LOCATION = "ssdp_main_tv_agent_location" CONF_ON_ACTION = "turn_on_action" CONF_SESSION_ID = "session_id" @@ -41,4 +42,8 @@ WEBSOCKET_PORTS = (WEBSOCKET_SSL_PORT, WEBSOCKET_NO_SSL_PORT) SUCCESSFUL_RESULTS = {RESULT_AUTH_MISSING, RESULT_SUCCESS} -UPNP_SVC_RENDERINGCONTROL = "urn:schemas-upnp-org:service:RenderingControl:1" +UPNP_SVC_RENDERING_CONTROL = "urn:schemas-upnp-org:service:RenderingControl:1" +UPNP_SVC_MAIN_TV_AGENT = "urn:samsung.com:service:MainTVAgent2:1" + +# Time to wait before reloading entry upon device config change +ENTRY_RELOAD_COOLDOWN = 5 diff --git a/homeassistant/components/samsungtv/manifest.json b/homeassistant/components/samsungtv/manifest.json index c27011d6bd1..acc5ea3a66a 100644 --- a/homeassistant/components/samsungtv/manifest.json +++ b/homeassistant/components/samsungtv/manifest.json @@ -13,6 +13,9 @@ { "st": "urn:samsung.com:device:RemoteControlReceiver:1" }, + { + "st": "urn:samsung.com:service:MainTVAgent2:1" + }, { "manufacturer": "Samsung", "st": "urn:schemas-upnp-org:service:RenderingControl:1" @@ -25,6 +28,7 @@ "zeroconf": [ {"type":"_airplay._tcp.local.","properties":{"manufacturer":"samsung*"}} ], + "dependencies": ["ssdp"], "dhcp": [ {"registered_devices": true}, { diff --git a/homeassistant/components/samsungtv/media_player.py b/homeassistant/components/samsungtv/media_player.py index 578408d98f3..c20674efdfd 100644 --- a/homeassistant/components/samsungtv/media_player.py +++ b/homeassistant/components/samsungtv/media_player.py @@ -54,7 +54,7 @@ from .const import ( DEFAULT_NAME, DOMAIN, LOGGER, - UPNP_SVC_RENDERINGCONTROL, + UPNP_SVC_RENDERING_CONTROL, ) SOURCES = {"TV": "KEY_TV", "HDMI": "KEY_HDMI"} @@ -256,13 +256,13 @@ class SamsungTVDevice(MediaPlayerEntity): LOGGER.info("Upnp services are not available on %s", self._host) return None - if service := self._upnp_device.services.get(UPNP_SVC_RENDERINGCONTROL): + if service := self._upnp_device.services.get(UPNP_SVC_RENDERING_CONTROL): return service if log: LOGGER.info( "Upnp service %s is not available on %s", - UPNP_SVC_RENDERINGCONTROL, + UPNP_SVC_RENDERING_CONTROL, self._host, ) return None diff --git a/homeassistant/generated/ssdp.py b/homeassistant/generated/ssdp.py index 10ea82949b5..1c0876cd791 100644 --- a/homeassistant/generated/ssdp.py +++ b/homeassistant/generated/ssdp.py @@ -226,6 +226,9 @@ SSDP = { { "st": "urn:samsung.com:device:RemoteControlReceiver:1" }, + { + "st": "urn:samsung.com:service:MainTVAgent2:1" + }, { "manufacturer": "Samsung", "st": "urn:schemas-upnp-org:service:RenderingControl:1" diff --git a/tests/components/samsungtv/__init__.py b/tests/components/samsungtv/__init__.py index 045b8b6e6de..516aa5d8c95 100644 --- a/tests/components/samsungtv/__init__.py +++ b/tests/components/samsungtv/__init__.py @@ -1,17 +1,28 @@ """Tests for the samsungtv component.""" from __future__ import annotations +from datetime import timedelta from unittest.mock import Mock from async_upnp_client.client import UpnpAction, UpnpService -from homeassistant.components.samsungtv.const import DOMAIN +from homeassistant.components.samsungtv.const import DOMAIN, ENTRY_RELOAD_COOLDOWN from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.helpers.typing import ConfigType from homeassistant.setup import async_setup_component +from homeassistant.util import dt as dt_util -from tests.common import MockConfigEntry +from tests.common import MockConfigEntry, async_fire_time_changed + + +async def async_wait_config_entry_reload(hass: HomeAssistant) -> None: + """Wait for the config entry to reload.""" + await hass.async_block_till_done() + async_fire_time_changed( + hass, dt_util.utcnow() + timedelta(seconds=ENTRY_RELOAD_COOLDOWN) + ) + await hass.async_block_till_done() async def setup_samsungtv_entry(hass: HomeAssistant, data: ConfigType) -> ConfigEntry: diff --git a/tests/components/samsungtv/conftest.py b/tests/components/samsungtv/conftest.py index a3809726b5b..e4ef0666423 100644 --- a/tests/components/samsungtv/conftest.py +++ b/tests/components/samsungtv/conftest.py @@ -23,6 +23,22 @@ import homeassistant.util.dt as dt_util from .const import SAMPLE_DEVICE_INFO_WIFI +@pytest.fixture(autouse=True) +async def silent_ssdp_scanner(hass): + """Start SSDP component and get Scanner, prevent actual SSDP traffic.""" + with patch( + "homeassistant.components.ssdp.Scanner._async_start_ssdp_listeners" + ), patch("homeassistant.components.ssdp.Scanner._async_stop_ssdp_listeners"), patch( + "homeassistant.components.ssdp.Scanner.async_scan" + ): + yield + + +@pytest.fixture(autouse=True) +def samsungtv_mock_get_source_ip(mock_get_source_ip): + """Mock network util's async_get_source_ip.""" + + @pytest.fixture(autouse=True) def fake_host_fixture() -> None: """Patch gethostbyname.""" diff --git a/tests/components/samsungtv/const.py b/tests/components/samsungtv/const.py index 64c3c6add8e..28e6e6a7120 100644 --- a/tests/components/samsungtv/const.py +++ b/tests/components/samsungtv/const.py @@ -1,7 +1,18 @@ """Constants for the samsungtv tests.""" from samsungtvws.event import ED_INSTALLED_APP_EVENT -from homeassistant.components.samsungtv.const import CONF_SESSION_ID +from homeassistant.components import ssdp +from homeassistant.components.samsungtv.const import ( + CONF_MODEL, + CONF_SESSION_ID, + METHOD_WEBSOCKET, +) +from homeassistant.components.ssdp import ( + ATTR_UPNP_FRIENDLY_NAME, + ATTR_UPNP_MANUFACTURER, + ATTR_UPNP_MODEL_NAME, + ATTR_UPNP_UDN, +) from homeassistant.const import ( CONF_HOST, CONF_IP_ADDRESS, @@ -25,6 +36,36 @@ MOCK_ENTRYDATA_ENCRYPTED_WS = { CONF_TOKEN: "037739871315caef138547b03e348b72", CONF_SESSION_ID: "2", } +MOCK_ENTRYDATA_WS = { + CONF_HOST: "fake_host", + CONF_METHOD: METHOD_WEBSOCKET, + CONF_PORT: 8002, + CONF_MODEL: "any", + CONF_NAME: "any", +} + +MOCK_SSDP_DATA_RENDERING_CONTROL_ST = ssdp.SsdpServiceInfo( + ssdp_usn="mock_usn", + ssdp_st="urn:schemas-upnp-org:service:RenderingControl:1", + ssdp_location="https://fake_host:12345/test", + upnp={ + ATTR_UPNP_FRIENDLY_NAME: "[TV] fake_name", + ATTR_UPNP_MANUFACTURER: "Samsung fake_manufacturer", + ATTR_UPNP_MODEL_NAME: "fake_model", + ATTR_UPNP_UDN: "uuid:0d1cef00-00dc-1000-9c80-4844f7b172de", + }, +) +MOCK_SSDP_DATA_MAIN_TV_AGENT_ST = ssdp.SsdpServiceInfo( + ssdp_usn="mock_usn", + ssdp_st="urn:samsung.com:service:MainTVAgent2:1", + ssdp_location="https://fake_host:12345/tv_agent", + upnp={ + ATTR_UPNP_FRIENDLY_NAME: "[TV] fake_name", + ATTR_UPNP_MANUFACTURER: "Samsung fake_manufacturer", + ATTR_UPNP_MODEL_NAME: "fake_model", + ATTR_UPNP_UDN: "uuid:0d1cef00-00dc-1000-9c80-4844f7b172de", + }, +) SAMPLE_DEVICE_INFO_WIFI = { "id": "uuid:be9554b9-c9fb-41f4-8920-22da015376a4", diff --git a/tests/components/samsungtv/test_config_flow.py b/tests/components/samsungtv/test_config_flow.py index 1620b46ee23..ba66cbc67ae 100644 --- a/tests/components/samsungtv/test_config_flow.py +++ b/tests/components/samsungtv/test_config_flow.py @@ -24,6 +24,7 @@ from homeassistant.components.samsungtv.const import ( CONF_MANUFACTURER, CONF_MODEL, CONF_SESSION_ID, + CONF_SSDP_MAIN_TV_AGENT_LOCATION, CONF_SSDP_RENDERING_CONTROL_LOCATION, DEFAULT_MANUFACTURER, DOMAIN, @@ -66,6 +67,9 @@ from . import setup_samsungtv_entry from .const import ( MOCK_CONFIG_ENCRYPTED_WS, MOCK_ENTRYDATA_ENCRYPTED_WS, + MOCK_ENTRYDATA_WS, + MOCK_SSDP_DATA_MAIN_TV_AGENT_ST, + MOCK_SSDP_DATA_RENDERING_CONTROL_ST, SAMPLE_DEVICE_INFO_FRAME, ) @@ -100,17 +104,6 @@ MOCK_SSDP_DATA = ssdp.SsdpServiceInfo( }, ) -MOCK_SSDP_DATA_RENDERING_CONTROL_ST = ssdp.SsdpServiceInfo( - ssdp_usn="mock_usn", - ssdp_st="urn:schemas-upnp-org:service:RenderingControl:1", - ssdp_location="https://fake_host:12345/test", - upnp={ - ATTR_UPNP_FRIENDLY_NAME: "[TV] fake_name", - ATTR_UPNP_MANUFACTURER: "Samsung fake_manufacturer", - ATTR_UPNP_MODEL_NAME: "fake_model", - ATTR_UPNP_UDN: "uuid:0d1cef00-00dc-1000-9c80-4844f7b172de", - }, -) MOCK_SSDP_DATA_NOPREFIX = ssdp.SsdpServiceInfo( ssdp_usn="mock_usn", ssdp_st="mock_st", @@ -164,13 +157,6 @@ MOCK_LEGACY_ENTRY = { CONF_METHOD: "legacy", CONF_PORT: None, } -MOCK_WS_ENTRY = { - CONF_HOST: "fake_host", - CONF_METHOD: METHOD_WEBSOCKET, - CONF_PORT: 8002, - CONF_MODEL: "any", - CONF_NAME: "any", -} MOCK_DEVICE_INFO = { "device": { "type": "Samsung SmartTV", @@ -643,6 +629,36 @@ async def test_ssdp_websocket_success_populates_mac_address_and_ssdp_location( assert result["result"].unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" +@pytest.mark.usefixtures("remotews", "rest_api", "remoteencws_failing") +async def test_ssdp_websocket_success_populates_mac_address_and_main_tv_ssdp_location( + hass: HomeAssistant, +) -> None: + """Test starting a flow from ssdp for a supported device populates the mac.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_SSDP_DATA_MAIN_TV_AGENT_ST, + ) + assert result["type"] == "form" + assert result["step_id"] == "confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input="whatever" + ) + assert result["type"] == "create_entry" + assert result["title"] == "Living Room (82GXARRS)" + assert result["data"][CONF_HOST] == "fake_host" + assert result["data"][CONF_NAME] == "Living Room" + assert result["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert result["data"][CONF_MANUFACTURER] == "Samsung fake_manufacturer" + assert result["data"][CONF_MODEL] == "82GXARRS" + assert ( + result["data"][CONF_SSDP_MAIN_TV_AGENT_LOCATION] + == "https://fake_host:12345/tv_agent" + ) + assert result["result"].unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" + + @pytest.mark.usefixtures("remoteencws", "rest_api_non_ssl_only") async def test_ssdp_encrypted_websocket_success_populates_mac_address_and_ssdp_location( hass: HomeAssistant, @@ -1504,6 +1520,52 @@ async def test_update_missing_mac_unique_id_added_ssdp_location_rendering_st_upd assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" +@pytest.mark.usefixtures("remotews", "rest_api", "remoteencws_failing") +async def test_update_missing_mac_unique_id_added_ssdp_location_main_tv_agent_st_updated_from_ssdp( + hass: HomeAssistant, +) -> None: + """Test missing mac and unique id with outdated ssdp_location with the correct st added via ssdp.""" + entry = MockConfigEntry( + domain=DOMAIN, + data={ + **MOCK_OLD_ENTRY, + CONF_SSDP_RENDERING_CONTROL_LOCATION: "https://1.2.3.4:555/test", + CONF_SSDP_MAIN_TV_AGENT_LOCATION: "https://1.2.3.4:555/test", + }, + unique_id=None, + ) + entry.add_to_hass(hass) + with patch( + "homeassistant.components.samsungtv.async_setup", + return_value=True, + ) as mock_setup, patch( + "homeassistant.components.samsungtv.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_SSDP_DATA_MAIN_TV_AGENT_ST, + ) + await hass.async_block_till_done() + assert len(mock_setup.mock_calls) == 1 + assert len(mock_setup_entry.mock_calls) == 1 + + assert result["type"] == "abort" + assert result["reason"] == "already_configured" + assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + # Main TV Agent ST, ssdp location should change + assert ( + entry.data[CONF_SSDP_MAIN_TV_AGENT_LOCATION] + == "https://fake_host:12345/tv_agent" + ) + # Rendering control should not be affected + assert ( + entry.data[CONF_SSDP_RENDERING_CONTROL_LOCATION] == "https://1.2.3.4:555/test" + ) + assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" + + @pytest.mark.usefixtures("remotews", "rest_api", "remoteencws_failing") async def test_update_ssdp_location_rendering_st_updated_from_ssdp( hass: HomeAssistant, @@ -1542,6 +1604,44 @@ async def test_update_ssdp_location_rendering_st_updated_from_ssdp( assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" +@pytest.mark.usefixtures("remotews", "rest_api", "remoteencws_failing") +async def test_update_main_tv_ssdp_location_rendering_st_updated_from_ssdp( + hass: HomeAssistant, +) -> None: + """Test with outdated ssdp_location with the correct st added via ssdp.""" + entry = MockConfigEntry( + domain=DOMAIN, + data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + unique_id="be9554b9-c9fb-41f4-8920-22da015376a4", + ) + entry.add_to_hass(hass) + with patch( + "homeassistant.components.samsungtv.async_setup", + return_value=True, + ) as mock_setup, patch( + "homeassistant.components.samsungtv.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_SSDP_DATA_MAIN_TV_AGENT_ST, + ) + await hass.async_block_till_done() + assert len(mock_setup.mock_calls) == 1 + assert len(mock_setup_entry.mock_calls) == 1 + + assert result["type"] == "abort" + assert result["reason"] == "already_configured" + assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + # Correct ST for MainTV, ssdp location should be added + assert ( + entry.data[CONF_SSDP_MAIN_TV_AGENT_LOCATION] + == "https://fake_host:12345/tv_agent" + ) + assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" + + @pytest.mark.usefixtures("remotews", "rest_api") async def test_update_missing_mac_added_unique_id_preserved_from_zeroconf( hass: HomeAssistant, @@ -1744,7 +1844,7 @@ async def test_form_reauth_legacy(hass: HomeAssistant) -> None: @pytest.mark.usefixtures("remotews", "rest_api") async def test_form_reauth_websocket(hass: HomeAssistant) -> None: """Test reauthenticate websocket.""" - entry = MockConfigEntry(domain=DOMAIN, data=MOCK_WS_ENTRY) + entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRYDATA_WS) entry.add_to_hass(hass) assert entry.state == config_entries.ConfigEntryState.NOT_LOADED @@ -1771,7 +1871,7 @@ async def test_form_reauth_websocket_cannot_connect( hass: HomeAssistant, remotews: Mock ) -> None: """Test reauthenticate websocket when we cannot connect on the first attempt.""" - entry = MockConfigEntry(domain=DOMAIN, data=MOCK_WS_ENTRY) + entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRYDATA_WS) entry.add_to_hass(hass) result = await hass.config_entries.flow.async_init( DOMAIN, @@ -1803,7 +1903,7 @@ async def test_form_reauth_websocket_cannot_connect( async def test_form_reauth_websocket_not_supported(hass: HomeAssistant) -> None: """Test reauthenticate websocket when the device is not supported.""" - entry = MockConfigEntry(domain=DOMAIN, data=MOCK_WS_ENTRY) + entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRYDATA_WS) entry.add_to_hass(hass) result = await hass.config_entries.flow.async_init( DOMAIN, @@ -1972,7 +2072,7 @@ async def test_update_incorrect_udn_matching_mac_from_dhcp( """Test that DHCP updates the wrong udn from ssdp via mac match.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_WS_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + data={**MOCK_ENTRYDATA_WS, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, source=config_entries.SOURCE_SSDP, unique_id="0d1cef00-00dc-1000-9c80-4844f7b172de", ) @@ -2006,7 +2106,7 @@ async def test_no_update_incorrect_udn_not_matching_mac_from_dhcp( """Test that DHCP does not update the wrong udn from ssdp via host match.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_WS_ENTRY, CONF_MAC: "aa:bb:ss:ss:dd:pp"}, + data={**MOCK_ENTRYDATA_WS, CONF_MAC: "aa:bb:ss:ss:dd:pp"}, source=config_entries.SOURCE_SSDP, unique_id="0d1cef00-00dc-1000-9c80-4844f7b172de", ) diff --git a/tests/components/samsungtv/test_init.py b/tests/components/samsungtv/test_init.py index 239840f8c8b..9d3289495a6 100644 --- a/tests/components/samsungtv/test_init.py +++ b/tests/components/samsungtv/test_init.py @@ -7,8 +7,12 @@ import pytest from homeassistant.components.media_player.const import DOMAIN, SUPPORT_TURN_ON from homeassistant.components.samsungtv.const import ( CONF_ON_ACTION, + CONF_SSDP_MAIN_TV_AGENT_LOCATION, + CONF_SSDP_RENDERING_CONTROL_LOCATION, DOMAIN as SAMSUNGTV_DOMAIN, METHOD_WEBSOCKET, + UPNP_SVC_MAIN_TV_AGENT, + UPNP_SVC_RENDERING_CONTROL, ) from homeassistant.components.samsungtv.media_player import SUPPORT_SAMSUNGTV from homeassistant.config_entries import ConfigEntryState @@ -24,6 +28,14 @@ from homeassistant.const import ( from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component +from .const import ( + MOCK_ENTRYDATA_WS, + MOCK_SSDP_DATA_MAIN_TV_AGENT_ST, + MOCK_SSDP_DATA_RENDERING_CONTROL_ST, +) + +from tests.common import MockConfigEntry + ENTITY_ID = f"{DOMAIN}.fake_name" MOCK_CONFIG = { SAMSUNGTV_DOMAIN: [ @@ -156,3 +168,35 @@ async def test_setup_h_j_model( state = hass.states.get(ENTITY_ID) assert state assert "H and J series use an encrypted protocol" in caplog.text + + +@pytest.mark.usefixtures("remote", "remotews", "remoteencws_failing") +async def test_setup_updates_from_ssdp(hass: HomeAssistant) -> None: + """Test setting up the entry fetches data from ssdp cache.""" + entry = MockConfigEntry(domain="samsungtv", data=MOCK_ENTRYDATA_WS) + entry.add_to_hass(hass) + + async def _mock_async_get_discovery_info_by_st(hass: HomeAssistant, mock_st: str): + if mock_st == UPNP_SVC_RENDERING_CONTROL: + return [MOCK_SSDP_DATA_RENDERING_CONTROL_ST] + if mock_st == UPNP_SVC_MAIN_TV_AGENT: + return [MOCK_SSDP_DATA_MAIN_TV_AGENT_ST] + raise ValueError(f"Unknown st {mock_st}") + + with patch( + "homeassistant.components.samsungtv.ssdp.async_get_discovery_info_by_st", + _mock_async_get_discovery_info_by_st, + ): + await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done() + await hass.async_block_till_done() + + assert hass.states.get("media_player.any") + assert ( + entry.data[CONF_SSDP_MAIN_TV_AGENT_LOCATION] + == "https://fake_host:12345/tv_agent" + ) + assert ( + entry.data[CONF_SSDP_RENDERING_CONTROL_LOCATION] + == "https://fake_host:12345/test" + ) diff --git a/tests/components/samsungtv/test_media_player.py b/tests/components/samsungtv/test_media_player.py index 546bede6467..fa8cd871de9 100644 --- a/tests/components/samsungtv/test_media_player.py +++ b/tests/components/samsungtv/test_media_player.py @@ -44,7 +44,7 @@ from homeassistant.components.samsungtv.const import ( ) from homeassistant.components.samsungtv.media_player import ( SUPPORT_SAMSUNGTV, - UPNP_SVC_RENDERINGCONTROL, + UPNP_SVC_RENDERING_CONTROL, ) from homeassistant.const import ( ATTR_DEVICE_CLASS, @@ -80,7 +80,11 @@ from homeassistant.helpers.typing import ConfigType from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util -from . import setup_samsungtv_entry, upnp_get_action_mock +from . import ( + async_wait_config_entry_reload, + setup_samsungtv_entry, + upnp_get_action_mock, +) from .const import ( MOCK_ENTRYDATA_ENCRYPTED_WS, SAMPLE_DEVICE_INFO_FRAME, @@ -1301,8 +1305,8 @@ async def test_websocket_unsupported_remote_control( "'unrecognized method value : ms.remote.control'" in caplog.text ) + await async_wait_config_entry_reload(hass) # ensure reauth triggered, and method/port updated - await hass.async_block_till_done() assert [ flow for flow in hass.config_entries.flow.async_progress() @@ -1320,12 +1324,12 @@ async def test_volume_control_upnp( ) -> None: """Test for Upnp volume control.""" upnp_get_volume = upnp_get_action_mock( - upnp_device, UPNP_SVC_RENDERINGCONTROL, "GetVolume" + upnp_device, UPNP_SVC_RENDERING_CONTROL, "GetVolume" ) upnp_get_volume.async_call.return_value = {"CurrentVolume": 44} upnp_get_mute = upnp_get_action_mock( - upnp_device, UPNP_SVC_RENDERINGCONTROL, "GetMute" + upnp_device, UPNP_SVC_RENDERING_CONTROL, "GetMute" ) upnp_get_mute.async_call.return_value = {"CurrentMute": False} @@ -1335,7 +1339,7 @@ async def test_volume_control_upnp( # Upnp action succeeds upnp_set_volume = upnp_get_action_mock( - upnp_device, UPNP_SVC_RENDERINGCONTROL, "SetVolume" + upnp_device, UPNP_SVC_RENDERING_CONTROL, "SetVolume" ) assert await hass.services.async_call( DOMAIN, @@ -1389,4 +1393,4 @@ async def test_upnp_missing_service( {ATTR_ENTITY_ID: ENTITY_ID, ATTR_MEDIA_VOLUME_LEVEL: 0.6}, True, ) - assert f"Upnp service {UPNP_SVC_RENDERINGCONTROL} is not available" in caplog.text + assert f"Upnp service {UPNP_SVC_RENDERING_CONTROL} is not available" in caplog.text