From fc7d4ed118d9a0f524ae549ea1947c1a14ce6d79 Mon Sep 17 00:00:00 2001 From: Matthias Alphart Date: Sat, 6 Nov 2021 19:31:25 +0100 Subject: [PATCH] Add decoded telegram payload to knx_event service (#57621) * decode knx_event telegram payload with given dpt * exception handling for invalid payloads * Update homeassistant/components/knx/__init__.py Co-authored-by: Marvin Wichmann Co-authored-by: Marvin Wichmann --- homeassistant/components/knx/__init__.py | 90 ++++++++++++++++++---- homeassistant/components/knx/schema.py | 23 ++++++ homeassistant/components/knx/services.yaml | 7 ++ tests/components/knx/test_events.py | 70 +++++++++++------ tests/components/knx/test_services.py | 30 ++++++-- 5 files changed, 180 insertions(+), 40 deletions(-) diff --git a/homeassistant/components/knx/__init__.py b/homeassistant/components/knx/__init__.py index 57c88b84cc7..b61a825e97f 100644 --- a/homeassistant/components/knx/__init__.py +++ b/homeassistant/components/knx/__init__.py @@ -10,15 +10,22 @@ from xknx import XKNX from xknx.core import XknxConnectionState from xknx.core.telegram_queue import TelegramQueue from xknx.dpt import DPTArray, DPTBase, DPTBinary -from xknx.exceptions import XKNXException +from xknx.exceptions import ConversionError, XKNXException from xknx.io import ConnectionConfig, ConnectionType from xknx.telegram import AddressFilter, Telegram -from xknx.telegram.address import parse_device_group_address +from xknx.telegram.address import ( + DeviceGroupAddress, + GroupAddress, + InternalGroupAddress, + parse_device_group_address, +) from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite from homeassistant.const import ( + CONF_EVENT, CONF_HOST, CONF_PORT, + CONF_TYPE, EVENT_HOMEASSISTANT_STOP, SERVICE_RELOAD, ) @@ -46,6 +53,7 @@ from .schema import ( ClimateSchema, ConnectionSchema, CoverSchema, + EventSchema, ExposeSchema, FanSchema, LightSchema, @@ -77,6 +85,8 @@ SERVICE_KNX_READ: Final = "read" CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.All( + # deprecated since 2021.12 + cv.deprecated(CONF_KNX_EVENT_FILTER), # deprecated since 2021.4 cv.deprecated("config_file"), # deprecated since 2021.2 @@ -89,6 +99,7 @@ CONFIG_SCHEMA = vol.Schema( vol.Optional(CONF_KNX_EVENT_FILTER, default=[]): vol.All( cv.ensure_list, [cv.string] ), + **EventSchema.SCHEMA, **ExposeSchema.platform_node(), **BinarySensorSchema.platform_node(), **ClimateSchema.platform_node(), @@ -149,6 +160,7 @@ SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema( cv.ensure_list, [ga_validator], ), + vol.Optional(CONF_TYPE): sensor_type_validator, vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean, } ) @@ -268,11 +280,16 @@ class KNXModule: self.service_exposures: dict[str, KNXExposeSensor | KNXExposeTime] = {} self.init_xknx() - self._knx_event_callback: TelegramQueue.Callback = self.register_callback() self.xknx.connection_manager.register_connection_state_changed_cb( self.connection_state_changed_cb ) + self._address_filter_transcoder: dict[AddressFilter, type[DPTBase]] = {} + self._group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {} + self._knx_event_callback: TelegramQueue.Callback = ( + self.register_event_callback() + ) + def init_xknx(self) -> None: """Initialize XKNX object.""" self.xknx = XKNX( @@ -332,38 +349,77 @@ class KNXModule: auto_reconnect=True, ) + async def connection_state_changed_cb(self, state: XknxConnectionState) -> None: + """Call invoked after a KNX connection state change was received.""" + self.connected = state == XknxConnectionState.CONNECTED + if tasks := [device.after_update() for device in self.xknx.devices]: + await asyncio.gather(*tasks) + async def telegram_received_cb(self, telegram: Telegram) -> None: """Call invoked after a KNX telegram was received.""" - data = None # Not all telegrams have serializable data. + data: int | tuple[int, ...] | None = None + value = None if ( isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)) and telegram.payload.value is not None + and isinstance( + telegram.destination_address, (GroupAddress, InternalGroupAddress) + ) ): data = telegram.payload.value.value + if isinstance(data, tuple): + if transcoder := ( + self._group_address_transcoder.get(telegram.destination_address) + or next( + ( + _transcoder + for _filter, _transcoder in self._address_filter_transcoder.items() + if _filter.match(telegram.destination_address) + ), + None, + ) + ): + try: + value = transcoder.from_knx(data) + except ConversionError as err: + _LOGGER.warning( + "Error in `knx_event` at decoding type '%s' from telegram %s\n%s", + transcoder.__name__, + telegram, + err, + ) + self.hass.bus.async_fire( "knx_event", { "data": data, "destination": str(telegram.destination_address), "direction": telegram.direction.value, + "value": value, "source": str(telegram.source_address), "telegramtype": telegram.payload.__class__.__name__, }, ) - async def connection_state_changed_cb(self, state: XknxConnectionState) -> None: - """Call invoked after a KNX connection state change was received.""" - self.connected = state == XknxConnectionState.CONNECTED - if tasks := [device.after_update() for device in self.xknx.devices]: - await asyncio.gather(*tasks) - - def register_callback(self) -> TelegramQueue.Callback: - """Register callback within XKNX TelegramQueue.""" + def register_event_callback(self) -> TelegramQueue.Callback: + """Register callback for knx_event within XKNX TelegramQueue.""" + # backwards compatibility for deprecated CONF_KNX_EVENT_FILTER + # use `address_filters = []` when this is not needed anymore address_filters = list( map(AddressFilter, self.config[DOMAIN][CONF_KNX_EVENT_FILTER]) ) + for filter_set in self.config[DOMAIN][CONF_EVENT]: + _filters = list(map(AddressFilter, filter_set[KNX_ADDRESS])) + address_filters.extend(_filters) + if (dpt := filter_set.get(CONF_TYPE)) and ( + transcoder := DPTBase.parse_transcoder(dpt) + ): + self._address_filter_transcoder.update( + {_filter: transcoder for _filter in _filters} # type: ignore[misc] + ) + return self.xknx.telegram_queue.register_telegram_received_cb( self.telegram_received_cb, address_filters=address_filters, @@ -374,7 +430,7 @@ class KNXModule: async def service_event_register_modify(self, call: ServiceCall) -> None: """Service for adding or removing a GroupAddress to the knx_event filter.""" attr_address = call.data[KNX_ADDRESS] - group_addresses = map(parse_device_group_address, attr_address) + group_addresses = list(map(parse_device_group_address, attr_address)) if call.data.get(SERVICE_KNX_ATTR_REMOVE): for group_address in group_addresses: @@ -385,8 +441,16 @@ class KNXModule: "Service event_register could not remove event for '%s'", str(group_address), ) + if group_address in self._group_address_transcoder: + del self._group_address_transcoder[group_address] return + if (dpt := call.data.get(CONF_TYPE)) and ( + transcoder := DPTBase.parse_transcoder(dpt) + ): + self._group_address_transcoder.update( + {_address: transcoder for _address in group_addresses} # type: ignore[misc] + ) for group_address in group_addresses: if group_address in self._knx_event_callback.group_addresses: continue diff --git a/homeassistant/components/knx/schema.py b/homeassistant/components/knx/schema.py index 0e54a9abbc5..6cd8fc6bc0d 100644 --- a/homeassistant/components/knx/schema.py +++ b/homeassistant/components/knx/schema.py @@ -24,6 +24,7 @@ from homeassistant.const import ( CONF_DEVICE_CLASS, CONF_ENTITY_CATEGORY, CONF_ENTITY_ID, + CONF_EVENT, CONF_HOST, CONF_MODE, CONF_NAME, @@ -204,6 +205,28 @@ class ConnectionSchema: } +######### +# EVENT +######### + + +class EventSchema: + """Voluptuous schema for KNX events.""" + + KNX_EVENT_FILTER_SCHEMA = vol.Schema( + { + vol.Required(KNX_ADDRESS): vol.All(cv.ensure_list, [cv.string]), + vol.Optional(CONF_TYPE): sensor_type_validator, + } + ) + + SCHEMA = { + vol.Optional(CONF_EVENT, default=[]): vol.All( + cv.ensure_list, [KNX_EVENT_FILTER_SCHEMA] + ) + } + + ############# # PLATFORMS ############# diff --git a/homeassistant/components/knx/services.yaml b/homeassistant/components/knx/services.yaml index 1ea5d9b6faa..fca5f4fe07d 100644 --- a/homeassistant/components/knx/services.yaml +++ b/homeassistant/components/knx/services.yaml @@ -45,6 +45,13 @@ event_register: example: "1/1/0" selector: object: + type: + name: "Value type" + description: "If set, the payload will be decoded as given DPT in the event data `value` key. Knx sensor types are valid values (see https://www.home-assistant.io/integrations/sensor.knx)." + required: false + example: "2byte_float" + selector: + text: remove: name: "Remove event registration" description: "If `True` the group address(es) will be removed." diff --git a/tests/components/knx/test_events.py b/tests/components/knx/test_events.py index 6a9e021ff53..360a1963d2d 100644 --- a/tests/components/knx/test_events.py +++ b/tests/components/knx/test_events.py @@ -1,6 +1,11 @@ """Test KNX events.""" -from homeassistant.components.knx import CONF_KNX_EVENT_FILTER +from homeassistant.components.knx import ( + CONF_EVENT, + CONF_KNX_EVENT_FILTER, + CONF_TYPE, + KNX_ADDRESS, +) from homeassistant.core import HomeAssistant from .conftest import KNXTestKit @@ -9,7 +14,7 @@ from tests.common import async_capture_events async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit): - """Test `knx_event` event.""" + """Test the `knx_event` event.""" test_group_a = "0/4/*" test_address_a_1 = "0/4/0" test_address_a_2 = "0/4/100" @@ -20,13 +25,15 @@ async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit): test_address_c_1 = "2/6/4" test_address_c_2 = "2/6/5" test_address_d = "5/4/3" + test_address_e = "6/4/3" events = async_capture_events(hass, "knx_event") - async def test_event_data(address, payload): + async def test_event_data(address, payload, value=None): await hass.async_block_till_done() assert len(events) == 1 event = events.pop() assert event.data["data"] == payload + assert event.data["value"] == value assert event.data["direction"] == "Incoming" assert event.data["destination"] == address if payload is None: @@ -40,12 +47,24 @@ async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit): await knx.setup_integration( { - CONF_KNX_EVENT_FILTER: [ - test_group_a, - test_group_b, - test_group_c, - test_address_d, - ] + CONF_EVENT: [ + { + KNX_ADDRESS: [ + test_group_a, + test_group_b, + ], + CONF_TYPE: "2byte_unsigned", + }, + { + KNX_ADDRESS: test_group_c, + CONF_TYPE: "2byte_float", + }, + { + KNX_ADDRESS: [test_address_d], + }, + ], + # test legacy `event_filter` config + CONF_KNX_EVENT_FILTER: [test_address_e], } ) @@ -54,28 +73,35 @@ async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit): assert len(events) == 0 # receive telegrams for group addresses matching the filter - await knx.receive_write(test_address_a_1, True) - await test_event_data(test_address_a_1, True) + await knx.receive_write(test_address_a_1, (0x03, 0x2F)) + await test_event_data(test_address_a_1, (0x03, 0x2F), value=815) - await knx.receive_response(test_address_a_2, False) - await test_event_data(test_address_a_2, False) + await knx.receive_response(test_address_a_2, (0x12, 0x67)) + await test_event_data(test_address_a_2, (0x12, 0x67), value=4711) - await knx.receive_write(test_address_b_1, (1,)) - await test_event_data(test_address_b_1, (1,)) + await knx.receive_write(test_address_b_1, (0, 0)) + await test_event_data(test_address_b_1, (0, 0), value=0) - await knx.receive_response(test_address_b_2, (255,)) - await test_event_data(test_address_b_2, (255,)) + await knx.receive_response(test_address_b_2, (255, 255)) + await test_event_data(test_address_b_2, (255, 255), value=65535) - await knx.receive_write(test_address_c_1, (89, 43, 34, 11)) - await test_event_data(test_address_c_1, (89, 43, 34, 11)) + await knx.receive_write(test_address_c_1, (0x06, 0xA0)) + await test_event_data(test_address_c_1, (0x06, 0xA0), value=16.96) - await knx.receive_response(test_address_c_2, (255, 255, 255, 255)) - await test_event_data(test_address_c_2, (255, 255, 255, 255)) + await knx.receive_response(test_address_c_2, (0x8A, 0x24)) + await test_event_data(test_address_c_2, (0x8A, 0x24), value=-30.0) await knx.receive_read(test_address_d) await test_event_data(test_address_d, None) - # receive telegrams for group addresses not matching the filter + await knx.receive_write(test_address_d, True) + await test_event_data(test_address_d, True) + + # test legacy `event_filter` config + await knx.receive_write(test_address_e, (89, 43, 34, 11)) + await test_event_data(test_address_e, (89, 43, 34, 11)) + + # receive telegrams for group addresses not matching any filter await knx.receive_write("0/5/0", True) await knx.receive_write("1/7/0", True) await knx.receive_write("2/6/6", True) diff --git a/tests/components/knx/test_services.py b/tests/components/knx/test_services.py index 80ed51e6aec..c61dc542586 100644 --- a/tests/components/knx/test_services.py +++ b/tests/components/knx/test_services.py @@ -86,14 +86,19 @@ async def test_event_register(hass: HomeAssistant, knx: KNXTestKit): await hass.async_block_till_done() assert len(events) == 0 - # register event + # register event with `type` await hass.services.async_call( - "knx", "event_register", {"address": test_address}, blocking=True + "knx", + "event_register", + {"address": test_address, "type": "2byte_unsigned"}, + blocking=True, ) - await knx.receive_write(test_address, True) - await knx.receive_write(test_address, False) + await knx.receive_write(test_address, (0x04, 0xD2)) await hass.async_block_till_done() - assert len(events) == 2 + assert len(events) == 1 + typed_event = events.pop() + assert typed_event.data["data"] == (0x04, 0xD2) + assert typed_event.data["value"] == 1234 # remove event registration - no event added await hass.services.async_call( @@ -104,7 +109,22 @@ async def test_event_register(hass: HomeAssistant, knx: KNXTestKit): ) await knx.receive_write(test_address, True) await hass.async_block_till_done() + assert len(events) == 0 + + # register event without `type` + await hass.services.async_call( + "knx", "event_register", {"address": test_address}, blocking=True + ) + await knx.receive_write(test_address, True) + await knx.receive_write(test_address, False) + await hass.async_block_till_done() assert len(events) == 2 + untyped_event_2 = events.pop() + assert untyped_event_2.data["data"] is False + assert untyped_event_2.data["value"] is None + untyped_event_1 = events.pop() + assert untyped_event_1.data["data"] is True + assert untyped_event_1.data["value"] is None async def test_exposure_register(hass: HomeAssistant, knx: KNXTestKit):