mirror of
https://github.com/home-assistant/core.git
synced 2025-04-24 01:08:12 +00:00
Test KNX switch (#53289)
This commit is contained in:
parent
86752516ee
commit
9f14b2cef5
71
tests/components/knx/README.md
Normal file
71
tests/components/knx/README.md
Normal file
@ -0,0 +1,71 @@
|
||||
# Testing the KNX integration
|
||||
|
||||
A KNXTestKit instance can be requested from a fixture. It provides convenience methods
|
||||
to test outgoing KNX telegrams and inject incoming telegrams.
|
||||
To test something add a test function requesting the `hass` and `knx` fixture and
|
||||
set up the KNX integration by passing a KNX config dict to `knx.setup_integration`.
|
||||
|
||||
```python
|
||||
async def test_something(hass, knx):
|
||||
await knx.setup_integration({
|
||||
"switch": {
|
||||
"name": "test_switch",
|
||||
"address": "1/2/3",
|
||||
}
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
## Asserting outgoing telegrams
|
||||
|
||||
All outgoing telegrams are pushed to an assertion queue. Assert them in order they were sent.
|
||||
|
||||
- `knx.assert_no_telegram`
|
||||
Asserts that no telegram was sent (assertion queue is empty).
|
||||
- `knx.assert_telegram_count(count: int)`
|
||||
Asserts that `count` telegrams were sent.
|
||||
- `knx.assert_read(group_address: str)`
|
||||
Asserts that a GroupValueRead telegram was sent to `group_address`.
|
||||
The telegram will be removed from the assertion queue.
|
||||
- `knx.assert_response(group_address: str, payload: int | tuple[int, ...])`
|
||||
Asserts that a GroupValueResponse telegram with `payload` was sent to `group_address`.
|
||||
The telegram will be removed from the assertion queue.
|
||||
- `knx.assert_write(group_address: str, payload: int | tuple[int, ...])`
|
||||
Asserts that a GroupValueWrite telegram with `payload` was sent to `group_address`.
|
||||
The telegram will be removed from the assertion queue.
|
||||
|
||||
Change some states or call some services and assert outgoing telegrams.
|
||||
|
||||
```python
|
||||
# turn on switch
|
||||
await hass.services.async_call(
|
||||
"switch", "turn_on", {"entity_id": "switch.test_switch"}, blocking=True
|
||||
)
|
||||
# assert ON telegram
|
||||
await knx.assert_write("1/2/3", True)
|
||||
```
|
||||
|
||||
## Injecting incoming telegrams
|
||||
|
||||
- `knx.receive_read(group_address: str)`
|
||||
Inject and process a GroupValueRead telegram addressed to `group_address`.
|
||||
- `knx.receive_response(group_address: str, payload: int | tuple[int, ...])`
|
||||
Inject and process a GroupValueResponse telegram addressed to `group_address` containing `payload`.
|
||||
- `knx.receive_write(group_address: str, payload: int | tuple[int, ...])`
|
||||
Inject and process a GroupValueWrite telegram addressed to `group_address` containing `payload`.
|
||||
|
||||
Receive some telegrams and assert state.
|
||||
|
||||
```python
|
||||
# receive OFF telegram
|
||||
await knx.receive_write("1/2/3", False)
|
||||
# assert OFF state
|
||||
state = hass.states.get("switch.test_switch")
|
||||
assert state.state is STATE_OFF
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
- For `payload` in `assert_*` and `receive_*` use `int` for DPT 1, 2 and 3 payload values (DPTBinary) and `tuple` for other DPTs (DPTArray).
|
||||
- `await self.hass.async_block_till_done()` is called before `KNXTestKit.assert_*` and after `KNXTestKit.receive_*` so you don't have to explicitly call it.
|
||||
- Make sure to assert every outgoing telegram that was created in a test. `assert_no_telegram` is automatically called on teardown.
|
@ -41,6 +41,8 @@ class KNXTestKit:
|
||||
def fish_xknx(*args, **kwargs):
|
||||
"""Get the XKNX object from the constructor call."""
|
||||
self.xknx = args[0]
|
||||
# disable rate limiter for tests (before StateUpdater starts)
|
||||
self.xknx.rate_limit = 0
|
||||
return DEFAULT
|
||||
|
||||
with patch(
|
||||
@ -50,8 +52,6 @@ class KNXTestKit:
|
||||
):
|
||||
await async_setup_component(self.hass, KNX_DOMAIN, {KNX_DOMAIN: config})
|
||||
await self.hass.async_block_till_done()
|
||||
# disable rate limiter for tests
|
||||
self.xknx.rate_limit = 0
|
||||
|
||||
########################
|
||||
# Telegram counter tests
|
||||
@ -101,14 +101,14 @@ class KNXTestKit:
|
||||
f" {group_address} - {payload}"
|
||||
)
|
||||
|
||||
assert (
|
||||
str(telegram.destination_address) == group_address
|
||||
), f"Group address mismatch in {telegram} - Expected: {group_address}"
|
||||
|
||||
assert isinstance(
|
||||
telegram.payload, apci_type
|
||||
), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}"
|
||||
|
||||
assert (
|
||||
str(telegram.destination_address) == group_address
|
||||
), f"Group address mismatch in {telegram} - Expected: {group_address}"
|
||||
|
||||
if payload is not None:
|
||||
assert (
|
||||
telegram.payload.value.value == payload # type: ignore
|
||||
@ -134,6 +134,13 @@ class KNXTestKit:
|
||||
# Incoming telegrams
|
||||
####################
|
||||
|
||||
@staticmethod
|
||||
def _payload_value(payload: int | tuple[int, ...]) -> DPTArray | DPTBinary:
|
||||
"""Prepare payload value for GroupValueWrite or GroupValueResponse."""
|
||||
if isinstance(payload, int):
|
||||
return DPTBinary(payload)
|
||||
return DPTArray(payload)
|
||||
|
||||
async def _receive_telegram(self, group_address: str, payload: APCI) -> None:
|
||||
"""Inject incoming KNX telegram."""
|
||||
self.xknx.telegrams.put_nowait(
|
||||
@ -146,13 +153,6 @@ class KNXTestKit:
|
||||
)
|
||||
await self.hass.async_block_till_done()
|
||||
|
||||
@staticmethod
|
||||
def _payload_value(payload: int | tuple[int, ...]) -> DPTArray | DPTBinary:
|
||||
"""Prepare payload value for GroupValueWrite or GroupValueResponse."""
|
||||
if isinstance(payload, int):
|
||||
return DPTBinary(payload)
|
||||
return DPTArray(payload)
|
||||
|
||||
async def receive_read(
|
||||
self,
|
||||
group_address: str,
|
||||
|
150
tests/components/knx/test_switch.py
Normal file
150
tests/components/knx/test_switch.py
Normal file
@ -0,0 +1,150 @@
|
||||
"""Test KNX switch."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant.components.knx.const import (
|
||||
CONF_RESPOND_TO_READ,
|
||||
CONF_STATE_ADDRESS,
|
||||
KNX_ADDRESS,
|
||||
)
|
||||
from homeassistant.components.knx.schema import SwitchSchema
|
||||
from homeassistant.const import CONF_NAME, STATE_OFF, STATE_ON
|
||||
from homeassistant.core import State
|
||||
|
||||
|
||||
async def test_switch_simple(hass, knx):
|
||||
"""Test simple KNX switch."""
|
||||
await knx.setup_integration(
|
||||
{
|
||||
SwitchSchema.PLATFORM_NAME: {
|
||||
CONF_NAME: "test",
|
||||
KNX_ADDRESS: "1/2/3",
|
||||
}
|
||||
}
|
||||
)
|
||||
assert len(hass.states.async_all()) == 1
|
||||
|
||||
# turn on switch
|
||||
await hass.services.async_call(
|
||||
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
|
||||
)
|
||||
await knx.assert_write("1/2/3", True)
|
||||
|
||||
# turn off switch
|
||||
await hass.services.async_call(
|
||||
"switch", "turn_off", {"entity_id": "switch.test"}, blocking=True
|
||||
)
|
||||
await knx.assert_write("1/2/3", False)
|
||||
|
||||
# receive ON telegram
|
||||
await knx.receive_write("1/2/3", True)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state is STATE_ON
|
||||
|
||||
# receive OFF telegram
|
||||
await knx.receive_write("1/2/3", False)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state is STATE_OFF
|
||||
|
||||
# switch does not respond to read by default
|
||||
await knx.receive_read("1/2/3")
|
||||
await knx.assert_telegram_count(0)
|
||||
|
||||
|
||||
async def test_switch_state(hass, knx):
|
||||
"""Test KNX switch with state_address."""
|
||||
_ADDRESS = "1/1/1"
|
||||
_STATE_ADDRESS = "2/2/2"
|
||||
|
||||
await knx.setup_integration(
|
||||
{
|
||||
SwitchSchema.PLATFORM_NAME: {
|
||||
CONF_NAME: "test",
|
||||
KNX_ADDRESS: _ADDRESS,
|
||||
CONF_STATE_ADDRESS: _STATE_ADDRESS,
|
||||
},
|
||||
}
|
||||
)
|
||||
assert len(hass.states.async_all()) == 1
|
||||
|
||||
# StateUpdater initialize state
|
||||
await knx.assert_read(_STATE_ADDRESS)
|
||||
await knx.receive_response(_STATE_ADDRESS, True)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state is STATE_ON
|
||||
|
||||
# receive OFF telegram at `address`
|
||||
await knx.receive_write(_ADDRESS, False)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state is STATE_OFF
|
||||
|
||||
# receive ON telegram at `address`
|
||||
await knx.receive_write(_ADDRESS, True)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state is STATE_ON
|
||||
|
||||
# receive OFF telegram at `state_address`
|
||||
await knx.receive_write(_STATE_ADDRESS, False)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state is STATE_OFF
|
||||
|
||||
# receive ON telegram at `state_address`
|
||||
await knx.receive_write(_STATE_ADDRESS, True)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state is STATE_ON
|
||||
|
||||
# turn off switch
|
||||
await hass.services.async_call(
|
||||
"switch", "turn_off", {"entity_id": "switch.test"}, blocking=True
|
||||
)
|
||||
await knx.assert_write(_ADDRESS, False)
|
||||
|
||||
# turn on switch
|
||||
await hass.services.async_call(
|
||||
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
|
||||
)
|
||||
await knx.assert_write(_ADDRESS, True)
|
||||
|
||||
# switch does not respond to read by default
|
||||
await knx.receive_read(_ADDRESS)
|
||||
await knx.assert_telegram_count(0)
|
||||
|
||||
|
||||
async def test_switch_restore_and_respond(hass, knx):
|
||||
"""Test restoring KNX switch state and respond to read."""
|
||||
_ADDRESS = "1/1/1"
|
||||
fake_state = State("switch.test", "on")
|
||||
|
||||
with patch(
|
||||
"homeassistant.helpers.restore_state.RestoreEntity.async_get_last_state",
|
||||
return_value=fake_state,
|
||||
):
|
||||
await knx.setup_integration(
|
||||
{
|
||||
SwitchSchema.PLATFORM_NAME: {
|
||||
CONF_NAME: "test",
|
||||
KNX_ADDRESS: _ADDRESS,
|
||||
CONF_RESPOND_TO_READ: True,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# restored state - doesn't send telegram
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state == STATE_ON
|
||||
await knx.assert_telegram_count(0)
|
||||
|
||||
# respond to restored state
|
||||
await knx.receive_read(_ADDRESS)
|
||||
await knx.assert_response(_ADDRESS, True)
|
||||
|
||||
# turn off switch
|
||||
await hass.services.async_call(
|
||||
"switch", "turn_off", {"entity_id": "switch.test"}, blocking=True
|
||||
)
|
||||
await knx.assert_write(_ADDRESS, False)
|
||||
state = hass.states.get("switch.test")
|
||||
assert state.state == STATE_OFF
|
||||
|
||||
# respond to new state
|
||||
await knx.receive_read(_ADDRESS)
|
||||
await knx.assert_response(_ADDRESS, False)
|
Loading…
x
Reference in New Issue
Block a user