Add support for setting up encrypted samsung tvs from config flow (#68717)

Co-authored-by: epenet <epenet@users.noreply.github.com>
This commit is contained in:
J. Nick Koston 2022-03-27 10:30:45 -10:00 committed by GitHub
parent b5401ccc4a
commit cc75cebfc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 923 additions and 273 deletions

View File

@ -173,7 +173,7 @@ async def _async_create_bridge_with_updated_data(
else:
# When we imported from yaml we didn't setup the method
# because we didn't know it
port, method, info = await async_get_device_info(hass, None, host)
_result, port, method, info = await async_get_device_info(hass, host)
load_info_attempted = True
if not port or not method:
raise ConfigEntryNotReady(

View File

@ -23,7 +23,12 @@ from samsungtvws.event import (
MS_ERROR_EVENT,
parse_installed_app,
)
from samsungtvws.exceptions import ConnectionFailure, HttpApiError
from samsungtvws.exceptions import (
ConnectionFailure,
HttpApiError,
ResponseError,
UnauthorizedError,
)
from samsungtvws.remote import ChannelEmitCommand, SendRemoteKey
from websockets.exceptions import ConnectionClosedError, WebSocketException
@ -54,6 +59,7 @@ from .const import (
RESULT_CANNOT_CONNECT,
RESULT_NOT_SUPPORTED,
RESULT_SUCCESS,
SUCCESSFUL_RESULTS,
TIMEOUT_REQUEST,
TIMEOUT_WEBSOCKET,
VALUE_CONF_ID,
@ -66,6 +72,8 @@ KEY_PRESS_TIMEOUT = 1.2
ENCRYPTED_MODEL_USES_POWER_OFF = {"H6400"}
ENCRYPTED_MODEL_USES_POWER = {"JU6400", "JU641D"}
REST_EXCEPTIONS = (HttpApiError, AsyncioTimeoutError, ResponseError)
def mac_from_device_info(info: dict[str, Any]) -> str | None:
"""Extract the mac address from the device info."""
@ -76,36 +84,39 @@ def mac_from_device_info(info: dict[str, Any]) -> str | None:
async def async_get_device_info(
hass: HomeAssistant,
bridge: SamsungTVBridge | None,
host: str,
) -> tuple[int | None, str | None, dict[str, Any] | None]:
) -> tuple[str, int | None, str | None, dict[str, Any] | None]:
"""Fetch the port, method, and device info."""
# Bridge is defined
if bridge and bridge.port:
return bridge.port, bridge.method, await bridge.async_device_info()
# Try websocket ports
# Try the websocket ssl and non-ssl ports
for port in WEBSOCKET_PORTS:
bridge = SamsungTVBridge.get_bridge(hass, METHOD_WEBSOCKET, host, port)
if info := await bridge.async_device_info():
return port, METHOD_WEBSOCKET, info
# Try encrypted websocket port
bridge = SamsungTVBridge.get_bridge(
hass, METHOD_ENCRYPTED_WEBSOCKET, host, ENCRYPTED_WEBSOCKET_PORT
)
result = await bridge.async_try_connect()
if result == RESULT_SUCCESS:
return port, METHOD_ENCRYPTED_WEBSOCKET, await bridge.async_device_info()
LOGGER.debug(
"Fetching rest info via %s was successful: %s, checking for encrypted",
port,
info,
)
encrypted_bridge = SamsungTVEncryptedBridge(
hass, METHOD_ENCRYPTED_WEBSOCKET, host, ENCRYPTED_WEBSOCKET_PORT
)
result = await encrypted_bridge.async_try_connect()
if result != RESULT_CANNOT_CONNECT:
return (
result,
ENCRYPTED_WEBSOCKET_PORT,
METHOD_ENCRYPTED_WEBSOCKET,
info,
)
return RESULT_SUCCESS, port, METHOD_WEBSOCKET, info
# Try legacy port
bridge = SamsungTVBridge.get_bridge(hass, METHOD_LEGACY, host, LEGACY_PORT)
result = await bridge.async_try_connect()
if result in (RESULT_SUCCESS, RESULT_AUTH_MISSING):
return LEGACY_PORT, METHOD_LEGACY, await bridge.async_device_info()
if result in SUCCESSFUL_RESULTS:
return result, LEGACY_PORT, METHOD_LEGACY, await bridge.async_device_info()
# Failed to get info
return None, None, None
return result, None, None, None
class SamsungTVBridge(ABC):
@ -433,8 +444,11 @@ class SamsungTVWSBridge(SamsungTVBridge):
"Working but unsupported config: %s, error: %s", config, err
)
result = RESULT_NOT_SUPPORTED
except (OSError, AsyncioTimeoutError, ConnectionFailure) as err:
LOGGER.debug("Failing config: %s, error: %s", config, err)
except UnauthorizedError as err:
LOGGER.debug("Failing config: %s, %s error: %s", config, type(err), err)
return RESULT_AUTH_MISSING
except (ConnectionFailure, OSError, AsyncioTimeoutError) as err:
LOGGER.debug("Failing config: %s, %s error: %s", config, type(err), err)
# pylint: disable=useless-else-on-loop
else:
if result:
@ -453,7 +467,7 @@ class SamsungTVWSBridge(SamsungTVBridge):
timeout=TIMEOUT_WEBSOCKET,
)
with contextlib.suppress(HttpApiError, AsyncioTimeoutError):
with contextlib.suppress(*REST_EXCEPTIONS):
device_info: dict[str, Any] = await rest_api.rest_device_info()
LOGGER.debug("Device info on %s is: %s", self.host, device_info)
self._device_info = device_info
@ -654,8 +668,7 @@ class SamsungTVEncryptedBridge(SamsungTVBridge):
CONF_HOST: self.host,
CONF_METHOD: self.method,
CONF_PORT: self.port,
# We need this high timeout because waiting for auth popup is just an open socket
CONF_TIMEOUT: TIMEOUT_REQUEST,
CONF_TIMEOUT: TIMEOUT_WEBSOCKET,
}
try:
@ -669,13 +682,14 @@ class SamsungTVEncryptedBridge(SamsungTVBridge):
timeout=TIMEOUT_REQUEST,
) as remote:
await remote.start_listening()
LOGGER.debug("Working config: %s", config)
return RESULT_SUCCESS
except WebSocketException as err:
LOGGER.debug("Working but unsupported config: %s, error: %s", config, err)
return RESULT_NOT_SUPPORTED
except (OSError, AsyncioTimeoutError, ConnectionFailure) as err:
LOGGER.debug("Failing config: %s, error: %s", config, err)
else:
LOGGER.debug("Working config: %s", config)
return RESULT_SUCCESS
return RESULT_CANNOT_CONNECT
@ -696,7 +710,7 @@ class SamsungTVEncryptedBridge(SamsungTVBridge):
timeout=TIMEOUT_WEBSOCKET,
)
with contextlib.suppress(HttpApiError, AsyncioTimeoutError):
with contextlib.suppress(*REST_EXCEPTIONS):
device_info: dict[str, Any] = await rest_api.rest_device_info()
LOGGER.debug("Device info on %s is: %s", self.host, device_info)
self._device_info = device_info

