mirror of
https://github.com/home-assistant/core.git
synced 2025-07-26 06:37:52 +00:00
Please pylint for modbus test (#58089)
This commit is contained in:
parent
45983b5edf
commit
b3117ced75
@ -32,8 +32,8 @@ class ReadResult:
|
|||||||
self.bits = register_words
|
self.bits = register_words
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_pymodbus")
|
||||||
def mock_pymodbus():
|
def mock_pymodbus_fixture():
|
||||||
"""Mock pymodbus."""
|
"""Mock pymodbus."""
|
||||||
mock_pb = mock.MagicMock()
|
mock_pb = mock.MagicMock()
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
@ -52,32 +52,32 @@ def mock_pymodbus():
|
|||||||
yield mock_pb
|
yield mock_pb
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="check_config_loaded")
|
||||||
def check_config_loaded():
|
def check_config_loaded_fixture():
|
||||||
"""Set default for check_config_loaded."""
|
"""Set default for check_config_loaded."""
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="register_words")
|
||||||
def register_words():
|
def register_words_fixture():
|
||||||
"""Set default for register_words."""
|
"""Set default for register_words."""
|
||||||
return [0x00, 0x00]
|
return [0x00, 0x00]
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="config_addon")
|
||||||
def config_addon():
|
def config_addon_fixture():
|
||||||
"""Add entra configuration items."""
|
"""Add entra configuration items."""
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="do_exception")
|
||||||
def do_exception():
|
def do_exception_fixture():
|
||||||
"""Remove side_effect to pymodbus calls."""
|
"""Remove side_effect to pymodbus calls."""
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_modbus")
|
||||||
async def mock_modbus(
|
async def mock_modbus_fixture(
|
||||||
hass, caplog, register_words, check_config_loaded, config_addon, do_config
|
hass, caplog, register_words, check_config_loaded, config_addon, do_config
|
||||||
):
|
):
|
||||||
"""Load integration modbus using mocked pymodbus."""
|
"""Load integration modbus using mocked pymodbus."""
|
||||||
@ -115,8 +115,8 @@ async def mock_modbus(
|
|||||||
yield mock_pb
|
yield mock_pb
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_pymodbus_exception")
|
||||||
async def mock_pymodbus_exception(hass, do_exception, mock_modbus):
|
async def mock_pymodbus_exception_fixture(hass, do_exception, mock_modbus):
|
||||||
"""Trigger update call with time_changed event."""
|
"""Trigger update call with time_changed event."""
|
||||||
if do_exception:
|
if do_exception:
|
||||||
exc = ModbusException("fail read_coils")
|
exc = ModbusException("fail read_coils")
|
||||||
@ -126,8 +126,8 @@ async def mock_pymodbus_exception(hass, do_exception, mock_modbus):
|
|||||||
mock_modbus.read_holding_registers.side_effect = exc
|
mock_modbus.read_holding_registers.side_effect = exc
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_pymodbus_return")
|
||||||
async def mock_pymodbus_return(hass, register_words, mock_modbus):
|
async def mock_pymodbus_return_fixture(hass, register_words, mock_modbus):
|
||||||
"""Trigger update call with time_changed event."""
|
"""Trigger update call with time_changed event."""
|
||||||
read_result = ReadResult(register_words)
|
read_result = ReadResult(register_words)
|
||||||
mock_modbus.read_coils.return_value = read_result
|
mock_modbus.read_coils.return_value = read_result
|
||||||
@ -136,8 +136,8 @@ async def mock_pymodbus_return(hass, register_words, mock_modbus):
|
|||||||
mock_modbus.read_holding_registers.return_value = read_result
|
mock_modbus.read_holding_registers.return_value = read_result
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_do_cycle")
|
||||||
async def mock_do_cycle(hass, mock_pymodbus_exception, mock_pymodbus_return):
|
async def mock_do_cycle_fixture(hass, mock_pymodbus_exception, mock_pymodbus_return):
|
||||||
"""Trigger update call with time_changed event."""
|
"""Trigger update call with time_changed event."""
|
||||||
now = dt_util.utcnow() + timedelta(seconds=90)
|
now = dt_util.utcnow() + timedelta(seconds=90)
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
@ -159,21 +159,21 @@ async def do_next_cycle(hass, now, cycle):
|
|||||||
return now
|
return now
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_test_state")
|
||||||
async def mock_test_state(hass, request):
|
async def mock_test_state_fixture(hass, request):
|
||||||
"""Mock restore cache."""
|
"""Mock restore cache."""
|
||||||
mock_restore_cache(hass, request.param)
|
mock_restore_cache(hass, request.param)
|
||||||
return request.param
|
return request.param
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_ha")
|
||||||
async def mock_ha(hass, mock_pymodbus_return):
|
async def mock_ha_fixture(hass, mock_pymodbus_return):
|
||||||
"""Load homeassistant to allow service calls."""
|
"""Load homeassistant to allow service calls."""
|
||||||
assert await async_setup_component(hass, "homeassistant", {})
|
assert await async_setup_component(hass, "homeassistant", {})
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="caplog_setup_text")
|
||||||
async def caplog_setup_text(caplog):
|
async def caplog_setup_text_fixture(caplog):
|
||||||
"""Return setup log of integration."""
|
"""Return setup log of integration."""
|
||||||
yield caplog.text
|
yield caplog.text
|
||||||
|
@ -33,6 +33,7 @@ from homeassistant.core import State
|
|||||||
from .conftest import TEST_ENTITY_NAME, ReadResult, do_next_cycle
|
from .conftest import TEST_ENTITY_NAME, ReadResult, do_next_cycle
|
||||||
|
|
||||||
ENTITY_ID = f"{COVER_DOMAIN}.{TEST_ENTITY_NAME}"
|
ENTITY_ID = f"{COVER_DOMAIN}.{TEST_ENTITY_NAME}"
|
||||||
|
ENTITY_ID2 = f"{ENTITY_ID}2"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
@ -281,7 +282,6 @@ async def test_restore_state_cover(hass, mock_test_state, mock_modbus):
|
|||||||
async def test_service_cover_move(hass, mock_modbus, mock_ha):
|
async def test_service_cover_move(hass, mock_modbus, mock_ha):
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
|
|
||||||
ENTITY_ID2 = f"{ENTITY_ID}2"
|
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult([0x01])
|
mock_modbus.read_holding_registers.return_value = ReadResult([0x01])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"cover", "open_cover", {"entity_id": ENTITY_ID}, blocking=True
|
"cover", "open_cover", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
|
@ -38,6 +38,7 @@ from homeassistant.setup import async_setup_component
|
|||||||
from .conftest import TEST_ENTITY_NAME, TEST_MODBUS_HOST, TEST_PORT_TCP, ReadResult
|
from .conftest import TEST_ENTITY_NAME, TEST_MODBUS_HOST, TEST_PORT_TCP, ReadResult
|
||||||
|
|
||||||
ENTITY_ID = f"{FAN_DOMAIN}.{TEST_ENTITY_NAME}"
|
ENTITY_ID = f"{FAN_DOMAIN}.{TEST_ENTITY_NAME}"
|
||||||
|
ENTITY_ID2 = f"{ENTITY_ID}2"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
@ -223,7 +224,6 @@ async def test_restore_state_fan(hass, mock_test_state, mock_modbus):
|
|||||||
async def test_fan_service_turn(hass, caplog, mock_pymodbus):
|
async def test_fan_service_turn(hass, caplog, mock_pymodbus):
|
||||||
"""Run test for service turn_on/turn_off."""
|
"""Run test for service turn_on/turn_off."""
|
||||||
|
|
||||||
ENTITY_ID2 = f"{FAN_DOMAIN}.{TEST_ENTITY_NAME}2"
|
|
||||||
config = {
|
config = {
|
||||||
MODBUS_DOMAIN: {
|
MODBUS_DOMAIN: {
|
||||||
CONF_TYPE: TCP,
|
CONF_TYPE: TCP,
|
||||||
|
@ -99,8 +99,8 @@ from .conftest import (
|
|||||||
from tests.common import async_fire_time_changed
|
from tests.common import async_fire_time_changed
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_modbus_with_pymodbus")
|
||||||
async def mock_modbus_with_pymodbus(hass, caplog, do_config, mock_pymodbus):
|
async def mock_modbus_with_pymodbus_fixture(hass, caplog, do_config, mock_pymodbus):
|
||||||
"""Load integration modbus using mocked pymodbus."""
|
"""Load integration modbus using mocked pymodbus."""
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
caplog.set_level(logging.ERROR)
|
caplog.set_level(logging.ERROR)
|
||||||
@ -115,7 +115,7 @@ async def mock_modbus_with_pymodbus(hass, caplog, do_config, mock_pymodbus):
|
|||||||
async def test_number_validator():
|
async def test_number_validator():
|
||||||
"""Test number validator."""
|
"""Test number validator."""
|
||||||
|
|
||||||
for value, value_type in [
|
for value, value_type in (
|
||||||
(15, int),
|
(15, int),
|
||||||
(15.1, float),
|
(15.1, float),
|
||||||
("15", int),
|
("15", int),
|
||||||
@ -124,7 +124,7 @@ async def test_number_validator():
|
|||||||
(-15.1, float),
|
(-15.1, float),
|
||||||
("-15", int),
|
("-15", int),
|
||||||
("-15.1", float),
|
("-15.1", float),
|
||||||
]:
|
):
|
||||||
assert isinstance(number_validator(value), value_type)
|
assert isinstance(number_validator(value), value_type)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -510,8 +510,8 @@ async def test_pb_service_write(
|
|||||||
assert caplog.messages[-1].startswith("Pymodbus:")
|
assert caplog.messages[-1].startswith("Pymodbus:")
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(name="mock_modbus_read_pymodbus")
|
||||||
async def mock_modbus_read_pymodbus(
|
async def mock_modbus_read_pymodbus_fixture(
|
||||||
hass,
|
hass,
|
||||||
do_group,
|
do_group,
|
||||||
do_type,
|
do_type,
|
||||||
@ -664,8 +664,8 @@ async def test_pymodbus_connect_fail(hass, caplog):
|
|||||||
"homeassistant.components.modbus.modbus.ModbusTcpClient", autospec=True
|
"homeassistant.components.modbus.modbus.ModbusTcpClient", autospec=True
|
||||||
) as mock_pb:
|
) as mock_pb:
|
||||||
caplog.set_level(logging.ERROR)
|
caplog.set_level(logging.ERROR)
|
||||||
ExceptionMessage = "test connect exception"
|
exception_message = "test connect exception"
|
||||||
mock_pb.connect.side_effect = ModbusException(ExceptionMessage)
|
mock_pb.connect.side_effect = ModbusException(exception_message)
|
||||||
|
|
||||||
assert await async_setup_component(hass, DOMAIN, config) is True
|
assert await async_setup_component(hass, DOMAIN, config) is True
|
||||||
|
|
||||||
@ -675,8 +675,8 @@ async def test_delay(hass, mock_pymodbus):
|
|||||||
|
|
||||||
# the purpose of this test is to test startup delay
|
# the purpose of this test is to test startup delay
|
||||||
# We "hijiack" a binary_sensor to make a proper blackbox test.
|
# We "hijiack" a binary_sensor to make a proper blackbox test.
|
||||||
test_delay = 15
|
set_delay = 15
|
||||||
test_scan_interval = 5
|
set_scan_interval = 5
|
||||||
entity_id = f"{BINARY_SENSOR_DOMAIN}.{TEST_ENTITY_NAME}"
|
entity_id = f"{BINARY_SENSOR_DOMAIN}.{TEST_ENTITY_NAME}"
|
||||||
config = {
|
config = {
|
||||||
DOMAIN: [
|
DOMAIN: [
|
||||||
@ -685,13 +685,13 @@ async def test_delay(hass, mock_pymodbus):
|
|||||||
CONF_HOST: TEST_MODBUS_HOST,
|
CONF_HOST: TEST_MODBUS_HOST,
|
||||||
CONF_PORT: TEST_PORT_TCP,
|
CONF_PORT: TEST_PORT_TCP,
|
||||||
CONF_NAME: TEST_MODBUS_NAME,
|
CONF_NAME: TEST_MODBUS_NAME,
|
||||||
CONF_DELAY: test_delay,
|
CONF_DELAY: set_delay,
|
||||||
CONF_BINARY_SENSORS: [
|
CONF_BINARY_SENSORS: [
|
||||||
{
|
{
|
||||||
CONF_INPUT_TYPE: CALL_TYPE_COIL,
|
CONF_INPUT_TYPE: CALL_TYPE_COIL,
|
||||||
CONF_NAME: TEST_ENTITY_NAME,
|
CONF_NAME: TEST_ENTITY_NAME,
|
||||||
CONF_ADDRESS: 52,
|
CONF_ADDRESS: 52,
|
||||||
CONF_SCAN_INTERVAL: test_scan_interval,
|
CONF_SCAN_INTERVAL: set_scan_interval,
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
@ -705,7 +705,7 @@ async def test_delay(hass, mock_pymodbus):
|
|||||||
|
|
||||||
# pass first scan_interval
|
# pass first scan_interval
|
||||||
start_time = now
|
start_time = now
|
||||||
now = now + timedelta(seconds=(test_scan_interval + 1))
|
now = now + timedelta(seconds=(set_scan_interval + 1))
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"homeassistant.helpers.event.dt_util.utcnow", return_value=now, autospec=True
|
"homeassistant.helpers.event.dt_util.utcnow", return_value=now, autospec=True
|
||||||
):
|
):
|
||||||
@ -713,7 +713,7 @@ async def test_delay(hass, mock_pymodbus):
|
|||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
||||||
|
|
||||||
stop_time = start_time + timedelta(seconds=(test_delay + 1))
|
stop_time = start_time + timedelta(seconds=(set_delay + 1))
|
||||||
step_timedelta = timedelta(seconds=1)
|
step_timedelta = timedelta(seconds=1)
|
||||||
while now < stop_time:
|
while now < stop_time:
|
||||||
now = now + step_timedelta
|
now = now + step_timedelta
|
||||||
|
@ -38,6 +38,7 @@ from homeassistant.setup import async_setup_component
|
|||||||
from .conftest import TEST_ENTITY_NAME, TEST_MODBUS_HOST, TEST_PORT_TCP, ReadResult
|
from .conftest import TEST_ENTITY_NAME, TEST_MODBUS_HOST, TEST_PORT_TCP, ReadResult
|
||||||
|
|
||||||
ENTITY_ID = f"{LIGHT_DOMAIN}.{TEST_ENTITY_NAME}"
|
ENTITY_ID = f"{LIGHT_DOMAIN}.{TEST_ENTITY_NAME}"
|
||||||
|
ENTITY_ID2 = f"{ENTITY_ID}2"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
@ -223,7 +224,6 @@ async def test_restore_state_light(hass, mock_test_state, mock_modbus):
|
|||||||
async def test_light_service_turn(hass, caplog, mock_pymodbus):
|
async def test_light_service_turn(hass, caplog, mock_pymodbus):
|
||||||
"""Run test for service turn_on/turn_off."""
|
"""Run test for service turn_on/turn_off."""
|
||||||
|
|
||||||
ENTITY_ID2 = f"{ENTITY_ID}2"
|
|
||||||
config = {
|
config = {
|
||||||
MODBUS_DOMAIN: {
|
MODBUS_DOMAIN: {
|
||||||
CONF_TYPE: TCP,
|
CONF_TYPE: TCP,
|
||||||
|
@ -52,6 +52,7 @@ from .conftest import (
|
|||||||
from tests.common import async_fire_time_changed
|
from tests.common import async_fire_time_changed
|
||||||
|
|
||||||
ENTITY_ID = f"{SWITCH_DOMAIN}.{TEST_ENTITY_NAME}"
|
ENTITY_ID = f"{SWITCH_DOMAIN}.{TEST_ENTITY_NAME}"
|
||||||
|
ENTITY_ID2 = f"{ENTITY_ID}2"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
@ -282,7 +283,6 @@ async def test_restore_state_switch(hass, mock_test_state, mock_modbus):
|
|||||||
async def test_switch_service_turn(hass, caplog, mock_pymodbus):
|
async def test_switch_service_turn(hass, caplog, mock_pymodbus):
|
||||||
"""Run test for service turn_on/turn_off."""
|
"""Run test for service turn_on/turn_off."""
|
||||||
|
|
||||||
ENTITY_ID2 = f"{SWITCH_DOMAIN}.{TEST_ENTITY_NAME}2"
|
|
||||||
config = {
|
config = {
|
||||||
MODBUS_DOMAIN: {
|
MODBUS_DOMAIN: {
|
||||||
CONF_TYPE: TCP,
|
CONF_TYPE: TCP,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user