100% code coverage for config_flow of dsmr component (#65238)

This commit is contained in:
rhpijnacker 2022-02-07 10:48:33 +01:00 committed by GitHub
parent f05caf451e
commit bb762d5b0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 30 deletions

View File

@ -147,36 +147,6 @@ class DSMRFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Get the options flow for this handler.""" """Get the options flow for this handler."""
return DSMROptionFlowHandler(config_entry) return DSMROptionFlowHandler(config_entry)
def _abort_if_host_port_configured(
self,
port: str,
host: str | None = None,
updates: dict[Any, Any] | None = None,
reload_on_update: bool = True,
) -> FlowResult | None:
"""Test if host and port are already configured."""
for entry in self._async_current_entries():
if entry.data.get(CONF_HOST) == host and entry.data[CONF_PORT] == port:
if updates is not None:
changed = self.hass.config_entries.async_update_entry(
entry, data={**entry.data, **updates}
)
if (
changed
and reload_on_update
and entry.state
in (
config_entries.ConfigEntryState.LOADED,
config_entries.ConfigEntryState.SETUP_RETRY,
)
):
self.hass.async_create_task(
self.hass.config_entries.async_reload(entry.entry_id)
)
return self.async_abort(reason="already_configured")
return None
async def async_step_user( async def async_step_user(
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None
) -> FlowResult: ) -> FlowResult:

View File

@ -1,4 +1,5 @@
"""Test the DSMR config flow.""" """Test the DSMR config flow."""
import asyncio
from itertools import chain, repeat from itertools import chain, repeat
import os import os
from unittest.mock import DEFAULT, AsyncMock, MagicMock, patch, sentinel from unittest.mock import DEFAULT, AsyncMock, MagicMock, patch, sentinel
@ -138,6 +139,45 @@ async def test_setup_5L(com_mock, hass, dsmr_connection_send_validate_fixture):
assert result["data"] == entry_data assert result["data"] == entry_data
@patch("serial.tools.list_ports.comports", return_value=[com_port()])
async def test_setup_5S(com_mock, hass, dsmr_connection_send_validate_fixture):
"""Test we can setup serial."""
port = com_port()
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert result["errors"] is None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"type": "Serial"},
)
assert result["type"] == "form"
assert result["step_id"] == "setup_serial"
assert result["errors"] == {}
with patch("homeassistant.components.dsmr.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"port": port.device, "dsmr_version": "5S"}
)
entry_data = {
"port": port.device,
"dsmr_version": "5S",
"serial_id": None,
"serial_id_gas": None,
}
assert result["type"] == "create_entry"
assert result["title"] == port.device
assert result["data"] == entry_data
@patch("serial.tools.list_ports.comports", return_value=[com_port()]) @patch("serial.tools.list_ports.comports", return_value=[com_port()])
async def test_setup_Q3D(com_mock, hass, dsmr_connection_send_validate_fixture): async def test_setup_Q3D(com_mock, hass, dsmr_connection_send_validate_fixture):
"""Test we can setup serial.""" """Test we can setup serial."""
@ -265,6 +305,48 @@ async def test_setup_serial_fail(com_mock, hass, dsmr_connection_send_validate_f
assert result["errors"] == {"base": "cannot_connect"} assert result["errors"] == {"base": "cannot_connect"}
@patch("serial.tools.list_ports.comports", return_value=[com_port()])
async def test_setup_serial_timeout(
com_mock, hass, dsmr_connection_send_validate_fixture
):
"""Test failed serial connection."""
(connection_factory, transport, protocol) = dsmr_connection_send_validate_fixture
port = com_port()
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
first_timeout_wait_closed = AsyncMock(
return_value=True,
side_effect=chain([asyncio.TimeoutError], repeat(DEFAULT)),
)
protocol.wait_closed = first_timeout_wait_closed
assert result["type"] == "form"
assert result["step_id"] == "user"
assert result["errors"] is None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"type": "Serial"},
)
assert result["type"] == "form"
assert result["step_id"] == "setup_serial"
assert result["errors"] == {}
with patch("homeassistant.components.dsmr.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"port": port.device, "dsmr_version": "2.2"}
)
assert result["type"] == "form"
assert result["step_id"] == "setup_serial"
assert result["errors"] == {"base": "cannot_communicate"}
@patch("serial.tools.list_ports.comports", return_value=[com_port()]) @patch("serial.tools.list_ports.comports", return_value=[com_port()])
async def test_setup_serial_wrong_telegram( async def test_setup_serial_wrong_telegram(
com_mock, hass, dsmr_connection_send_validate_fixture com_mock, hass, dsmr_connection_send_validate_fixture