Refactor ModbusRegisterSensor class to get hub and configuration (#50234)

* refactor ModbusRegisterSensor to match the ModbusSwitch interface

* Please pylint, mypy etc.

* Remove PLATFORM.

Co-authored-by: jan Iversen <jancasacondor@gmail.com>
This commit is contained in:
Yuriy Sannikov 2021-05-26 20:28:14 +03:00 committed by GitHub
parent 09b9218511
commit 6391d75919
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 257 additions and 293 deletions

View File

@ -104,6 +104,7 @@ from .const import (
PLATFORMS, PLATFORMS,
) )
from .modbus import async_modbus_setup from .modbus import async_modbus_setup
from .validators import sensor_schema_validator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -347,7 +348,9 @@ MODBUS_SCHEMA = vol.Schema(
vol.Optional(CONF_CLIMATES): vol.All(cv.ensure_list, [CLIMATE_SCHEMA]), vol.Optional(CONF_CLIMATES): vol.All(cv.ensure_list, [CLIMATE_SCHEMA]),
vol.Optional(CONF_COVERS): vol.All(cv.ensure_list, [COVERS_SCHEMA]), vol.Optional(CONF_COVERS): vol.All(cv.ensure_list, [COVERS_SCHEMA]),
vol.Optional(CONF_LIGHTS): vol.All(cv.ensure_list, [LIGHT_SCHEMA]), vol.Optional(CONF_LIGHTS): vol.All(cv.ensure_list, [LIGHT_SCHEMA]),
vol.Optional(CONF_SENSORS): vol.All(cv.ensure_list, [SENSOR_SCHEMA]), vol.Optional(CONF_SENSORS): vol.All(
cv.ensure_list, [vol.All(SENSOR_SCHEMA, sensor_schema_validator)]
),
vol.Optional(CONF_SWITCHES): vol.All(cv.ensure_list, [SWITCH_SCHEMA]), vol.Optional(CONF_SWITCHES): vol.All(cv.ensure_list, [SWITCH_SCHEMA]),
vol.Optional(CONF_FANS): vol.All(cv.ensure_list, [FAN_SCHEMA]), vol.Optional(CONF_FANS): vol.All(cv.ensure_list, [FAN_SCHEMA]),
} }

View File

