mirror of
https://github.com/home-assistant/core.git
synced 2025-07-07 05:17:07 +00:00
246 lines
7.6 KiB
Python
246 lines
7.6 KiB
Python
"""Config flow for Philips TV integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping
|
|
import platform
|
|
from typing import Any
|
|
|
|
from haphilipsjs import (
|
|
DEFAULT_API_VERSION,
|
|
ConnectionFailure,
|
|
GeneralFailure,
|
|
PairingFailure,
|
|
PhilipsTV,
|
|
)
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.config_entries import (
|
|
SOURCE_REAUTH,
|
|
ConfigEntry,
|
|
ConfigFlow,
|
|
ConfigFlowResult,
|
|
)
|
|
from homeassistant.const import (
|
|
CONF_API_VERSION,
|
|
CONF_HOST,
|
|
CONF_NAME,
|
|
CONF_PASSWORD,
|
|
CONF_PIN,
|
|
CONF_USERNAME,
|
|
)
|
|
from homeassistant.core import callback
|
|
from homeassistant.helpers import selector
|
|
from homeassistant.helpers.schema_config_entry_flow import (
|
|
SchemaFlowFormStep,
|
|
SchemaOptionsFlowHandler,
|
|
)
|
|
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
|
|
|
|
from . import LOGGER
|
|
from .const import CONF_ALLOW_NOTIFY, CONF_SYSTEM, CONST_APP_ID, CONST_APP_NAME, DOMAIN
|
|
|
|
USER_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(
|
|
CONF_HOST,
|
|
): str,
|
|
vol.Required(
|
|
CONF_API_VERSION,
|
|
default=1,
|
|
): vol.In([1, 5, 6]),
|
|
}
|
|
)
|
|
|
|
OPTIONS_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional(CONF_ALLOW_NOTIFY, default=False): selector.BooleanSelector(),
|
|
}
|
|
)
|
|
OPTIONS_FLOW = {
|
|
"init": SchemaFlowFormStep(OPTIONS_SCHEMA),
|
|
}
|
|
|
|
|
|
class PhilipsJSConfigFlow(ConfigFlow, domain=DOMAIN):
|
|
"""Handle a config flow for Philips TV."""
|
|
|
|
VERSION = 1
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize flow."""
|
|
super().__init__()
|
|
self._current: dict[str, Any] = {}
|
|
self._hub: PhilipsTV | None = None
|
|
self._pair_state: Any = None
|
|
|
|
async def _async_attempt_prepare(
|
|
self, host: str, api_version: int, secured_transport: bool
|
|
) -> None:
|
|
hub = PhilipsTV(
|
|
host, api_version=api_version, secured_transport=secured_transport
|
|
)
|
|
|
|
await hub.getSystem()
|
|
await hub.setTransport(hub.secured_transport)
|
|
|
|
if not hub.system or not hub.name:
|
|
raise ConnectionFailure("System data or name is empty")
|
|
|
|
self._hub = hub
|
|
self._current[CONF_HOST] = host
|
|
self._current[CONF_SYSTEM] = hub.system
|
|
self._current[CONF_API_VERSION] = hub.api_version
|
|
self.context.update({"title_placeholders": {CONF_NAME: hub.name}})
|
|
|
|
if serialnumber := hub.system.get("serialnumber"):
|
|
await self.async_set_unique_id(serialnumber)
|
|
if self.source != SOURCE_REAUTH:
|
|
self._abort_if_unique_id_configured(
|
|
updates=self._current, reload_on_update=True
|
|
)
|
|
|
|
async def _async_attempt_add(self) -> ConfigFlowResult:
|
|
assert self._hub
|
|
if self._hub.pairing_type == "digest_auth_pairing":
|
|
return await self.async_step_pair()
|
|
return await self._async_create_current()
|
|
|
|
async def _async_create_current(self) -> ConfigFlowResult:
|
|
system = self._current[CONF_SYSTEM]
|
|
if self.source == SOURCE_REAUTH:
|
|
return self.async_update_reload_and_abort(
|
|
self._get_reauth_entry(), data_updates=self._current
|
|
)
|
|
|
|
return self.async_create_entry(
|
|
title=f"{system['name']} ({system['serialnumber']})",
|
|
data=self._current,
|
|
)
|
|
|
|
async def async_step_pair(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Attempt to pair with device."""
|
|
assert self._hub
|
|
|
|
errors: dict[str, str] = {}
|
|
schema = vol.Schema(
|
|
{
|
|
vol.Required(CONF_PIN): str,
|
|
}
|
|
)
|
|
|
|
if not user_input:
|
|
try:
|
|
self._pair_state = await self._hub.pairRequest(
|
|
CONST_APP_ID,
|
|
CONST_APP_NAME,
|
|
platform.node(),
|
|
platform.system(),
|
|
"native",
|
|
)
|
|
except PairingFailure as exc:
|
|
LOGGER.debug(exc)
|
|
return self.async_abort(
|
|
reason="pairing_failure",
|
|
description_placeholders={"error_id": exc.data.get("error_id")},
|
|
)
|
|
return self.async_show_form(
|
|
step_id="pair", data_schema=schema, errors=errors
|
|
)
|
|
|
|
try:
|
|
username, password = await self._hub.pairGrant(
|
|
self._pair_state, user_input[CONF_PIN]
|
|
)
|
|
except PairingFailure as exc:
|
|
LOGGER.debug(exc)
|
|
if exc.data.get("error_id") == "INVALID_PIN":
|
|
errors[CONF_PIN] = "invalid_pin"
|
|
return self.async_show_form(
|
|
step_id="pair", data_schema=schema, errors=errors
|
|
)
|
|
|
|
return self.async_abort(
|
|
reason="pairing_failure",
|
|
description_placeholders={"error_id": exc.data.get("error_id")},
|
|
)
|
|
|
|
self._current[CONF_USERNAME] = username
|
|
self._current[CONF_PASSWORD] = password
|
|
return await self._async_create_current()
|
|
|
|
async def async_step_reauth(
|
|
self, entry_data: Mapping[str, Any]
|
|
) -> ConfigFlowResult:
|
|
"""Handle configuration by re-auth."""
|
|
self._current[CONF_HOST] = entry_data[CONF_HOST]
|
|
self._current[CONF_API_VERSION] = entry_data[CONF_API_VERSION]
|
|
return await self.async_step_user()
|
|
|
|
async def async_step_zeroconf(
|
|
self, discovery_info: ZeroconfServiceInfo
|
|
) -> ConfigFlowResult:
|
|
"""Handle zeroconf discovery."""
|
|
|
|
LOGGER.debug(
|
|
"Checking discovered device: {discovery_info.name} on {discovery_info.host}"
|
|
)
|
|
|
|
secured_transport = discovery_info.type == "_philipstv_s_rpc._tcp.local."
|
|
api_version = 6 if secured_transport else DEFAULT_API_VERSION
|
|
|
|
try:
|
|
await self._async_attempt_prepare(
|
|
discovery_info.host, api_version, secured_transport
|
|
)
|
|
except GeneralFailure:
|
|
LOGGER.debug("Failed to get system info from discovery", exc_info=True)
|
|
return self.async_abort(reason="discovery_failure")
|
|
|
|
return await self.async_step_zeroconf_confirm()
|
|
|
|
async def async_step_zeroconf_confirm(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle a flow initiated by zeroconf."""
|
|
if user_input is not None:
|
|
return await self._async_attempt_add()
|
|
|
|
name = self.context.get("title_placeholders", {CONF_NAME: "Philips TV"})[
|
|
CONF_NAME
|
|
]
|
|
return self.async_show_form(
|
|
step_id="zeroconf_confirm",
|
|
description_placeholders={CONF_NAME: name},
|
|
)
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle the initial step."""
|
|
errors = {}
|
|
if user_input:
|
|
self._current = user_input
|
|
try:
|
|
await self._async_attempt_prepare(
|
|
user_input[CONF_HOST], user_input[CONF_API_VERSION], False
|
|
)
|
|
except GeneralFailure as exc:
|
|
LOGGER.error(exc)
|
|
errors["base"] = "cannot_connect"
|
|
else:
|
|
return await self._async_attempt_add()
|
|
|
|
schema = self.add_suggested_values_to_schema(USER_SCHEMA, self._current)
|
|
return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
|
|
|
|
@staticmethod
|
|
@callback
|
|
def async_get_options_flow(
|
|
config_entry: ConfigEntry,
|
|
) -> SchemaOptionsFlowHandler:
|
|
"""Get the options flow for this handler."""
|
|
return SchemaOptionsFlowHandler(config_entry, OPTIONS_FLOW)
|