Add feature to generate OTP token in One-Time Password (OTP) integration (#120055)

This commit is contained in:
Mr. Bubbles 2024-06-22 16:43:04 +02:00 committed by GitHub
parent 10edf85311
commit 856aa38539
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 192 additions and 28 deletions

View File

@ -10,19 +10,29 @@ import pyotp
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_NAME, CONF_TOKEN
from homeassistant.const import CONF_CODE, CONF_NAME, CONF_TOKEN
from homeassistant.helpers.selector import (
BooleanSelector,
BooleanSelectorConfig,
QrCodeSelector,
QrCodeSelectorConfig,
QrErrorCorrectionLevel,
)
from .const import DEFAULT_NAME, DOMAIN
from .const import CONF_NEW_TOKEN, DEFAULT_NAME, DOMAIN
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_TOKEN): str,
vol.Optional(CONF_TOKEN): str,
vol.Optional(CONF_NEW_TOKEN): BooleanSelector(BooleanSelectorConfig()),
vol.Required(CONF_NAME, default=DEFAULT_NAME): str,
}
)
STEP_CONFIRM_DATA_SCHEMA = vol.Schema({vol.Required(CONF_CODE): str})
class TOTPConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for One-Time Password (OTP)."""
@ -36,23 +46,31 @@ class TOTPConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
try:
await self.hass.async_add_executor_job(
pyotp.TOTP(user_input[CONF_TOKEN]).now
if user_input.get(CONF_TOKEN) and not user_input.get(CONF_NEW_TOKEN):
try:
await self.hass.async_add_executor_job(
pyotp.TOTP(user_input[CONF_TOKEN]).now
)
except binascii.Error:
errors["base"] = "invalid_token"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(user_input[CONF_TOKEN])
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_input[CONF_NAME],
data=user_input,
)
elif user_input.get(CONF_NEW_TOKEN):
user_input[CONF_TOKEN] = await self.hass.async_add_executor_job(
pyotp.random_base32
)
except binascii.Error:
errors["base"] = "invalid_token"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
self.user_input = user_input
return await self.async_step_confirm()
else:
await self.async_set_unique_id(user_input[CONF_TOKEN])
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_input[CONF_NAME],
data=user_input,
)
errors["base"] = "invalid_token"
return self.async_show_form(
step_id="user",
@ -72,3 +90,51 @@ class TOTPConfigFlow(ConfigFlow, domain=DOMAIN):
title=import_info.get(CONF_NAME, DEFAULT_NAME),
data=import_info,
)
async def async_step_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the confirmation step."""
errors: dict[str, str] = {}
if user_input is not None:
if await self.hass.async_add_executor_job(
pyotp.TOTP(self.user_input[CONF_TOKEN]).verify, user_input["code"]
):
return self.async_create_entry(
title=self.user_input[CONF_NAME],
data={
CONF_NAME: self.user_input[CONF_NAME],
CONF_TOKEN: self.user_input[CONF_TOKEN],
},
)
errors["base"] = "invalid_code"
provisioning_uri = await self.hass.async_add_executor_job(
pyotp.TOTP(self.user_input[CONF_TOKEN]).provisioning_uri,
self.user_input[CONF_NAME],
"Home Assistant",
)
data_schema = STEP_CONFIRM_DATA_SCHEMA.extend(
{
vol.Optional("qr_code"): QrCodeSelector(
config=QrCodeSelectorConfig(
data=provisioning_uri,
scale=6,
error_correction_level=QrErrorCorrectionLevel.QUARTILE,
)
)
}
)
return self.async_show_form(
step_id="confirm",
data_schema=data_schema,
description_placeholders={
"auth_app1": "[Google Authenticator](https://support.google.com/accounts/answer/1066447)",
"auth_app2": "[Authy](https://authy.com/)",
"code": self.user_input[CONF_TOKEN],
},
errors=errors,
)

View File

