mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 08:47:57 +00:00
Add manual config flow for Plex (#34476)
This commit is contained in:
parent
a330eba61c
commit
740d9c575e
@ -11,7 +11,14 @@ import voluptuous as vol
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.http.view import HomeAssistantView
|
||||
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
|
||||
from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_VERIFY_SSL
|
||||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
CONF_PORT,
|
||||
CONF_SSL,
|
||||
CONF_TOKEN,
|
||||
CONF_URL,
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
@ -19,14 +26,18 @@ import homeassistant.helpers.config_validation as cv
|
||||
from .const import ( # pylint: disable=unused-import
|
||||
AUTH_CALLBACK_NAME,
|
||||
AUTH_CALLBACK_PATH,
|
||||
AUTOMATIC_SETUP_STRING,
|
||||
CONF_CLIENT_IDENTIFIER,
|
||||
CONF_IGNORE_NEW_SHARED_USERS,
|
||||
CONF_MONITORED_USERS,
|
||||
CONF_SERVER,
|
||||
CONF_SERVER_IDENTIFIER,
|
||||
CONF_USE_EPISODE_ART,
|
||||
DEFAULT_PORT,
|
||||
DEFAULT_SSL,
|
||||
DEFAULT_VERIFY_SSL,
|
||||
DOMAIN,
|
||||
MANUAL_SETUP_STRING,
|
||||
PLEX_SERVER_CONFIG,
|
||||
SERVERS,
|
||||
X_PLEX_DEVICE_NAME,
|
||||
@ -68,14 +79,77 @@ class PlexFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
self.plexauth = None
|
||||
self.token = None
|
||||
self.client_id = None
|
||||
self._manual = False
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
async def async_step_user(self, user_input=None, errors=None):
|
||||
"""Handle a flow initialized by the user."""
|
||||
return self.async_show_form(step_id="start_website_auth")
|
||||
if user_input is not None:
|
||||
return await self.async_step_plex_website_auth()
|
||||
if self.show_advanced_options:
|
||||
return await self.async_step_user_advanced(errors=errors)
|
||||
return self.async_show_form(step_id="user", errors=errors)
|
||||
|
||||
async def async_step_start_website_auth(self, user_input=None):
|
||||
"""Show a form before starting external authentication."""
|
||||
return await self.async_step_plex_website_auth()
|
||||
async def async_step_user_advanced(self, user_input=None, errors=None):
|
||||
"""Handle an advanced mode flow initialized by the user."""
|
||||
if user_input is not None:
|
||||
if user_input.get("setup_method") == MANUAL_SETUP_STRING:
|
||||
self._manual = True
|
||||
return await self.async_step_manual_setup()
|
||||
return await self.async_step_plex_website_auth()
|
||||
|
||||
data_schema = vol.Schema(
|
||||
{
|
||||
vol.Required("setup_method", default=AUTOMATIC_SETUP_STRING): vol.In(
|
||||
[AUTOMATIC_SETUP_STRING, MANUAL_SETUP_STRING]
|
||||
)
|
||||
}
|
||||
)
|
||||
return self.async_show_form(
|
||||
step_id="user_advanced", data_schema=data_schema, errors=errors
|
||||
)
|
||||
|
||||
async def async_step_manual_setup(self, user_input=None, errors=None):
|
||||
"""Begin manual configuration."""
|
||||
if user_input is not None and errors is None:
|
||||
user_input.pop(CONF_URL, None)
|
||||
host = user_input.get(CONF_HOST)
|
||||
if host:
|
||||
port = user_input[CONF_PORT]
|
||||
prefix = "https" if user_input.get(CONF_SSL) else "http"
|
||||
user_input[CONF_URL] = f"{prefix}://{host}:{port}"
|
||||
elif CONF_TOKEN not in user_input:
|
||||
return await self.async_step_manual_setup(
|
||||
user_input=user_input, errors={"base": "host_or_token"}
|
||||
)
|
||||
return await self.async_step_server_validate(user_input)
|
||||
|
||||
previous_input = user_input or {}
|
||||
|
||||
data_schema = vol.Schema(
|
||||
{
|
||||
vol.Optional(
|
||||
CONF_HOST,
|
||||
description={"suggested_value": previous_input.get(CONF_HOST)},
|
||||
): str,
|
||||
vol.Required(
|
||||
CONF_PORT, default=previous_input.get(CONF_PORT, DEFAULT_PORT)
|
||||
): int,
|
||||
vol.Required(
|
||||
CONF_SSL, default=previous_input.get(CONF_SSL, DEFAULT_SSL)
|
||||
): bool,
|
||||
vol.Required(
|
||||
CONF_VERIFY_SSL,
|
||||
default=previous_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL),
|
||||
): bool,
|
||||
vol.Optional(
|
||||
CONF_TOKEN,
|
||||
description={"suggested_value": previous_input.get(CONF_TOKEN)},
|
||||
): str,
|
||||
}
|
||||
)
|
||||
return self.async_show_form(
|
||||
step_id="manual_setup", data_schema=data_schema, errors=errors
|
||||
)
|
||||
|
||||
async def async_step_server_validate(self, server_config):
|
||||
"""Validate a provided configuration."""
|
||||
@ -95,13 +169,16 @@ class PlexFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
errors["base"] = "no_servers"
|
||||
except (plexapi.exceptions.BadRequest, plexapi.exceptions.Unauthorized):
|
||||
_LOGGER.error("Invalid credentials provided, config not created")
|
||||
errors["base"] = "faulty_credentials"
|
||||
errors[CONF_TOKEN] = "faulty_credentials"
|
||||
except requests.exceptions.SSLError as error:
|
||||
_LOGGER.error("SSL certificate error: [%s]", error)
|
||||
errors["base"] = "ssl_error"
|
||||
except (plexapi.exceptions.NotFound, requests.exceptions.ConnectionError):
|
||||
server_identifier = (
|
||||
server_config.get(CONF_URL) or plex_server.server_choice or "Unknown"
|
||||
)
|
||||
_LOGGER.error("Plex server could not be reached: %s", server_identifier)
|
||||
errors["base"] = "not_found"
|
||||
errors[CONF_HOST] = "not_found"
|
||||
|
||||
except ServerNotSpecified as available_servers:
|
||||
if is_importing:
|
||||
@ -119,7 +196,11 @@ class PlexFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
if errors:
|
||||
if is_importing:
|
||||
return self.async_abort(reason="non-interactive")
|
||||
return self.async_show_form(step_id="start_website_auth", errors=errors)
|
||||
if self._manual:
|
||||
return await self.async_step_manual_setup(
|
||||
user_input=server_config, errors=errors
|
||||
)
|
||||
return await self.async_step_user(errors=errors)
|
||||
|
||||
server_id = plex_server.machine_identifier
|
||||
|
||||
|
@ -39,3 +39,6 @@ X_PLEX_DEVICE_NAME = "Home Assistant"
|
||||
X_PLEX_PLATFORM = "Home Assistant"
|
||||
X_PLEX_PRODUCT = "Home Assistant"
|
||||
X_PLEX_VERSION = __version__
|
||||
|
||||
AUTOMATIC_SETUP_STRING = "Obtain a new token from plex.tv"
|
||||
MANUAL_SETUP_STRING = "Configure Plex server manually"
|
||||
|
@ -131,6 +131,8 @@ class PlexServer:
|
||||
)
|
||||
_update_plexdirect_hostname()
|
||||
config_entry_update_needed = True
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
|
@ -1,20 +1,38 @@
|
||||
{
|
||||
"config": {
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Plex Media Server",
|
||||
"description": "Continue to [plex.tv](https://plex.tv) to link a Plex server."
|
||||
},
|
||||
"user_advanced": {
|
||||
"title": "Plex Media Server",
|
||||
"data": {
|
||||
"setup_method": "Setup method"
|
||||
}
|
||||
},
|
||||
"manual_setup": {
|
||||
"title": "Manual Plex Configuration",
|
||||
"data": {
|
||||
"host": "Host (Optional if Token provided)",
|
||||
"port": "Port",
|
||||
"ssl": "Use SSL",
|
||||
"verify_ssl": "Verify SSL certificate",
|
||||
"token": "Token (Optional)"
|
||||
}
|
||||
},
|
||||
"select_server": {
|
||||
"title": "Select Plex server",
|
||||
"description": "Multiple servers available, select one:",
|
||||
"data": { "server": "Server" }
|
||||
},
|
||||
"start_website_auth": {
|
||||
"title": "Connect Plex server",
|
||||
"description": "Continue to authorize at plex.tv."
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"faulty_credentials": "Authorization failed",
|
||||
"no_servers": "No servers linked to account",
|
||||
"not_found": "Plex server not found"
|
||||
"faulty_credentials": "Authorization failed, verify Token",
|
||||
"host_or_token": "Must provide at least one of Host or Token",
|
||||
"no_servers": "No servers linked to Plex account",
|
||||
"not_found": "Plex server not found",
|
||||
"ssl_error": "SSL certificate issue"
|
||||
},
|
||||
"abort": {
|
||||
"all_configured": "All linked servers already configured",
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""Tests for Plex config flow."""
|
||||
import copy
|
||||
import ssl
|
||||
|
||||
import plexapi.exceptions
|
||||
import requests.exceptions
|
||||
@ -7,18 +8,27 @@ import requests.exceptions
|
||||
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
|
||||
from homeassistant.components.plex import config_flow
|
||||
from homeassistant.components.plex.const import (
|
||||
AUTOMATIC_SETUP_STRING,
|
||||
CONF_IGNORE_NEW_SHARED_USERS,
|
||||
CONF_MONITORED_USERS,
|
||||
CONF_SERVER,
|
||||
CONF_SERVER_IDENTIFIER,
|
||||
CONF_USE_EPISODE_ART,
|
||||
DOMAIN,
|
||||
MANUAL_SETUP_STRING,
|
||||
PLEX_SERVER_CONFIG,
|
||||
PLEX_UPDATE_PLATFORMS_SIGNAL,
|
||||
SERVERS,
|
||||
)
|
||||
from homeassistant.config_entries import ENTRY_STATE_LOADED
|
||||
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_TOKEN, CONF_URL
|
||||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
CONF_PORT,
|
||||
CONF_SSL,
|
||||
CONF_TOKEN,
|
||||
CONF_URL,
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
@ -36,14 +46,16 @@ async def test_bad_credentials(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch(
|
||||
"plexapi.myplex.MyPlexAccount", side_effect=plexapi.exceptions.Unauthorized
|
||||
), patch("plexauth.PlexAuth.initiate_auth"), patch(
|
||||
"plexauth.PlexAuth.token", return_value="BAD TOKEN"
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -52,8 +64,8 @@ async def test_bad_credentials(hass):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["errors"]["base"] == "faulty_credentials"
|
||||
assert result["step_id"] == "user"
|
||||
assert result["errors"][CONF_TOKEN] == "faulty_credentials"
|
||||
|
||||
|
||||
async def test_import_success(hass):
|
||||
@ -104,12 +116,14 @@ async def test_unknown_exception(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch("plexapi.myplex.MyPlexAccount", side_effect=Exception), patch(
|
||||
"plexauth.PlexAuth.initiate_auth"
|
||||
), patch("plexauth.PlexAuth.token", return_value="MOCK_TOKEN"):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -129,14 +143,16 @@ async def test_no_servers_found(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch(
|
||||
"plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount(servers=0)
|
||||
), patch("plexauth.PlexAuth.initiate_auth"), patch(
|
||||
"plexauth.PlexAuth.token", return_value=MOCK_TOKEN
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -144,7 +160,7 @@ async def test_no_servers_found(hass):
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
assert result["errors"]["base"] == "no_servers"
|
||||
|
||||
|
||||
@ -159,14 +175,16 @@ async def test_single_available_server(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch("plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount()), patch(
|
||||
"plexapi.server.PlexServer", return_value=mock_plex_server
|
||||
), patch("plexauth.PlexAuth.initiate_auth"), patch(
|
||||
"plexauth.PlexAuth.token", return_value=MOCK_TOKEN
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -194,7 +212,7 @@ async def test_multiple_servers_with_selection(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch(
|
||||
"plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount(servers=2)
|
||||
@ -203,7 +221,9 @@ async def test_multiple_servers_with_selection(hass):
|
||||
), patch(
|
||||
"plexauth.PlexAuth.token", return_value=MOCK_TOKEN
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -245,7 +265,7 @@ async def test_adding_last_unconfigured_server(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch(
|
||||
"plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount(servers=2)
|
||||
@ -254,7 +274,9 @@ async def test_adding_last_unconfigured_server(hass):
|
||||
), patch(
|
||||
"plexauth.PlexAuth.token", return_value=MOCK_TOKEN
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -325,14 +347,16 @@ async def test_all_available_servers_configured(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch(
|
||||
"plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount(servers=2)
|
||||
), patch("plexauth.PlexAuth.initiate_auth"), patch(
|
||||
"plexauth.PlexAuth.token", return_value=MOCK_TOKEN
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -449,12 +473,14 @@ async def test_external_timed_out(hass):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch("plexauth.PlexAuth.initiate_auth"), patch(
|
||||
"plexauth.PlexAuth.token", return_value=None
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
@ -474,12 +500,14 @@ async def test_callback_view(hass, aiohttp_client):
|
||||
DOMAIN, context={"source": "user"}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "start_website_auth"
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
with patch("plexauth.PlexAuth.initiate_auth"), patch(
|
||||
"plexauth.PlexAuth.token", return_value=MOCK_TOKEN
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={}
|
||||
)
|
||||
assert result["type"] == "external"
|
||||
|
||||
client = await aiohttp_client(hass.http.app)
|
||||
@ -502,3 +530,161 @@ async def test_multiple_servers_with_import(hass):
|
||||
)
|
||||
assert result["type"] == "abort"
|
||||
assert result["reason"] == "non-interactive"
|
||||
|
||||
|
||||
async def test_manual_config(hass):
|
||||
"""Test creating via manual configuration."""
|
||||
|
||||
class WrongCertValidaitionException(requests.exceptions.SSLError):
|
||||
"""Mock the exception showing an unmatched error."""
|
||||
|
||||
def __init__(self):
|
||||
self.__context__ = ssl.SSLCertVerificationError(
|
||||
"some random message that doesn't match"
|
||||
)
|
||||
|
||||
# Basic mode
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
config_flow.DOMAIN, context={"source": "user"}
|
||||
)
|
||||
|
||||
assert result["data_schema"] is None
|
||||
hass.config_entries.flow.async_abort(result["flow_id"])
|
||||
|
||||
# Advanced automatic
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
config_flow.DOMAIN, context={"source": "user", "show_advanced_options": True}
|
||||
)
|
||||
|
||||
assert result["data_schema"] is not None
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "user_advanced"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={"setup_method": AUTOMATIC_SETUP_STRING}
|
||||
)
|
||||
|
||||
assert result["type"] == "external"
|
||||
hass.config_entries.flow.async_abort(result["flow_id"])
|
||||
|
||||
# Advanced manual
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
config_flow.DOMAIN, context={"source": "user", "show_advanced_options": True}
|
||||
)
|
||||
|
||||
assert result["data_schema"] is not None
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "user_advanced"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={"setup_method": MANUAL_SETUP_STRING}
|
||||
)
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "manual_setup"
|
||||
|
||||
mock_plex_server = MockPlexServer()
|
||||
|
||||
MANUAL_SERVER = {
|
||||
CONF_HOST: MOCK_SERVERS[0][CONF_HOST],
|
||||
CONF_PORT: MOCK_SERVERS[0][CONF_PORT],
|
||||
CONF_SSL: False,
|
||||
CONF_VERIFY_SSL: True,
|
||||
CONF_TOKEN: MOCK_TOKEN,
|
||||
}
|
||||
|
||||
MANUAL_SERVER_NO_HOST_OR_TOKEN = {
|
||||
CONF_PORT: MOCK_SERVERS[0][CONF_PORT],
|
||||
CONF_SSL: False,
|
||||
CONF_VERIFY_SSL: True,
|
||||
}
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input=MANUAL_SERVER_NO_HOST_OR_TOKEN
|
||||
)
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "manual_setup"
|
||||
assert result["errors"]["base"] == "host_or_token"
|
||||
|
||||
with patch(
|
||||
"plexapi.server.PlexServer", side_effect=requests.exceptions.SSLError,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input=MANUAL_SERVER
|
||||
)
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "manual_setup"
|
||||
assert result["errors"]["base"] == "ssl_error"
|
||||
|
||||
with patch(
|
||||
"plexapi.server.PlexServer", side_effect=WrongCertValidaitionException,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input=MANUAL_SERVER
|
||||
)
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "manual_setup"
|
||||
assert result["errors"]["base"] == "ssl_error"
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.plex.PlexServer.connect",
|
||||
side_effect=requests.exceptions.SSLError,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input=MANUAL_SERVER
|
||||
)
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "manual_setup"
|
||||
assert result["errors"]["base"] == "ssl_error"
|
||||
|
||||
with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
"homeassistant.components.plex.PlexWebsocket.listen"
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input=MANUAL_SERVER
|
||||
)
|
||||
|
||||
assert result["type"] == "create_entry"
|
||||
assert result["title"] == mock_plex_server.friendlyName
|
||||
assert result["data"][CONF_SERVER] == mock_plex_server.friendlyName
|
||||
assert result["data"][CONF_SERVER_IDENTIFIER] == mock_plex_server.machineIdentifier
|
||||
assert result["data"][PLEX_SERVER_CONFIG][CONF_URL] == mock_plex_server._baseurl
|
||||
assert result["data"][PLEX_SERVER_CONFIG][CONF_TOKEN] == MOCK_TOKEN
|
||||
|
||||
|
||||
async def test_manual_config_with_token(hass):
|
||||
"""Test creating via manual configuration with only token."""
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
config_flow.DOMAIN, context={"source": "user", "show_advanced_options": True}
|
||||
)
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "user_advanced"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={"setup_method": MANUAL_SETUP_STRING}
|
||||
)
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["step_id"] == "manual_setup"
|
||||
|
||||
mock_plex_server = MockPlexServer()
|
||||
|
||||
with patch("plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount()), patch(
|
||||
"plexapi.server.PlexServer", return_value=mock_plex_server
|
||||
), patch("homeassistant.components.plex.PlexWebsocket.listen"):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={CONF_TOKEN: MOCK_TOKEN}
|
||||
)
|
||||
|
||||
assert result["type"] == "create_entry"
|
||||
assert result["title"] == mock_plex_server.friendlyName
|
||||
assert result["data"][CONF_SERVER] == mock_plex_server.friendlyName
|
||||
assert result["data"][CONF_SERVER_IDENTIFIER] == mock_plex_server.machineIdentifier
|
||||
assert result["data"][PLEX_SERVER_CONFIG][CONF_URL] == mock_plex_server._baseurl
|
||||
assert result["data"][PLEX_SERVER_CONFIG][CONF_TOKEN] == MOCK_TOKEN
|
||||
|
Loading…
x
Reference in New Issue
Block a user