Add captcha to BMW ConfigFlow (#131351)

Co-authored-by: Franck Nijhof <git@frenck.dev>
This commit is contained in:
Richard Kroegel 2024-11-28 21:01:00 +01:00 committed by GitHub
parent 9db6f0ffc4
commit 6dd93253c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 153 additions and 74 deletions

View File

@ -27,9 +27,18 @@ from homeassistant.const import CONF_PASSWORD, CONF_REGION, CONF_SOURCE, CONF_US
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
from homeassistant.util.ssl import get_default_context
from . import DOMAIN from . import DOMAIN
from .const import CONF_ALLOWED_REGIONS, CONF_GCID, CONF_READ_ONLY, CONF_REFRESH_TOKEN from .const import (
CONF_ALLOWED_REGIONS,
CONF_CAPTCHA_REGIONS,
CONF_CAPTCHA_TOKEN,
CONF_CAPTCHA_URL,
CONF_GCID,
CONF_READ_ONLY,
CONF_REFRESH_TOKEN,
)
DATA_SCHEMA = vol.Schema( DATA_SCHEMA = vol.Schema(
{ {
@ -41,7 +50,14 @@ DATA_SCHEMA = vol.Schema(
translation_key="regions", translation_key="regions",
) )
), ),
} },
extra=vol.REMOVE_EXTRA,
)
CAPTCHA_SCHEMA = vol.Schema(
{
vol.Required(CONF_CAPTCHA_TOKEN): str,
},
extra=vol.REMOVE_EXTRA,
) )
@ -54,6 +70,8 @@ async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str,
data[CONF_USERNAME], data[CONF_USERNAME],
data[CONF_PASSWORD], data[CONF_PASSWORD],
get_region_from_name(data[CONF_REGION]), get_region_from_name(data[CONF_REGION]),
hcaptcha_token=data.get(CONF_CAPTCHA_TOKEN),
verify=get_default_context(),
) )
try: try:
@ -79,15 +97,17 @@ class BMWConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1 VERSION = 1
data: dict[str, Any] = {}
_existing_entry_data: Mapping[str, Any] | None = None _existing_entry_data: Mapping[str, Any] | None = None
async def async_step_user( async def async_step_user(
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult: ) -> ConfigFlowResult:
"""Handle the initial step.""" """Handle the initial step."""
errors: dict[str, str] = {} errors: dict[str, str] = self.data.pop("errors", {})
if user_input is not None: if user_input is not None and not errors:
unique_id = f"{user_input[CONF_REGION]}-{user_input[CONF_USERNAME]}" unique_id = f"{user_input[CONF_REGION]}-{user_input[CONF_USERNAME]}"
await self.async_set_unique_id(unique_id) await self.async_set_unique_id(unique_id)
@ -96,22 +116,35 @@ class BMWConfigFlow(ConfigFlow, domain=DOMAIN):
else: else:
self._abort_if_unique_id_configured() self._abort_if_unique_id_configured()
# Store user input for later use
self.data.update(user_input)
# North America and Rest of World require captcha token
if (
self.data.get(CONF_REGION) in CONF_CAPTCHA_REGIONS
and CONF_CAPTCHA_TOKEN not in self.data
):
return await self.async_step_captcha()
info = None info = None
try: try:
info = await validate_input(self.hass, user_input) info = await validate_input(self.hass, self.data)
entry_data = {
**user_input,
CONF_REFRESH_TOKEN: info.get(CONF_REFRESH_TOKEN),
CONF_GCID: info.get(CONF_GCID),
}
except MissingCaptcha: except MissingCaptcha:
errors["base"] = "missing_captcha" errors["base"] = "missing_captcha"
except CannotConnect: except CannotConnect:
errors["base"] = "cannot_connect" errors["base"] = "cannot_connect"
except InvalidAuth: except InvalidAuth:
errors["base"] = "invalid_auth" errors["base"] = "invalid_auth"
finally:
self.data.pop(CONF_CAPTCHA_TOKEN, None)
if info: if info:
entry_data = {
**self.data,
CONF_REFRESH_TOKEN: info.get(CONF_REFRESH_TOKEN),
CONF_GCID: info.get(CONF_GCID),
}
if self.source == SOURCE_REAUTH: if self.source == SOURCE_REAUTH:
return self.async_update_reload_and_abort( return self.async_update_reload_and_abort(
self._get_reauth_entry(), data=entry_data self._get_reauth_entry(), data=entry_data
@ -128,7 +161,7 @@ class BMWConfigFlow(ConfigFlow, domain=DOMAIN):
schema = self.add_suggested_values_to_schema( schema = self.add_suggested_values_to_schema(
DATA_SCHEMA, DATA_SCHEMA,
self._existing_entry_data, self._existing_entry_data or self.data,
) )
return self.async_show_form(step_id="user", data_schema=schema, errors=errors) return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
@ -147,6 +180,22 @@ class BMWConfigFlow(ConfigFlow, domain=DOMAIN):
self._existing_entry_data = self._get_reconfigure_entry().data self._existing_entry_data = self._get_reconfigure_entry().data
return await self.async_step_user() return await self.async_step_user()
async def async_step_captcha(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Show captcha form."""
if user_input and user_input.get(CONF_CAPTCHA_TOKEN):
self.data[CONF_CAPTCHA_TOKEN] = user_input[CONF_CAPTCHA_TOKEN].strip()
return await self.async_step_user(self.data)
return self.async_show_form(
step_id="captcha",
data_schema=CAPTCHA_SCHEMA,
description_placeholders={
"captcha_url": CONF_CAPTCHA_URL.format(region=self.data[CONF_REGION])
},
)
@staticmethod @staticmethod
@callback @callback
def async_get_options_flow( def async_get_options_flow(

View File

@ -8,10 +8,15 @@ ATTR_DIRECTION = "direction"
ATTR_VIN = "vin" ATTR_VIN = "vin"
CONF_ALLOWED_REGIONS = ["china", "north_america", "rest_of_world"] CONF_ALLOWED_REGIONS = ["china", "north_america", "rest_of_world"]
CONF_CAPTCHA_REGIONS = ["north_america", "rest_of_world"]
CONF_READ_ONLY = "read_only" CONF_READ_ONLY = "read_only"
CONF_ACCOUNT = "account" CONF_ACCOUNT = "account"
CONF_REFRESH_TOKEN = "refresh_token" CONF_REFRESH_TOKEN = "refresh_token"
CONF_GCID = "gcid" CONF_GCID = "gcid"
CONF_CAPTCHA_TOKEN = "captcha_token"
CONF_CAPTCHA_URL = (
"https://bimmer-connected.readthedocs.io/en/stable/captcha/{region}.html"
)
DATA_HASS_CONFIG = "hass_config" DATA_HASS_CONFIG = "hass_config"

View File

@ -84,11 +84,6 @@ class BMWDataUpdateCoordinator(DataUpdateCoordinator[None]):
if self.account.refresh_token != old_refresh_token: if self.account.refresh_token != old_refresh_token:
self._update_config_entry_refresh_token(self.account.refresh_token) self._update_config_entry_refresh_token(self.account.refresh_token)
_LOGGER.debug(
"bimmer_connected: refresh token %s > %s",
old_refresh_token,
self.account.refresh_token,
)
def _update_config_entry_refresh_token(self, refresh_token: str | None) -> None: def _update_config_entry_refresh_token(self, refresh_token: str | None) -> None:
"""Update or delete the refresh_token in the Config Entry.""" """Update or delete the refresh_token in the Config Entry."""

View File

@ -7,6 +7,16 @@
"password": "[%key:common::config_flow::data::password%]", "password": "[%key:common::config_flow::data::password%]",
"region": "ConnectedDrive Region" "region": "ConnectedDrive Region"
} }
},
"captcha": {
"title": "Are you a robot?",
"description": "A captcha is required for BMW login. Visit the external website to complete the challenge and submit the form. Copy the resulting token into the field below.\n\n{captcha_url}\n\nNo data will be exposed outside of your Home Assistant instance.",
"data": {
"captcha_token": "Captcha token"
},
"data_description": {
"captcha_token": "One-time token retrieved from the captcha challenge."
}
} }
}, },
"error": { "error": {

View File

@ -9,6 +9,7 @@ import respx
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.components.bmw_connected_drive.const import ( from homeassistant.components.bmw_connected_drive.const import (
CONF_CAPTCHA_TOKEN,
CONF_GCID, CONF_GCID,
CONF_READ_ONLY, CONF_READ_ONLY,
CONF_REFRESH_TOKEN, CONF_REFRESH_TOKEN,
@ -24,8 +25,12 @@ FIXTURE_USER_INPUT = {
CONF_PASSWORD: "p4ssw0rd", CONF_PASSWORD: "p4ssw0rd",
CONF_REGION: "rest_of_world", CONF_REGION: "rest_of_world",
} }
FIXTURE_REFRESH_TOKEN = "SOME_REFRESH_TOKEN" FIXTURE_CAPTCHA_INPUT = {
FIXTURE_GCID = "SOME_GCID" CONF_CAPTCHA_TOKEN: "captcha_token",
}
FIXTURE_USER_INPUT_W_CAPTCHA = FIXTURE_USER_INPUT | FIXTURE_CAPTCHA_INPUT
FIXTURE_REFRESH_TOKEN = "another_token_string"
FIXTURE_GCID = "DUMMY"
FIXTURE_CONFIG_ENTRY = { FIXTURE_CONFIG_ENTRY = {
"entry_id": "1", "entry_id": "1",

View File

@ -4833,7 +4833,7 @@
}), }),
]), ]),
'info': dict({ 'info': dict({
'gcid': 'SOME_GCID', 'gcid': 'DUMMY',
'password': '**REDACTED**', 'password': '**REDACTED**',
'refresh_token': '**REDACTED**', 'refresh_token': '**REDACTED**',
'region': 'rest_of_world', 'region': 'rest_of_world',
@ -7202,7 +7202,7 @@
}), }),
]), ]),
'info': dict({ 'info': dict({
'gcid': 'SOME_GCID', 'gcid': 'DUMMY',
'password': '**REDACTED**', 'password': '**REDACTED**',
'refresh_token': '**REDACTED**', 'refresh_token': '**REDACTED**',
'region': 'rest_of_world', 'region': 'rest_of_world',
@ -8925,7 +8925,7 @@
}), }),
]), ]),
'info': dict({ 'info': dict({
'gcid': 'SOME_GCID', 'gcid': 'DUMMY',
'password': '**REDACTED**', 'password': '**REDACTED**',
'refresh_token': '**REDACTED**', 'refresh_token': '**REDACTED**',
'region': 'rest_of_world', 'region': 'rest_of_world',

View File

@ -4,17 +4,14 @@ from copy import deepcopy
from unittest.mock import patch from unittest.mock import patch
from bimmer_connected.api.authentication import MyBMWAuthentication from bimmer_connected.api.authentication import MyBMWAuthentication
from bimmer_connected.models import ( from bimmer_connected.models import MyBMWAPIError, MyBMWAuthError
MyBMWAPIError,
MyBMWAuthError,
MyBMWCaptchaMissingError,
)
from httpx import RequestError from httpx import RequestError
import pytest import pytest
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.components.bmw_connected_drive.config_flow import DOMAIN from homeassistant.components.bmw_connected_drive.config_flow import DOMAIN
from homeassistant.components.bmw_connected_drive.const import ( from homeassistant.components.bmw_connected_drive.const import (
CONF_CAPTCHA_TOKEN,
CONF_READ_ONLY, CONF_READ_ONLY,
CONF_REFRESH_TOKEN, CONF_REFRESH_TOKEN,
) )
@ -23,10 +20,12 @@ from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType from homeassistant.data_entry_flow import FlowResultType
from . import ( from . import (
FIXTURE_CAPTCHA_INPUT,
FIXTURE_CONFIG_ENTRY, FIXTURE_CONFIG_ENTRY,
FIXTURE_GCID, FIXTURE_GCID,
FIXTURE_REFRESH_TOKEN, FIXTURE_REFRESH_TOKEN,
FIXTURE_USER_INPUT, FIXTURE_USER_INPUT,
FIXTURE_USER_INPUT_W_CAPTCHA,
) )
from tests.common import MockConfigEntry from tests.common import MockConfigEntry
@ -61,7 +60,7 @@ async def test_authentication_error(hass: HomeAssistant) -> None:
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": config_entries.SOURCE_USER}, context={"source": config_entries.SOURCE_USER},
data=FIXTURE_USER_INPUT, data=deepcopy(FIXTURE_USER_INPUT_W_CAPTCHA),
) )
assert result["type"] is FlowResultType.FORM assert result["type"] is FlowResultType.FORM
@ -79,7 +78,7 @@ async def test_connection_error(hass: HomeAssistant) -> None:
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": config_entries.SOURCE_USER}, context={"source": config_entries.SOURCE_USER},
data=FIXTURE_USER_INPUT, data=deepcopy(FIXTURE_USER_INPUT_W_CAPTCHA),
) )
assert result["type"] is FlowResultType.FORM assert result["type"] is FlowResultType.FORM
@ -97,7 +96,7 @@ async def test_api_error(hass: HomeAssistant) -> None:
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": config_entries.SOURCE_USER}, context={"source": config_entries.SOURCE_USER},
data=deepcopy(FIXTURE_USER_INPUT), data=deepcopy(FIXTURE_USER_INPUT_W_CAPTCHA),
) )
assert result["type"] is FlowResultType.FORM assert result["type"] is FlowResultType.FORM
@ -105,6 +104,28 @@ async def test_api_error(hass: HomeAssistant) -> None:
assert result["errors"] == {"base": "cannot_connect"} assert result["errors"] == {"base": "cannot_connect"}
@pytest.mark.usefixtures("bmw_fixture")
async def test_captcha_flow_missing_error(hass: HomeAssistant) -> None:
"""Test the external flow with captcha failing once and succeeding the second time."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
data=deepcopy(FIXTURE_USER_INPUT),
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "captcha"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_CAPTCHA_TOKEN: " "}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "missing_captcha"}
async def test_full_user_flow_implementation(hass: HomeAssistant) -> None: async def test_full_user_flow_implementation(hass: HomeAssistant) -> None:
"""Test registering an integration and finishing flow works.""" """Test registering an integration and finishing flow works."""
with ( with (
@ -118,14 +139,22 @@ async def test_full_user_flow_implementation(hass: HomeAssistant) -> None:
return_value=True, return_value=True,
) as mock_setup_entry, ) as mock_setup_entry,
): ):
result2 = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": config_entries.SOURCE_USER}, context={"source": config_entries.SOURCE_USER},
data=deepcopy(FIXTURE_USER_INPUT), data=deepcopy(FIXTURE_USER_INPUT),
) )
assert result2["type"] is FlowResultType.CREATE_ENTRY
assert result2["title"] == FIXTURE_COMPLETE_ENTRY[CONF_USERNAME] assert result["type"] is FlowResultType.FORM
assert result2["data"] == FIXTURE_COMPLETE_ENTRY assert result["step_id"] == "captcha"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_CAPTCHA_INPUT
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == FIXTURE_COMPLETE_ENTRY[CONF_USERNAME]
assert result["data"] == FIXTURE_COMPLETE_ENTRY
assert len(mock_setup_entry.mock_calls) == 1 assert len(mock_setup_entry.mock_calls) == 1
@ -206,13 +235,20 @@ async def test_reauth(hass: HomeAssistant) -> None:
assert suggested_values[CONF_PASSWORD] == wrong_password assert suggested_values[CONF_PASSWORD] == wrong_password
assert suggested_values[CONF_REGION] == FIXTURE_USER_INPUT[CONF_REGION] assert suggested_values[CONF_REGION] == FIXTURE_USER_INPUT[CONF_REGION]
result2 = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT result["flow_id"], deepcopy(FIXTURE_USER_INPUT)
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert result2["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.FORM
assert result2["reason"] == "reauth_successful" assert result["step_id"] == "captcha"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_CAPTCHA_INPUT
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_successful"
assert config_entry.data == FIXTURE_COMPLETE_ENTRY assert config_entry.data == FIXTURE_COMPLETE_ENTRY
assert len(mock_setup_entry.mock_calls) == 2 assert len(mock_setup_entry.mock_calls) == 2
@ -243,13 +279,13 @@ async def test_reauth_unique_id_abort(hass: HomeAssistant) -> None:
assert result["step_id"] == "user" assert result["step_id"] == "user"
assert result["errors"] == {} assert result["errors"] == {}
result2 = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], {**FIXTURE_USER_INPUT, CONF_REGION: "north_america"} result["flow_id"], {**FIXTURE_USER_INPUT, CONF_REGION: "north_america"}
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert result2["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.ABORT
assert result2["reason"] == "account_mismatch" assert result["reason"] == "account_mismatch"
assert config_entry.data == config_entry_with_wrong_password["data"] assert config_entry.data == config_entry_with_wrong_password["data"]
@ -279,13 +315,20 @@ async def test_reconfigure(hass: HomeAssistant) -> None:
assert suggested_values[CONF_PASSWORD] == FIXTURE_USER_INPUT[CONF_PASSWORD] assert suggested_values[CONF_PASSWORD] == FIXTURE_USER_INPUT[CONF_PASSWORD]
assert suggested_values[CONF_REGION] == FIXTURE_USER_INPUT[CONF_REGION] assert suggested_values[CONF_REGION] == FIXTURE_USER_INPUT[CONF_REGION]
result2 = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT result["flow_id"], FIXTURE_USER_INPUT
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert result2["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.FORM
assert result2["reason"] == "reconfigure_successful" assert result["step_id"] == "captcha"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_CAPTCHA_INPUT
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reconfigure_successful"
assert config_entry.data == FIXTURE_COMPLETE_ENTRY assert config_entry.data == FIXTURE_COMPLETE_ENTRY
@ -307,40 +350,12 @@ async def test_reconfigure_unique_id_abort(hass: HomeAssistant) -> None:
assert result["step_id"] == "user" assert result["step_id"] == "user"
assert result["errors"] == {} assert result["errors"] == {}
result2 = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
{**FIXTURE_USER_INPUT, CONF_USERNAME: "somebody@email.com"}, {**FIXTURE_USER_INPUT, CONF_USERNAME: "somebody@email.com"},
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert result2["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.ABORT
assert result2["reason"] == "account_mismatch" assert result["reason"] == "account_mismatch"
assert config_entry.data == FIXTURE_COMPLETE_ENTRY assert config_entry.data == FIXTURE_COMPLETE_ENTRY
@pytest.mark.usefixtures("bmw_fixture")
async def test_captcha_flow_not_set(hass: HomeAssistant) -> None:
"""Test the external flow with captcha failing once and succeeding the second time."""
TEST_REGION = "north_america"
# Start flow and open form
# Start flow and open form
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# Add login data
with patch(
"bimmer_connected.api.authentication.MyBMWAuthentication._login_row_na",
side_effect=MyBMWCaptchaMissingError(
"Missing hCaptcha token for North America login"
),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={**FIXTURE_USER_INPUT, CONF_REGION: TEST_REGION},
)
assert result["errors"]["base"] == "missing_captcha"