@ -3,99 +3,39 @@ from __future__ import annotations
import logging import logging
import struct import struct
from typing import Any
import voluptuous as vol from homeassistant.components.sensor import SensorEntity
from homeassistant.components.sensor import (
DEVICE_CLASSES_SCHEMA,
PLATFORM_SCHEMA,
SensorEntity,
)
from homeassistant.const import ( from homeassistant.const import (
CONF_ADDRESS,
CONF_COUNT, CONF_COUNT,
CONF_DEVICE_CLASS,
CONF_NAME, CONF_NAME,
CONF_OFFSET, CONF_OFFSET,
CONF_SCAN_INTERVAL,
CONF_SENSORS, CONF_SENSORS,
CONF_SLAVE,
CONF_STRUCTURE, CONF_STRUCTURE,
CONF_UNIT_OF_MEASUREMENT, CONF_UNIT_OF_MEASUREMENT,
) )
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import number
from .base_platform import BasePlatform from .base_platform import BasePlatform
from .const import ( from .const import (
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
CONF_DATA_TYPE, CONF_DATA_TYPE,
CONF_HUB,
CONF_INPUT_TYPE,
CONF_PRECISION, CONF_PRECISION,
CONF_REGISTER,
CONF_REGISTER_TYPE,
CONF_REGISTERS,
CONF_REVERSE_ORDER,
CONF_SCALE, CONF_SCALE,
CONF_SWAP, CONF_SWAP,
CONF_SWAP_BYTE, CONF_SWAP_BYTE,
CONF_SWAP_NONE,
CONF_SWAP_WORD, CONF_SWAP_WORD,
CONF_SWAP_WORD_BYTE, CONF_SWAP_WORD_BYTE,
DATA_TYPE_CUSTOM,
DATA_TYPE_FLOAT,
DATA_TYPE_INT,
DATA_TYPE_STRING, DATA_TYPE_STRING,
DATA_TYPE_UINT,
DEFAULT_HUB,
DEFAULT_SCAN_INTERVAL,
DEFAULT_STRUCT_FORMAT,
MODBUS_DOMAIN, MODBUS_DOMAIN,
) )
from .modbus import ModbusHub
PARALLEL_UPDATES = 1 PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_REGISTERS): [
{
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_REGISTER): cv.positive_int,
vol.Optional(CONF_COUNT, default=1): cv.positive_int,
vol.Optional(CONF_DATA_TYPE, default=DATA_TYPE_INT): vol.In(
[
DATA_TYPE_INT,
DATA_TYPE_UINT,
DATA_TYPE_FLOAT,
DATA_TYPE_STRING,
DATA_TYPE_CUSTOM,
]
),
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_HUB, default=DEFAULT_HUB): cv.string,
vol.Optional(CONF_OFFSET, default=0): number,
vol.Optional(CONF_PRECISION, default=0): cv.positive_int,
vol.Optional(
CONF_REGISTER_TYPE, default=CALL_TYPE_REGISTER_HOLDING
): vol.In([CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT]),
vol.Optional(CONF_REVERSE_ORDER, default=False): cv.boolean,
vol.Optional(CONF_SCALE, default=1): number,
vol.Optional(CONF_SLAVE): cv.positive_int,
vol.Optional(CONF_STRUCTURE): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
}
]
}
)
async def async_setup_platform( async def async_setup_platform(
hass: HomeAssistant, hass: HomeAssistant,
config: ConfigType, config: ConfigType,
@ -105,92 +45,15 @@ async def async_setup_platform(
"""Set up the Modbus sensors.""" """Set up the Modbus sensors."""
sensors = [] sensors = []
#  check for old config: if discovery_info is None: # pragma: no cover
if discovery_info is None: return
_LOGGER.warning(
"Sensor configuration is deprecated, will be removed in a future release"
)
discovery_info = {
CONF_NAME: "no name",
CONF_SENSORS: config[CONF_REGISTERS],
}
for entry in discovery_info[CONF_SENSORS]:
entry[CONF_ADDRESS] = entry[CONF_REGISTER]
entry[CONF_INPUT_TYPE] = entry[CONF_REGISTER_TYPE]
del entry[CONF_REGISTER]
del entry[CONF_REGISTER_TYPE]
for entry in discovery_info[CONF_SENSORS]: for entry in discovery_info[CONF_SENSORS]:
if entry[CONF_DATA_TYPE] == DATA_TYPE_STRING: hub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]]
structure = str(entry[CONF_COUNT] * 2) + "s" sensors.append(ModbusRegisterSensor(hub, entry))
elif entry[CONF_DATA_TYPE] != DATA_TYPE_CUSTOM:
try:
structure = f">{DEFAULT_STRUCT_FORMAT[entry[CONF_DATA_TYPE]][entry[CONF_COUNT]]}"
except KeyError:
_LOGGER.error(
"Unable to detect data type for %s sensor, try a custom type",
entry[CONF_NAME],
)
continue
else:
structure = entry.get(CONF_STRUCTURE)
try: if len(sensors) > 0:
size = struct.calcsize(structure) async_add_entities(sensors)
except struct.error as err:
_LOGGER.error("Error in sensor %s structure: %s", entry[CONF_NAME], err)
continue
bytecount = entry[CONF_COUNT] * 2
if bytecount != size:
_LOGGER.error(
"Structure request %d bytes, but %d registers have a size of %d bytes",
size,
entry[CONF_COUNT],
bytecount,
)
continue
if CONF_REVERSE_ORDER in entry:
if entry[CONF_REVERSE_ORDER]:
entry[CONF_SWAP] = CONF_SWAP_WORD
else:
entry[CONF_SWAP] = CONF_SWAP_NONE
del entry[CONF_REVERSE_ORDER]
if entry.get(CONF_SWAP) != CONF_SWAP_NONE:
if entry[CONF_SWAP] == CONF_SWAP_BYTE:
regs_needed = 1
else: # CONF_SWAP_WORD_BYTE, CONF_SWAP_WORD
regs_needed = 2
if (
entry[CONF_COUNT] < regs_needed
or (entry[CONF_COUNT] % regs_needed) != 0
):
_LOGGER.error(
"Error in sensor %s swap(%s) not possible due to count: %d",
entry[CONF_NAME],
entry[CONF_SWAP],
entry[CONF_COUNT],
)
continue
if CONF_HUB in entry:
# from old config!
hub = hass.data[MODBUS_DOMAIN][entry[CONF_HUB]]
else:
hub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]]
if CONF_SCAN_INTERVAL not in entry:
entry[CONF_SCAN_INTERVAL] = DEFAULT_SCAN_INTERVAL
sensors.append(
ModbusRegisterSensor(
hub,
entry,
structure,
)
)
if not sensors:
return
async_add_entities(sensors)
class ModbusRegisterSensor(BasePlatform, RestoreEntity, SensorEntity): class ModbusRegisterSensor(BasePlatform, RestoreEntity, SensorEntity):
@ -198,21 +61,18 @@ class ModbusRegisterSensor(BasePlatform, RestoreEntity, SensorEntity):
def __init__( def __init__(
self, self,
hub, hub: ModbusHub,
entry, entry: dict[str, Any],
structure, ) -> None:
):
"""Initialize the modbus register sensor.""" """Initialize the modbus register sensor."""
super().__init__(hub, entry) super().__init__(hub, entry)
self._register = self._address
self._register_type = self._input_type
self._unit_of_measurement = entry.get(CONF_UNIT_OF_MEASUREMENT) self._unit_of_measurement = entry.get(CONF_UNIT_OF_MEASUREMENT)
self._count = int(entry[CONF_COUNT]) self._count = int(entry[CONF_COUNT])
self._swap = entry[CONF_SWAP] self._swap = entry[CONF_SWAP]
self._scale = entry[CONF_SCALE] self._scale = entry[CONF_SCALE]
self._offset = entry[CONF_OFFSET] self._offset = entry[CONF_OFFSET]
self._precision = entry[CONF_PRECISION] self._precision = entry[CONF_PRECISION]
self._structure = structure self._structure = entry.get(CONF_STRUCTURE)
self._data_type = entry[CONF_DATA_TYPE] self._data_type = entry[CONF_DATA_TYPE]
async def async_added_to_hass(self): async def async_added_to_hass(self):
@ -252,7 +112,7 @@ class ModbusRegisterSensor(BasePlatform, RestoreEntity, SensorEntity):
# remark "now" is a dummy parameter to avoid problems with # remark "now" is a dummy parameter to avoid problems with
# async_track_time_interval # async_track_time_interval
result = await self._hub.async_pymodbus_call( result = await self._hub.async_pymodbus_call(
self._slave, self._register, self._count, self._register_type self._slave, self._address, self._count, self._input_type
) )
if result is None: if result is None:
self._available = False self._available = False

