Do not abort on invalid host in SamsungTV user flow (#144794)

This commit is contained in:
epenet 2025-05-13 10:47:26 +02:00 committed by GitHub
parent a7787d6080
commit 5c6984d326
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 48 additions and 40 deletions

View File

@ -56,7 +56,6 @@ from .const import (
RESULT_INVALID_PIN, RESULT_INVALID_PIN,
RESULT_NOT_SUPPORTED, RESULT_NOT_SUPPORTED,
RESULT_SUCCESS, RESULT_SUCCESS,
RESULT_UNKNOWN_HOST,
SUCCESSFUL_RESULTS, SUCCESSFUL_RESULTS,
UPNP_SVC_MAIN_TV_AGENT, UPNP_SVC_MAIN_TV_AGENT,
UPNP_SVC_RENDERING_CONTROL, UPNP_SVC_RENDERING_CONTROL,
@ -252,32 +251,40 @@ class SamsungTVConfigFlow(ConfigFlow, domain=DOMAIN):
self._mac = mac self._mac = mac
return True return True
async def _async_set_name_host_from_input(self, user_input: dict[str, Any]) -> None: async def _async_set_name_host_from_input(self, user_input: dict[str, Any]) -> bool:
try: try:
self._host = await self.hass.async_add_executor_job( self._host = await self.hass.async_add_executor_job(
socket.gethostbyname, user_input[CONF_HOST] socket.gethostbyname, user_input[CONF_HOST]
) )
except socket.gaierror as err: except socket.gaierror as err:
raise AbortFlow(RESULT_UNKNOWN_HOST) from err LOGGER.debug("Failed to get IP for %s: %s", user_input[CONF_HOST], err)
return False
self._title = self._host self._title = self._host
return True
async def async_step_user( async def async_step_user(
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult: ) -> ConfigFlowResult:
"""Handle a flow initialized by the user.""" """Handle a flow initialized by the user."""
errors: dict[str, str] | None = None
if user_input is not None: if user_input is not None:
await self._async_set_name_host_from_input(user_input) if await self._async_set_name_host_from_input(user_input):
await self._async_create_bridge() await self._async_create_bridge()
assert self._bridge assert self._bridge
self._async_abort_entries_match({CONF_HOST: self._host}) self._async_abort_entries_match({CONF_HOST: self._host})
if self._bridge.method != METHOD_LEGACY: if self._bridge.method != METHOD_LEGACY:
# Legacy bridge does not provide device info # Legacy bridge does not provide device info
await self._async_set_device_unique_id(raise_on_progress=False) await self._async_set_device_unique_id(raise_on_progress=False)
if self._bridge.method == METHOD_ENCRYPTED_WEBSOCKET: if self._bridge.method == METHOD_ENCRYPTED_WEBSOCKET:
return await self.async_step_encrypted_pairing() return await self.async_step_encrypted_pairing()
return await self.async_step_pairing({}) return await self.async_step_pairing({})
errors = {"base": "invalid_host"}
return self.async_show_form(step_id="user", data_schema=DATA_SCHEMA) return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(DATA_SCHEMA, user_input),
errors=errors,
)
async def async_step_pairing( async def async_step_pairing(
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None

View File

@ -43,6 +43,7 @@
}, },
"error": { "error": {
"auth_missing": "[%key:component::samsungtv::config::abort::auth_missing%]", "auth_missing": "[%key:component::samsungtv::config::abort::auth_missing%]",
"invalid_host": "Host is invalid, please try again.",
"invalid_pin": "PIN is invalid, please try again." "invalid_pin": "PIN is invalid, please try again."
}, },
"abort": { "abort": {
@ -52,7 +53,6 @@
"id_missing": "This Samsung device doesn't have a SerialNumber.", "id_missing": "This Samsung device doesn't have a SerialNumber.",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]", "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"not_supported": "This Samsung device is currently not supported.", "not_supported": "This Samsung device is currently not supported.",
"unknown": "[%key:common::config_flow::error::unknown%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]" "reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
} }
}, },

View File

@ -67,7 +67,7 @@ def fake_host_fixture() -> Generator[None]:
"""Patch gethostbyname.""" """Patch gethostbyname."""
with patch( with patch(
"homeassistant.components.samsungtv.config_flow.socket.gethostbyname", "homeassistant.components.samsungtv.config_flow.socket.gethostbyname",
return_value="fake_host", return_value="10.20.43.21",
): ):
yield yield

View File

