diff --git a/tests/components/knx/README.md b/tests/components/knx/README.md new file mode 100644 index 00000000000..4b5886200c4 --- /dev/null +++ b/tests/components/knx/README.md @@ -0,0 +1,71 @@ +# Testing the KNX integration + +A KNXTestKit instance can be requested from a fixture. It provides convenience methods +to test outgoing KNX telegrams and inject incoming telegrams. +To test something add a test function requesting the `hass` and `knx` fixture and +set up the KNX integration by passing a KNX config dict to `knx.setup_integration`. + +```python +async def test_something(hass, knx): + await knx.setup_integration({ + "switch": { + "name": "test_switch", + "address": "1/2/3", + } + } + ) +``` + +## Asserting outgoing telegrams + +All outgoing telegrams are pushed to an assertion queue. Assert them in order they were sent. + +- `knx.assert_no_telegram` + Asserts that no telegram was sent (assertion queue is empty). +- `knx.assert_telegram_count(count: int)` + Asserts that `count` telegrams were sent. +- `knx.assert_read(group_address: str)` + Asserts that a GroupValueRead telegram was sent to `group_address`. + The telegram will be removed from the assertion queue. +- `knx.assert_response(group_address: str, payload: int | tuple[int, ...])` + Asserts that a GroupValueResponse telegram with `payload` was sent to `group_address`. + The telegram will be removed from the assertion queue. +- `knx.assert_write(group_address: str, payload: int | tuple[int, ...])` + Asserts that a GroupValueWrite telegram with `payload` was sent to `group_address`. + The telegram will be removed from the assertion queue. + +Change some states or call some services and assert outgoing telegrams. + +```python + # turn on switch + await hass.services.async_call( + "switch", "turn_on", {"entity_id": "switch.test_switch"}, blocking=True + ) + # assert ON telegram + await knx.assert_write("1/2/3", True) +``` + +## Injecting incoming telegrams + +- `knx.receive_read(group_address: str)` + Inject and process a GroupValueRead telegram addressed to `group_address`. +- `knx.receive_response(group_address: str, payload: int | tuple[int, ...])` + Inject and process a GroupValueResponse telegram addressed to `group_address` containing `payload`. +- `knx.receive_write(group_address: str, payload: int | tuple[int, ...])` + Inject and process a GroupValueWrite telegram addressed to `group_address` containing `payload`. + +Receive some telegrams and assert state. + +```python + # receive OFF telegram + await knx.receive_write("1/2/3", False) + # assert OFF state + state = hass.states.get("switch.test_switch") + assert state.state is STATE_OFF +``` + +## Notes + +- For `payload` in `assert_*` and `receive_*` use `int` for DPT 1, 2 and 3 payload values (DPTBinary) and `tuple` for other DPTs (DPTArray). +- `await self.hass.async_block_till_done()` is called before `KNXTestKit.assert_*` and after `KNXTestKit.receive_*` so you don't have to explicitly call it. +- Make sure to assert every outgoing telegram that was created in a test. `assert_no_telegram` is automatically called on teardown. diff --git a/tests/components/knx/conftest.py b/tests/components/knx/conftest.py index 548e620813a..26f8f1eabac 100644 --- a/tests/components/knx/conftest.py +++ b/tests/components/knx/conftest.py @@ -41,6 +41,8 @@ class KNXTestKit: def fish_xknx(*args, **kwargs): """Get the XKNX object from the constructor call.""" self.xknx = args[0] + # disable rate limiter for tests (before StateUpdater starts) + self.xknx.rate_limit = 0 return DEFAULT with patch( @@ -50,8 +52,6 @@ class KNXTestKit: ): await async_setup_component(self.hass, KNX_DOMAIN, {KNX_DOMAIN: config}) await self.hass.async_block_till_done() - # disable rate limiter for tests - self.xknx.rate_limit = 0 ######################## # Telegram counter tests @@ -101,14 +101,14 @@ class KNXTestKit: f" {group_address} - {payload}" ) - assert ( - str(telegram.destination_address) == group_address - ), f"Group address mismatch in {telegram} - Expected: {group_address}" - assert isinstance( telegram.payload, apci_type ), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}" + assert ( + str(telegram.destination_address) == group_address + ), f"Group address mismatch in {telegram} - Expected: {group_address}" + if payload is not None: assert ( telegram.payload.value.value == payload # type: ignore @@ -134,6 +134,13 @@ class KNXTestKit: # Incoming telegrams #################### + @staticmethod + def _payload_value(payload: int | tuple[int, ...]) -> DPTArray | DPTBinary: + """Prepare payload value for GroupValueWrite or GroupValueResponse.""" + if isinstance(payload, int): + return DPTBinary(payload) + return DPTArray(payload) + async def _receive_telegram(self, group_address: str, payload: APCI) -> None: """Inject incoming KNX telegram.""" self.xknx.telegrams.put_nowait( @@ -146,13 +153,6 @@ class KNXTestKit: ) await self.hass.async_block_till_done() - @staticmethod - def _payload_value(payload: int | tuple[int, ...]) -> DPTArray | DPTBinary: - """Prepare payload value for GroupValueWrite or GroupValueResponse.""" - if isinstance(payload, int): - return DPTBinary(payload) - return DPTArray(payload) - async def receive_read( self, group_address: str, diff --git a/tests/components/knx/test_switch.py b/tests/components/knx/test_switch.py new file mode 100644 index 00000000000..407d6d83267 --- /dev/null +++ b/tests/components/knx/test_switch.py @@ -0,0 +1,150 @@ +"""Test KNX switch.""" +from unittest.mock import patch + +from homeassistant.components.knx.const import ( + CONF_RESPOND_TO_READ, + CONF_STATE_ADDRESS, + KNX_ADDRESS, +) +from homeassistant.components.knx.schema import SwitchSchema +from homeassistant.const import CONF_NAME, STATE_OFF, STATE_ON +from homeassistant.core import State + + +async def test_switch_simple(hass, knx): + """Test simple KNX switch.""" + await knx.setup_integration( + { + SwitchSchema.PLATFORM_NAME: { + CONF_NAME: "test", + KNX_ADDRESS: "1/2/3", + } + } + ) + assert len(hass.states.async_all()) == 1 + + # turn on switch + await hass.services.async_call( + "switch", "turn_on", {"entity_id": "switch.test"}, blocking=True + ) + await knx.assert_write("1/2/3", True) + + # turn off switch + await hass.services.async_call( + "switch", "turn_off", {"entity_id": "switch.test"}, blocking=True + ) + await knx.assert_write("1/2/3", False) + + # receive ON telegram + await knx.receive_write("1/2/3", True) + state = hass.states.get("switch.test") + assert state.state is STATE_ON + + # receive OFF telegram + await knx.receive_write("1/2/3", False) + state = hass.states.get("switch.test") + assert state.state is STATE_OFF + + # switch does not respond to read by default + await knx.receive_read("1/2/3") + await knx.assert_telegram_count(0) + + +async def test_switch_state(hass, knx): + """Test KNX switch with state_address.""" + _ADDRESS = "1/1/1" + _STATE_ADDRESS = "2/2/2" + + await knx.setup_integration( + { + SwitchSchema.PLATFORM_NAME: { + CONF_NAME: "test", + KNX_ADDRESS: _ADDRESS, + CONF_STATE_ADDRESS: _STATE_ADDRESS, + }, + } + ) + assert len(hass.states.async_all()) == 1 + + # StateUpdater initialize state + await knx.assert_read(_STATE_ADDRESS) + await knx.receive_response(_STATE_ADDRESS, True) + state = hass.states.get("switch.test") + assert state.state is STATE_ON + + # receive OFF telegram at `address` + await knx.receive_write(_ADDRESS, False) + state = hass.states.get("switch.test") + assert state.state is STATE_OFF + + # receive ON telegram at `address` + await knx.receive_write(_ADDRESS, True) + state = hass.states.get("switch.test") + assert state.state is STATE_ON + + # receive OFF telegram at `state_address` + await knx.receive_write(_STATE_ADDRESS, False) + state = hass.states.get("switch.test") + assert state.state is STATE_OFF + + # receive ON telegram at `state_address` + await knx.receive_write(_STATE_ADDRESS, True) + state = hass.states.get("switch.test") + assert state.state is STATE_ON + + # turn off switch + await hass.services.async_call( + "switch", "turn_off", {"entity_id": "switch.test"}, blocking=True + ) + await knx.assert_write(_ADDRESS, False) + + # turn on switch + await hass.services.async_call( + "switch", "turn_on", {"entity_id": "switch.test"}, blocking=True + ) + await knx.assert_write(_ADDRESS, True) + + # switch does not respond to read by default + await knx.receive_read(_ADDRESS) + await knx.assert_telegram_count(0) + + +async def test_switch_restore_and_respond(hass, knx): + """Test restoring KNX switch state and respond to read.""" + _ADDRESS = "1/1/1" + fake_state = State("switch.test", "on") + + with patch( + "homeassistant.helpers.restore_state.RestoreEntity.async_get_last_state", + return_value=fake_state, + ): + await knx.setup_integration( + { + SwitchSchema.PLATFORM_NAME: { + CONF_NAME: "test", + KNX_ADDRESS: _ADDRESS, + CONF_RESPOND_TO_READ: True, + }, + } + ) + + # restored state - doesn't send telegram + state = hass.states.get("switch.test") + assert state.state == STATE_ON + await knx.assert_telegram_count(0) + + # respond to restored state + await knx.receive_read(_ADDRESS) + await knx.assert_response(_ADDRESS, True) + + # turn off switch + await hass.services.async_call( + "switch", "turn_off", {"entity_id": "switch.test"}, blocking=True + ) + await knx.assert_write(_ADDRESS, False) + state = hass.states.get("switch.test") + assert state.state == STATE_OFF + + # respond to new state + await knx.receive_read(_ADDRESS) + await knx.assert_response(_ADDRESS, False)