Support for direct modbus connection to nibe pumps (#80557)

* Initial support for modbus

* Complete test coverage of config flow

* Bump to 1.1.0 with corrected numbers

* Handle missing mapping for reset-alarm

* Fixup type checks after library bump

* Adjust coil number

* Move word_swap to nibegw

* Adjust to 1.1.2 with fixes

* Add series property

* Add S series models

* Correct test for invalid host

* Apply suggestions from code review

Co-authored-by: J. Nick Koston <nick@koston.org>

* Move some things into library

* Adjust strings somewhat

* Correct black

* Correct test after validation change

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Joakim Plate 2022-11-08 10:49:47 +01:00 committed by GitHub
parent 318122fe53
commit fc0e0bf099
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 394 additions and 141 deletions

View File

@ -10,10 +10,10 @@ from typing import Any, Generic, TypeVar
from nibe.coil import Coil
from nibe.connection import Connection
from nibe.connection.modbus import Modbus
from nibe.connection.nibegw import NibeGW
from nibe.exceptions import CoilNotFoundException, CoilReadException
from nibe.heatpump import HeatPump, Model
from tenacity import RetryError, retry, retry_if_exception_type, stop_after_attempt
from nibe.heatpump import HeatPump, Model, Series
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
@ -34,8 +34,11 @@ from homeassistant.helpers.update_coordinator import (
from .const import (
CONF_CONNECTION_TYPE,
CONF_CONNECTION_TYPE_MODBUS,
CONF_CONNECTION_TYPE_NIBEGW,
CONF_LISTENING_PORT,
CONF_MODBUS_UNIT,
CONF_MODBUS_URL,
CONF_REMOTE_READ_PORT,
CONF_REMOTE_WRITE_PORT,
CONF_WORD_SWAP,
@ -57,12 +60,13 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Nibe Heat Pump from a config entry."""
heatpump = HeatPump(Model[entry.data[CONF_MODEL]])
heatpump.word_swap = entry.data[CONF_WORD_SWAP]
await hass.async_add_executor_job(heatpump.initialize)
await heatpump.initialize()
connection: Connection
connection_type = entry.data[CONF_CONNECTION_TYPE]
if connection_type == CONF_CONNECTION_TYPE_NIBEGW:
heatpump.word_swap = entry.data[CONF_WORD_SWAP]
connection = NibeGW(
heatpump,
entry.data[CONF_IP_ADDRESS],
@ -70,13 +74,22 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
entry.data[CONF_REMOTE_WRITE_PORT],
listening_port=entry.data[CONF_LISTENING_PORT],
)
elif connection_type == CONF_CONNECTION_TYPE_MODBUS:
connection = Modbus(
heatpump, entry.data[CONF_MODBUS_URL], entry.data[CONF_MODBUS_UNIT]
)
else:
raise HomeAssistantError(f"Connection type {connection_type} is not supported.")
await connection.start()
assert heatpump.model
async def _async_stop(_):
await connection.stop()
entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, connection.stop)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop)
)
coordinator = Coordinator(hass, heatpump, connection)
@ -184,6 +197,11 @@ class Coordinator(ContextCoordinator[dict[int, Coil], int]):
self.seed[coil.address] = coil
self.async_update_context_listeners([coil.address])
@property
def series(self) -> Series:
"""Return which series of pump we are connected to."""
return self.heatpump.series
@property
def coils(self) -> list[Coil]:
"""Return the full coil database."""
@ -201,8 +219,8 @@ class Coordinator(ContextCoordinator[dict[int, Coil], int]):
def get_coil_value(self, coil: Coil) -> int | str | float | None:
"""Return a coil with data and check for validity."""
if coil := self.data.get(coil.address):
return coil.value
if coil_with_data := self.data.get(coil.address):
return coil_with_data.value
return None
def get_coil_float(self, coil: Coil) -> float | None:
@ -228,33 +246,29 @@ class Coordinator(ContextCoordinator[dict[int, Coil], int]):
self.task = None
async def _async_update_data_internal(self) -> dict[int, Coil]:
@retry(
retry=retry_if_exception_type(CoilReadException),
stop=stop_after_attempt(COIL_READ_RETRIES),
)
async def read_coil(coil: Coil):
return await self.connection.read_coil(coil)
result: dict[int, Coil] = {}
for address in self.context_callbacks.keys():
if seed := self.seed.pop(address, None):
self.logger.debug("Skipping seeded coil: %d", address)
result[address] = seed
continue
def _get_coils() -> Iterable[Coil]:
for address in sorted(self.context_callbacks.keys()):
if seed := self.seed.pop(address, None):
self.logger.debug("Skipping seeded coil: %d", address)
result[address] = seed
continue
try:
coil = self.heatpump.get_coil_by_address(address)
except CoilNotFoundException as exception:
self.logger.debug("Skipping missing coil: %s", exception)
continue
try:
coil = self.heatpump.get_coil_by_address(address)
except CoilNotFoundException as exception:
self.logger.debug("Skipping missing coil: %s", exception)
continue
yield coil
try:
result[coil.address] = await read_coil(coil)
except (CoilReadException, RetryError) as exception:
raise UpdateFailed(f"Failed to update: {exception}") from exception
self.seed.pop(coil.address, None)
try:
async for coil in self.connection.read_coils(_get_coils()):
result[coil.address] = coil
self.seed.pop(coil.address, None)
except CoilReadException as exception:
raise UpdateFailed(f"Failed to update: {exception}") from exception
return result

View File

@ -1,14 +1,21 @@
"""Config flow for Nibe Heat Pump integration."""
from __future__ import annotations
import errno
from socket import gaierror
from typing import Any
from nibe.connection.modbus import Modbus
from nibe.connection.nibegw import NibeGW
from nibe.exceptions import CoilNotFoundException, CoilReadException, CoilWriteException
from nibe.exceptions import (
AddressInUseException,
CoilNotFoundException,
CoilReadException,
CoilReadSendException,
CoilWriteException,
CoilWriteSendException,
)
from nibe.heatpump import HeatPump, Model
import voluptuous as vol
import yarl
from homeassistant import config_entries
from homeassistant.const import CONF_IP_ADDRESS, CONF_MODEL
@ -18,8 +25,11 @@ from homeassistant.helpers import selector
from .const import (
CONF_CONNECTION_TYPE,
CONF_CONNECTION_TYPE_MODBUS,
CONF_CONNECTION_TYPE_NIBEGW,
CONF_LISTENING_PORT,
CONF_MODBUS_UNIT,
CONF_MODBUS_URL,
CONF_REMOTE_READ_PORT,
CONF_REMOTE_WRITE_PORT,
CONF_WORD_SWAP,
@ -36,7 +46,7 @@ PORT_SELECTOR = vol.All(
vol.Coerce(int),
)
STEP_USER_DATA_SCHEMA = vol.Schema(
STEP_NIBEGW_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_MODEL): vol.In(list(Model.__members__)),
vol.Required(CONF_IP_ADDRESS): selector.TextSelector(),
@ -47,6 +57,22 @@ STEP_USER_DATA_SCHEMA = vol.Schema(
)
STEP_MODBUS_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_MODEL): vol.In(list(Model.__members__)),
vol.Required(CONF_MODBUS_URL): selector.TextSelector(),
vol.Required(CONF_MODBUS_UNIT, default=0): vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=0, step=1, mode=selector.NumberSelectorMode.BOX
)
),
vol.Coerce(int),
),
}
)
class FieldError(Exception):
"""Field with invalid data."""
@ -57,11 +83,13 @@ class FieldError(Exception):
self.error = error
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
async def validate_nibegw_input(
hass: HomeAssistant, data: dict[str, Any]
) -> tuple[str, dict[str, Any]]:
"""Validate the user input allows us to connect."""
heatpump = HeatPump(Model[data[CONF_MODEL]])
heatpump.initialize()
await heatpump.initialize()
connection = NibeGW(
heatpump,
@ -73,24 +101,17 @@ async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str,
try:
await connection.start()
except OSError as exception:
if exception.errno == errno.EADDRINUSE:
raise FieldError(
"Address already in use", "listening_port", "address_in_use"
) from exception
raise
except AddressInUseException as exception:
raise FieldError(
"Address already in use", "listening_port", "address_in_use"
) from exception
try:
coil = heatpump.get_coil_by_name("modbus40-word-swap-48852")
coil = await connection.read_coil(coil)
word_swap = coil.value == "ON"
coil = await connection.write_coil(coil)
except gaierror as exception:
raise FieldError(str(exception), "ip_address", "address") from exception
await connection.verify_connectivity()
except (CoilReadSendException, CoilWriteSendException) as exception:
raise FieldError(str(exception), CONF_IP_ADDRESS, "address") from exception
except CoilNotFoundException as exception:
raise FieldError(
"Model selected doesn't seem to support expected coils", "base", "model"
) from exception
raise FieldError("Coils not found", "base", "model") from exception
except CoilReadException as exception:
raise FieldError("Timeout on read from pump", "base", "read") from exception
except CoilWriteException as exception:
@ -98,9 +119,49 @@ async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str,
finally:
await connection.stop()
return {
"title": f"{data[CONF_MODEL]} at {data[CONF_IP_ADDRESS]}",
CONF_WORD_SWAP: word_swap,
return f"{data[CONF_MODEL]} at {data[CONF_IP_ADDRESS]}", {
**data,
CONF_WORD_SWAP: heatpump.word_swap,
CONF_CONNECTION_TYPE: CONF_CONNECTION_TYPE_NIBEGW,
}
async def validate_modbus_input(
hass: HomeAssistant, data: dict[str, Any]
) -> tuple[str, dict[str, Any]]:
"""Validate the user input allows us to connect."""
heatpump = HeatPump(Model[data[CONF_MODEL]])
await heatpump.initialize()
try:
connection = Modbus(
heatpump,
data[CONF_MODBUS_URL],
data[CONF_MODBUS_UNIT],
)
except ValueError as exc:
raise FieldError("Not a valid modbus url", CONF_MODBUS_URL, "url") from exc
await connection.start()
try:
await connection.verify_connectivity()
except (CoilReadSendException, CoilWriteSendException) as exception:
raise FieldError(str(exception), CONF_MODBUS_URL, "address") from exception
except CoilNotFoundException as exception:
raise FieldError("Coils not found", "base", "model") from exception
except CoilReadException as exception:
raise FieldError("Timeout on read from pump", "base", "read") from exception
except CoilWriteException as exception:
raise FieldError("Timeout on writing to pump", "base", "write") from exception
finally:
await connection.stop()
host = yarl.URL(data[CONF_MODBUS_URL]).host
return f"{data[CONF_MODEL]} at {host}", {
**data,
CONF_CONNECTION_TYPE: CONF_CONNECTION_TYPE_MODBUS,
}
@ -113,15 +174,21 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
return self.async_show_menu(step_id="user", menu_options=["modbus", "nibegw"])
async def async_step_modbus(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the modbus step."""
if user_input is None:
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
step_id="modbus", data_schema=STEP_MODBUS_DATA_SCHEMA
)
errors = {}
try:
info = await validate_input(self.hass, user_input)
title, data = await validate_modbus_input(self.hass, user_input)
except FieldError as exception:
LOGGER.debug("Validation error %s", exception)
errors[exception.field] = exception.error
@ -129,13 +196,34 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
data = {
**user_input,
CONF_WORD_SWAP: info[CONF_WORD_SWAP],
CONF_CONNECTION_TYPE: CONF_CONNECTION_TYPE_NIBEGW,
}
return self.async_create_entry(title=info["title"], data=data)
return self.async_create_entry(title=title, data=data)
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
step_id="modbus", data_schema=STEP_MODBUS_DATA_SCHEMA, errors=errors
)
async def async_step_nibegw(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the nibegw step."""
if user_input is None:
return self.async_show_form(
step_id="nibegw", data_schema=STEP_NIBEGW_DATA_SCHEMA
)
errors = {}
try:
title, data = await validate_nibegw_input(self.hass, user_input)
except FieldError as exception:
LOGGER.exception("Validation error")
errors[exception.field] = exception.error
except Exception: # pylint: disable=broad-except
LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
return self.async_create_entry(title=title, data=data)
return self.async_show_form(
step_id="nibegw", data_schema=STEP_NIBEGW_DATA_SCHEMA, errors=errors
)

View File

@ -10,3 +10,6 @@ CONF_REMOTE_WRITE_PORT = "remote_write_port"
CONF_WORD_SWAP = "word_swap"
CONF_CONNECTION_TYPE = "connection_type"
CONF_CONNECTION_TYPE_NIBEGW = "nibegw"
CONF_CONNECTION_TYPE_MODBUS = "modbus"
CONF_MODBUS_URL = "modbus_url"
CONF_MODBUS_UNIT = "modbus_unit"

View File

@ -3,7 +3,7 @@
"name": "Nibe Heat Pump",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/nibe_heatpump",
"requirements": ["nibe==0.5.0", "tenacity==8.0.1"],
"requirements": ["nibe==1.2.0"],
"codeowners": ["@elupus"],
"iot_class": "local_polling"
}

View File

@ -58,6 +58,10 @@ class Number(CoilEntity, NumberEntity):
self._attr_native_value = None
def _async_read_coil(self, coil: Coil) -> None:
if coil.value is None:
self._attr_native_value = None
return
try:
self._attr_native_value = float(coil.value)
except ValueError:

View File

@ -35,11 +35,16 @@ class Select(CoilEntity, SelectEntity):
def __init__(self, coordinator: Coordinator, coil: Coil) -> None:
"""Initialize entity."""
assert coil.mappings
super().__init__(coordinator, coil, ENTITY_ID_FORMAT)
self._attr_options = list(coil.mappings.values())
self._attr_current_option = None
def _async_read_coil(self, coil: Coil) -> None:
if not isinstance(coil.value, str):
self._attr_current_option = None
return
self._attr_current_option = coil.value
async def async_select_option(self, option: str) -> None:

View File

@ -2,8 +2,27 @@
"config": {
"step": {
"user": {
"menu_options": {
"nibegw": "NibeGW",
"modbus": "Modbus"
},
"description": "Pick the connection method to your pump. In general, F-series pumps require a Nibe GW custom accessory, while an S-series pump has Modbus support built-in."
},
"modbus": {
"data": {
"model": "Model of Heat Pump",
"modbus_url": "Modbus URL",
"modbus_unit": "Modbus Unit Identifier"
},
"data_description": {
"modbus_url": "Modbus URL that describes the connection to your Heat Pump or MODBUS40 unit. It should be on the form:\n - `tcp://[HOST]:[PORT]` for Modbus TCP connection\n - `serial://[LOCAL DEVICE]` for a local Modbus RTU connection\n - `rfc2217://[HOST]:[PORT]` for a remote telnet based Modbus RTU connection.",
"modbus_unit": "Unit identification for you Heat Pump. Can usually be left at 0."
}
},
"nibegw": {
"description": "Before attempting to configure the integration, verify that:\n - The NibeGW unit is connected to a heat pump.\n - The MODBUS40 accessory has been enabled in the heat pump configuration.\n - The pump has not gone into an alarm state about missing MODBUS40 accessory.",
"data": {
"model": "Model of Heat Pump",
"ip_address": "Remote address",
"remote_read_port": "Remote read port",
"remote_write_port": "Remote write port",
@ -23,7 +42,8 @@
"address": "Invalid remote address specified. Address must be an IP address or a resolvable hostname.",
"address_in_use": "The selected listening port is already in use on this system.",
"model": "The model selected doesn't seem to support modbus40",
"unknown": "[%key:common::config_flow::error::unknown%]"
"unknown": "[%key:common::config_flow::error::unknown%]",
"url": "The url specified is not a well formed and supported url"
}
}
}

View File

@ -9,13 +9,26 @@
"model": "The model selected doesn't seem to support modbus40",
"read": "Error on read request from pump. Verify your `Remote read port` or `Remote IP address`.",
"unknown": "Unexpected error",
"url": "The url specified is not a well formed and supported url",
"write": "Error on write request to pump. Verify your `Remote write port` or `Remote IP address`."
},
"step": {
"user": {
"modbus": {
"data": {
"modbus_unit": "Modbus Unit Identifier",
"modbus_url": "Modbus URL",
"model": "Model of Heat Pump"
},
"data_description": {
"modbus_unit": "Unit identification for you Heat Pump. Can usually be left at 0.",
"modbus_url": "Modbus URL that describes the connection to your Heat Pump or MODBUS40 unit. It should be on the form:\n - `tcp://[HOST]:[PORT]` for Modbus TCP connection\n - `serial://[LOCAL DEVICE]` for a local Modbus RTU connection\n - `rfc2217://[HOST]:[PORT]` for a remote telnet based Modbus RTU connection."
}
},
"nibegw": {
"data": {
"ip_address": "Remote address",
"listening_port": "Local listening port",
"model": "Model of Heat Pump",
"remote_read_port": "Remote read port",
"remote_write_port": "Remote write port"
},
@ -26,6 +39,13 @@
"remote_write_port": "The port the NibeGW unit is listening for write requests on."
},
"description": "Before attempting to configure the integration, verify that:\n - The NibeGW unit is connected to a heat pump.\n - The MODBUS40 accessory has been enabled in the heat pump configuration.\n - The pump has not gone into an alarm state about missing MODBUS40 accessory."
},
"user": {
"description": "Pick the connection method to your pump. In general, F-series pumps require a Nibe GW custom accessory, while an S-series pump has Modbus support built-in.",
"menu_options": {
"modbus": "Modbus",
"nibegw": "NibeGW"
}
}
}
}

View File

@ -1159,7 +1159,7 @@ nextcord==2.0.0a8
nextdns==1.1.1
# homeassistant.components.nibe_heatpump
nibe==0.5.0
nibe==1.2.0
# homeassistant.components.niko_home_control
niko-home-control==0.2.1
@ -2394,9 +2394,6 @@ temescal==0.5
# homeassistant.components.temper
temperusb==1.6.0
# homeassistant.components.nibe_heatpump
tenacity==8.0.1
# homeassistant.components.tensorflow
# tensorflow==2.5.0

View File

@ -846,7 +846,7 @@ nextcord==2.0.0a8
nextdns==1.1.1
# homeassistant.components.nibe_heatpump
nibe==0.5.0
nibe==1.2.0
# homeassistant.components.nfandroidtv
notifications-android-tv==0.1.5
@ -1652,9 +1652,6 @@ tellduslive==0.10.11
# homeassistant.components.lg_soundbar
temescal==0.5
# homeassistant.components.nibe_heatpump
tenacity==8.0.1
# homeassistant.components.powerwall
tesla-powerwall==0.3.18

View File

@ -1,11 +1,16 @@
"""Test the Nibe Heat Pump config flow."""
import errno
from socket import gaierror
from unittest.mock import Mock, patch
from unittest.mock import AsyncMock, Mock, patch
from nibe.coil import Coil
from nibe.connection import Connection
from nibe.exceptions import CoilNotFoundException, CoilReadException, CoilWriteException
from nibe.exceptions import (
AddressInUseException,
CoilNotFoundException,
CoilReadException,
CoilReadSendException,
CoilWriteException,
)
import pytest
from pytest import fixture
from homeassistant import config_entries
@ -13,7 +18,7 @@ from homeassistant.components.nibe_heatpump import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
MOCK_FLOW_USERDATA = {
MOCK_FLOW_NIBEGW_USERDATA = {
"model": "F1155",
"ip_address": "127.0.0.1",
"listening_port": 9999,
@ -22,13 +27,33 @@ MOCK_FLOW_USERDATA = {
}
@fixture(autouse=True, name="mock_connection")
async def fixture_mock_connection():
MOCK_FLOW_MODBUS_USERDATA = {
"model": "S1155",
"modbus_url": "tcp://127.0.0.1",
"modbus_unit": 0,
}
@fixture(autouse=True, name="mock_connection_constructor")
async def fixture_mock_connection_constructor():
"""Make sure we have a dummy connection."""
mock_constructor = Mock()
with patch(
"homeassistant.components.nibe_heatpump.config_flow.NibeGW", spec=Connection
) as mock_connection:
yield mock_connection
"homeassistant.components.nibe_heatpump.config_flow.NibeGW",
new=mock_constructor,
), patch(
"homeassistant.components.nibe_heatpump.config_flow.Modbus",
new=mock_constructor,
):
yield mock_constructor
@fixture(name="mock_connection")
def fixture_mock_connection(mock_connection_constructor: Mock):
"""Make sure we have a dummy connection."""
mock_connection = AsyncMock(spec=Connection)
mock_connection_constructor.return_value = mock_connection
return mock_connection
@fixture(autouse=True, name="mock_setup_entry")
@ -40,24 +65,38 @@ async def fixture_mock_setup():
yield mock_setup_entry
async def test_form(
hass: HomeAssistant, mock_connection: Mock, mock_setup_entry: Mock
) -> None:
async def _get_connection_form(
hass: HomeAssistant, connection_type: str
) -> FlowResultType:
"""Test we get the form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.MENU
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": connection_type}
)
assert result["type"] == FlowResultType.FORM
assert result["errors"] is None
return result
async def test_nibegw_form(
hass: HomeAssistant, mock_connection: Mock, mock_setup_entry: Mock
) -> None:
"""Test we get the form."""
result = await _get_connection_form(hass, "nibegw")
coil_wordswap = Coil(
48852, "modbus40-word-swap-48852", "Modbus40 Word Swap", "u8", min=0, max=1
)
coil_wordswap.value = "ON"
mock_connection.return_value.read_coil.return_value = coil_wordswap
mock_connection.read_coil.return_value = coil_wordswap
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
result["flow_id"], MOCK_FLOW_NIBEGW_USERDATA
)
await hass.async_block_till_done()
@ -75,109 +114,175 @@ async def test_form(
assert len(mock_setup_entry.mock_calls) == 1
async def test_address_inuse(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle invalid auth."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
async def test_modbus_form(
hass: HomeAssistant, mock_connection: Mock, mock_setup_entry: Mock
) -> None:
"""Test we get the form."""
result = await _get_connection_form(hass, "modbus")
error = OSError()
error.errno = errno.EADDRINUSE
mock_connection.return_value.start.side_effect = error
coil = Coil(
40022, "reset-alarm-40022", "Reset Alarm", "u8", min=0, max=1, write=True
)
coil.value = "ON"
mock_connection.read_coil.return_value = coil
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
result["flow_id"], MOCK_FLOW_MODBUS_USERDATA
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.CREATE_ENTRY
assert result2["title"] == "S1155 at 127.0.0.1"
assert result2["data"] == {
"model": "S1155",
"modbus_url": "tcp://127.0.0.1",
"modbus_unit": 0,
"connection_type": "modbus",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_modbus_invalid_url(
hass: HomeAssistant, mock_connection_constructor: Mock
) -> None:
"""Test we handle invalid auth."""
result = await _get_connection_form(hass, "modbus")
mock_connection_constructor.side_effect = ValueError()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {**MOCK_FLOW_MODBUS_USERDATA, "modbus_url": "invalid://url"}
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"modbus_url": "url"}
async def test_nibegw_address_inuse(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle invalid auth."""
result = await _get_connection_form(hass, "nibegw")
mock_connection.start.side_effect = AddressInUseException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_NIBEGW_USERDATA
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"listening_port": "address_in_use"}
error.errno = errno.EACCES
mock_connection.return_value.start.side_effect = error
mock_connection.start.side_effect = Exception()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
result["flow_id"], MOCK_FLOW_NIBEGW_USERDATA
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "unknown"}
async def test_read_timeout(hass: HomeAssistant, mock_connection: Mock) -> None:
@pytest.mark.parametrize(
"connection_type,data",
(
("nibegw", MOCK_FLOW_NIBEGW_USERDATA),
("modbus", MOCK_FLOW_MODBUS_USERDATA),
),
)
async def test_read_timeout(
hass: HomeAssistant, mock_connection: Mock, connection_type: str, data: dict
) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await _get_connection_form(hass, connection_type)
mock_connection.return_value.read_coil.side_effect = CoilReadException()
mock_connection.verify_connectivity.side_effect = CoilReadException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], data)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "read"}
async def test_write_timeout(hass: HomeAssistant, mock_connection: Mock) -> None:
@pytest.mark.parametrize(
"connection_type,data",
(
("nibegw", MOCK_FLOW_NIBEGW_USERDATA),
("modbus", MOCK_FLOW_MODBUS_USERDATA),
),
)
async def test_write_timeout(
hass: HomeAssistant, mock_connection: Mock, connection_type: str, data: dict
) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await _get_connection_form(hass, connection_type)
mock_connection.return_value.write_coil.side_effect = CoilWriteException()
mock_connection.verify_connectivity.side_effect = CoilWriteException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], data)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "write"}
async def test_unexpected_exception(hass: HomeAssistant, mock_connection: Mock) -> None:
@pytest.mark.parametrize(
"connection_type,data",
(
("nibegw", MOCK_FLOW_NIBEGW_USERDATA),
("modbus", MOCK_FLOW_MODBUS_USERDATA),
),
)
async def test_unexpected_exception(
hass: HomeAssistant, mock_connection: Mock, connection_type: str, data: dict
) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await _get_connection_form(hass, connection_type)
mock_connection.return_value.read_coil.side_effect = Exception()
mock_connection.verify_connectivity.side_effect = Exception()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], data)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "unknown"}
async def test_invalid_host(hass: HomeAssistant, mock_connection: Mock) -> None:
@pytest.mark.parametrize(
"connection_type,data",
(
("nibegw", MOCK_FLOW_NIBEGW_USERDATA),
("modbus", MOCK_FLOW_MODBUS_USERDATA),
),
)
async def test_nibegw_invalid_host(
hass: HomeAssistant, mock_connection: Mock, connection_type: str, data: dict
) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await _get_connection_form(hass, connection_type)
mock_connection.return_value.read_coil.side_effect = gaierror()
mock_connection.verify_connectivity.side_effect = CoilReadSendException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {**MOCK_FLOW_USERDATA, "ip_address": "abcd"}
)
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], data)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"ip_address": "address"}
if connection_type == "nibegw":
assert result2["errors"] == {"ip_address": "address"}
else:
assert result2["errors"] == {"modbus_url": "address"}
async def test_model_missing_coil(hass: HomeAssistant, mock_connection: Mock) -> None:
@pytest.mark.parametrize(
"connection_type,data",
(
("nibegw", MOCK_FLOW_NIBEGW_USERDATA),
("modbus", MOCK_FLOW_MODBUS_USERDATA),
),
)
async def test_model_missing_coil(
hass: HomeAssistant, mock_connection: Mock, connection_type: str, data: dict
) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await _get_connection_form(hass, connection_type)
mock_connection.return_value.read_coil.side_effect = CoilNotFoundException()
mock_connection.verify_connectivity.side_effect = CoilNotFoundException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {**MOCK_FLOW_USERDATA}
)
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], data)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "model"}