View File

@ -30,6 +30,7 @@ from .const import (
CONF_MANUFACTURER,
CONF_MODEL,
CONF_SESSION_ID,
CONF_SSDP_RENDERING_CONTROL_LOCATION,
DEFAULT_MANUFACTURER,
DOMAIN,
ENCRYPTED_WEBSOCKET_PORT,
@ -44,20 +45,34 @@ from .const import (
RESULT_NOT_SUPPORTED,
RESULT_SUCCESS,
RESULT_UNKNOWN_HOST,
SUCCESSFUL_RESULTS,
UPNP_SVC_RENDERINGCONTROL,
WEBSOCKET_PORTS,
)
DATA_SCHEMA = vol.Schema({vol.Required(CONF_HOST): str, vol.Required(CONF_NAME): str})
SUPPORTED_METHODS = [METHOD_LEGACY, METHOD_WEBSOCKET]
def _strip_uuid(udn: str) -> str:
return udn[5:] if udn.startswith("uuid:") else udn
def _entry_is_complete(entry: config_entries.ConfigEntry) -> bool:
"""Return True if the config entry information is complete."""
return bool(entry.unique_id and entry.data.get(CONF_MAC))
def _entry_is_complete(
entry: config_entries.ConfigEntry, ssdp_rendering_control_location: str | None
) -> bool:
"""Return True if the config entry information is complete.
If we do not have an ssdp location we consider it complete
as some TVs will not support SSDP/UPNP
"""
return bool(
entry.unique_id
and entry.data.get(CONF_MAC)
and (
not ssdp_rendering_control_location
or entry.data.get(CONF_SSDP_RENDERING_CONTROL_LOCATION)
)
)
class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
@ -72,22 +87,22 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self._mac: str | None = None
self._udn: str | None = None
self._upnp_udn: str | None = None
self._ssdp_rendering_control_location: str | None = None
self._manufacturer: str | None = None
self._model: str | None = None
self._connect_result: str | None = None
self._method: str | None = None
self._name: str | None = None
self._title: str = ""
self._id: int | None = None
self._bridge: SamsungTVBridge | None = None
self._device_info: dict[str, Any] | None = None
self._encrypted_authenticator: SamsungTVEncryptedWSAsyncAuthenticator | None = (
None
)
self._authenticator: SamsungTVEncryptedWSAsyncAuthenticator | None = None
def _get_entry_from_bridge(self) -> data_entry_flow.FlowResult:
"""Get device entry."""
assert self._bridge
data = {
def _base_config_entry(self) -> dict[str, Any]:
"""Generate the base config entry without the method."""
assert self._bridge is not None
return {
CONF_HOST: self._host,
CONF_MAC: self._mac,
CONF_MANUFACTURER: self._manufacturer or DEFAULT_MANUFACTURER,
@ -95,7 +110,13 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
CONF_MODEL: self._model,
CONF_NAME: self._name,
CONF_PORT: self._bridge.port,
CONF_SSDP_RENDERING_CONTROL_LOCATION: self._ssdp_rendering_control_location,
}
def _get_entry_from_bridge(self) -> data_entry_flow.FlowResult:
"""Get device entry."""
assert self._bridge
data = self._base_config_entry()
if self._bridge.token:
data[CONF_TOKEN] = self._bridge.token
return self.async_create_entry(
@ -115,48 +136,66 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
) -> None:
"""Set the unique id from the udn."""
assert self._host is not None
await self.async_set_unique_id(self._udn, raise_on_progress=raise_on_progress)
# Set the unique id without raising on progress in case
# there are two SSDP flows with for each ST
await self.async_set_unique_id(self._udn, raise_on_progress=False)
if (
entry := self._async_update_existing_matching_entry()
) and _entry_is_complete(entry):
) and _entry_is_complete(entry, self._ssdp_rendering_control_location):
raise data_entry_flow.AbortFlow("already_configured")
# Now that we have updated the config entry, we can raise
# if another one is progressing
if raise_on_progress:
await self.async_set_unique_id(self._udn, raise_on_progress=True)
def _async_update_and_abort_for_matching_unique_id(self) -> None:
"""Abort and update host and mac if we have it."""
updates = {CONF_HOST: self._host}
if self._mac:
updates[CONF_MAC] = self._mac
if self._ssdp_rendering_control_location:
updates[
CONF_SSDP_RENDERING_CONTROL_LOCATION
] = self._ssdp_rendering_control_location
self._abort_if_unique_id_configured(updates=updates)
async def _try_connect(self) -> None:
"""Try to connect and check auth."""
for method in SUPPORTED_METHODS:
self._bridge = SamsungTVBridge.get_bridge(self.hass, method, self._host)
result = await self._bridge.async_try_connect()
if result == RESULT_SUCCESS:
return
if result != RESULT_CANNOT_CONNECT:
raise data_entry_flow.AbortFlow(result)
LOGGER.debug("No working config found")
raise data_entry_flow.AbortFlow(RESULT_CANNOT_CONNECT)
async def _async_create_bridge(self) -> None:
"""Create the bridge."""
result, method, _info = await self._async_get_device_info_and_method()
if result not in SUCCESSFUL_RESULTS:
LOGGER.debug("No working config found for %s", self._host)
raise data_entry_flow.AbortFlow(result)
assert method is not None
self._bridge = SamsungTVBridge.get_bridge(self.hass, method, self._host)
return
async def _async_get_device_info_and_method(
self,
) -> tuple[str, str | None, dict[str, Any] | None]:
"""Get device info and method only once."""
if self._connect_result is None:
result, _, method, info = await async_get_device_info(self.hass, self._host)
self._connect_result = result
self._method = method
self._device_info = info
if not method:
LOGGER.debug("Host:%s did not return device info", self._host)
return result, None, None
return self._connect_result, self._method, self._device_info
async def _async_get_and_check_device_info(self) -> bool:
"""Try to get the device info."""
_port, _method, info = await async_get_device_info(
self.hass, self._bridge, self._host
)
result, _method, info = await self._async_get_device_info_and_method()
if result not in SUCCESSFUL_RESULTS:
raise data_entry_flow.AbortFlow(result)
if not info:
if not _method:
LOGGER.debug(
"Samsung host %s is not supported by either %s or %s methods",
self._host,
METHOD_LEGACY,
METHOD_WEBSOCKET,
)
raise data_entry_flow.AbortFlow(RESULT_NOT_SUPPORTED)
return False
dev_info = info.get("device", {})
assert dev_info is not None
if (device_type := dev_info.get("type")) != "Samsung SmartTV":
LOGGER.debug(
"Host:%s has type: %s which is not supported", self._host, device_type
)
raise data_entry_flow.AbortFlow(RESULT_NOT_SUPPORTED)
self._model = dev_info.get("modelName")
name = dev_info.get("name")
@ -169,7 +208,6 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
partial(getmac.get_mac_address, ip=self._host)
):
self._mac = mac
self._device_info = info
return True
async def async_step_import(
@ -209,16 +247,73 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a flow initialized by the user."""
if user_input is not None:
await self._async_set_name_host_from_input(user_input)
await self._try_connect()
await self._async_create_bridge()
assert self._bridge
self._async_abort_entries_match({CONF_HOST: self._host})
if self._bridge.method != METHOD_LEGACY:
# Legacy bridge does not provide device info
await self._async_set_device_unique_id(raise_on_progress=False)
return self._get_entry_from_bridge()
if self._bridge.method == METHOD_ENCRYPTED_WEBSOCKET:
return await self.async_step_encrypted_pairing()
return await self.async_step_pairing({})
return self.async_show_form(step_id="user", data_schema=DATA_SCHEMA)
async def async_step_pairing(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Handle a pairing by accepting the message on the TV."""
assert self._bridge is not None
errors: dict[str, str] = {}
if user_input is not None:
result = await self._bridge.async_try_connect()
if result == RESULT_SUCCESS:
return self._get_entry_from_bridge()
if result != RESULT_AUTH_MISSING:
raise data_entry_flow.AbortFlow(result)
errors = {"base": RESULT_AUTH_MISSING}
self.context["title_placeholders"] = {"device": self._title}
return self.async_show_form(
step_id="pairing",
errors=errors,
description_placeholders={"device": self._title},
data_schema=vol.Schema({}),
)
async def async_step_encrypted_pairing(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Handle a encrypted pairing."""
assert self._host is not None
await self._async_start_encrypted_pairing(self._host)
assert self._authenticator is not None
errors: dict[str, str] = {}
if user_input is not None:
if (
(pin := user_input.get("pin"))
and (token := await self._authenticator.try_pin(pin))
and (session_id := await self._authenticator.get_session_id_and_close())
):
return self.async_create_entry(
data={
**self._base_config_entry(),
CONF_TOKEN: token,
CONF_SESSION_ID: session_id,
},
title=self._title,
)
errors = {"base": RESULT_INVALID_PIN}
self.context["title_placeholders"] = {"device": self._title}
return self.async_show_form(
step_id="encrypted_pairing",
errors=errors,
description_placeholders={"device": self._title},
data_schema=vol.Schema({vol.Required("pin"): str}),
)
@callback
def _async_get_existing_matching_entry(
self,
@ -254,8 +349,21 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
or (is_unique_match and self.unique_id != entry.unique_id)
):
entry_kw_args["unique_id"] = self.unique_id
if self._mac and not entry.data.get(CONF_MAC):
entry_kw_args["data"] = {**entry.data, CONF_MAC: self._mac}
data = entry.data
update_ssdp_rendering_control_location = (
self._ssdp_rendering_control_location
and data.get(CONF_SSDP_RENDERING_CONTROL_LOCATION)
!= self._ssdp_rendering_control_location
)
update_mac = self._mac and not data.get(CONF_MAC)
if update_ssdp_rendering_control_location or update_mac:
entry_kw_args["data"] = {**entry.data}
if update_ssdp_rendering_control_location:
entry_kw_args["data"][
CONF_SSDP_RENDERING_CONTROL_LOCATION
] = self._ssdp_rendering_control_location
if update_mac:
entry_kw_args["data"][CONF_MAC] = self._mac
if entry_kw_args:
LOGGER.debug("Updating existing config entry with %s", entry_kw_args)
self.hass.config_entries.async_update_entry(entry, **entry_kw_args)
@ -294,6 +402,11 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a flow initialized by ssdp discovery."""
LOGGER.debug("Samsung device found via SSDP: %s", discovery_info)
model_name: str = discovery_info.upnp.get(ssdp.ATTR_UPNP_MODEL_NAME) or ""
if discovery_info.ssdp_st == UPNP_SVC_RENDERINGCONTROL:
self._ssdp_rendering_control_location = discovery_info.ssdp_location
LOGGER.debug(
"Set SSDP location to: %s", self._ssdp_rendering_control_location
)
self._udn = self._upnp_udn = _strip_uuid(
discovery_info.upnp[ssdp.ATTR_UPNP_UDN]
)
@ -345,12 +458,12 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
) -> data_entry_flow.FlowResult:
"""Handle user-confirmation of discovered node."""
if user_input is not None:
await self._try_connect()
await self._async_create_bridge()
assert self._bridge
return self._get_entry_from_bridge()
if self._bridge.method == METHOD_ENCRYPTED_WEBSOCKET:
return await self.async_step_encrypted_pairing()
return await self.async_step_pairing({})
self._set_confirm_only()
return self.async_show_form(
step_id="confirm", description_placeholders={"device": self._title}
)
@ -378,6 +491,8 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
assert self._reauth_entry
method = self._reauth_entry.data[CONF_METHOD]
if user_input is not None:
if method == METHOD_ENCRYPTED_WEBSOCKET:
return await self.async_step_reauth_confirm_encrypted()
bridge = SamsungTVBridge.get_bridge(
self.hass,
method,
@ -399,40 +514,42 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors = {"base": RESULT_AUTH_MISSING}
self.context["title_placeholders"] = {"device": self._title}
step_id = "reauth_confirm"
if method == METHOD_ENCRYPTED_WEBSOCKET:
step_id = "reauth_confirm_encrypted"
return self.async_show_form(
step_id=step_id,
step_id="reauth_confirm",
errors=errors,
description_placeholders={"device": self._title},
)
async def _async_start_encrypted_pairing(self, host: str) -> None:
if self._authenticator is None:
self._authenticator = SamsungTVEncryptedWSAsyncAuthenticator(
host,
web_session=async_get_clientsession(self.hass),
)
await self._authenticator.start_pairing()
async def async_step_reauth_confirm_encrypted(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Confirm reauth (encrypted method)."""
errors = {}
assert self._reauth_entry
if self._encrypted_authenticator is None:
self._encrypted_authenticator = SamsungTVEncryptedWSAsyncAuthenticator(
self._reauth_entry.data[CONF_HOST],
web_session=async_get_clientsession(self.hass),
)
await self._encrypted_authenticator.start_pairing()
await self._async_start_encrypted_pairing(self._reauth_entry.data[CONF_HOST])
assert self._authenticator is not None
if user_input is not None and (pin := user_input.get("pin")):
if token := await self._encrypted_authenticator.try_pin(pin):
session_id = (
await self._encrypted_authenticator.get_session_id_and_close()
)
new_data = {
**self._reauth_entry.data,
CONF_TOKEN: token,
CONF_SESSION_ID: session_id,
}
if user_input is not None:
if (
(pin := user_input.get("pin"))
and (token := await self._authenticator.try_pin(pin))
and (session_id := await self._authenticator.get_session_id_and_close())
):
self.hass.config_entries.async_update_entry(
self._reauth_entry, data=new_data
self._reauth_entry,
data={
**self._reauth_entry.data,
CONF_TOKEN: token,
CONF_SESSION_ID: session_id,
},
)
await self.hass.config_entries.async_reload(self._reauth_entry.entry_id)
return self.async_abort(reason="reauth_successful")

View File

@ -15,6 +15,7 @@ VALUE_CONF_ID = "ha.component.samsung"
CONF_DESCRIPTION = "description"
CONF_MANUFACTURER = "manufacturer"
CONF_MODEL = "model"
CONF_SSDP_RENDERING_CONTROL_LOCATION = "ssdp_rendering_control_location"
CONF_ON_ACTION = "turn_on_action"
CONF_SESSION_ID = "session_id"
@ -34,4 +35,10 @@ TIMEOUT_WEBSOCKET = 5
LEGACY_PORT = 55000
ENCRYPTED_WEBSOCKET_PORT = 8000
WEBSOCKET_PORTS = (8002, 8001)
WEBSOCKET_NO_SSL_PORT = 8001
WEBSOCKET_SSL_PORT = 8002
WEBSOCKET_PORTS = (WEBSOCKET_SSL_PORT, WEBSOCKET_NO_SSL_PORT)
SUCCESSFUL_RESULTS = {RESULT_AUTH_MISSING, RESULT_SUCCESS}
UPNP_SVC_RENDERINGCONTROL = "urn:schemas-upnp-org:service:RenderingControl:1"

View File

@ -11,6 +11,14 @@
"ssdp": [
{
"st": "urn:samsung.com:device:RemoteControlReceiver:1"
},
{
"manufacturer": "Samsung",
"st": "urn:schemas-upnp-org:service:RenderingControl:1"
},
{
"manufacturer": "Samsung Electronics",
"st": "urn:schemas-upnp-org:service:RenderingControl:1"
}
],
"zeroconf": [

View File

@ -12,11 +12,17 @@
"confirm": {
"description": "Do you want to set up {device}? If you never connected Home Assistant before you should see a popup on your TV asking for authorization."
},
"pairing": {
"description": "[%key:component::samsungtv::config::step::confirm::description%]"
},
"reauth_confirm": {
"description": "After submitting, accept the the popup on {device} requesting authorization within 30 seconds or input PIN."
},
"reauth_confirm_encrypted": {
"encrypted_pairing": {
"description": "Please enter the PIN displayed on {device}."
},
"reauth_confirm_encrypted": {
"description": "[%key:component::samsungtv::config::step::encrypted_pairing::description%]"
}
},
"error": {
@ -34,4 +40,4 @@
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
}
}
}
}

View File

@ -6,7 +6,6 @@
"auth_missing": "Home Assistant is not authorized to connect to this Samsung TV. Check your TV's External Device Manager settings to authorize Home Assistant.",
"cannot_connect": "Failed to connect",
"id_missing": "This Samsung device doesn't have a SerialNumber.",
"missing_config_entry": "This Samsung device doesn't have a configuration entry.",
"not_supported": "This Samsung device is currently not supported.",
"reauth_successful": "Re-authentication was successful",
"unknown": "Unexpected error"
@ -18,8 +17,13 @@
"flow_title": "{device}",
"step": {
"confirm": {
"description": "Do you want to set up {device}? If you never connected Home Assistant before you should see a popup on your TV asking for authorization.",
"title": "Samsung TV"
"description": "Do you want to set up {device}? If you never connected Home Assistant before you should see a popup on your TV asking for authorization."
},
"encrypted_pairing": {
"description": "Please enter the PIN displayed on {device}."
},
"pairing": {
"description": "Do you want to set up {device}? If you never connected Home Assistant before you should see a popup on your TV asking for authorization."
},
"reauth_confirm": {
"description": "After submitting, accept the the popup on {device} requesting authorization within 30 seconds or input PIN."

View File

@ -225,6 +225,14 @@ SSDP = {
"samsungtv": [
{
"st": "urn:samsung.com:device:RemoteControlReceiver:1"
},
{
"manufacturer": "Samsung",
"st": "urn:schemas-upnp-org:service:RenderingControl:1"
},
{
"manufacturer": "Samsung Electronics",
"st": "urn:schemas-upnp-org:service:RenderingControl:1"
}
],
"songpal": [

View File

@ -1,5 +1,5 @@
"""Tests for the samsungtv component."""
from __future__ import annotations
from homeassistant.components.samsungtv.const import DOMAIN
from homeassistant.config_entries import ConfigEntry

View File

@ -12,8 +12,10 @@ from samsungtvws.async_remote import SamsungTVWSAsyncRemote
from samsungtvws.command import SamsungTVCommand
from samsungtvws.encrypted.remote import SamsungTVEncryptedWSAsyncRemote
from samsungtvws.event import ED_INSTALLED_APP_EVENT
from samsungtvws.exceptions import ResponseError
from samsungtvws.remote import ChannelEmitCommand
from homeassistant.components.samsungtv.const import WEBSOCKET_SSL_PORT
import homeassistant.util.dt as dt_util
from .const import SAMPLE_DEVICE_INFO_WIFI
@ -47,7 +49,7 @@ def remote_fixture() -> Mock:
yield remote
@pytest.fixture(name="rest_api", autouse=True)
@pytest.fixture(name="rest_api")
def rest_api_fixture() -> Mock:
"""Patch the samsungtvws SamsungTVAsyncRest."""
with patch(
@ -60,6 +62,52 @@ def rest_api_fixture() -> Mock:
yield rest_api_class.return_value
@pytest.fixture(name="rest_api_non_ssl_only")
def rest_api_fixture_non_ssl_only() -> Mock:
"""Patch the samsungtvws SamsungTVAsyncRest non-ssl only."""
class MockSamsungTVAsyncRest:
"""Mock for a MockSamsungTVAsyncRest."""
def __init__(self, host, session, port, timeout):
"""Mock a MockSamsungTVAsyncRest."""
self.port = port
self.host = host
async def rest_device_info(self):
"""Mock rest_device_info to fail for ssl and work for non-ssl."""
if self.port == WEBSOCKET_SSL_PORT:
raise ResponseError
return SAMPLE_DEVICE_INFO_WIFI
with patch(
"homeassistant.components.samsungtv.bridge.SamsungTVAsyncRest",
MockSamsungTVAsyncRest,
):
yield
@pytest.fixture(name="rest_api_failing")
def rest_api_failure_fixture() -> Mock:
"""Patch the samsungtvws SamsungTVAsyncRest."""
with patch(
"homeassistant.components.samsungtv.bridge.SamsungTVAsyncRest",
autospec=True,
) as rest_api_class:
rest_api_class.return_value.rest_device_info.side_effect = ResponseError
yield
@pytest.fixture(name="remoteencws_failing")
def remoteencws_failing_fixture():
"""Patch the samsungtvws SamsungTVEncryptedWSAsyncRemote."""
with patch(
"homeassistant.components.samsungtv.bridge.SamsungTVEncryptedWSAsyncRemote.start_listening",
side_effect=OSError,
):
yield
@pytest.fixture(name="remotews")
def remotews_fixture() -> Mock:
"""Patch the samsungtvws SamsungTVWS."""

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,7 @@ from .test_media_player import MOCK_ENTRY_WS_WITH_MAC
from tests.components.diagnostics import get_diagnostics_for_config_entry
@pytest.mark.usefixtures("remotews")
@pytest.mark.usefixtures("remotews", "rest_api")
async def test_entry_diagnostics(
hass: HomeAssistant, hass_client: ClientSession
) -> None:

View File

@ -56,7 +56,12 @@ REMOTE_CALL = {
}
@pytest.mark.usefixtures("remotews")
@pytest.fixture(name="autouse_rest_api", autouse=True)
def autouse_rest_api(rest_api) -> Mock:
"""Enable auto use of the rest api fixture for these tests."""
@pytest.mark.usefixtures("remotews", "remoteencws_failing")
async def test_setup(hass: HomeAssistant) -> None:
"""Test Samsung TV integration is setup."""
await async_setup_component(hass, SAMSUNGTV_DOMAIN, MOCK_CONFIG)
@ -98,7 +103,7 @@ async def test_setup_from_yaml_without_port_device_offline(hass: HomeAssistant)
assert config_entries_domain[0].state == ConfigEntryState.SETUP_RETRY
@pytest.mark.usefixtures("remotews")
@pytest.mark.usefixtures("remotews", "remoteencws_failing")
async def test_setup_from_yaml_without_port_device_online(hass: HomeAssistant) -> None:
"""Test import from yaml when the device is online."""
await async_setup_component(hass, SAMSUNGTV_DOMAIN, MOCK_CONFIG)
@ -127,7 +132,7 @@ async def test_setup_duplicate_config(
assert "duplicate host entries found" in caplog.text
@pytest.mark.usefixtures("remote", "remotews")
@pytest.mark.usefixtures("remote", "remotews", "remoteencws_failing")
async def test_setup_duplicate_entries(hass: HomeAssistant) -> None:
"""Test duplicate setup of platform."""
await async_setup_component(hass, SAMSUNGTV_DOMAIN, MOCK_CONFIG)
@ -138,7 +143,7 @@ async def test_setup_duplicate_entries(hass: HomeAssistant) -> None:
assert len(hass.states.async_all("media_player")) == 1
@pytest.mark.usefixtures("remotews")
@pytest.mark.usefixtures("remotews", "remoteencws_failing")
async def test_setup_h_j_model(
hass: HomeAssistant, rest_api: Mock, caplog: pytest.LogCaptureFixture
) -> None:

View File

@ -141,6 +141,11 @@ MOCK_CONFIG_NOTURNON = {
}
@pytest.fixture(name="autouse_rest_api", autouse=True)
def autouse_rest_api(rest_api) -> Mock:
"""Enable auto use of the rest api fixture for these tests."""
@pytest.fixture(name="delay")
def delay_fixture():
"""Patch the delay script function."""