@ -2,3 +2,4 @@
DOMAIN = "otp"
DEFAULT_NAME = "OTP Sensor"
CONF_NEW_TOKEN = "new_token"

View File

@ -4,13 +4,22 @@
"user": {
"data": {
"name": "[%key:common::config_flow::data::name%]",
"token": "Authenticator token (OTP)"
"token": "Authenticator token (OTP)",
"new_token": "Generate a new token?"
}
},
"confirm": {
"title": "Verify One-Time Password (OTP)",
"description": "Before completing the setup of One-Time Password (OTP), confirm with a verification code. Scan the QR code with your authentication app. If you don't have one, we recommend either {auth_app1} or {auth_app2}.\n\nAfter scanning the code, enter the six-digit code from your app to verify the setup. If you have problems scanning the QR code, do a manual setup with code **`{code}`**.",
"data": {
"code": "Verification code (OTP)"
}
}
},
"error": {
"unknown": "[%key:common::config_flow::error::unknown%]",
"invalid_token": "Invalid token"
"invalid_token": "Invalid token",
"invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate."
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"

View File

@ -33,7 +33,10 @@ def mock_pyotp() -> Generator[MagicMock, None, None]:
):
mock_totp = MagicMock()
mock_totp.now.return_value = 123456
mock_totp.verify.return_value = True
mock_totp.provisioning_uri.return_value = "otpauth://totp/Home%20Assistant:OTP%20Sensor?secret=2FX5FBSYRE6VEC2FSHBQCRKO2GNDVZ52&issuer=Home%20Assistant"
mock_client.TOTP.return_value = mock_totp
mock_client.random_base32.return_value = "2FX5FBSYRE6VEC2FSHBQCRKO2GNDVZ52"
yield mock_client

View File

@ -5,15 +5,25 @@ from unittest.mock import AsyncMock, MagicMock
import pytest
from homeassistant.components.otp.const import DOMAIN
from homeassistant.components.otp.const import CONF_NEW_TOKEN, DOMAIN
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.const import CONF_NAME, CONF_TOKEN
from homeassistant.const import CONF_CODE, CONF_NAME, CONF_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
TEST_DATA = {
CONF_NAME: "OTP Sensor",
CONF_TOKEN: "TOKEN_A",
CONF_TOKEN: "2FX5FBSYRE6VEC2FSHBQCRKO2GNDVZ52",
}
TEST_DATA_2 = {
CONF_NAME: "OTP Sensor",
CONF_NEW_TOKEN: True,
}
TEST_DATA_3 = {
CONF_NAME: "OTP Sensor",
CONF_TOKEN: "",
}
@ -33,11 +43,6 @@ async def test_form(hass: HomeAssistant, mock_setup_entry: AsyncMock) -> None:
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "OTP Sensor"
assert result["data"] == TEST_DATA
assert len(mock_setup_entry.mock_calls) == 1
@pytest.mark.parametrize(
("exception", "error"),
@ -98,3 +103,83 @@ async def test_flow_import(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "OTP Sensor"
assert result["data"] == TEST_DATA
@pytest.mark.usefixtures("mock_pyotp")
async def test_generate_new_token(
hass: HomeAssistant, mock_setup_entry: AsyncMock
) -> None:
"""Test form generate new token."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
TEST_DATA_2,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
assert result["step_id"] == "confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_CODE: "123456"},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "OTP Sensor"
assert result["data"] == TEST_DATA
assert len(mock_setup_entry.mock_calls) == 1
async def test_generate_new_token_errors(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_pyotp
) -> None:
"""Test input validation errors."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
TEST_DATA_3,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": "invalid_token"}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
TEST_DATA_2,
)
mock_pyotp.TOTP().verify.return_value = False
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_CODE: "123456"},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": "invalid_code"}
mock_pyotp.TOTP().verify.return_value = True
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_CODE: "123456"},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "OTP Sensor"
assert result["data"] == TEST_DATA
assert len(mock_setup_entry.mock_calls) == 1