diff --git a/tests/components/mysensors/conftest.py b/tests/components/mysensors/conftest.py index e8dc59cf526..94b60ed139c 100644 --- a/tests/components/mysensors/conftest.py +++ b/tests/components/mysensors/conftest.py @@ -278,6 +278,36 @@ def distance_sensor( return node +@pytest.fixture(name="ir_transceiver_state", scope="session") +def ir_transceiver_state_fixture() -> dict: + """Load the ir transceiver state.""" + return load_nodes_state("mysensors/ir_transceiver_state.json") + + +@pytest.fixture +def ir_transceiver( + gateway_nodes: dict[int, Sensor], ir_transceiver_state: dict +) -> Sensor: + """Load the ir transceiver child node.""" + nodes = update_gateway_nodes(gateway_nodes, deepcopy(ir_transceiver_state)) + node = nodes[1] + return node + + +@pytest.fixture(name="relay_node_state", scope="session") +def relay_node_state_fixture() -> dict: + """Load the relay node state.""" + return load_nodes_state("mysensors/relay_node_state.json") + + +@pytest.fixture +def relay_node(gateway_nodes: dict[int, Sensor], relay_node_state: dict) -> Sensor: + """Load the relay child node.""" + nodes = update_gateway_nodes(gateway_nodes, deepcopy(relay_node_state)) + node = nodes[1] + return node + + @pytest.fixture(name="temperature_sensor_state", scope="session") def temperature_sensor_state_fixture() -> dict: """Load the temperature sensor state.""" diff --git a/tests/components/mysensors/fixtures/ir_transceiver_state.json b/tests/components/mysensors/fixtures/ir_transceiver_state.json new file mode 100644 index 00000000000..34e16e96787 --- /dev/null +++ b/tests/components/mysensors/fixtures/ir_transceiver_state.json @@ -0,0 +1,23 @@ +{ + "1": { + "sensor_id": 1, + "children": { + "1": { + "id": 1, + "type": 20, + "description": "", + "values": { + "2": "0", + "32": "test_code", + "33": "test_code" + } + } + }, + "type": 17, + "sketch_name": "IR Transceiver", + "sketch_version": "1.0", + "battery_level": 0, + "protocol_version": "2.3.2", + "heartbeat": 0 + } +} diff --git a/tests/components/mysensors/fixtures/relay_node_state.json b/tests/components/mysensors/fixtures/relay_node_state.json new file mode 100644 index 00000000000..00a71f85ad6 --- /dev/null +++ b/tests/components/mysensors/fixtures/relay_node_state.json @@ -0,0 +1,21 @@ +{ + "1": { + "sensor_id": 1, + "children": { + "1": { + "id": 1, + "type": 3, + "description": "", + "values": { + "2": "0" + } + } + }, + "type": 17, + "sketch_name": "Relay Node", + "sketch_version": "1.0", + "battery_level": 0, + "protocol_version": "2.3.2", + "heartbeat": 0 + } +} diff --git a/tests/components/mysensors/test_switch.py b/tests/components/mysensors/test_switch.py new file mode 100644 index 00000000000..a4ff3995c08 --- /dev/null +++ b/tests/components/mysensors/test_switch.py @@ -0,0 +1,157 @@ +"""Provide tests for mysensors switch platform.""" +from __future__ import annotations + +from collections.abc import Callable +from unittest.mock import MagicMock, call + +from mysensors.sensor import Sensor + +from homeassistant.components.mysensors.const import DOMAIN +from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN +from homeassistant.core import HomeAssistant + + +async def test_relay_node( + hass: HomeAssistant, + relay_node: Sensor, + receive_message: Callable[[str], None], + transport_write: MagicMock, +) -> None: + """Test a relay node.""" + entity_id = "switch.relay_node_1_1" + + state = hass.states.get(entity_id) + + assert state + assert state.state == "off" + + await hass.services.async_call( + SWITCH_DOMAIN, + "turn_on", + {"entity_id": entity_id}, + blocking=True, + ) + + assert transport_write.call_count == 1 + assert transport_write.call_args == call("1;1;1;1;2;1\n") + + receive_message("1;1;1;0;2;1\n") + # the integration adds multiple jobs to do the update currently + await hass.async_block_till_done() + await hass.async_block_till_done() + await hass.async_block_till_done() + + state = hass.states.get(entity_id) + + assert state + assert state.state == "on" + + transport_write.reset_mock() + + await hass.services.async_call( + SWITCH_DOMAIN, + "turn_off", + {"entity_id": entity_id}, + blocking=True, + ) + + assert transport_write.call_count == 1 + assert transport_write.call_args == call("1;1;1;1;2;0\n") + + receive_message("1;1;1;0;2;0\n") + # the integration adds multiple jobs to do the update currently + await hass.async_block_till_done() + await hass.async_block_till_done() + await hass.async_block_till_done() + + state = hass.states.get(entity_id) + + assert state + assert state.state == "off" + + +async def test_ir_transceiver( + hass: HomeAssistant, + ir_transceiver: Sensor, + receive_message: Callable[[str], None], + transport_write: MagicMock, +) -> None: + """Test an ir transceiver.""" + entity_id = "switch.ir_transceiver_1_1" + + state = hass.states.get(entity_id) + + assert state + assert state.state == "off" + + await hass.services.async_call( + SWITCH_DOMAIN, + "turn_on", + {"entity_id": entity_id}, + blocking=True, + ) + + assert transport_write.call_count == 2 + assert transport_write.call_args_list[0] == call("1;1;1;0;32;test_code\n") + assert transport_write.call_args_list[1] == call("1;1;1;1;2;1\n") + + receive_message("1;1;1;0;2;1\n") + # the integration adds multiple jobs to do the update currently + await hass.async_block_till_done() + await hass.async_block_till_done() + await hass.async_block_till_done() + + state = hass.states.get(entity_id) + + assert state + assert state.state == "on" + assert state.attributes["V_IR_SEND"] == "test_code" + + transport_write.reset_mock() + + await hass.services.async_call( + SWITCH_DOMAIN, + "turn_off", + {"entity_id": entity_id}, + blocking=True, + ) + + assert transport_write.call_count == 1 + assert transport_write.call_args == call("1;1;1;1;2;0\n") + + receive_message("1;1;1;0;2;0\n") + # the integration adds multiple jobs to do the update currently + await hass.async_block_till_done() + await hass.async_block_till_done() + await hass.async_block_till_done() + + state = hass.states.get(entity_id) + + assert state + assert state.state == "off" + + transport_write.reset_mock() + + await hass.services.async_call( + DOMAIN, + "send_ir_code", + {"entity_id": entity_id, "V_IR_SEND": "new_code"}, + blocking=True, + ) + + assert transport_write.call_count == 2 + assert transport_write.call_args_list[0] == call("1;1;1;0;32;new_code\n") + assert transport_write.call_args_list[1] == call("1;1;1;1;2;1\n") + + receive_message("1;1;1;0;32;new_code\n") + receive_message("1;1;1;0;2;1\n") + # the integration adds multiple jobs to do the update currently + await hass.async_block_till_done() + await hass.async_block_till_done() + await hass.async_block_till_done() + + state = hass.states.get(entity_id) + + assert state + assert state.state == "on" + assert state.attributes["V_IR_SEND"] == "new_code"