View File

@ -0,0 +1,86 @@
"""Validate Modbus configuration."""
import logging
import struct
from voluptuous import Invalid
from homeassistant.const import CONF_COUNT, CONF_NAME, CONF_STRUCTURE
from .const import (
CONF_DATA_TYPE,
CONF_REVERSE_ORDER,
CONF_SWAP,
CONF_SWAP_BYTE,
CONF_SWAP_NONE,
CONF_SWAP_WORD,
DATA_TYPE_CUSTOM,
DATA_TYPE_STRING,
DEFAULT_STRUCT_FORMAT,
)
_LOGGER = logging.getLogger(__name__)
def sensor_schema_validator(config):
"""Sensor schema validator."""
if config[CONF_DATA_TYPE] == DATA_TYPE_STRING:
structure = str(config[CONF_COUNT] * 2) + "s"
elif config[CONF_DATA_TYPE] != DATA_TYPE_CUSTOM:
try:
structure = (
f">{DEFAULT_STRUCT_FORMAT[config[CONF_DATA_TYPE]][config[CONF_COUNT]]}"
)
except KeyError:
raise Invalid(
f"Unable to detect data type for {config[CONF_NAME]} sensor, try a custom type"
) from KeyError
else:
structure = config.get(CONF_STRUCTURE)
if not structure:
raise Invalid(
f"Error in sensor {config[CONF_NAME]}. The `{CONF_STRUCTURE}` field can not be empty "
f"if the parameter `{CONF_DATA_TYPE}` is set to the `{DATA_TYPE_CUSTOM}`"
)
try:
size = struct.calcsize(structure)
except struct.error as err:
raise Invalid(
f"Error in sensor {config[CONF_NAME]} structure: {str(err)}"
) from err
bytecount = config[CONF_COUNT] * 2
if bytecount != size:
raise Invalid(
f"Structure request {size} bytes, "
f"but {config[CONF_COUNT]} registers have a size of {bytecount} bytes"
)
swap_type = config.get(CONF_SWAP)
if CONF_REVERSE_ORDER in config:
if config[CONF_REVERSE_ORDER]:
swap_type = CONF_SWAP_WORD
else:
swap_type = CONF_SWAP_NONE
del config[CONF_REVERSE_ORDER]
if config.get(CONF_SWAP) != CONF_SWAP_NONE:
if swap_type == CONF_SWAP_BYTE:
regs_needed = 1
else: # CONF_SWAP_WORD_BYTE, CONF_SWAP_WORD
regs_needed = 2
if config[CONF_COUNT] < regs_needed or (config[CONF_COUNT] % regs_needed) != 0:
raise Invalid(
f"Error in sensor {config[CONF_NAME]} swap({swap_type}) "
f"not possible due to the registers "
f"count: {config[CONF_COUNT]}, needed: {regs_needed}"
)
return {
**config,
CONF_STRUCTURE: structure,
CONF_SWAP: swap_type,
}

