Add data point type option to knx.telegram trigger (#117463)

* Add data point type (dpt) option to `knx.telegram` trigger

* Rename from `dpt` to `type` to match services

* Add test for GroupValueRead telegrams

* Fix device trigger schema inheritance

* Typesafe dispatcher signal

* readability

* Avoid re-decoding with same transcoder
This commit is contained in:
Matthias Alphart 2024-05-22 00:03:54 +02:00 committed by GitHub
parent b94735a445
commit 622d1e4c50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 146 additions and 50 deletions

View File

@ -83,8 +83,6 @@ DATA_HASS_CONFIG: Final = "knx_hass_config"
ATTR_COUNTER: Final = "counter"
ATTR_SOURCE: Final = "source"
# dispatcher signal for KNX interface device triggers
SIGNAL_KNX_TELEGRAM_DICT: Final = "knx_telegram_dict"
type AsyncMessageCallbackType = Callable[[Telegram], Awaitable[None]]
type MessageCallbackType = Callable[[Telegram], None]

View File

@ -7,6 +7,7 @@ from collections.abc import Callable
from typing import Final, TypedDict
from xknx import XKNX
from xknx.dpt import DPTArray, DPTBase, DPTBinary
from xknx.exceptions import XKNXException
from xknx.telegram import Telegram
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
@ -15,31 +16,40 @@ from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.storage import Store
import homeassistant.util.dt as dt_util
from homeassistant.util.signal_type import SignalType
from .const import DOMAIN, SIGNAL_KNX_TELEGRAM_DICT
from .const import DOMAIN
from .project import KNXProject
STORAGE_VERSION: Final = 1
STORAGE_KEY: Final = f"{DOMAIN}/telegrams_history.json"
# dispatcher signal for KNX interface device triggers
SIGNAL_KNX_TELEGRAM: SignalType[Telegram, TelegramDict] = SignalType("knx_telegram")
class TelegramDict(TypedDict):
class DecodedTelegramPayload(TypedDict):
"""Decoded payload value and metadata."""
dpt_main: int | None
dpt_sub: int | None
dpt_name: str | None
unit: str | None
value: str | int | float | bool | None
class TelegramDict(DecodedTelegramPayload):
"""Represent a Telegram as a dict."""
# this has to be in sync with the frontend implementation
destination: str
destination_name: str
direction: str
dpt_main: int | None
dpt_sub: int | None
dpt_name: str | None
payload: int | tuple[int, ...] | None
source: str
source_name: str
telegramtype: str
timestamp: str # ISO format
unit: str | None
value: str | int | float | bool | None
class Telegrams:
@ -89,7 +99,7 @@ class Telegrams:
"""Handle incoming and outgoing telegrams from xknx."""
telegram_dict = self.telegram_to_dict(telegram)
self.recent_telegrams.append(telegram_dict)
async_dispatcher_send(self.hass, SIGNAL_KNX_TELEGRAM_DICT, telegram_dict)
async_dispatcher_send(self.hass, SIGNAL_KNX_TELEGRAM, telegram, telegram_dict)
for job in self._jobs:
self.hass.async_run_hass_job(job, telegram_dict)
@ -112,14 +122,10 @@ class Telegrams:
def telegram_to_dict(self, telegram: Telegram) -> TelegramDict:
"""Convert a Telegram to a dict."""
dst_name = ""
dpt_main = None
dpt_sub = None
dpt_name = None
payload_data: int | tuple[int, ...] | None = None
src_name = ""
transcoder = None
unit = None
value: str | int | float | bool | None = None
decoded_payload: DecodedTelegramPayload | None = None
if (
ga_info := self.project.group_addresses.get(
@ -137,27 +143,44 @@ class Telegrams:
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
payload_data = telegram.payload.value.value
if transcoder is not None:
try:
value = transcoder.from_knx(telegram.payload.value)
dpt_main = transcoder.dpt_main_number
dpt_sub = transcoder.dpt_sub_number
dpt_name = transcoder.value_type
unit = transcoder.unit
except XKNXException:
value = "Error decoding value"
decoded_payload = decode_telegram_payload(
payload=telegram.payload.value, transcoder=transcoder
)
return TelegramDict(
destination=f"{telegram.destination_address}",
destination_name=dst_name,
direction=telegram.direction.value,
dpt_main=dpt_main,
dpt_sub=dpt_sub,
dpt_name=dpt_name,
dpt_main=decoded_payload["dpt_main"]
if decoded_payload is not None
else None,
dpt_sub=decoded_payload["dpt_sub"] if decoded_payload is not None else None,
dpt_name=decoded_payload["dpt_name"]
if decoded_payload is not None
else None,
payload=payload_data,
source=f"{telegram.source_address}",
source_name=src_name,
telegramtype=telegram.payload.__class__.__name__,
timestamp=dt_util.now().isoformat(),
unit=unit,
value=value,
unit=decoded_payload["unit"] if decoded_payload is not None else None,
value=decoded_payload["value"] if decoded_payload is not None else None,
)
def decode_telegram_payload(
payload: DPTArray | DPTBinary, transcoder: type[DPTBase]
) -> DecodedTelegramPayload:
"""Decode the payload of a KNX telegram."""
try:
value = transcoder.from_knx(payload)
except XKNXException:
value = "Error decoding value"
return DecodedTelegramPayload(
dpt_main=transcoder.dpt_main_number,
dpt_sub=transcoder.dpt_sub_number,
dpt_name=transcoder.value_type,
unit=transcoder.unit,
value=value,
)

View File

@ -3,18 +3,22 @@
from typing import Final
import voluptuous as vol
from xknx.dpt import DPTBase
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.address import DeviceGroupAddress, parse_device_group_address
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from homeassistant.const import CONF_PLATFORM
from homeassistant.const import CONF_PLATFORM, CONF_TYPE
from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN, SIGNAL_KNX_TELEGRAM_DICT
from .const import DOMAIN
from .schema import ga_validator
from .telegrams import TelegramDict
from .telegrams import SIGNAL_KNX_TELEGRAM, TelegramDict, decode_telegram_payload
from .validation import sensor_type_validator
TRIGGER_TELEGRAM: Final = "telegram"
@ -41,10 +45,11 @@ TELEGRAM_TRIGGER_SCHEMA: Final = {
),
**TELEGRAM_TRIGGER_OPTIONS,
}
# TRIGGER_SCHEMA is exclusive to triggers, the above are used in device triggers too
TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
{
vol.Required(CONF_PLATFORM): PLATFORM_TYPE_TRIGGER_TELEGRAM,
vol.Optional(CONF_TYPE, default=None): vol.Any(sensor_type_validator, None),
**TELEGRAM_TRIGGER_SCHEMA,
}
)
@ -61,41 +66,55 @@ async def async_attach_trigger(
dst_addresses: list[DeviceGroupAddress] = [
parse_device_group_address(address) for address in _addresses
]
_transcoder = config.get(CONF_TYPE)
trigger_transcoder = DPTBase.parse_transcoder(_transcoder) if _transcoder else None
job = HassJob(action, f"KNX trigger {trigger_info}")
trigger_data = trigger_info["trigger_data"]
@callback
def async_call_trigger_action(telegram: TelegramDict) -> None:
def async_call_trigger_action(
telegram: Telegram, telegram_dict: TelegramDict
) -> None:
"""Filter Telegram and call trigger action."""
if telegram["telegramtype"] == "GroupValueWrite":
payload_apci = type(telegram.payload)
if payload_apci is GroupValueWrite:
if config[CONF_KNX_GROUP_VALUE_WRITE] is False:
return
elif telegram["telegramtype"] == "GroupValueResponse":
elif payload_apci is GroupValueResponse:
if config[CONF_KNX_GROUP_VALUE_RESPONSE] is False:
return
elif telegram["telegramtype"] == "GroupValueRead":
elif payload_apci is GroupValueRead:
if config[CONF_KNX_GROUP_VALUE_READ] is False:
return
if telegram["direction"] == "Incoming":
if telegram.direction is TelegramDirection.INCOMING:
if config[CONF_KNX_INCOMING] is False:
return
elif config[CONF_KNX_OUTGOING] is False:
return
if (
dst_addresses
and parse_device_group_address(telegram["destination"]) not in dst_addresses
):
if dst_addresses and telegram.destination_address not in dst_addresses:
return
hass.async_run_hass_job(
job,
{"trigger": {**trigger_data, **telegram}},
)
if (
trigger_transcoder is not None
and payload_apci in (GroupValueWrite, GroupValueResponse)
and trigger_transcoder.value_type != telegram_dict["dpt_name"]
):
decoded_payload = decode_telegram_payload(
payload=telegram.payload.value, # type: ignore[union-attr] # checked via payload_apci
transcoder=trigger_transcoder, # type: ignore[type-abstract] # parse_transcoder don't return abstract classes
)
# overwrite decoded payload values in telegram_dict
telegram_trigger_data = {**trigger_data, **telegram_dict, **decoded_payload}
else:
telegram_trigger_data = {**trigger_data, **telegram_dict}
hass.async_run_hass_job(job, {"trigger": telegram_trigger_data})
return async_dispatcher_connect(
hass,
signal=SIGNAL_KNX_TELEGRAM_DICT,
signal=SIGNAL_KNX_TELEGRAM,
target=async_call_trigger_action,
)

View File

@ -25,7 +25,7 @@ async def test_telegram_trigger(
calls: list[ServiceCall],
knx: KNXTestKit,
) -> None:
"""Test telegram telegram triggers firing."""
"""Test telegram triggers firing."""
await knx.setup_integration({})
# "id" field added to action to test if `trigger_data` passed correctly in `async_attach_trigger`
@ -95,6 +95,64 @@ async def test_telegram_trigger(
assert test_call.data["id"] == 0
@pytest.mark.parametrize(
("payload", "type_option", "expected_value", "expected_unit"),
[
((0x4C,), {"type": "percent"}, 30, "%"),
((0x03,), {}, None, None), # "dpt" omitted defaults to None
((0x0C, 0x1A), {"type": "temperature"}, 21.00, "°C"),
],
)
async def test_telegram_trigger_dpt_option(
hass: HomeAssistant,
calls: list[ServiceCall],
knx: KNXTestKit,
payload: tuple[int, ...],
type_option: dict[str, bool],
expected_value: int | None,
expected_unit: str | None,
) -> None:
"""Test telegram trigger type option."""
await knx.setup_integration({})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
# "catch_all" trigger
{
"trigger": {
"platform": "knx.telegram",
**type_option,
},
"action": {
"service": "test.automation",
"data_template": {
"catch_all": ("telegram - {{ trigger.destination }}"),
"trigger": (" {{ trigger }}"),
},
},
},
]
},
)
await knx.receive_write("0/0/1", payload)
assert len(calls) == 1
test_call = calls.pop()
assert test_call.data["catch_all"] == "telegram - 0/0/1"
assert test_call.data["trigger"]["value"] == expected_value
assert test_call.data["trigger"]["unit"] == expected_unit
await knx.receive_read("0/0/1")
assert len(calls) == 1
test_call = calls.pop()
assert test_call.data["catch_all"] == "telegram - 0/0/1"
assert test_call.data["trigger"]["value"] is None
assert test_call.data["trigger"]["unit"] is None
@pytest.mark.parametrize(
"group_value_options",
[
@ -139,7 +197,7 @@ async def test_telegram_trigger_options(
group_value_options: dict[str, bool],
direction_options: dict[str, bool],
) -> None:
"""Test telegram telegram trigger options."""
"""Test telegram trigger options."""
await knx.setup_integration({})
assert await async_setup_component(
hass,
@ -157,7 +215,6 @@ async def test_telegram_trigger_options(
"service": "test.automation",
"data_template": {
"catch_all": ("telegram - {{ trigger.destination }}"),
"id": (" {{ trigger.id }}"),
},
},
},
@ -275,7 +332,6 @@ async def test_invalid_trigger(
"service": "test.automation",
"data_template": {
"catch_all": ("telegram - {{ trigger.destination }}"),
"id": (" {{ trigger.id }}"),
},
},
},