Test KNX events (#53433)

* test knx_event

* use async_capture_events
This commit is contained in:
Matthias Alphart 2021-07-28 19:04:11 +02:00 committed by GitHub
parent 03308b62c1
commit 4e4bb2e5a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 9 deletions

View File

@ -19,6 +19,8 @@ from homeassistant.setup import async_setup_component
class KNXTestKit:
"""Test helper for the KNX integration."""
INDIVIDUAL_ADDRESS = "1.2.3"
def __init__(self, hass: HomeAssistant):
"""Init KNX test helper class."""
self.hass: HomeAssistant = hass
@ -148,7 +150,7 @@ class KNXTestKit:
destination_address=GroupAddress(group_address),
direction=TelegramDirection.INCOMING,
payload=payload,
source_address=IndividualAddress("1.2.3"),
source_address=IndividualAddress(self.INDIVIDUAL_ADDRESS),
)
)
await self.hass.async_block_till_done()

View File

@ -0,0 +1,83 @@
"""Test KNX events."""
from homeassistant.components.knx import CONF_KNX_EVENT_FILTER
from homeassistant.core import HomeAssistant
from .conftest import KNXTestKit
from tests.common import async_capture_events
async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit):
"""Test `knx_event` event."""
test_group_a = "0/4/*"
test_address_a_1 = "0/4/0"
test_address_a_2 = "0/4/100"
test_group_b = "1/3-6/*"
test_address_b_1 = "1/3/0"
test_address_b_2 = "1/6/200"
test_group_c = "2/6/4,5"
test_address_c_1 = "2/6/4"
test_address_c_2 = "2/6/5"
test_address_d = "5/4/3"
events = async_capture_events(hass, "knx_event")
async def test_event_data(address, payload):
await hass.async_block_till_done()
assert len(events) == 1
event = events.pop()
assert event.data["data"] == payload
assert event.data["direction"] == "Incoming"
assert event.data["destination"] == address
if payload is None:
assert event.data["telegramtype"] == "GroupValueRead"
else:
assert event.data["telegramtype"] in (
"GroupValueWrite",
"GroupValueResponse",
)
assert event.data["source"] == KNXTestKit.INDIVIDUAL_ADDRESS
await knx.setup_integration(
{
CONF_KNX_EVENT_FILTER: [
test_group_a,
test_group_b,
test_group_c,
test_address_d,
]
}
)
# no event received
await hass.async_block_till_done()
assert len(events) == 0
# receive telegrams for group addresses matching the filter
await knx.receive_write(test_address_a_1, True)
await test_event_data(test_address_a_1, True)
await knx.receive_response(test_address_a_2, False)
await test_event_data(test_address_a_2, False)
await knx.receive_write(test_address_b_1, (1,))
await test_event_data(test_address_b_1, (1,))
await knx.receive_response(test_address_b_2, (255,))
await test_event_data(test_address_b_2, (255,))
await knx.receive_write(test_address_c_1, (89, 43, 34, 11))
await test_event_data(test_address_c_1, (89, 43, 34, 11))
await knx.receive_response(test_address_c_2, (255, 255, 255, 255))
await test_event_data(test_address_c_2, (255, 255, 255, 255))
await knx.receive_read(test_address_d)
await test_event_data(test_address_d, None)
# receive telegrams for group addresses not matching the filter
await knx.receive_write("0/5/0", True)
await knx.receive_write("1/7/0", True)
await knx.receive_write("2/6/6", True)
await hass.async_block_till_done()
assert len(events) == 0

View File

@ -4,6 +4,8 @@ from homeassistant.core import HomeAssistant
from .conftest import KNXTestKit
from tests.common import async_capture_events
async def test_send(hass: HomeAssistant, knx: KNXTestKit):
"""Test `knx.send` service."""
@ -74,17 +76,14 @@ async def test_read(hass: HomeAssistant, knx: KNXTestKit):
async def test_event_register(hass: HomeAssistant, knx: KNXTestKit):
"""Test `knx.event_register` service."""
events = []
events = async_capture_events(hass, "knx_event")
test_address = "1/2/3"
def listener(event):
events.append(event)
await knx.setup_integration({})
hass.bus.async_listen("knx_event", listener)
# no event registered
await knx.receive_write(test_address, True)
await hass.async_block_till_done()
assert len(events) == 0
# register event
@ -93,10 +92,10 @@ async def test_event_register(hass: HomeAssistant, knx: KNXTestKit):
)
await knx.receive_write(test_address, True)
await knx.receive_write(test_address, False)
await hass.async_block_till_done()
assert len(events) == 2
# remove event registration
events = []
# remove event registration - no event added
await hass.services.async_call(
"knx",
"event_register",
@ -104,7 +103,8 @@ async def test_event_register(hass: HomeAssistant, knx: KNXTestKit):
blocking=True,
)
await knx.receive_write(test_address, True)
assert len(events) == 0
await hass.async_block_till_done()
assert len(events) == 2
async def test_exposure_register(hass: HomeAssistant, knx: KNXTestKit):