diff --git a/homeassistant/components/knx/const.py b/homeassistant/components/knx/const.py index 67e009cacfc..6cec901adc7 100644 --- a/homeassistant/components/knx/const.py +++ b/homeassistant/components/knx/const.py @@ -83,8 +83,6 @@ DATA_HASS_CONFIG: Final = "knx_hass_config" ATTR_COUNTER: Final = "counter" ATTR_SOURCE: Final = "source" -# dispatcher signal for KNX interface device triggers -SIGNAL_KNX_TELEGRAM_DICT: Final = "knx_telegram_dict" type AsyncMessageCallbackType = Callable[[Telegram], Awaitable[None]] type MessageCallbackType = Callable[[Telegram], None] diff --git a/homeassistant/components/knx/telegrams.py b/homeassistant/components/knx/telegrams.py index 7c3ea28c4df..6945bb50746 100644 --- a/homeassistant/components/knx/telegrams.py +++ b/homeassistant/components/knx/telegrams.py @@ -7,6 +7,7 @@ from collections.abc import Callable from typing import Final, TypedDict from xknx import XKNX +from xknx.dpt import DPTArray, DPTBase, DPTBinary from xknx.exceptions import XKNXException from xknx.telegram import Telegram from xknx.telegram.apci import GroupValueResponse, GroupValueWrite @@ -15,31 +16,40 @@ from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.storage import Store import homeassistant.util.dt as dt_util +from homeassistant.util.signal_type import SignalType -from .const import DOMAIN, SIGNAL_KNX_TELEGRAM_DICT +from .const import DOMAIN from .project import KNXProject STORAGE_VERSION: Final = 1 STORAGE_KEY: Final = f"{DOMAIN}/telegrams_history.json" +# dispatcher signal for KNX interface device triggers +SIGNAL_KNX_TELEGRAM: SignalType[Telegram, TelegramDict] = SignalType("knx_telegram") -class TelegramDict(TypedDict): + +class DecodedTelegramPayload(TypedDict): + """Decoded payload value and metadata.""" + + dpt_main: int | None + dpt_sub: int | None + dpt_name: str | None + unit: str | None + value: str | int | float | bool | None + + +class TelegramDict(DecodedTelegramPayload): """Represent a Telegram as a dict.""" # this has to be in sync with the frontend implementation destination: str destination_name: str direction: str - dpt_main: int | None - dpt_sub: int | None - dpt_name: str | None payload: int | tuple[int, ...] | None source: str source_name: str telegramtype: str timestamp: str # ISO format - unit: str | None - value: str | int | float | bool | None class Telegrams: @@ -89,7 +99,7 @@ class Telegrams: """Handle incoming and outgoing telegrams from xknx.""" telegram_dict = self.telegram_to_dict(telegram) self.recent_telegrams.append(telegram_dict) - async_dispatcher_send(self.hass, SIGNAL_KNX_TELEGRAM_DICT, telegram_dict) + async_dispatcher_send(self.hass, SIGNAL_KNX_TELEGRAM, telegram, telegram_dict) for job in self._jobs: self.hass.async_run_hass_job(job, telegram_dict) @@ -112,14 +122,10 @@ class Telegrams: def telegram_to_dict(self, telegram: Telegram) -> TelegramDict: """Convert a Telegram to a dict.""" dst_name = "" - dpt_main = None - dpt_sub = None - dpt_name = None payload_data: int | tuple[int, ...] | None = None src_name = "" transcoder = None - unit = None - value: str | int | float | bool | None = None + decoded_payload: DecodedTelegramPayload | None = None if ( ga_info := self.project.group_addresses.get( @@ -137,27 +143,44 @@ class Telegrams: if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)): payload_data = telegram.payload.value.value if transcoder is not None: - try: - value = transcoder.from_knx(telegram.payload.value) - dpt_main = transcoder.dpt_main_number - dpt_sub = transcoder.dpt_sub_number - dpt_name = transcoder.value_type - unit = transcoder.unit - except XKNXException: - value = "Error decoding value" + decoded_payload = decode_telegram_payload( + payload=telegram.payload.value, transcoder=transcoder + ) return TelegramDict( destination=f"{telegram.destination_address}", destination_name=dst_name, direction=telegram.direction.value, - dpt_main=dpt_main, - dpt_sub=dpt_sub, - dpt_name=dpt_name, + dpt_main=decoded_payload["dpt_main"] + if decoded_payload is not None + else None, + dpt_sub=decoded_payload["dpt_sub"] if decoded_payload is not None else None, + dpt_name=decoded_payload["dpt_name"] + if decoded_payload is not None + else None, payload=payload_data, source=f"{telegram.source_address}", source_name=src_name, telegramtype=telegram.payload.__class__.__name__, timestamp=dt_util.now().isoformat(), - unit=unit, - value=value, + unit=decoded_payload["unit"] if decoded_payload is not None else None, + value=decoded_payload["value"] if decoded_payload is not None else None, ) + + +def decode_telegram_payload( + payload: DPTArray | DPTBinary, transcoder: type[DPTBase] +) -> DecodedTelegramPayload: + """Decode the payload of a KNX telegram.""" + try: + value = transcoder.from_knx(payload) + except XKNXException: + value = "Error decoding value" + + return DecodedTelegramPayload( + dpt_main=transcoder.dpt_main_number, + dpt_sub=transcoder.dpt_sub_number, + dpt_name=transcoder.value_type, + unit=transcoder.unit, + value=value, + ) diff --git a/homeassistant/components/knx/trigger.py b/homeassistant/components/knx/trigger.py index 16907fa9748..fff844f35b0 100644 --- a/homeassistant/components/knx/trigger.py +++ b/homeassistant/components/knx/trigger.py @@ -3,18 +3,22 @@ from typing import Final import voluptuous as vol +from xknx.dpt import DPTBase +from xknx.telegram import Telegram, TelegramDirection from xknx.telegram.address import DeviceGroupAddress, parse_device_group_address +from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite -from homeassistant.const import CONF_PLATFORM +from homeassistant.const import CONF_PLATFORM, CONF_TYPE from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback from homeassistant.helpers import config_validation as cv from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo from homeassistant.helpers.typing import ConfigType -from .const import DOMAIN, SIGNAL_KNX_TELEGRAM_DICT +from .const import DOMAIN from .schema import ga_validator -from .telegrams import TelegramDict +from .telegrams import SIGNAL_KNX_TELEGRAM, TelegramDict, decode_telegram_payload +from .validation import sensor_type_validator TRIGGER_TELEGRAM: Final = "telegram" @@ -41,10 +45,11 @@ TELEGRAM_TRIGGER_SCHEMA: Final = { ), **TELEGRAM_TRIGGER_OPTIONS, } - +# TRIGGER_SCHEMA is exclusive to triggers, the above are used in device triggers too TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend( { vol.Required(CONF_PLATFORM): PLATFORM_TYPE_TRIGGER_TELEGRAM, + vol.Optional(CONF_TYPE, default=None): vol.Any(sensor_type_validator, None), **TELEGRAM_TRIGGER_SCHEMA, } ) @@ -61,41 +66,55 @@ async def async_attach_trigger( dst_addresses: list[DeviceGroupAddress] = [ parse_device_group_address(address) for address in _addresses ] + _transcoder = config.get(CONF_TYPE) + trigger_transcoder = DPTBase.parse_transcoder(_transcoder) if _transcoder else None + job = HassJob(action, f"KNX trigger {trigger_info}") trigger_data = trigger_info["trigger_data"] @callback - def async_call_trigger_action(telegram: TelegramDict) -> None: + def async_call_trigger_action( + telegram: Telegram, telegram_dict: TelegramDict + ) -> None: """Filter Telegram and call trigger action.""" - if telegram["telegramtype"] == "GroupValueWrite": + payload_apci = type(telegram.payload) + if payload_apci is GroupValueWrite: if config[CONF_KNX_GROUP_VALUE_WRITE] is False: return - elif telegram["telegramtype"] == "GroupValueResponse": + elif payload_apci is GroupValueResponse: if config[CONF_KNX_GROUP_VALUE_RESPONSE] is False: return - elif telegram["telegramtype"] == "GroupValueRead": + elif payload_apci is GroupValueRead: if config[CONF_KNX_GROUP_VALUE_READ] is False: return - if telegram["direction"] == "Incoming": + if telegram.direction is TelegramDirection.INCOMING: if config[CONF_KNX_INCOMING] is False: return elif config[CONF_KNX_OUTGOING] is False: return - if ( - dst_addresses - and parse_device_group_address(telegram["destination"]) not in dst_addresses - ): + if dst_addresses and telegram.destination_address not in dst_addresses: return - hass.async_run_hass_job( - job, - {"trigger": {**trigger_data, **telegram}}, - ) + if ( + trigger_transcoder is not None + and payload_apci in (GroupValueWrite, GroupValueResponse) + and trigger_transcoder.value_type != telegram_dict["dpt_name"] + ): + decoded_payload = decode_telegram_payload( + payload=telegram.payload.value, # type: ignore[union-attr] # checked via payload_apci + transcoder=trigger_transcoder, # type: ignore[type-abstract] # parse_transcoder don't return abstract classes + ) + # overwrite decoded payload values in telegram_dict + telegram_trigger_data = {**trigger_data, **telegram_dict, **decoded_payload} + else: + telegram_trigger_data = {**trigger_data, **telegram_dict} + + hass.async_run_hass_job(job, {"trigger": telegram_trigger_data}) return async_dispatcher_connect( hass, - signal=SIGNAL_KNX_TELEGRAM_DICT, + signal=SIGNAL_KNX_TELEGRAM, target=async_call_trigger_action, ) diff --git a/tests/components/knx/test_trigger.py b/tests/components/knx/test_trigger.py index 3eab7d58a00..d957082de18 100644 --- a/tests/components/knx/test_trigger.py +++ b/tests/components/knx/test_trigger.py @@ -25,7 +25,7 @@ async def test_telegram_trigger( calls: list[ServiceCall], knx: KNXTestKit, ) -> None: - """Test telegram telegram triggers firing.""" + """Test telegram triggers firing.""" await knx.setup_integration({}) # "id" field added to action to test if `trigger_data` passed correctly in `async_attach_trigger` @@ -95,6 +95,64 @@ async def test_telegram_trigger( assert test_call.data["id"] == 0 +@pytest.mark.parametrize( + ("payload", "type_option", "expected_value", "expected_unit"), + [ + ((0x4C,), {"type": "percent"}, 30, "%"), + ((0x03,), {}, None, None), # "dpt" omitted defaults to None + ((0x0C, 0x1A), {"type": "temperature"}, 21.00, "°C"), + ], +) +async def test_telegram_trigger_dpt_option( + hass: HomeAssistant, + calls: list[ServiceCall], + knx: KNXTestKit, + payload: tuple[int, ...], + type_option: dict[str, bool], + expected_value: int | None, + expected_unit: str | None, +) -> None: + """Test telegram trigger type option.""" + await knx.setup_integration({}) + assert await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: [ + # "catch_all" trigger + { + "trigger": { + "platform": "knx.telegram", + **type_option, + }, + "action": { + "service": "test.automation", + "data_template": { + "catch_all": ("telegram - {{ trigger.destination }}"), + "trigger": (" {{ trigger }}"), + }, + }, + }, + ] + }, + ) + await knx.receive_write("0/0/1", payload) + + assert len(calls) == 1 + test_call = calls.pop() + assert test_call.data["catch_all"] == "telegram - 0/0/1" + assert test_call.data["trigger"]["value"] == expected_value + assert test_call.data["trigger"]["unit"] == expected_unit + + await knx.receive_read("0/0/1") + + assert len(calls) == 1 + test_call = calls.pop() + assert test_call.data["catch_all"] == "telegram - 0/0/1" + assert test_call.data["trigger"]["value"] is None + assert test_call.data["trigger"]["unit"] is None + + @pytest.mark.parametrize( "group_value_options", [ @@ -139,7 +197,7 @@ async def test_telegram_trigger_options( group_value_options: dict[str, bool], direction_options: dict[str, bool], ) -> None: - """Test telegram telegram trigger options.""" + """Test telegram trigger options.""" await knx.setup_integration({}) assert await async_setup_component( hass, @@ -157,7 +215,6 @@ async def test_telegram_trigger_options( "service": "test.automation", "data_template": { "catch_all": ("telegram - {{ trigger.destination }}"), - "id": (" {{ trigger.id }}"), }, }, }, @@ -275,7 +332,6 @@ async def test_invalid_trigger( "service": "test.automation", "data_template": { "catch_all": ("telegram - {{ trigger.destination }}"), - "id": (" {{ trigger.id }}"), }, }, },