@ -2,6 +2,7 @@
from copy import deepcopy from copy import deepcopy
from ipaddress import ip_address from ipaddress import ip_address
import socket
from unittest.mock import ANY, AsyncMock, Mock, call, patch from unittest.mock import ANY, AsyncMock, Mock, call, patch
import pytest import pytest
@ -106,35 +107,22 @@ AUTODETECT_LEGACY = {
"id": "ha.component.samsung", "id": "ha.component.samsung",
"method": METHOD_LEGACY, "method": METHOD_LEGACY,
"port": LEGACY_PORT, "port": LEGACY_PORT,
"host": "fake_host", "host": "10.20.43.21",
"timeout": TIMEOUT_REQUEST, "timeout": TIMEOUT_REQUEST,
} }
AUTODETECT_WEBSOCKET_PLAIN = {
"host": "fake_host",
"name": "HomeAssistant",
"port": 8001,
"timeout": TIMEOUT_REQUEST,
"token": None,
}
AUTODETECT_WEBSOCKET_SSL = { AUTODETECT_WEBSOCKET_SSL = {
"host": "fake_host", "host": "10.20.43.21",
"name": "HomeAssistant", "name": "HomeAssistant",
"port": 8002, "port": 8002,
"timeout": TIMEOUT_REQUEST, "timeout": TIMEOUT_REQUEST,
"token": None, "token": None,
} }
DEVICEINFO_WEBSOCKET_SSL = { DEVICEINFO_WEBSOCKET_SSL = {
"host": "fake_host", "host": "10.20.43.21",
"session": ANY, "session": ANY,
"port": 8002, "port": 8002,
"timeout": TIMEOUT_WEBSOCKET, "timeout": TIMEOUT_WEBSOCKET,
} }
DEVICEINFO_WEBSOCKET_NO_SSL = {
"host": "fake_host",
"session": ANY,
"port": 8001,
"timeout": TIMEOUT_WEBSOCKET,
}
pytestmark = pytest.mark.usefixtures("mock_setup_entry") pytestmark = pytest.mark.usefixtures("mock_setup_entry")
@ -149,14 +137,27 @@ async def test_user_legacy(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.FORM assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user" assert result["step_id"] == "user"
# entry was added # Wrong host allow to retry
with patch(
"homeassistant.components.samsungtv.config_flow.socket.gethostbyname",
side_effect=socket.gaierror("[Error -2] Name or Service not known"),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "invalid_host"}
# Good host creates entry
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA result["flow_id"], user_input=MOCK_USER_DATA
) )
# legacy tv entry created # legacy tv entry created
assert result["type"] is FlowResultType.CREATE_ENTRY assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "fake_host" assert result["title"] == "10.20.43.21"
assert result["data"][CONF_HOST] == "fake_host" assert result["data"][CONF_HOST] == "10.20.43.21"
assert result["data"][CONF_METHOD] == METHOD_LEGACY assert result["data"][CONF_METHOD] == METHOD_LEGACY
assert result["data"][CONF_MANUFACTURER] == DEFAULT_MANUFACTURER assert result["data"][CONF_MANUFACTURER] == DEFAULT_MANUFACTURER
assert result["data"][CONF_MODEL] is None assert result["data"][CONF_MODEL] is None
@ -189,8 +190,8 @@ async def test_user_legacy_does_not_ok_first_time(hass: HomeAssistant) -> None:
# legacy tv entry created # legacy tv entry created
assert result3["type"] is FlowResultType.CREATE_ENTRY assert result3["type"] is FlowResultType.CREATE_ENTRY
assert result3["title"] == "fake_host" assert result3["title"] == "10.20.43.21"
assert result3["data"][CONF_HOST] == "fake_host" assert result3["data"][CONF_HOST] == "10.20.43.21"
assert result3["data"][CONF_METHOD] == METHOD_LEGACY assert result3["data"][CONF_METHOD] == METHOD_LEGACY
assert result3["data"][CONF_MANUFACTURER] == DEFAULT_MANUFACTURER assert result3["data"][CONF_MANUFACTURER] == DEFAULT_MANUFACTURER
assert result3["data"][CONF_MODEL] is None assert result3["data"][CONF_MODEL] is None
@ -219,7 +220,7 @@ async def test_user_websocket(hass: HomeAssistant) -> None:
# websocket tv entry created # websocket tv entry created
assert result["type"] is FlowResultType.CREATE_ENTRY assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Living Room (82GXARRS)" assert result["title"] == "Living Room (82GXARRS)"
assert result["data"][CONF_HOST] == "fake_host" assert result["data"][CONF_HOST] == "10.20.43.21"
assert result["data"][CONF_METHOD] == "websocket" assert result["data"][CONF_METHOD] == "websocket"
assert result["data"][CONF_MANUFACTURER] == "Samsung" assert result["data"][CONF_MANUFACTURER] == "Samsung"
assert result["data"][CONF_MODEL] == "82GXARRS" assert result["data"][CONF_MODEL] == "82GXARRS"
@ -267,7 +268,7 @@ async def test_user_encrypted_websocket(
assert result4["type"] is FlowResultType.CREATE_ENTRY assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == "TV-UE48JU6470 (UE48JU6400)" assert result4["title"] == "TV-UE48JU6470 (UE48JU6400)"
assert result4["data"][CONF_HOST] == "fake_host" assert result4["data"][CONF_HOST] == "10.20.43.21"
assert result4["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert result4["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa"
assert result4["data"][CONF_MANUFACTURER] == "Samsung" assert result4["data"][CONF_MANUFACTURER] == "Samsung"
assert result4["data"][CONF_MODEL] == "UE48JU6400" assert result4["data"][CONF_MODEL] == "UE48JU6400"
@ -398,7 +399,7 @@ async def test_user_websocket_auth_retry(hass: HomeAssistant) -> None:
) )
assert result["type"] is FlowResultType.CREATE_ENTRY assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Living Room (82GXARRS)" assert result["title"] == "Living Room (82GXARRS)"
assert result["data"][CONF_HOST] == "fake_host" assert result["data"][CONF_HOST] == "10.20.43.21"
assert result["data"][CONF_MANUFACTURER] == "Samsung" assert result["data"][CONF_MANUFACTURER] == "Samsung"
assert result["data"][CONF_MODEL] == "82GXARRS" assert result["data"][CONF_MODEL] == "82GXARRS"
assert result["result"].unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" assert result["result"].unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4"