View File

@ -80,6 +80,7 @@ async def base_test(
config_modbus=None, config_modbus=None,
scan_interval=None, scan_interval=None,
expect_init_to_fail=False, expect_init_to_fail=False,
expect_setup_to_fail=False,
): ):
"""Run test on device for given config.""" """Run test on device for given config."""
@ -131,7 +132,10 @@ async def base_test(
{array_name_discovery: [{**config_device}]} {array_name_discovery: [{**config_device}]}
) )
config_device = None config_device = None
assert await async_setup_component(hass, DOMAIN, config_modbus) assert (
await async_setup_component(hass, DOMAIN, config_modbus)
is not expect_setup_to_fail
)
await hass.async_block_till_done() await hass.async_block_till_done()
# setup platform old style # setup platform old style
@ -151,7 +155,7 @@ async def base_test(
assert await async_setup_component(hass, entity_domain, config_device) assert await async_setup_component(hass, entity_domain, config_device)
await hass.async_block_till_done() await hass.async_block_till_done()
assert DOMAIN in hass.config.components assert (DOMAIN in hass.config.components) is not expect_setup_to_fail
if config_device is not None: if config_device is not None:
entity_id = f"{entity_domain}.{device_name}" entity_id = f"{entity_domain}.{device_name}"
device = hass.states.get(entity_id) device = hass.states.get(entity_id)
@ -184,6 +188,7 @@ async def base_config_test(
method_discovery=False, method_discovery=False,
config_modbus=None, config_modbus=None,
expect_init_to_fail=False, expect_init_to_fail=False,
expect_setup_to_fail=False,
): ):
"""Check config of device for given config.""" """Check config of device for given config."""
@ -200,6 +205,7 @@ async def base_config_test(
check_config_only=True, check_config_only=True,
config_modbus=config_modbus, config_modbus=config_modbus,
expect_init_to_fail=expect_init_to_fail, expect_init_to_fail=expect_init_to_fail,
expect_setup_to_fail=expect_setup_to_fail,
) )

View File

