From f67c0ce8bb006e29f955b82c20a414e0dbe3f35b Mon Sep 17 00:00:00 2001 From: jan iversen Date: Thu, 22 Apr 2021 11:54:40 +0200 Subject: [PATCH] Secure 100% test coverage for modbus, binary_sensor and sensor (#49521) * Secure 100% test coverage for modbus/binary_sensor. * Test that class constructor is called. --- .coveragerc | 3 - homeassistant/components/modbus/sensor.py | 21 +-- tests/components/modbus/conftest.py | 25 ++- tests/components/modbus/test_init.py | 155 ++++++++++++++++++ .../modbus/test_modbus_binary_sensor.py | 5 + tests/components/modbus/test_modbus_sensor.py | 134 ++++++++++++--- 6 files changed, 290 insertions(+), 53 deletions(-) diff --git a/.coveragerc b/.coveragerc index 86b129f636c..26d49395164 100644 --- a/.coveragerc +++ b/.coveragerc @@ -616,10 +616,7 @@ omit = homeassistant/components/mochad/* homeassistant/components/modbus/climate.py homeassistant/components/modbus/cover.py - homeassistant/components/modbus/modbus.py homeassistant/components/modbus/switch.py - homeassistant/components/modbus/sensor.py - homeassistant/components/modbus/binary_sensor.py homeassistant/components/modem_callerid/sensor.py homeassistant/components/motion_blinds/__init__.py homeassistant/components/motion_blinds/const.py diff --git a/homeassistant/components/modbus/sensor.py b/homeassistant/components/modbus/sensor.py index 89c68947d3f..254bfe6e0fb 100644 --- a/homeassistant/components/modbus/sensor.py +++ b/homeassistant/components/modbus/sensor.py @@ -4,7 +4,6 @@ from __future__ import annotations from datetime import timedelta import logging import struct -from typing import Any import voluptuous as vol @@ -31,6 +30,7 @@ from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType +from . import number from .const import ( CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT, @@ -58,25 +58,6 @@ from .modbus import ModbusHub _LOGGER = logging.getLogger(__name__) -def number(value: Any) -> int | float: - """Coerce a value to number without losing precision.""" - if isinstance(value, int): - return value - - if isinstance(value, str): - try: - value = int(value) - return value - except (TypeError, ValueError): - pass - - try: - value = float(value) - return value - except (TypeError, ValueError) as err: - raise vol.Invalid(f"invalid number {value}") from err - - PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_REGISTERS): [ diff --git a/tests/components/modbus/conftest.py b/tests/components/modbus/conftest.py index 761f2c7e141..cbfddb4488b 100644 --- a/tests/components/modbus/conftest.py +++ b/tests/components/modbus/conftest.py @@ -3,6 +3,7 @@ from datetime import timedelta import logging from unittest import mock +from pymodbus.exceptions import ModbusException import pytest from homeassistant.components.modbus.const import DEFAULT_HUB, MODBUS_DOMAIN as DOMAIN @@ -69,11 +70,23 @@ async def base_test( ): # Setup inputs for the sensor - read_result = ReadResult(register_words) - mock_sync.read_coils.return_value = read_result - mock_sync.read_discrete_inputs.return_value = read_result - mock_sync.read_input_registers.return_value = read_result - mock_sync.read_holding_registers.return_value = read_result + if register_words is None: + mock_sync.read_coils.side_effect = ModbusException("fail read_coils") + mock_sync.read_discrete_inputs.side_effect = ModbusException( + "fail read_coils" + ) + mock_sync.read_input_registers.side_effect = ModbusException( + "fail read_coils" + ) + mock_sync.read_holding_registers.side_effect = ModbusException( + "fail read_coils" + ) + else: + read_result = ReadResult(register_words) + mock_sync.read_coils.return_value = read_result + mock_sync.read_discrete_inputs.return_value = read_result + mock_sync.read_input_registers.return_value = read_result + mock_sync.read_holding_registers.return_value = read_result # mock timer and add old/new config now = dt_util.utcnow() @@ -104,7 +117,7 @@ async def base_test( assert await async_setup_component(hass, entity_domain, config_device) await hass.async_block_till_done() - assert DOMAIN in hass.data + assert DOMAIN in hass.config.components if config_device is not None: entity_id = f"{entity_domain}.{device_name}" device = hass.states.get(entity_id) diff --git a/tests/components/modbus/test_init.py b/tests/components/modbus/test_init.py index 393a9ce86da..2da3a753505 100644 --- a/tests/components/modbus/test_init.py +++ b/tests/components/modbus/test_init.py @@ -2,16 +2,24 @@ import logging from unittest import mock +from pymodbus.exceptions import ModbusException import pytest import voluptuous as vol from homeassistant.components.modbus import number from homeassistant.components.modbus.const import ( + ATTR_ADDRESS, + ATTR_HUB, + ATTR_STATE, + ATTR_UNIT, + ATTR_VALUE, CONF_BAUDRATE, CONF_BYTESIZE, CONF_PARITY, CONF_STOPBITS, MODBUS_DOMAIN as DOMAIN, + SERVICE_WRITE_COIL, + SERVICE_WRITE_REGISTER, ) from homeassistant.const import ( CONF_DELAY, @@ -177,3 +185,150 @@ async def test_config_multiple_modbus(hass, caplog): await _config_helper(hass, do_config) assert DOMAIN in hass.config.components assert len(caplog.records) == 0 + + +async def test_pb_service_write_register(hass): + """Run test for service write_register.""" + + conf_name = "myModbus" + config = { + DOMAIN: [ + { + CONF_TYPE: "tcp", + CONF_HOST: "modbusTestHost", + CONF_PORT: 5501, + CONF_NAME: conf_name, + } + ] + } + + mock_pb = mock.MagicMock() + with mock.patch( + "homeassistant.components.modbus.modbus.ModbusTcpClient", return_value=mock_pb + ): + assert await async_setup_component(hass, DOMAIN, config) is True + await hass.async_block_till_done() + + data = {ATTR_HUB: conf_name, ATTR_UNIT: 17, ATTR_ADDRESS: 16, ATTR_VALUE: 15} + await hass.services.async_call( + DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True + ) + assert mock_pb.write_register.called + assert mock_pb.write_register.call_args[0] == ( + data[ATTR_ADDRESS], + data[ATTR_VALUE], + ) + mock_pb.write_register.side_effect = ModbusException("fail write_") + await hass.services.async_call( + DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True + ) + + data[ATTR_VALUE] = [1, 2, 3] + await hass.services.async_call( + DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True + ) + assert mock_pb.write_registers.called + assert mock_pb.write_registers.call_args[0] == ( + data[ATTR_ADDRESS], + data[ATTR_VALUE], + ) + mock_pb.write_registers.side_effect = ModbusException("fail write_") + await hass.services.async_call( + DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True + ) + + +async def test_pb_service_write_coil(hass, caplog): + """Run test for service write_coil.""" + + conf_name = "myModbus" + config = { + DOMAIN: [ + { + CONF_TYPE: "tcp", + CONF_HOST: "modbusTestHost", + CONF_PORT: 5501, + CONF_NAME: conf_name, + } + ] + } + + mock_pb = mock.MagicMock() + with mock.patch( + "homeassistant.components.modbus.modbus.ModbusTcpClient", return_value=mock_pb + ): + assert await async_setup_component(hass, DOMAIN, config) is True + await hass.async_block_till_done() + + data = {ATTR_HUB: conf_name, ATTR_UNIT: 17, ATTR_ADDRESS: 16, ATTR_STATE: False} + await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True) + assert mock_pb.write_coil.called + assert mock_pb.write_coil.call_args[0] == ( + data[ATTR_ADDRESS], + data[ATTR_STATE], + ) + mock_pb.write_coil.side_effect = ModbusException("fail write_") + await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True) + + data[ATTR_STATE] = [True, False, True] + await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True) + assert mock_pb.write_coils.called + assert mock_pb.write_coils.call_args[0] == ( + data[ATTR_ADDRESS], + data[ATTR_STATE], + ) + + caplog.set_level(logging.DEBUG) + caplog.clear + mock_pb.write_coils.side_effect = ModbusException("fail write_") + await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True) + assert caplog.records[-1].levelname == "ERROR" + await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True) + assert caplog.records[-1].levelname == "DEBUG" + + +async def test_pymodbus_constructor_fail(hass, caplog): + """Run test for failing pymodbus constructor.""" + config = { + DOMAIN: [ + { + CONF_TYPE: "tcp", + CONF_HOST: "modbusTestHost", + CONF_PORT: 5501, + } + ] + } + with mock.patch( + "homeassistant.components.modbus.modbus.ModbusTcpClient" + ) as mock_pb: + caplog.set_level(logging.ERROR) + mock_pb.side_effect = ModbusException("test no class") + assert await async_setup_component(hass, DOMAIN, config) is True + await hass.async_block_till_done() + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "ERROR" + assert mock_pb.called + + +async def test_pymodbus_connect_fail(hass, caplog): + """Run test for failing pymodbus constructor.""" + config = { + DOMAIN: [ + { + CONF_TYPE: "tcp", + CONF_HOST: "modbusTestHost", + CONF_PORT: 5501, + } + ] + } + mock_pb = mock.MagicMock() + with mock.patch( + "homeassistant.components.modbus.modbus.ModbusTcpClient", return_value=mock_pb + ): + caplog.set_level(logging.ERROR) + mock_pb.connect.side_effect = ModbusException("test connect fail") + mock_pb.close.side_effect = ModbusException("test connect fail") + assert await async_setup_component(hass, DOMAIN, config) is True + await hass.async_block_till_done() + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "ERROR" diff --git a/tests/components/modbus/test_modbus_binary_sensor.py b/tests/components/modbus/test_modbus_binary_sensor.py index 5c4e71cd669..4ce423b2f16 100644 --- a/tests/components/modbus/test_modbus_binary_sensor.py +++ b/tests/components/modbus/test_modbus_binary_sensor.py @@ -16,6 +16,7 @@ from homeassistant.const import ( CONF_SLAVE, STATE_OFF, STATE_ON, + STATE_UNAVAILABLE, ) from .conftest import base_config_test, base_test @@ -76,6 +77,10 @@ async def test_config_binary_sensor(hass, do_discovery, do_options): [0xFE], STATE_OFF, ), + ( + None, + STATE_UNAVAILABLE, + ), ], ) async def test_all_binary_sensor(hass, do_type, regs, expected): diff --git a/tests/components/modbus/test_modbus_sensor.py b/tests/components/modbus/test_modbus_sensor.py index b81cc9c4c1e..59bb81f8baa 100644 --- a/tests/components/modbus/test_modbus_sensor.py +++ b/tests/components/modbus/test_modbus_sensor.py @@ -28,6 +28,7 @@ from homeassistant.const import ( CONF_SENSORS, CONF_SLAVE, CONF_STRUCTURE, + STATE_UNAVAILABLE, ) from .conftest import base_config_test, base_test @@ -128,6 +129,50 @@ async def test_config_sensor(hass, do_discovery, do_config): ) +@pytest.mark.parametrize( + "do_config", + [ + { + CONF_ADDRESS: 1234, + CONF_COUNT: 8, + CONF_PRECISION: 2, + CONF_DATA_TYPE: DATA_TYPE_INT, + }, + { + CONF_ADDRESS: 1234, + CONF_COUNT: 8, + CONF_PRECISION: 2, + CONF_DATA_TYPE: DATA_TYPE_CUSTOM, + CONF_STRUCTURE: ">no struct", + }, + { + CONF_ADDRESS: 1234, + CONF_COUNT: 2, + CONF_PRECISION: 2, + CONF_DATA_TYPE: DATA_TYPE_CUSTOM, + CONF_STRUCTURE: ">4f", + }, + ], +) +async def test_config_wrong_struct_sensor(hass, do_config): + """Run test for sensor with wrong struct.""" + + sensor_name = "test_sensor" + config_sensor = { + CONF_NAME: sensor_name, + **do_config, + } + await base_config_test( + hass, + config_sensor, + sensor_name, + SENSOR_DOMAIN, + CONF_SENSORS, + None, + method_discovery=True, + ) + + @pytest.mark.parametrize( "cfg,regs,expected", [ @@ -336,6 +381,30 @@ async def test_config_sensor(hass, do_discovery, do_config): [0x3037, 0x2D30, 0x352D, 0x3230, 0x3230, 0x2031, 0x343A, 0x3335], "07-05-2020 14:35", ), + ( + { + CONF_COUNT: 8, + CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING, + CONF_DATA_TYPE: DATA_TYPE_STRING, + CONF_SCALE: 1, + CONF_OFFSET: 0, + CONF_PRECISION: 0, + }, + None, + STATE_UNAVAILABLE, + ), + ( + { + CONF_COUNT: 2, + CONF_INPUT_TYPE: CALL_TYPE_REGISTER_INPUT, + CONF_DATA_TYPE: DATA_TYPE_UINT, + CONF_SCALE: 1, + CONF_OFFSET: 0, + CONF_PRECISION: 0, + }, + None, + STATE_UNAVAILABLE, + ), ], ) async def test_all_sensor(hass, cfg, regs, expected): @@ -357,39 +426,56 @@ async def test_all_sensor(hass, cfg, regs, expected): assert state == expected -async def test_struct_sensor(hass): +@pytest.mark.parametrize( + "cfg,regs,expected", + [ + ( + { + CONF_COUNT: 8, + CONF_PRECISION: 2, + CONF_DATA_TYPE: DATA_TYPE_CUSTOM, + CONF_STRUCTURE: ">4f", + }, + # floats: 7.931250095367432, 10.600000381469727, + # 1.000879611487865e-28, 10.566553115844727 + [0x40FD, 0xCCCD, 0x4129, 0x999A, 0x10FD, 0xC0CD, 0x4129, 0x109A], + "7.93,10.60,0.00,10.57", + ), + ( + { + CONF_COUNT: 4, + CONF_PRECISION: 0, + CONF_DATA_TYPE: DATA_TYPE_CUSTOM, + CONF_STRUCTURE: ">2i", + }, + [0x0000, 0x0100, 0x0000, 0x0032], + "256,50", + ), + ( + { + CONF_COUNT: 1, + CONF_PRECISION: 0, + CONF_DATA_TYPE: DATA_TYPE_INT, + }, + [0x0101], + "257", + ), + ], +) +async def test_struct_sensor(hass, cfg, regs, expected): """Run test for sensor struct.""" sensor_name = "modbus_test_sensor" - # floats: 7.931250095367432, 10.600000381469727, - # 1.000879611487865e-28, 10.566553115844727 - expected = "7.93,10.60,0.00,10.57" state = await base_test( hass, - { - CONF_NAME: sensor_name, - CONF_REGISTER: 1234, - CONF_COUNT: 8, - CONF_PRECISION: 2, - CONF_DATA_TYPE: DATA_TYPE_CUSTOM, - CONF_STRUCTURE: ">4f", - }, + {CONF_NAME: sensor_name, CONF_ADDRESS: 1234, **cfg}, sensor_name, SENSOR_DOMAIN, CONF_SENSORS, - CONF_REGISTERS, - [ - 0x40FD, - 0xCCCD, - 0x4129, - 0x999A, - 0x10FD, - 0xC0CD, - 0x4129, - 0x109A, - ], + None, + regs, expected, - method_discovery=False, + method_discovery=True, scan_interval=5, ) assert state == expected