mirror of
https://github.com/home-assistant/core.git
synced 2026-04-20 10:45:54 +00:00
Teltonika integration: add reauth config flow (#163712)
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
@@ -126,6 +127,65 @@ class TeltonikaConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
|
||||
)
|
||||
|
||||
async def async_step_reauth(
|
||||
self, entry_data: Mapping[str, Any]
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle reauth when authentication fails."""
|
||||
return await self.async_step_reauth_confirm()
|
||||
|
||||
async def async_step_reauth_confirm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle reauth confirmation."""
|
||||
errors: dict[str, str] = {}
|
||||
reauth_entry = self._get_reauth_entry()
|
||||
|
||||
if user_input is not None:
|
||||
data = {
|
||||
CONF_HOST: reauth_entry.data[CONF_HOST],
|
||||
CONF_USERNAME: user_input[CONF_USERNAME],
|
||||
CONF_PASSWORD: user_input[CONF_PASSWORD],
|
||||
CONF_VERIFY_SSL: reauth_entry.data.get(CONF_VERIFY_SSL, False),
|
||||
}
|
||||
try:
|
||||
# Validate new credentials against the configured host
|
||||
info = await validate_input(self.hass, data)
|
||||
except CannotConnect:
|
||||
errors["base"] = "cannot_connect"
|
||||
except InvalidAuth:
|
||||
errors["base"] = "invalid_auth"
|
||||
except Exception:
|
||||
_LOGGER.exception("Unexpected exception during reauth")
|
||||
errors["base"] = "unknown"
|
||||
else:
|
||||
# Verify reauth is for the same device
|
||||
await self.async_set_unique_id(info["device_id"])
|
||||
self._abort_if_unique_id_mismatch(reason="wrong_account")
|
||||
|
||||
return self.async_update_reload_and_abort(
|
||||
reauth_entry,
|
||||
data_updates=user_input,
|
||||
)
|
||||
|
||||
reauth_schema = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_USERNAME): str,
|
||||
vol.Required(CONF_PASSWORD): str,
|
||||
}
|
||||
)
|
||||
|
||||
suggested = {**reauth_entry.data, **(user_input or {})}
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="reauth_confirm",
|
||||
data_schema=self.add_suggested_values_to_schema(reauth_schema, suggested),
|
||||
errors=errors,
|
||||
description_placeholders={
|
||||
"name": reauth_entry.title,
|
||||
"host": reauth_entry.data[CONF_HOST],
|
||||
},
|
||||
)
|
||||
|
||||
async def async_step_dhcp(
|
||||
self, discovery_info: DhcpServiceInfo
|
||||
) -> ConfigFlowResult:
|
||||
|
||||
@@ -8,10 +8,11 @@ from typing import TYPE_CHECKING, Any
|
||||
|
||||
from aiohttp import ClientResponseError, ContentTypeError
|
||||
from teltasync import Teltasync, TeltonikaAuthenticationError, TeltonikaConnectionError
|
||||
from teltasync.error_codes import TeltonikaErrorCode
|
||||
from teltasync.modems import Modems
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||
from homeassistant.helpers.device_registry import DeviceInfo
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
|
||||
@@ -23,6 +24,13 @@ if TYPE_CHECKING:
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
SCAN_INTERVAL = timedelta(seconds=30)
|
||||
AUTH_ERROR_CODES = frozenset(
|
||||
{
|
||||
TeltonikaErrorCode.UNAUTHORIZED_ACCESS,
|
||||
TeltonikaErrorCode.LOGIN_FAILED,
|
||||
TeltonikaErrorCode.INVALID_JWT_TOKEN,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class TeltonikaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||
@@ -54,12 +62,12 @@ class TeltonikaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||
await self.client.get_device_info()
|
||||
system_info_response = await self.client.get_system_info()
|
||||
except TeltonikaAuthenticationError as err:
|
||||
raise ConfigEntryError(f"Authentication failed: {err}") from err
|
||||
raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err
|
||||
except (ClientResponseError, ContentTypeError) as err:
|
||||
if isinstance(err, ClientResponseError) and err.status in (401, 403):
|
||||
raise ConfigEntryError(f"Authentication failed: {err}") from err
|
||||
if isinstance(err, ContentTypeError) and err.status == 403:
|
||||
raise ConfigEntryError(f"Authentication failed: {err}") from err
|
||||
if (isinstance(err, ClientResponseError) and err.status in (401, 403)) or (
|
||||
isinstance(err, ContentTypeError) and err.status == 403
|
||||
):
|
||||
raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err
|
||||
raise ConfigEntryNotReady(f"Failed to connect to device: {err}") from err
|
||||
except TeltonikaConnectionError as err:
|
||||
raise ConfigEntryNotReady(f"Failed to connect to device: {err}") from err
|
||||
@@ -81,9 +89,32 @@ class TeltonikaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||
try:
|
||||
# Get modems data using the teltasync library
|
||||
modems_response = await modems.get_status()
|
||||
except TeltonikaAuthenticationError as err:
|
||||
raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err
|
||||
except (ClientResponseError, ContentTypeError) as err:
|
||||
if (isinstance(err, ClientResponseError) and err.status in (401, 403)) or (
|
||||
isinstance(err, ContentTypeError) and err.status == 403
|
||||
):
|
||||
raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err
|
||||
raise UpdateFailed(f"Error communicating with device: {err}") from err
|
||||
except TeltonikaConnectionError as err:
|
||||
raise UpdateFailed(f"Error communicating with device: {err}") from err
|
||||
|
||||
if not modems_response.success:
|
||||
if modems_response.errors and any(
|
||||
error.code in AUTH_ERROR_CODES for error in modems_response.errors
|
||||
):
|
||||
raise ConfigEntryAuthFailed(
|
||||
"Authentication failed: unauthorized access"
|
||||
)
|
||||
|
||||
error_message = (
|
||||
modems_response.errors[0].error
|
||||
if modems_response.errors
|
||||
else "Unknown API error"
|
||||
)
|
||||
raise UpdateFailed(f"Error communicating with device: {error_message}")
|
||||
|
||||
# Return only modems which are online
|
||||
modem_data: dict[str, Any] = {}
|
||||
if modems_response.data:
|
||||
|
||||
@@ -36,7 +36,7 @@ rules:
|
||||
integration-owner: done
|
||||
log-when-unavailable: todo
|
||||
parallel-updates: done
|
||||
reauthentication-flow: todo
|
||||
reauthentication-flow: done
|
||||
test-coverage: todo
|
||||
|
||||
# Gold
|
||||
|
||||
@@ -23,6 +23,18 @@
|
||||
"description": "A Teltonika device ({name}) was discovered at {host}. Enter the credentials to add it to Home Assistant.",
|
||||
"title": "Discovered Teltonika device"
|
||||
},
|
||||
"reauth_confirm": {
|
||||
"data": {
|
||||
"password": "[%key:common::config_flow::data::password%]",
|
||||
"username": "[%key:common::config_flow::data::username%]"
|
||||
},
|
||||
"data_description": {
|
||||
"password": "[%key:component::teltonika::config::step::dhcp_confirm::data_description::password%]",
|
||||
"username": "[%key:component::teltonika::config::step::dhcp_confirm::data_description::username%]"
|
||||
},
|
||||
"description": "Update the credentials for {name}. The current host is {host}.",
|
||||
"title": "Authentication failed for {name}"
|
||||
},
|
||||
"user": {
|
||||
"data": {
|
||||
"host": "[%key:common::config_flow::data::host%]",
|
||||
|
||||
@@ -90,6 +90,7 @@ def mock_modems() -> Generator[AsyncMock]:
|
||||
ModemStatusFull(**modem) for modem in device_data["modems_data"]
|
||||
]
|
||||
mock_modems_instance.get_status.return_value = response_mock
|
||||
response_mock.success = True
|
||||
|
||||
# Mock is_online to return True for the modem
|
||||
mock_modems_class.is_online = MagicMock(return_value=True)
|
||||
|
||||
@@ -406,3 +406,115 @@ async def test_validate_credentials_false(
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["errors"] == {"base": "invalid_auth"}
|
||||
|
||||
|
||||
async def test_reauth_flow_success(
|
||||
hass: HomeAssistant,
|
||||
mock_teltasync_client: MagicMock,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Test successful reauth flow."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
|
||||
result = await mock_config_entry.start_reauth_flow(hass)
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_USERNAME: "admin",
|
||||
CONF_PASSWORD: "new_password",
|
||||
},
|
||||
)
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "reauth_successful"
|
||||
assert mock_config_entry.data[CONF_USERNAME] == "admin"
|
||||
assert mock_config_entry.data[CONF_PASSWORD] == "new_password"
|
||||
assert mock_config_entry.data[CONF_HOST] == "192.168.1.1"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("side_effect", "expected_error"),
|
||||
[
|
||||
(TeltonikaAuthenticationError("Invalid credentials"), "invalid_auth"),
|
||||
(TeltonikaConnectionError("Connection failed"), "cannot_connect"),
|
||||
(ValueError("Unexpected error"), "unknown"),
|
||||
],
|
||||
ids=["invalid_auth", "cannot_connect", "unexpected_exception"],
|
||||
)
|
||||
async def test_reauth_flow_errors_with_recovery(
|
||||
hass: HomeAssistant,
|
||||
mock_teltasync_client: MagicMock,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
side_effect: Exception,
|
||||
expected_error: str,
|
||||
) -> None:
|
||||
"""Test reauth flow error handling with successful recovery."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
result = await mock_config_entry.start_reauth_flow(hass)
|
||||
|
||||
mock_teltasync_client.get_device_info.side_effect = side_effect
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_USERNAME: "admin",
|
||||
CONF_PASSWORD: "bad_password",
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["errors"] == {"base": expected_error}
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
|
||||
mock_teltasync_client.get_device_info.side_effect = None
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_USERNAME: "admin",
|
||||
CONF_PASSWORD: "new_password",
|
||||
},
|
||||
)
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "reauth_successful"
|
||||
assert mock_config_entry.data[CONF_USERNAME] == "admin"
|
||||
assert mock_config_entry.data[CONF_PASSWORD] == "new_password"
|
||||
assert mock_config_entry.data[CONF_HOST] == "192.168.1.1"
|
||||
|
||||
|
||||
async def test_reauth_flow_wrong_account(
|
||||
hass: HomeAssistant,
|
||||
mock_teltasync_client: MagicMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Test reauth flow aborts when device serial doesn't match."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
result = await mock_config_entry.start_reauth_flow(hass)
|
||||
|
||||
device_info = MagicMock()
|
||||
device_info.device_name = "RUTX50 Different"
|
||||
device_info.device_identifier = "DIFFERENT1234567890"
|
||||
mock_teltasync_client.get_device_info = AsyncMock(return_value=device_info)
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_USERNAME: "admin",
|
||||
CONF_PASSWORD: "password",
|
||||
},
|
||||
)
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "wrong_account"
|
||||
|
||||
@@ -3,10 +3,15 @@
|
||||
from datetime import timedelta
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from aiohttp import ClientResponseError
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
from teltasync import TeltonikaConnectionError
|
||||
from teltasync import TeltonikaAuthenticationError, TeltonikaConnectionError
|
||||
from teltasync.error_codes import TeltonikaErrorCode
|
||||
|
||||
from homeassistant.components.teltonika.const import DOMAIN
|
||||
from homeassistant.config_entries import SOURCE_REAUTH
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
|
||||
@@ -91,3 +96,95 @@ async def test_sensor_update_failure_and_recovery(
|
||||
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
||||
assert state is not None
|
||||
assert state.state == "-63"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("side_effect", "expect_reauth"),
|
||||
[
|
||||
(TeltonikaAuthenticationError("Invalid credentials"), True),
|
||||
(
|
||||
ClientResponseError(
|
||||
request_info=MagicMock(),
|
||||
history=(),
|
||||
status=401,
|
||||
message="Unauthorized",
|
||||
headers={},
|
||||
),
|
||||
True,
|
||||
),
|
||||
(
|
||||
ClientResponseError(
|
||||
request_info=MagicMock(),
|
||||
history=(),
|
||||
status=500,
|
||||
message="Server error",
|
||||
headers={},
|
||||
),
|
||||
False,
|
||||
),
|
||||
],
|
||||
ids=["auth_exception", "http_auth_error", "http_non_auth_error"],
|
||||
)
|
||||
async def test_sensor_update_exception_paths(
|
||||
hass: HomeAssistant,
|
||||
mock_modems: AsyncMock,
|
||||
init_integration: MockConfigEntry,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
side_effect: Exception,
|
||||
expect_reauth: bool,
|
||||
) -> None:
|
||||
"""Test auth and non-auth exceptions during updates."""
|
||||
mock_modems.get_status.side_effect = side_effect
|
||||
|
||||
freezer.tick(timedelta(seconds=31))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
||||
assert state is not None
|
||||
assert state.state == "unavailable"
|
||||
|
||||
has_reauth = any(
|
||||
flow["handler"] == DOMAIN and flow["context"]["source"] == SOURCE_REAUTH
|
||||
for flow in hass.config_entries.flow.async_progress()
|
||||
)
|
||||
assert has_reauth is expect_reauth
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("error_code", "expect_reauth"),
|
||||
[
|
||||
(TeltonikaErrorCode.UNAUTHORIZED_ACCESS, True),
|
||||
(999, False),
|
||||
],
|
||||
ids=["api_auth_error", "api_non_auth_error"],
|
||||
)
|
||||
async def test_sensor_update_unsuccessful_response_paths(
|
||||
hass: HomeAssistant,
|
||||
mock_modems: AsyncMock,
|
||||
init_integration: MockConfigEntry,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
error_code: int,
|
||||
expect_reauth: bool,
|
||||
) -> None:
|
||||
"""Test unsuccessful API response handling."""
|
||||
mock_modems.get_status.side_effect = None
|
||||
mock_modems.get_status.return_value = MagicMock(
|
||||
success=False,
|
||||
data=None,
|
||||
errors=[MagicMock(code=error_code, error="API error")],
|
||||
)
|
||||
|
||||
freezer.tick(timedelta(seconds=31))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
||||
assert state is not None
|
||||
assert state.state == "unavailable"
|
||||
|
||||
has_reauth = any(
|
||||
flow["handler"] == DOMAIN and flow["context"]["source"] == SOURCE_REAUTH
|
||||
for flow in hass.config_entries.flow.async_progress()
|
||||
)
|
||||
assert has_reauth is expect_reauth
|
||||
|
||||
Reference in New Issue
Block a user