@ -9,8 +9,6 @@ from homeassistant.components.modbus.const import (
CONF_DATA_TYPE, CONF_DATA_TYPE,
CONF_INPUT_TYPE, CONF_INPUT_TYPE,
CONF_PRECISION, CONF_PRECISION,
CONF_REGISTER,
CONF_REGISTER_TYPE,
CONF_REGISTERS, CONF_REGISTERS,
CONF_REVERSE_ORDER, CONF_REVERSE_ORDER,
CONF_SCALE, CONF_SCALE,
@ -45,115 +43,58 @@ from tests.common import mock_restore_cache
@pytest.mark.parametrize( @pytest.mark.parametrize(
"do_discovery, do_config", "do_config",
[ [
( {
False, CONF_ADDRESS: 51,
{ },
CONF_REGISTER: 51, {
}, CONF_ADDRESS: 51,
), CONF_SLAVE: 10,
( CONF_COUNT: 1,
False, CONF_DATA_TYPE: "int",
{ CONF_PRECISION: 0,
CONF_REGISTER: 51, CONF_SCALE: 1,
CONF_SLAVE: 10, CONF_REVERSE_ORDER: False,
CONF_COUNT: 1, CONF_OFFSET: 0,
CONF_DATA_TYPE: "int", CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_PRECISION: 0, CONF_DEVICE_CLASS: "battery",
CONF_SCALE: 1, },
CONF_REVERSE_ORDER: False, {
CONF_OFFSET: 0, CONF_ADDRESS: 51,
CONF_REGISTER_TYPE: CALL_TYPE_REGISTER_HOLDING, CONF_SLAVE: 10,
CONF_DEVICE_CLASS: "battery", CONF_COUNT: 1,
}, CONF_DATA_TYPE: "int",
), CONF_PRECISION: 0,
( CONF_SCALE: 1,
False, CONF_REVERSE_ORDER: False,
{ CONF_OFFSET: 0,
CONF_REGISTER: 51, CONF_INPUT_TYPE: CALL_TYPE_REGISTER_INPUT,
CONF_SLAVE: 10, CONF_DEVICE_CLASS: "battery",
CONF_COUNT: 1, },
CONF_DATA_TYPE: "int", {
CONF_PRECISION: 0, CONF_ADDRESS: 51,
CONF_SCALE: 1, CONF_COUNT: 1,
CONF_REVERSE_ORDER: False, CONF_SWAP: CONF_SWAP_NONE,
CONF_OFFSET: 0, },
CONF_REGISTER_TYPE: CALL_TYPE_REGISTER_INPUT, {
CONF_DEVICE_CLASS: "battery", CONF_ADDRESS: 51,
}, CONF_COUNT: 1,
), CONF_SWAP: CONF_SWAP_BYTE,
( },
True, {
{ CONF_ADDRESS: 51,
CONF_ADDRESS: 51, CONF_COUNT: 2,
}, CONF_SWAP: CONF_SWAP_WORD,
), },
( {
True, CONF_ADDRESS: 51,
{ CONF_COUNT: 2,
CONF_ADDRESS: 51, CONF_SWAP: CONF_SWAP_WORD_BYTE,
CONF_SLAVE: 10, },
CONF_COUNT: 1,
CONF_DATA_TYPE: "int",
CONF_PRECISION: 0,
CONF_SCALE: 1,
CONF_REVERSE_ORDER: False,
CONF_OFFSET: 0,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_DEVICE_CLASS: "battery",
},
),
(
True,
{
CONF_ADDRESS: 51,
CONF_SLAVE: 10,
CONF_COUNT: 1,
CONF_DATA_TYPE: "int",
CONF_PRECISION: 0,
CONF_SCALE: 1,
CONF_REVERSE_ORDER: False,
CONF_OFFSET: 0,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_INPUT,
CONF_DEVICE_CLASS: "battery",
},
),
(
True,
{
CONF_ADDRESS: 51,
CONF_COUNT: 1,
CONF_SWAP: CONF_SWAP_NONE,
},
),
(
True,
{
CONF_ADDRESS: 51,
CONF_COUNT: 1,
CONF_SWAP: CONF_SWAP_BYTE,
},
),
(
True,
{
CONF_ADDRESS: 51,
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_WORD,
},
),
(
True,
{
CONF_ADDRESS: 51,
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_WORD_BYTE,
},
),
], ],
) )
async def test_config_sensor(hass, do_discovery, do_config): async def test_config_sensor(hass, do_config):
"""Run test for sensor.""" """Run test for sensor."""
sensor_name = "test_sensor" sensor_name = "test_sensor"
config_sensor = { config_sensor = {
@ -167,36 +108,87 @@ async def test_config_sensor(hass, do_discovery, do_config):
SENSOR_DOMAIN, SENSOR_DOMAIN,
CONF_SENSORS, CONF_SENSORS,
CONF_REGISTERS, CONF_REGISTERS,
method_discovery=do_discovery, method_discovery=True,
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
"do_config", "do_config,error_message",
[ [
{ (
CONF_ADDRESS: 1234, {
CONF_COUNT: 8, CONF_ADDRESS: 1234,
CONF_PRECISION: 2, CONF_COUNT: 8,
CONF_DATA_TYPE: DATA_TYPE_INT, CONF_PRECISION: 2,
}, CONF_DATA_TYPE: DATA_TYPE_INT,
{ },
CONF_ADDRESS: 1234, "Unable to detect data type for test_sensor sensor, try a custom type",
CONF_COUNT: 8, ),
CONF_PRECISION: 2, (
CONF_DATA_TYPE: DATA_TYPE_CUSTOM, {
CONF_STRUCTURE: ">no struct", CONF_ADDRESS: 1234,
}, CONF_COUNT: 8,
{ CONF_PRECISION: 2,
CONF_ADDRESS: 1234, CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_COUNT: 2, CONF_STRUCTURE: ">no struct",
CONF_PRECISION: 2, },
CONF_DATA_TYPE: DATA_TYPE_CUSTOM, "Error in sensor test_sensor structure: bad char in struct format",
CONF_STRUCTURE: ">4f", ),
}, (
{
CONF_ADDRESS: 1234,
CONF_COUNT: 2,
CONF_PRECISION: 2,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_STRUCTURE: ">4f",
},
"Structure request 16 bytes, but 2 registers have a size of 4 bytes",
),
(
{
CONF_ADDRESS: 1234,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_COUNT: 4,
CONF_SWAP: CONF_SWAP_NONE,
CONF_STRUCTURE: "invalid",
},
"Error in sensor test_sensor structure: bad char in struct format",
),
(
{
CONF_ADDRESS: 1234,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_COUNT: 4,
CONF_SWAP: CONF_SWAP_NONE,
CONF_STRUCTURE: "",
},
"Error in sensor test_sensor. The `structure` field can not be empty if the parameter `data_type` is set to the `custom`",
),
(
{
CONF_ADDRESS: 1234,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_COUNT: 4,
CONF_SWAP: CONF_SWAP_NONE,
CONF_STRUCTURE: "1s",
},
"Structure request 1 bytes, but 4 registers have a size of 8 bytes",
),
(
{
CONF_ADDRESS: 1234,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_COUNT: 1,
CONF_STRUCTURE: "2s",
CONF_SWAP: CONF_SWAP_WORD,
},
"Error in sensor test_sensor swap(word) not possible due to the registers count: 1, needed: 2",
),
], ],
) )
async def test_config_wrong_struct_sensor(hass, do_config): async def test_config_wrong_struct_sensor(
hass, caplog, do_config, error_message, mock_pymodbus
):
"""Run test for sensor with wrong struct.""" """Run test for sensor with wrong struct."""
sensor_name = "test_sensor" sensor_name = "test_sensor"
@ -204,6 +196,9 @@ async def test_config_wrong_struct_sensor(hass, do_config):
CONF_NAME: sensor_name, CONF_NAME: sensor_name,
**do_config, **do_config,
} }
caplog.set_level(logging.WARNING)
caplog.clear()
await base_config_test( await base_config_test(
hass, hass,
config_sensor, config_sensor,
@ -212,8 +207,11 @@ async def test_config_wrong_struct_sensor(hass, do_config):
CONF_SENSORS, CONF_SENSORS,
None, None,
method_discovery=True, method_discovery=True,
expect_setup_to_fail=True,
) )
assert error_message in "".join(caplog.messages)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"cfg,regs,expected", "cfg,regs,expected",
@ -592,10 +590,21 @@ async def test_restore_state_sensor(hass):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"swap_type", "swap_type, error_message",
[CONF_SWAP_WORD, CONF_SWAP_WORD_BYTE], [
(
CONF_SWAP_WORD,
"Error in sensor modbus_test_sensor swap(word) not possible due to the registers count: 1, needed: 2",
),
(
CONF_SWAP_WORD_BYTE,
"Error in sensor modbus_test_sensor swap(word_byte) not possible due to the registers count: 1, needed: 2",
),
],
) )
async def test_swap_sensor_wrong_config(hass, caplog, swap_type): async def test_swap_sensor_wrong_config(
hass, caplog, swap_type, error_message, mock_pymodbus
):
"""Run test for sensor swap.""" """Run test for sensor swap."""
sensor_name = "modbus_test_sensor" sensor_name = "modbus_test_sensor"
config = { config = {
@ -616,9 +625,9 @@ async def test_swap_sensor_wrong_config(hass, caplog, swap_type):
CONF_SENSORS, CONF_SENSORS,
None, None,
method_discovery=True, method_discovery=True,
expect_init_to_fail=True, expect_setup_to_fail=True,
) )
assert caplog.messages[-1].startswith("Error in sensor " + sensor_name + " swap") assert error_message in "".join(caplog.messages)
async def test_service_sensor_update(hass, mock_pymodbus): async def test_service_sensor_update(hass, mock_pymodbus):