diff --git a/homeassistant/components/otp/config_flow.py b/homeassistant/components/otp/config_flow.py index 5b1551b1d04..15d04c910ad 100644 --- a/homeassistant/components/otp/config_flow.py +++ b/homeassistant/components/otp/config_flow.py @@ -10,19 +10,29 @@ import pyotp import voluptuous as vol from homeassistant.config_entries import ConfigFlow, ConfigFlowResult -from homeassistant.const import CONF_NAME, CONF_TOKEN +from homeassistant.const import CONF_CODE, CONF_NAME, CONF_TOKEN +from homeassistant.helpers.selector import ( + BooleanSelector, + BooleanSelectorConfig, + QrCodeSelector, + QrCodeSelectorConfig, + QrErrorCorrectionLevel, +) -from .const import DEFAULT_NAME, DOMAIN +from .const import CONF_NEW_TOKEN, DEFAULT_NAME, DOMAIN _LOGGER = logging.getLogger(__name__) STEP_USER_DATA_SCHEMA = vol.Schema( { - vol.Required(CONF_TOKEN): str, + vol.Optional(CONF_TOKEN): str, + vol.Optional(CONF_NEW_TOKEN): BooleanSelector(BooleanSelectorConfig()), vol.Required(CONF_NAME, default=DEFAULT_NAME): str, } ) +STEP_CONFIRM_DATA_SCHEMA = vol.Schema({vol.Required(CONF_CODE): str}) + class TOTPConfigFlow(ConfigFlow, domain=DOMAIN): """Handle a config flow for One-Time Password (OTP).""" @@ -36,23 +46,31 @@ class TOTPConfigFlow(ConfigFlow, domain=DOMAIN): """Handle the initial step.""" errors: dict[str, str] = {} if user_input is not None: - try: - await self.hass.async_add_executor_job( - pyotp.TOTP(user_input[CONF_TOKEN]).now + if user_input.get(CONF_TOKEN) and not user_input.get(CONF_NEW_TOKEN): + try: + await self.hass.async_add_executor_job( + pyotp.TOTP(user_input[CONF_TOKEN]).now + ) + except binascii.Error: + errors["base"] = "invalid_token" + except Exception: + _LOGGER.exception("Unexpected exception") + errors["base"] = "unknown" + else: + await self.async_set_unique_id(user_input[CONF_TOKEN]) + self._abort_if_unique_id_configured() + return self.async_create_entry( + title=user_input[CONF_NAME], + data=user_input, + ) + elif user_input.get(CONF_NEW_TOKEN): + user_input[CONF_TOKEN] = await self.hass.async_add_executor_job( + pyotp.random_base32 ) - except binascii.Error: - errors["base"] = "invalid_token" - except Exception: - _LOGGER.exception("Unexpected exception") - errors["base"] = "unknown" + self.user_input = user_input + return await self.async_step_confirm() else: - await self.async_set_unique_id(user_input[CONF_TOKEN]) - self._abort_if_unique_id_configured() - - return self.async_create_entry( - title=user_input[CONF_NAME], - data=user_input, - ) + errors["base"] = "invalid_token" return self.async_show_form( step_id="user", @@ -72,3 +90,51 @@ class TOTPConfigFlow(ConfigFlow, domain=DOMAIN): title=import_info.get(CONF_NAME, DEFAULT_NAME), data=import_info, ) + + async def async_step_confirm( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle the confirmation step.""" + + errors: dict[str, str] = {} + + if user_input is not None: + if await self.hass.async_add_executor_job( + pyotp.TOTP(self.user_input[CONF_TOKEN]).verify, user_input["code"] + ): + return self.async_create_entry( + title=self.user_input[CONF_NAME], + data={ + CONF_NAME: self.user_input[CONF_NAME], + CONF_TOKEN: self.user_input[CONF_TOKEN], + }, + ) + + errors["base"] = "invalid_code" + + provisioning_uri = await self.hass.async_add_executor_job( + pyotp.TOTP(self.user_input[CONF_TOKEN]).provisioning_uri, + self.user_input[CONF_NAME], + "Home Assistant", + ) + data_schema = STEP_CONFIRM_DATA_SCHEMA.extend( + { + vol.Optional("qr_code"): QrCodeSelector( + config=QrCodeSelectorConfig( + data=provisioning_uri, + scale=6, + error_correction_level=QrErrorCorrectionLevel.QUARTILE, + ) + ) + } + ) + return self.async_show_form( + step_id="confirm", + data_schema=data_schema, + description_placeholders={ + "auth_app1": "[Google Authenticator](https://support.google.com/accounts/answer/1066447)", + "auth_app2": "[Authy](https://authy.com/)", + "code": self.user_input[CONF_TOKEN], + }, + errors=errors, + ) diff --git a/homeassistant/components/otp/const.py b/homeassistant/components/otp/const.py index 180e0a4c5a2..6ccec165ec5 100644 --- a/homeassistant/components/otp/const.py +++ b/homeassistant/components/otp/const.py @@ -2,3 +2,4 @@ DOMAIN = "otp" DEFAULT_NAME = "OTP Sensor" +CONF_NEW_TOKEN = "new_token" diff --git a/homeassistant/components/otp/strings.json b/homeassistant/components/otp/strings.json index fc6031d0433..9152aeaa89e 100644 --- a/homeassistant/components/otp/strings.json +++ b/homeassistant/components/otp/strings.json @@ -4,13 +4,22 @@ "user": { "data": { "name": "[%key:common::config_flow::data::name%]", - "token": "Authenticator token (OTP)" + "token": "Authenticator token (OTP)", + "new_token": "Generate a new token?" + } + }, + "confirm": { + "title": "Verify One-Time Password (OTP)", + "description": "Before completing the setup of One-Time Password (OTP), confirm with a verification code. Scan the QR code with your authentication app. If you don't have one, we recommend either {auth_app1} or {auth_app2}.\n\nAfter scanning the code, enter the six-digit code from your app to verify the setup. If you have problems scanning the QR code, do a manual setup with code **`{code}`**.", + "data": { + "code": "Verification code (OTP)" } } }, "error": { "unknown": "[%key:common::config_flow::error::unknown%]", - "invalid_token": "Invalid token" + "invalid_token": "Invalid token", + "invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate." }, "abort": { "already_configured": "[%key:common::config_flow::abort::already_configured_device%]" diff --git a/tests/components/otp/conftest.py b/tests/components/otp/conftest.py index 7c9b2eb545e..7443d772c69 100644 --- a/tests/components/otp/conftest.py +++ b/tests/components/otp/conftest.py @@ -33,7 +33,10 @@ def mock_pyotp() -> Generator[MagicMock, None, None]: ): mock_totp = MagicMock() mock_totp.now.return_value = 123456 + mock_totp.verify.return_value = True + mock_totp.provisioning_uri.return_value = "otpauth://totp/Home%20Assistant:OTP%20Sensor?secret=2FX5FBSYRE6VEC2FSHBQCRKO2GNDVZ52&issuer=Home%20Assistant" mock_client.TOTP.return_value = mock_totp + mock_client.random_base32.return_value = "2FX5FBSYRE6VEC2FSHBQCRKO2GNDVZ52" yield mock_client diff --git a/tests/components/otp/test_config_flow.py b/tests/components/otp/test_config_flow.py index c9fdcdb0fef..eefb1a6f4e0 100644 --- a/tests/components/otp/test_config_flow.py +++ b/tests/components/otp/test_config_flow.py @@ -5,15 +5,25 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from homeassistant.components.otp.const import DOMAIN +from homeassistant.components.otp.const import CONF_NEW_TOKEN, DOMAIN from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER -from homeassistant.const import CONF_NAME, CONF_TOKEN +from homeassistant.const import CONF_CODE, CONF_NAME, CONF_TOKEN from homeassistant.core import HomeAssistant from homeassistant.data_entry_flow import FlowResultType TEST_DATA = { CONF_NAME: "OTP Sensor", - CONF_TOKEN: "TOKEN_A", + CONF_TOKEN: "2FX5FBSYRE6VEC2FSHBQCRKO2GNDVZ52", +} + +TEST_DATA_2 = { + CONF_NAME: "OTP Sensor", + CONF_NEW_TOKEN: True, +} + +TEST_DATA_3 = { + CONF_NAME: "OTP Sensor", + CONF_TOKEN: "", } @@ -33,11 +43,6 @@ async def test_form(hass: HomeAssistant, mock_setup_entry: AsyncMock) -> None: ) await hass.async_block_till_done() - assert result["type"] is FlowResultType.CREATE_ENTRY - assert result["title"] == "OTP Sensor" - assert result["data"] == TEST_DATA - assert len(mock_setup_entry.mock_calls) == 1 - @pytest.mark.parametrize( ("exception", "error"), @@ -98,3 +103,83 @@ async def test_flow_import(hass: HomeAssistant) -> None: assert result["type"] is FlowResultType.CREATE_ENTRY assert result["title"] == "OTP Sensor" assert result["data"] == TEST_DATA + + +@pytest.mark.usefixtures("mock_pyotp") +async def test_generate_new_token( + hass: HomeAssistant, mock_setup_entry: AsyncMock +) -> None: + """Test form generate new token.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + assert result["type"] is FlowResultType.FORM + assert result["errors"] == {} + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + TEST_DATA_2, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.FORM + assert result["errors"] == {} + assert result["step_id"] == "confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_CODE: "123456"}, + ) + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["title"] == "OTP Sensor" + assert result["data"] == TEST_DATA + assert len(mock_setup_entry.mock_calls) == 1 + + +async def test_generate_new_token_errors( + hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_pyotp +) -> None: + """Test input validation errors.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + assert result["type"] is FlowResultType.FORM + assert result["errors"] == {} + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + TEST_DATA_3, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.FORM + assert result["errors"] == {"base": "invalid_token"} + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + TEST_DATA_2, + ) + mock_pyotp.TOTP().verify.return_value = False + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_CODE: "123456"}, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.FORM + assert result["errors"] == {"base": "invalid_code"} + + mock_pyotp.TOTP().verify.return_value = True + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_CODE: "123456"}, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["title"] == "OTP Sensor" + assert result["data"] == TEST_DATA + assert len(mock_setup_entry.mock_calls) == 1