Secure 100% test coverage for modbus, binary_sensor and sensor (#49521)

* Secure 100% test coverage for modbus/binary_sensor.

* Test that class constructor is called.
This commit is contained in:
jan iversen 2021-04-22 11:54:40 +02:00 committed by GitHub
parent 8b08134850
commit f67c0ce8bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 290 additions and 53 deletions

View File

@ -616,10 +616,7 @@ omit =
homeassistant/components/mochad/*
homeassistant/components/modbus/climate.py
homeassistant/components/modbus/cover.py
homeassistant/components/modbus/modbus.py
homeassistant/components/modbus/switch.py
homeassistant/components/modbus/sensor.py
homeassistant/components/modbus/binary_sensor.py
homeassistant/components/modem_callerid/sensor.py
homeassistant/components/motion_blinds/__init__.py
homeassistant/components/motion_blinds/const.py

View File

@ -4,7 +4,6 @@ from __future__ import annotations
from datetime import timedelta
import logging
import struct
from typing import Any
import voluptuous as vol
@ -31,6 +30,7 @@ from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import number
from .const import (
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
@ -58,25 +58,6 @@ from .modbus import ModbusHub
_LOGGER = logging.getLogger(__name__)
def number(value: Any) -> int | float:
"""Coerce a value to number without losing precision."""
if isinstance(value, int):
return value
if isinstance(value, str):
try:
value = int(value)
return value
except (TypeError, ValueError):
pass
try:
value = float(value)
return value
except (TypeError, ValueError) as err:
raise vol.Invalid(f"invalid number {value}") from err
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_REGISTERS): [

View File

@ -3,6 +3,7 @@ from datetime import timedelta
import logging
from unittest import mock
from pymodbus.exceptions import ModbusException
import pytest
from homeassistant.components.modbus.const import DEFAULT_HUB, MODBUS_DOMAIN as DOMAIN
@ -69,11 +70,23 @@ async def base_test(
):
# Setup inputs for the sensor
read_result = ReadResult(register_words)
mock_sync.read_coils.return_value = read_result
mock_sync.read_discrete_inputs.return_value = read_result
mock_sync.read_input_registers.return_value = read_result
mock_sync.read_holding_registers.return_value = read_result
if register_words is None:
mock_sync.read_coils.side_effect = ModbusException("fail read_coils")
mock_sync.read_discrete_inputs.side_effect = ModbusException(
"fail read_coils"
)
mock_sync.read_input_registers.side_effect = ModbusException(
"fail read_coils"
)
mock_sync.read_holding_registers.side_effect = ModbusException(
"fail read_coils"
)
else:
read_result = ReadResult(register_words)
mock_sync.read_coils.return_value = read_result
mock_sync.read_discrete_inputs.return_value = read_result
mock_sync.read_input_registers.return_value = read_result
mock_sync.read_holding_registers.return_value = read_result
# mock timer and add old/new config
now = dt_util.utcnow()
@ -104,7 +117,7 @@ async def base_test(
assert await async_setup_component(hass, entity_domain, config_device)
await hass.async_block_till_done()
assert DOMAIN in hass.data
assert DOMAIN in hass.config.components
if config_device is not None:
entity_id = f"{entity_domain}.{device_name}"
device = hass.states.get(entity_id)

View File

@ -2,16 +2,24 @@
import logging
from unittest import mock
from pymodbus.exceptions import ModbusException
import pytest
import voluptuous as vol
from homeassistant.components.modbus import number
from homeassistant.components.modbus.const import (
ATTR_ADDRESS,
ATTR_HUB,
ATTR_STATE,
ATTR_UNIT,
ATTR_VALUE,
CONF_BAUDRATE,
CONF_BYTESIZE,
CONF_PARITY,
CONF_STOPBITS,
MODBUS_DOMAIN as DOMAIN,
SERVICE_WRITE_COIL,
SERVICE_WRITE_REGISTER,
)
from homeassistant.const import (
CONF_DELAY,
@ -177,3 +185,150 @@ async def test_config_multiple_modbus(hass, caplog):
await _config_helper(hass, do_config)
assert DOMAIN in hass.config.components
assert len(caplog.records) == 0
async def test_pb_service_write_register(hass):
"""Run test for service write_register."""
conf_name = "myModbus"
config = {
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
CONF_NAME: conf_name,
}
]
}
mock_pb = mock.MagicMock()
with mock.patch(
"homeassistant.components.modbus.modbus.ModbusTcpClient", return_value=mock_pb
):
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
data = {ATTR_HUB: conf_name, ATTR_UNIT: 17, ATTR_ADDRESS: 16, ATTR_VALUE: 15}
await hass.services.async_call(
DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True
)
assert mock_pb.write_register.called
assert mock_pb.write_register.call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_VALUE],
)
mock_pb.write_register.side_effect = ModbusException("fail write_")
await hass.services.async_call(
DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True
)
data[ATTR_VALUE] = [1, 2, 3]
await hass.services.async_call(
DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True
)
assert mock_pb.write_registers.called
assert mock_pb.write_registers.call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_VALUE],
)
mock_pb.write_registers.side_effect = ModbusException("fail write_")
await hass.services.async_call(
DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True
)
async def test_pb_service_write_coil(hass, caplog):
"""Run test for service write_coil."""
conf_name = "myModbus"
config = {
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
CONF_NAME: conf_name,
}
]
}
mock_pb = mock.MagicMock()
with mock.patch(
"homeassistant.components.modbus.modbus.ModbusTcpClient", return_value=mock_pb
):
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
data = {ATTR_HUB: conf_name, ATTR_UNIT: 17, ATTR_ADDRESS: 16, ATTR_STATE: False}
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_pb.write_coil.called
assert mock_pb.write_coil.call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_STATE],
)
mock_pb.write_coil.side_effect = ModbusException("fail write_")
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
data[ATTR_STATE] = [True, False, True]
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_pb.write_coils.called
assert mock_pb.write_coils.call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_STATE],
)
caplog.set_level(logging.DEBUG)
caplog.clear
mock_pb.write_coils.side_effect = ModbusException("fail write_")
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert caplog.records[-1].levelname == "ERROR"
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert caplog.records[-1].levelname == "DEBUG"
async def test_pymodbus_constructor_fail(hass, caplog):
"""Run test for failing pymodbus constructor."""
config = {
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
}
]
}
with mock.patch(
"homeassistant.components.modbus.modbus.ModbusTcpClient"
) as mock_pb:
caplog.set_level(logging.ERROR)
mock_pb.side_effect = ModbusException("test no class")
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
assert len(caplog.records) == 1
assert caplog.records[0].levelname == "ERROR"
assert mock_pb.called
async def test_pymodbus_connect_fail(hass, caplog):
"""Run test for failing pymodbus constructor."""
config = {
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
}
]
}
mock_pb = mock.MagicMock()
with mock.patch(
"homeassistant.components.modbus.modbus.ModbusTcpClient", return_value=mock_pb
):
caplog.set_level(logging.ERROR)
mock_pb.connect.side_effect = ModbusException("test connect fail")
mock_pb.close.side_effect = ModbusException("test connect fail")
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
assert len(caplog.records) == 1
assert caplog.records[0].levelname == "ERROR"

View File

@ -16,6 +16,7 @@ from homeassistant.const import (
CONF_SLAVE,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
)
from .conftest import base_config_test, base_test
@ -76,6 +77,10 @@ async def test_config_binary_sensor(hass, do_discovery, do_options):
[0xFE],
STATE_OFF,
),
(
None,
STATE_UNAVAILABLE,
),
],
)
async def test_all_binary_sensor(hass, do_type, regs, expected):

View File

@ -28,6 +28,7 @@ from homeassistant.const import (
CONF_SENSORS,
CONF_SLAVE,
CONF_STRUCTURE,
STATE_UNAVAILABLE,
)
from .conftest import base_config_test, base_test
@ -128,6 +129,50 @@ async def test_config_sensor(hass, do_discovery, do_config):
)
@pytest.mark.parametrize(
"do_config",
[
{
CONF_ADDRESS: 1234,
CONF_COUNT: 8,
CONF_PRECISION: 2,
CONF_DATA_TYPE: DATA_TYPE_INT,
},
{
CONF_ADDRESS: 1234,
CONF_COUNT: 8,
CONF_PRECISION: 2,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_STRUCTURE: ">no struct",
},
{
CONF_ADDRESS: 1234,
CONF_COUNT: 2,
CONF_PRECISION: 2,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_STRUCTURE: ">4f",
},
],
)
async def test_config_wrong_struct_sensor(hass, do_config):
"""Run test for sensor with wrong struct."""
sensor_name = "test_sensor"
config_sensor = {
CONF_NAME: sensor_name,
**do_config,
}
await base_config_test(
hass,
config_sensor,
sensor_name,
SENSOR_DOMAIN,
CONF_SENSORS,
None,
method_discovery=True,
)
@pytest.mark.parametrize(
"cfg,regs,expected",
[
@ -336,6 +381,30 @@ async def test_config_sensor(hass, do_discovery, do_config):
[0x3037, 0x2D30, 0x352D, 0x3230, 0x3230, 0x2031, 0x343A, 0x3335],
"07-05-2020 14:35",
),
(
{
CONF_COUNT: 8,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_DATA_TYPE: DATA_TYPE_STRING,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
},
None,
STATE_UNAVAILABLE,
),
(
{
CONF_COUNT: 2,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_INPUT,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
},
None,
STATE_UNAVAILABLE,
),
],
)
async def test_all_sensor(hass, cfg, regs, expected):
@ -357,39 +426,56 @@ async def test_all_sensor(hass, cfg, regs, expected):
assert state == expected
async def test_struct_sensor(hass):
@pytest.mark.parametrize(
"cfg,regs,expected",
[
(
{
CONF_COUNT: 8,
CONF_PRECISION: 2,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_STRUCTURE: ">4f",
},
# floats: 7.931250095367432, 10.600000381469727,
# 1.000879611487865e-28, 10.566553115844727
[0x40FD, 0xCCCD, 0x4129, 0x999A, 0x10FD, 0xC0CD, 0x4129, 0x109A],
"7.93,10.60,0.00,10.57",
),
(
{
CONF_COUNT: 4,
CONF_PRECISION: 0,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_STRUCTURE: ">2i",
},
[0x0000, 0x0100, 0x0000, 0x0032],
"256,50",
),
(
{
CONF_COUNT: 1,
CONF_PRECISION: 0,
CONF_DATA_TYPE: DATA_TYPE_INT,
},
[0x0101],
"257",
),
],
)
async def test_struct_sensor(hass, cfg, regs, expected):
"""Run test for sensor struct."""
sensor_name = "modbus_test_sensor"
# floats: 7.931250095367432, 10.600000381469727,
# 1.000879611487865e-28, 10.566553115844727
expected = "7.93,10.60,0.00,10.57"
state = await base_test(
hass,
{
CONF_NAME: sensor_name,
CONF_REGISTER: 1234,
CONF_COUNT: 8,
CONF_PRECISION: 2,
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
CONF_STRUCTURE: ">4f",
},
{CONF_NAME: sensor_name, CONF_ADDRESS: 1234, **cfg},
sensor_name,
SENSOR_DOMAIN,
CONF_SENSORS,
CONF_REGISTERS,
[
0x40FD,
0xCCCD,
0x4129,
0x999A,
0x10FD,
0xC0CD,
0x4129,
0x109A,
],
None,
regs,
expected,
method_discovery=False,
method_discovery=True,
scan_interval=5,
)
assert state == expected