From bd6b8a812cc94071d659ab8fc0da507451caa906 Mon Sep 17 00:00:00 2001 From: Karl Beecken Date: Mon, 23 Feb 2026 13:07:19 +0100 Subject: [PATCH] Teltonika integration: add reauth config flow (#163712) --- .../components/teltonika/config_flow.py | 60 ++++++++++ .../components/teltonika/coordinator.py | 43 ++++++- .../components/teltonika/quality_scale.yaml | 2 +- .../components/teltonika/strings.json | 12 ++ tests/components/teltonika/conftest.py | 1 + .../components/teltonika/test_config_flow.py | 112 ++++++++++++++++++ tests/components/teltonika/test_sensor.py | 99 +++++++++++++++- 7 files changed, 321 insertions(+), 8 deletions(-) diff --git a/homeassistant/components/teltonika/config_flow.py b/homeassistant/components/teltonika/config_flow.py index 3af1d28620c..2d6f06bc35d 100644 --- a/homeassistant/components/teltonika/config_flow.py +++ b/homeassistant/components/teltonika/config_flow.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Mapping import logging from typing import Any @@ -126,6 +127,65 @@ class TeltonikaConfigFlow(ConfigFlow, domain=DOMAIN): step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors ) + async def async_step_reauth( + self, entry_data: Mapping[str, Any] + ) -> ConfigFlowResult: + """Handle reauth when authentication fails.""" + return await self.async_step_reauth_confirm() + + async def async_step_reauth_confirm( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle reauth confirmation.""" + errors: dict[str, str] = {} + reauth_entry = self._get_reauth_entry() + + if user_input is not None: + data = { + CONF_HOST: reauth_entry.data[CONF_HOST], + CONF_USERNAME: user_input[CONF_USERNAME], + CONF_PASSWORD: user_input[CONF_PASSWORD], + CONF_VERIFY_SSL: reauth_entry.data.get(CONF_VERIFY_SSL, False), + } + try: + # Validate new credentials against the configured host + info = await validate_input(self.hass, data) + except CannotConnect: + errors["base"] = "cannot_connect" + except InvalidAuth: + errors["base"] = "invalid_auth" + except Exception: + _LOGGER.exception("Unexpected exception during reauth") + errors["base"] = "unknown" + else: + # Verify reauth is for the same device + await self.async_set_unique_id(info["device_id"]) + self._abort_if_unique_id_mismatch(reason="wrong_account") + + return self.async_update_reload_and_abort( + reauth_entry, + data_updates=user_input, + ) + + reauth_schema = vol.Schema( + { + vol.Required(CONF_USERNAME): str, + vol.Required(CONF_PASSWORD): str, + } + ) + + suggested = {**reauth_entry.data, **(user_input or {})} + + return self.async_show_form( + step_id="reauth_confirm", + data_schema=self.add_suggested_values_to_schema(reauth_schema, suggested), + errors=errors, + description_placeholders={ + "name": reauth_entry.title, + "host": reauth_entry.data[CONF_HOST], + }, + ) + async def async_step_dhcp( self, discovery_info: DhcpServiceInfo ) -> ConfigFlowResult: diff --git a/homeassistant/components/teltonika/coordinator.py b/homeassistant/components/teltonika/coordinator.py index 0604ca4cd54..7d1a614d141 100644 --- a/homeassistant/components/teltonika/coordinator.py +++ b/homeassistant/components/teltonika/coordinator.py @@ -8,10 +8,11 @@ from typing import TYPE_CHECKING, Any from aiohttp import ClientResponseError, ContentTypeError from teltasync import Teltasync, TeltonikaAuthenticationError, TeltonikaConnectionError +from teltasync.error_codes import TeltonikaErrorCode from teltasync.modems import Modems from homeassistant.core import HomeAssistant -from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady +from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed @@ -23,6 +24,13 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) SCAN_INTERVAL = timedelta(seconds=30) +AUTH_ERROR_CODES = frozenset( + { + TeltonikaErrorCode.UNAUTHORIZED_ACCESS, + TeltonikaErrorCode.LOGIN_FAILED, + TeltonikaErrorCode.INVALID_JWT_TOKEN, + } +) class TeltonikaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): @@ -54,12 +62,12 @@ class TeltonikaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): await self.client.get_device_info() system_info_response = await self.client.get_system_info() except TeltonikaAuthenticationError as err: - raise ConfigEntryError(f"Authentication failed: {err}") from err + raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err except (ClientResponseError, ContentTypeError) as err: - if isinstance(err, ClientResponseError) and err.status in (401, 403): - raise ConfigEntryError(f"Authentication failed: {err}") from err - if isinstance(err, ContentTypeError) and err.status == 403: - raise ConfigEntryError(f"Authentication failed: {err}") from err + if (isinstance(err, ClientResponseError) and err.status in (401, 403)) or ( + isinstance(err, ContentTypeError) and err.status == 403 + ): + raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err raise ConfigEntryNotReady(f"Failed to connect to device: {err}") from err except TeltonikaConnectionError as err: raise ConfigEntryNotReady(f"Failed to connect to device: {err}") from err @@ -81,9 +89,32 @@ class TeltonikaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): try: # Get modems data using the teltasync library modems_response = await modems.get_status() + except TeltonikaAuthenticationError as err: + raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err + except (ClientResponseError, ContentTypeError) as err: + if (isinstance(err, ClientResponseError) and err.status in (401, 403)) or ( + isinstance(err, ContentTypeError) and err.status == 403 + ): + raise ConfigEntryAuthFailed(f"Authentication failed: {err}") from err + raise UpdateFailed(f"Error communicating with device: {err}") from err except TeltonikaConnectionError as err: raise UpdateFailed(f"Error communicating with device: {err}") from err + if not modems_response.success: + if modems_response.errors and any( + error.code in AUTH_ERROR_CODES for error in modems_response.errors + ): + raise ConfigEntryAuthFailed( + "Authentication failed: unauthorized access" + ) + + error_message = ( + modems_response.errors[0].error + if modems_response.errors + else "Unknown API error" + ) + raise UpdateFailed(f"Error communicating with device: {error_message}") + # Return only modems which are online modem_data: dict[str, Any] = {} if modems_response.data: diff --git a/homeassistant/components/teltonika/quality_scale.yaml b/homeassistant/components/teltonika/quality_scale.yaml index 329aa7f7b78..c6b7d6b23c7 100644 --- a/homeassistant/components/teltonika/quality_scale.yaml +++ b/homeassistant/components/teltonika/quality_scale.yaml @@ -36,7 +36,7 @@ rules: integration-owner: done log-when-unavailable: todo parallel-updates: done - reauthentication-flow: todo + reauthentication-flow: done test-coverage: todo # Gold diff --git a/homeassistant/components/teltonika/strings.json b/homeassistant/components/teltonika/strings.json index 954f648f2dd..f775e620035 100644 --- a/homeassistant/components/teltonika/strings.json +++ b/homeassistant/components/teltonika/strings.json @@ -23,6 +23,18 @@ "description": "A Teltonika device ({name}) was discovered at {host}. Enter the credentials to add it to Home Assistant.", "title": "Discovered Teltonika device" }, + "reauth_confirm": { + "data": { + "password": "[%key:common::config_flow::data::password%]", + "username": "[%key:common::config_flow::data::username%]" + }, + "data_description": { + "password": "[%key:component::teltonika::config::step::dhcp_confirm::data_description::password%]", + "username": "[%key:component::teltonika::config::step::dhcp_confirm::data_description::username%]" + }, + "description": "Update the credentials for {name}. The current host is {host}.", + "title": "Authentication failed for {name}" + }, "user": { "data": { "host": "[%key:common::config_flow::data::host%]", diff --git a/tests/components/teltonika/conftest.py b/tests/components/teltonika/conftest.py index db90e8b8230..f33293847cb 100644 --- a/tests/components/teltonika/conftest.py +++ b/tests/components/teltonika/conftest.py @@ -90,6 +90,7 @@ def mock_modems() -> Generator[AsyncMock]: ModemStatusFull(**modem) for modem in device_data["modems_data"] ] mock_modems_instance.get_status.return_value = response_mock + response_mock.success = True # Mock is_online to return True for the modem mock_modems_class.is_online = MagicMock(return_value=True) diff --git a/tests/components/teltonika/test_config_flow.py b/tests/components/teltonika/test_config_flow.py index f6e6b605409..582de543fcb 100644 --- a/tests/components/teltonika/test_config_flow.py +++ b/tests/components/teltonika/test_config_flow.py @@ -406,3 +406,115 @@ async def test_validate_credentials_false( assert result["type"] is FlowResultType.FORM assert result["errors"] == {"base": "invalid_auth"} + + +async def test_reauth_flow_success( + hass: HomeAssistant, + mock_teltasync_client: MagicMock, + mock_setup_entry: AsyncMock, + mock_config_entry: MockConfigEntry, +) -> None: + """Test successful reauth flow.""" + mock_config_entry.add_to_hass(hass) + + result = await mock_config_entry.start_reauth_flow(hass) + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "reauth_confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_USERNAME: "admin", + CONF_PASSWORD: "new_password", + }, + ) + + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "reauth_successful" + assert mock_config_entry.data[CONF_USERNAME] == "admin" + assert mock_config_entry.data[CONF_PASSWORD] == "new_password" + assert mock_config_entry.data[CONF_HOST] == "192.168.1.1" + + +@pytest.mark.parametrize( + ("side_effect", "expected_error"), + [ + (TeltonikaAuthenticationError("Invalid credentials"), "invalid_auth"), + (TeltonikaConnectionError("Connection failed"), "cannot_connect"), + (ValueError("Unexpected error"), "unknown"), + ], + ids=["invalid_auth", "cannot_connect", "unexpected_exception"], +) +async def test_reauth_flow_errors_with_recovery( + hass: HomeAssistant, + mock_teltasync_client: MagicMock, + mock_setup_entry: AsyncMock, + mock_config_entry: MockConfigEntry, + side_effect: Exception, + expected_error: str, +) -> None: + """Test reauth flow error handling with successful recovery.""" + mock_config_entry.add_to_hass(hass) + result = await mock_config_entry.start_reauth_flow(hass) + + mock_teltasync_client.get_device_info.side_effect = side_effect + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_USERNAME: "admin", + CONF_PASSWORD: "bad_password", + }, + ) + + assert result["type"] is FlowResultType.FORM + assert result["errors"] == {"base": expected_error} + assert result["step_id"] == "reauth_confirm" + + mock_teltasync_client.get_device_info.side_effect = None + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_USERNAME: "admin", + CONF_PASSWORD: "new_password", + }, + ) + + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "reauth_successful" + assert mock_config_entry.data[CONF_USERNAME] == "admin" + assert mock_config_entry.data[CONF_PASSWORD] == "new_password" + assert mock_config_entry.data[CONF_HOST] == "192.168.1.1" + + +async def test_reauth_flow_wrong_account( + hass: HomeAssistant, + mock_teltasync_client: MagicMock, + mock_config_entry: MockConfigEntry, +) -> None: + """Test reauth flow aborts when device serial doesn't match.""" + mock_config_entry.add_to_hass(hass) + result = await mock_config_entry.start_reauth_flow(hass) + + device_info = MagicMock() + device_info.device_name = "RUTX50 Different" + device_info.device_identifier = "DIFFERENT1234567890" + mock_teltasync_client.get_device_info = AsyncMock(return_value=device_info) + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_USERNAME: "admin", + CONF_PASSWORD: "password", + }, + ) + + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "wrong_account" diff --git a/tests/components/teltonika/test_sensor.py b/tests/components/teltonika/test_sensor.py index 1d7b1b18d61..65c306c9577 100644 --- a/tests/components/teltonika/test_sensor.py +++ b/tests/components/teltonika/test_sensor.py @@ -3,10 +3,15 @@ from datetime import timedelta from unittest.mock import AsyncMock, MagicMock +from aiohttp import ClientResponseError from freezegun.api import FrozenDateTimeFactory +import pytest from syrupy.assertion import SnapshotAssertion -from teltasync import TeltonikaConnectionError +from teltasync import TeltonikaAuthenticationError, TeltonikaConnectionError +from teltasync.error_codes import TeltonikaErrorCode +from homeassistant.components.teltonika.const import DOMAIN +from homeassistant.config_entries import SOURCE_REAUTH from homeassistant.core import HomeAssistant from homeassistant.helpers import entity_registry as er @@ -91,3 +96,95 @@ async def test_sensor_update_failure_and_recovery( state = hass.states.get("sensor.rutx50_test_internal_modem_rssi") assert state is not None assert state.state == "-63" + + +@pytest.mark.parametrize( + ("side_effect", "expect_reauth"), + [ + (TeltonikaAuthenticationError("Invalid credentials"), True), + ( + ClientResponseError( + request_info=MagicMock(), + history=(), + status=401, + message="Unauthorized", + headers={}, + ), + True, + ), + ( + ClientResponseError( + request_info=MagicMock(), + history=(), + status=500, + message="Server error", + headers={}, + ), + False, + ), + ], + ids=["auth_exception", "http_auth_error", "http_non_auth_error"], +) +async def test_sensor_update_exception_paths( + hass: HomeAssistant, + mock_modems: AsyncMock, + init_integration: MockConfigEntry, + freezer: FrozenDateTimeFactory, + side_effect: Exception, + expect_reauth: bool, +) -> None: + """Test auth and non-auth exceptions during updates.""" + mock_modems.get_status.side_effect = side_effect + + freezer.tick(timedelta(seconds=31)) + async_fire_time_changed(hass) + await hass.async_block_till_done() + + state = hass.states.get("sensor.rutx50_test_internal_modem_rssi") + assert state is not None + assert state.state == "unavailable" + + has_reauth = any( + flow["handler"] == DOMAIN and flow["context"]["source"] == SOURCE_REAUTH + for flow in hass.config_entries.flow.async_progress() + ) + assert has_reauth is expect_reauth + + +@pytest.mark.parametrize( + ("error_code", "expect_reauth"), + [ + (TeltonikaErrorCode.UNAUTHORIZED_ACCESS, True), + (999, False), + ], + ids=["api_auth_error", "api_non_auth_error"], +) +async def test_sensor_update_unsuccessful_response_paths( + hass: HomeAssistant, + mock_modems: AsyncMock, + init_integration: MockConfigEntry, + freezer: FrozenDateTimeFactory, + error_code: int, + expect_reauth: bool, +) -> None: + """Test unsuccessful API response handling.""" + mock_modems.get_status.side_effect = None + mock_modems.get_status.return_value = MagicMock( + success=False, + data=None, + errors=[MagicMock(code=error_code, error="API error")], + ) + + freezer.tick(timedelta(seconds=31)) + async_fire_time_changed(hass) + await hass.async_block_till_done() + + state = hass.states.get("sensor.rutx50_test_internal_modem_rssi") + assert state is not None + assert state.state == "unavailable" + + has_reauth = any( + flow["handler"] == DOMAIN and flow["context"]["source"] == SOURCE_REAUTH + for flow in hass.config_entries.flow.async_progress() + ) + assert has_reauth is expect_reauth