"""Config flow for OpenAI Conversation integration.""" from __future__ import annotations import json import logging from typing import Any import openai import voluptuous as vol from voluptuous_openapi import convert from homeassistant.components.zone import ENTITY_ID_HOME from homeassistant.config_entries import ( ConfigEntry, ConfigEntryState, ConfigFlow, ConfigFlowResult, ConfigSubentryFlow, SubentryFlowResult, ) from homeassistant.const import ( ATTR_LATITUDE, ATTR_LONGITUDE, CONF_API_KEY, CONF_LLM_HASS_API, CONF_NAME, ) from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import llm from homeassistant.helpers.httpx_client import get_async_client from homeassistant.helpers.selector import ( NumberSelector, NumberSelectorConfig, SelectOptionDict, SelectSelector, SelectSelectorConfig, SelectSelectorMode, TemplateSelector, ) from homeassistant.helpers.typing import VolDictType from .const import ( CONF_CHAT_MODEL, CONF_MAX_TOKENS, CONF_PROMPT, CONF_REASONING_EFFORT, CONF_RECOMMENDED, CONF_TEMPERATURE, CONF_TOP_P, CONF_WEB_SEARCH, CONF_WEB_SEARCH_CITY, CONF_WEB_SEARCH_CONTEXT_SIZE, CONF_WEB_SEARCH_COUNTRY, CONF_WEB_SEARCH_REGION, CONF_WEB_SEARCH_TIMEZONE, CONF_WEB_SEARCH_USER_LOCATION, DEFAULT_CONVERSATION_NAME, DOMAIN, RECOMMENDED_CHAT_MODEL, RECOMMENDED_MAX_TOKENS, RECOMMENDED_REASONING_EFFORT, RECOMMENDED_TEMPERATURE, RECOMMENDED_TOP_P, RECOMMENDED_WEB_SEARCH, RECOMMENDED_WEB_SEARCH_CONTEXT_SIZE, RECOMMENDED_WEB_SEARCH_USER_LOCATION, UNSUPPORTED_MODELS, WEB_SEARCH_MODELS, ) _LOGGER = logging.getLogger(__name__) STEP_USER_DATA_SCHEMA = vol.Schema( { vol.Required(CONF_API_KEY): str, } ) RECOMMENDED_OPTIONS = { CONF_RECOMMENDED: True, CONF_LLM_HASS_API: [llm.LLM_API_ASSIST], CONF_PROMPT: llm.DEFAULT_INSTRUCTIONS_PROMPT, } async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> None: """Validate the user input allows us to connect. Data has the keys from STEP_USER_DATA_SCHEMA with values provided by the user. """ client = openai.AsyncOpenAI( api_key=data[CONF_API_KEY], http_client=get_async_client(hass) ) await hass.async_add_executor_job(client.with_options(timeout=10.0).models.list) class OpenAIConfigFlow(ConfigFlow, domain=DOMAIN): """Handle a config flow for OpenAI Conversation.""" VERSION = 2 MINOR_VERSION = 2 async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle the initial step.""" if user_input is None: return self.async_show_form( step_id="user", data_schema=STEP_USER_DATA_SCHEMA ) errors: dict[str, str] = {} self._async_abort_entries_match(user_input) try: await validate_input(self.hass, user_input) except openai.APIConnectionError: errors["base"] = "cannot_connect" except openai.AuthenticationError: errors["base"] = "invalid_auth" except Exception: _LOGGER.exception("Unexpected exception") errors["base"] = "unknown" else: return self.async_create_entry( title="ChatGPT", data=user_input, subentries=[ { "subentry_type": "conversation", "data": RECOMMENDED_OPTIONS, "title": DEFAULT_CONVERSATION_NAME, "unique_id": None, } ], ) return self.async_show_form( step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors ) @classmethod @callback def async_get_supported_subentry_types( cls, config_entry: ConfigEntry ) -> dict[str, type[ConfigSubentryFlow]]: """Return subentries supported by this integration.""" return {"conversation": ConversationSubentryFlowHandler} class ConversationSubentryFlowHandler(ConfigSubentryFlow): """Flow for managing conversation subentries.""" last_rendered_recommended = False options: dict[str, Any] @property def _is_new(self) -> bool: """Return if this is a new subentry.""" return self.source == "user" async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: """Add a subentry.""" self.options = RECOMMENDED_OPTIONS.copy() return await self.async_step_init() async def async_step_reconfigure( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: """Handle reconfiguration of a subentry.""" self.options = self._get_reconfigure_subentry().data.copy() return await self.async_step_init() async def async_step_init( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: """Manage initial options.""" # abort if entry is not loaded if self._get_entry().state != ConfigEntryState.LOADED: return self.async_abort(reason="entry_not_loaded") options = self.options hass_apis: list[SelectOptionDict] = [ SelectOptionDict( label=api.name, value=api.id, ) for api in llm.async_get_apis(self.hass) ] if (suggested_llm_apis := options.get(CONF_LLM_HASS_API)) and isinstance( suggested_llm_apis, str ): options[CONF_LLM_HASS_API] = [suggested_llm_apis] step_schema: VolDictType = {} if self._is_new: step_schema[vol.Required(CONF_NAME, default=DEFAULT_CONVERSATION_NAME)] = ( str ) step_schema.update( { vol.Optional( CONF_PROMPT, description={ "suggested_value": options.get( CONF_PROMPT, llm.DEFAULT_INSTRUCTIONS_PROMPT ) }, ): TemplateSelector(), vol.Optional(CONF_LLM_HASS_API): SelectSelector( SelectSelectorConfig(options=hass_apis, multiple=True) ), vol.Required( CONF_RECOMMENDED, default=options.get(CONF_RECOMMENDED, False) ): bool, } ) if user_input is not None: if not user_input.get(CONF_LLM_HASS_API): user_input.pop(CONF_LLM_HASS_API, None) if user_input[CONF_RECOMMENDED]: if self._is_new: return self.async_create_entry( title=user_input.pop(CONF_NAME), data=user_input, ) return self.async_update_and_abort( self._get_entry(), self._get_reconfigure_subentry(), data=user_input, ) options.update(user_input) if CONF_LLM_HASS_API in options and CONF_LLM_HASS_API not in user_input: options.pop(CONF_LLM_HASS_API) return await self.async_step_advanced() return self.async_show_form( step_id="init", data_schema=self.add_suggested_values_to_schema( vol.Schema(step_schema), options ), ) async def async_step_advanced( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: """Manage advanced options.""" options = self.options errors: dict[str, str] = {} step_schema: VolDictType = { vol.Optional( CONF_CHAT_MODEL, default=RECOMMENDED_CHAT_MODEL, ): str, vol.Optional( CONF_MAX_TOKENS, default=RECOMMENDED_MAX_TOKENS, ): int, vol.Optional( CONF_TOP_P, default=RECOMMENDED_TOP_P, ): NumberSelector(NumberSelectorConfig(min=0, max=1, step=0.05)), vol.Optional( CONF_TEMPERATURE, default=RECOMMENDED_TEMPERATURE, ): NumberSelector(NumberSelectorConfig(min=0, max=2, step=0.05)), } if user_input is not None: options.update(user_input) if user_input.get(CONF_CHAT_MODEL) in UNSUPPORTED_MODELS: errors[CONF_CHAT_MODEL] = "model_not_supported" if not errors: return await self.async_step_model() return self.async_show_form( step_id="advanced", data_schema=self.add_suggested_values_to_schema( vol.Schema(step_schema), options ), errors=errors, ) async def async_step_model( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: """Manage model-specific options.""" options = self.options errors: dict[str, str] = {} step_schema: VolDictType = {} model = options[CONF_CHAT_MODEL] if model.startswith("o"): step_schema.update( { vol.Optional( CONF_REASONING_EFFORT, default=RECOMMENDED_REASONING_EFFORT, ): SelectSelector( SelectSelectorConfig( options=["low", "medium", "high"], translation_key=CONF_REASONING_EFFORT, mode=SelectSelectorMode.DROPDOWN, ) ), } ) elif CONF_REASONING_EFFORT in options: options.pop(CONF_REASONING_EFFORT) if model.startswith(tuple(WEB_SEARCH_MODELS)): step_schema.update( { vol.Optional( CONF_WEB_SEARCH, default=RECOMMENDED_WEB_SEARCH, ): bool, vol.Optional( CONF_WEB_SEARCH_CONTEXT_SIZE, default=RECOMMENDED_WEB_SEARCH_CONTEXT_SIZE, ): SelectSelector( SelectSelectorConfig( options=["low", "medium", "high"], translation_key=CONF_WEB_SEARCH_CONTEXT_SIZE, mode=SelectSelectorMode.DROPDOWN, ) ), vol.Optional( CONF_WEB_SEARCH_USER_LOCATION, default=RECOMMENDED_WEB_SEARCH_USER_LOCATION, ): bool, } ) elif CONF_WEB_SEARCH in options: options = { k: v for k, v in options.items() if k not in ( CONF_WEB_SEARCH, CONF_WEB_SEARCH_CONTEXT_SIZE, CONF_WEB_SEARCH_USER_LOCATION, CONF_WEB_SEARCH_CITY, CONF_WEB_SEARCH_REGION, CONF_WEB_SEARCH_COUNTRY, CONF_WEB_SEARCH_TIMEZONE, ) } if not step_schema: if self._is_new: return self.async_create_entry( title=options.pop(CONF_NAME, DEFAULT_CONVERSATION_NAME), data=options, ) return self.async_update_and_abort( self._get_entry(), self._get_reconfigure_subentry(), data=options, ) if user_input is not None: if user_input.get(CONF_WEB_SEARCH): if user_input.get(CONF_WEB_SEARCH_USER_LOCATION): user_input.update(await self._get_location_data()) else: options.pop(CONF_WEB_SEARCH_CITY, None) options.pop(CONF_WEB_SEARCH_REGION, None) options.pop(CONF_WEB_SEARCH_COUNTRY, None) options.pop(CONF_WEB_SEARCH_TIMEZONE, None) options.update(user_input) if self._is_new: return self.async_create_entry( title=options.pop(CONF_NAME, DEFAULT_CONVERSATION_NAME), data=options, ) return self.async_update_and_abort( self._get_entry(), self._get_reconfigure_subentry(), data=options, ) return self.async_show_form( step_id="model", data_schema=self.add_suggested_values_to_schema( vol.Schema(step_schema), options ), errors=errors, ) async def _get_location_data(self) -> dict[str, str]: """Get approximate location data of the user.""" location_data: dict[str, str] = {} zone_home = self.hass.states.get(ENTITY_ID_HOME) if zone_home is not None: client = openai.AsyncOpenAI( api_key=self._get_entry().data[CONF_API_KEY], http_client=get_async_client(self.hass), ) location_schema = vol.Schema( { vol.Optional( CONF_WEB_SEARCH_CITY, description="Free text input for the city, e.g. `San Francisco`", ): str, vol.Optional( CONF_WEB_SEARCH_REGION, description="Free text input for the region, e.g. `California`", ): str, } ) response = await client.responses.create( model=RECOMMENDED_CHAT_MODEL, input=[ { "role": "system", "content": "Where are the following coordinates located: " f"({zone_home.attributes[ATTR_LATITUDE]}," f" {zone_home.attributes[ATTR_LONGITUDE]})?", } ], text={ "format": { "type": "json_schema", "name": "approximate_location", "description": "Approximate location data of the user " "for refined web search results", "schema": convert(location_schema), "strict": False, } }, store=False, ) location_data = location_schema(json.loads(response.output_text) or {}) if self.hass.config.country: location_data[CONF_WEB_SEARCH_COUNTRY] = self.hass.config.country location_data[CONF_WEB_SEARCH_TIMEZONE] = self.hass.config.time_zone _LOGGER.debug("Location data: %s", location_data) return location_data