"""HTTP views handle login flow. # GET /auth/providers Return a list of auth providers. Example: [ { "name": "Local", "id": null, "type": "local_provider", } ] # POST /auth/login_flow Create a login flow. Will return the first step of the flow. Pass in parameter 'client_id' and 'redirect_url' validate by indieauth. Pass in parameter 'handler' to specify the auth provider to use. Auth providers are identified by type and id. And optional parameter 'type' has to set as 'link_user' if login flow used for link credential to exist user. Default 'type' is 'authorize'. { "client_id": "https://hassbian.local:8123/", "handler": ["local_provider", null], "redirect_url": "https://hassbian.local:8123/", "type': "authorize" } Return value will be a step in a data entry flow. See the docs for data entry flow for details. { "data_schema": [ {"name": "username", "type": "string"}, {"name": "password", "type": "string"} ], "errors": {}, "flow_id": "8f7e42faab604bcab7ac43c44ca34d58", "handler": ["insecure_example", null], "step_id": "init", "type": "form" } # POST /auth/login_flow/{flow_id} Progress the flow. Most flows will be 1 page, but could optionally add extra login challenges, like TFA. Once the flow has finished, the returned step will have type FlowResultType.CREATE_ENTRY and "result" key will contain an authorization code. The authorization code associated with an authorized user by default, it will associate with an credential if "type" set to "link_user" in "/auth/login_flow" { "flow_id": "8f7e42faab604bcab7ac43c44ca34d58", "handler": ["insecure_example", null], "result": "411ee2f916e648d691e937ae9344681e", "title": "Example", "type": "create_entry", "version": 1 } """ from __future__ import annotations from collections.abc import Callable from http import HTTPStatus from ipaddress import ip_address from typing import TYPE_CHECKING, Any, cast from aiohttp import web import voluptuous as vol import voluptuous_serialize from homeassistant import data_entry_flow from homeassistant.auth import AuthManagerFlowManager, InvalidAuthError from homeassistant.auth.models import AuthFlowContext, AuthFlowResult, Credentials from homeassistant.components import onboarding from homeassistant.components.http import KEY_HASS from homeassistant.components.http.auth import async_user_not_allowed_do_auth from homeassistant.components.http.ban import ( log_invalid_auth, process_success_login, process_wrong_login, ) from homeassistant.components.http.data_validator import RequestDataValidator from homeassistant.components.http.view import HomeAssistantView from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.network import ( NoURLAvailableError, get_url, is_cloud_connection, ) from homeassistant.util.network import is_local from . import indieauth if TYPE_CHECKING: from homeassistant.auth.providers.trusted_networks import ( TrustedNetworksAuthProvider, ) from . import StoreResultType @callback def async_setup( hass: HomeAssistant, store_result: Callable[[str, Credentials], str] ) -> None: """Component to allow users to login.""" hass.http.register_view(WellKnownOAuthInfoView) hass.http.register_view(AuthProvidersView) hass.http.register_view(LoginFlowIndexView(hass.auth.login_flow, store_result)) hass.http.register_view(LoginFlowResourceView(hass.auth.login_flow, store_result)) class WellKnownOAuthInfoView(HomeAssistantView): """View to host the OAuth2 information.""" requires_auth = False url = "/.well-known/oauth-authorization-server" name = "well-known/oauth-authorization-server" async def get(self, request: web.Request) -> web.Response: """Return the well known OAuth2 authorization info.""" hass = request.app[KEY_HASS] # Some applications require absolute urls, so we prefer using the # current requests url if possible, with fallback to a relative url. try: url_prefix = get_url(hass, require_current_request=True) except NoURLAvailableError: url_prefix = "" metadata = { "authorization_endpoint": f"{url_prefix}/auth/authorize", "token_endpoint": f"{url_prefix}/auth/token", "revocation_endpoint": f"{url_prefix}/auth/revoke", "response_types_supported": ["code"], "service_documentation": ( "https://developers.home-assistant.io/docs/auth_api" ), } # Add issuer only when we have a valid base URL (RFC 8414 compliance) if url_prefix: metadata["issuer"] = url_prefix return self.json(metadata) class AuthProvidersView(HomeAssistantView): """View to get available auth providers.""" url = "/auth/providers" name = "api:auth:providers" requires_auth = False async def get(self, request: web.Request) -> web.Response: """Get available auth providers.""" hass = request.app[KEY_HASS] if not onboarding.async_is_user_onboarded(hass): return self.json_message( message="Onboarding not finished", status_code=HTTPStatus.BAD_REQUEST, message_code="onboarding_required", ) try: remote_address = ip_address(request.remote) # type: ignore[arg-type] except ValueError: return self.json_message( message="Invalid remote IP", status_code=HTTPStatus.BAD_REQUEST, message_code="invalid_remote_ip", ) cloud_connection = is_cloud_connection(hass) providers = [] for provider in hass.auth.auth_providers: if provider.type == "trusted_networks": if cloud_connection: # Skip quickly as trusted networks are not available on cloud continue try: cast("TrustedNetworksAuthProvider", provider).async_validate_access( remote_address ) except InvalidAuthError: # Not a trusted network, so we don't expose that trusted_network authenticator is setup continue providers.append( { "name": provider.name, "id": provider.id, "type": provider.type, } ) preselect_remember_me = not cloud_connection and is_local(remote_address) return self.json( { "providers": providers, "preselect_remember_me": preselect_remember_me, } ) def _prepare_result_json(result: AuthFlowResult) -> dict[str, Any]: """Convert result to JSON serializable dict.""" if result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY: return { key: val for key, val in result.items() if key not in ("result", "data") } if result["type"] != data_entry_flow.FlowResultType.FORM: return result # type: ignore[return-value] data = dict(result) if (schema := result["data_schema"]) is None: data["data_schema"] = [] else: data["data_schema"] = voluptuous_serialize.convert(schema) return data class LoginFlowBaseView(HomeAssistantView): """Base class for the login views.""" requires_auth = False def __init__( self, flow_mgr: AuthManagerFlowManager, store_result: StoreResultType, ) -> None: """Initialize the flow manager index view.""" self._flow_mgr = flow_mgr self._store_result = store_result async def _async_flow_result_to_response( self, request: web.Request, client_id: str, result: AuthFlowResult, ) -> web.Response: """Convert the flow result to a response.""" if result["type"] != data_entry_flow.FlowResultType.CREATE_ENTRY: # @log_invalid_auth does not work here since it returns HTTP 200. # We need to manually log failed login attempts. if ( result["type"] == data_entry_flow.FlowResultType.FORM and (errors := result.get("errors")) and errors.get("base") in ( "invalid_auth", "invalid_code", ) ): await process_wrong_login(request) return self.json(_prepare_result_json(result)) hass = request.app[KEY_HASS] if not await indieauth.verify_redirect_uri( hass, client_id, result["context"]["redirect_uri"] ): return self.json_message("Invalid redirect URI", HTTPStatus.FORBIDDEN) result.pop("data") result.pop("context") result_obj = result.pop("result") # Result can be None if credential was never linked to a user before. user = await hass.auth.async_get_user_by_credentials(result_obj) if user is not None and ( user_access_error := async_user_not_allowed_do_auth(hass, user) ): return self.json_message( f"Login blocked: {user_access_error}", HTTPStatus.FORBIDDEN ) process_success_login(request) # We overwrite the Credentials object with the string code to retrieve it. result["result"] = self._store_result(client_id, result_obj) # type: ignore[typeddict-item] return self.json(result) class LoginFlowIndexView(LoginFlowBaseView): """View to create a login flow.""" url = "/auth/login_flow" name = "api:auth:login_flow" async def get(self, request: web.Request) -> web.Response: """Do not allow index of flows in progress.""" return web.Response(status=HTTPStatus.METHOD_NOT_ALLOWED) @RequestDataValidator( vol.Schema( { vol.Required("client_id"): str, vol.Required("handler"): vol.All( [vol.Any(str, None)], vol.Length(2, 2), vol.Coerce(tuple) ), vol.Required("redirect_uri"): str, vol.Optional("type", default="authorize"): str, } ) ) @log_invalid_auth async def post(self, request: web.Request, data: dict[str, Any]) -> web.Response: """Create a new login flow.""" client_id: str = data["client_id"] redirect_uri: str = data["redirect_uri"] if not indieauth.verify_client_id(client_id): return self.json_message("Invalid client id", HTTPStatus.BAD_REQUEST) handler: tuple[str, str] = tuple(data["handler"]) try: result = await self._flow_mgr.async_init( handler, context=AuthFlowContext( ip_address=ip_address(request.remote), # type: ignore[arg-type] credential_only=data.get("type") == "link_user", redirect_uri=redirect_uri, ), ) except data_entry_flow.UnknownHandler: return self.json_message("Invalid handler specified", HTTPStatus.NOT_FOUND) except data_entry_flow.UnknownStep: return self.json_message( "Handler does not support init", HTTPStatus.BAD_REQUEST ) return await self._async_flow_result_to_response(request, client_id, result) class LoginFlowResourceView(LoginFlowBaseView): """View to interact with the flow manager.""" url = "/auth/login_flow/{flow_id}" name = "api:auth:login_flow:resource" async def get(self, request: web.Request) -> web.Response: """Do not allow getting status of a flow in progress.""" return self.json_message("Invalid flow specified", HTTPStatus.NOT_FOUND) @RequestDataValidator( vol.Schema( {vol.Required("client_id"): str}, extra=vol.ALLOW_EXTRA, ) ) @log_invalid_auth async def post( self, request: web.Request, data: dict[str, Any], flow_id: str ) -> web.Response: """Handle progressing a login flow request.""" client_id: str = data.pop("client_id") if not indieauth.verify_client_id(client_id): return self.json_message("Invalid client id", HTTPStatus.BAD_REQUEST) try: # do not allow change ip during login flow flow = self._flow_mgr.async_get(flow_id) if flow["context"]["ip_address"] != ip_address(request.remote): # type: ignore[arg-type] return self.json_message("IP address changed", HTTPStatus.BAD_REQUEST) result = await self._flow_mgr.async_configure(flow_id, data) except data_entry_flow.UnknownFlow: return self.json_message("Invalid flow specified", HTTPStatus.NOT_FOUND) except vol.Invalid: return self.json_message("User input malformed", HTTPStatus.BAD_REQUEST) return await self._async_flow_result_to_response(request, client_id, result) async def delete(self, request: web.Request, flow_id: str) -> web.Response: """Cancel a flow in progress.""" try: self._flow_mgr.async_abort(flow_id) except data_entry_flow.UnknownFlow: return self.json_message("Invalid flow specified", HTTPStatus.NOT_FOUND) return self.json_message("Flow aborted")