Add KNX interface device trigger for telegrams (#93102)

* telegram device trigger initial

* add Telegrams helper class

to parse and convert Telegram only once instead of once per device trigger

* translation

* label for extra_field

* test device trigger

* test trigger callback removal

* rename extra_field key to same name as used in trigger

* typo
This commit is contained in:
Matthias Alphart 2023-05-15 22:59:29 +02:00 committed by GitHub
parent 72a6d3a748
commit 2f8e8901fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 413 additions and 0 deletions

View File

@ -92,6 +92,7 @@ from .schema import (
ga_validator,
sensor_type_validator,
)
from .telegrams import Telegrams
from .websocket import register_panel
_LOGGER = logging.getLogger(__name__)
@ -383,6 +384,7 @@ class KNXModule:
self.xknx.connection_manager.register_connection_state_changed_cb(
self.connection_state_changed_cb
)
self.telegrams = Telegrams(hass, self.xknx, self.project)
self.interface_device = KNXInterfaceDevice(
hass=hass, entry=entry, xknx=self.xknx
)

View File

@ -0,0 +1,103 @@
"""Provides device triggers for KNX."""
from __future__ import annotations
from typing import Any, Final
import voluptuous as vol
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF_TYPE
from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
from homeassistant.helpers import selector
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.typing import ConfigType
from . import KNXModule
from .const import DOMAIN
from .project import KNXProject
from .schema import ga_list_validator
from .telegrams import TelegramDict
TRIGGER_TELEGRAM: Final = "telegram"
EXTRA_FIELD_DESTINATION: Final = "destination" # no translation support
TRIGGER_SCHEMA = DEVICE_TRIGGER_BASE_SCHEMA.extend(
{
vol.Optional(EXTRA_FIELD_DESTINATION): ga_list_validator,
vol.Required(CONF_TYPE): TRIGGER_TELEGRAM,
}
)
async def async_get_triggers(
hass: HomeAssistant, device_id: str
) -> list[dict[str, Any]]:
"""List device triggers for KNX devices."""
triggers = []
knx: KNXModule = hass.data[DOMAIN]
if knx.interface_device.device.id == device_id:
# Add trigger for KNX telegrams to interface device
triggers.append(
{
# Required fields of TRIGGER_BASE_SCHEMA
CONF_PLATFORM: "device",
CONF_DOMAIN: DOMAIN,
CONF_DEVICE_ID: device_id,
# Required fields of TRIGGER_SCHEMA
CONF_TYPE: TRIGGER_TELEGRAM,
}
)
return triggers
async def async_get_trigger_capabilities(
hass: HomeAssistant, config: ConfigType
) -> dict[str, vol.Schema]:
"""List trigger capabilities."""
project: KNXProject = hass.data[DOMAIN].project
options = [
selector.SelectOptionDict(value=ga.address, label=f"{ga.address} - {ga.name}")
for ga in project.group_addresses.values()
]
return {
"extra_fields": vol.Schema(
{
vol.Optional(EXTRA_FIELD_DESTINATION): selector.SelectSelector(
selector.SelectSelectorConfig(
mode=selector.SelectSelectorMode.DROPDOWN,
multiple=True,
custom_value=True,
options=options,
),
),
}
)
}
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: TriggerActionType,
trigger_info: TriggerInfo,
) -> CALLBACK_TYPE:
"""Attach a trigger."""
dst_addresses: list[str] = config.get(EXTRA_FIELD_DESTINATION, [])
job = HassJob(action, f"KNX device trigger {trigger_info}")
knx: KNXModule = hass.data[DOMAIN]
@callback
def async_call_trigger_action(telegram: TelegramDict) -> None:
"""Filter Telegram and call trigger action."""
if dst_addresses and telegram["destination"] not in dst_addresses:
return
hass.async_run_hass_job(
job,
{"trigger": telegram},
)
return knx.telegrams.async_listen_telegram(
async_call_trigger_action, name="KNX device trigger call"
)

View File

@ -281,5 +281,10 @@
"name": "Telegrams"
}
}
},
"device_automation": {
"trigger_type": {
"telegram": "Telegram sent or received"
}
}
}

View File

@ -0,0 +1,105 @@
"""KNX Telegram handler."""
from __future__ import annotations
from collections.abc import Callable
from typing import TypedDict
from xknx import XKNX
from xknx.exceptions import XKNXException
from xknx.telegram import Telegram
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
from .project import KNXProject
class TelegramDict(TypedDict):
"""Represent a Telegram as a dict."""
destination: str
destination_name: str
direction: str
payload: int | tuple[int, ...] | None
source: str
source_name: str
telegramtype: str
value: str | int | float | bool | None
class Telegrams:
"""Class to handle KNX telegrams."""
def __init__(self, hass: HomeAssistant, xknx: XKNX, project: KNXProject) -> None:
"""Initialize Telegrams class."""
self.hass = hass
self.project = project
self._jobs: list[HassJob[[TelegramDict], None]] = []
self._xknx_telegram_cb_handle = (
xknx.telegram_queue.register_telegram_received_cb(
telegram_received_cb=self._xknx_telegram_cb,
match_for_outgoing=True,
)
)
async def _xknx_telegram_cb(self, telegram: Telegram) -> None:
"""Handle incoming and outgoing telegrams from xknx."""
telegram_dict = self.telegram_to_dict(telegram)
for job in self._jobs:
self.hass.async_run_hass_job(job, telegram_dict)
@callback
def async_listen_telegram(
self,
action: Callable[[TelegramDict], None],
name: str = "KNX telegram listener",
) -> CALLBACK_TYPE:
"""Register callback to listen for telegrams."""
job = HassJob(action, name=name)
self._jobs.append(job)
def remove_listener() -> None:
"""Remove the listener."""
self._jobs.remove(job)
return remove_listener
def telegram_to_dict(self, telegram: Telegram) -> TelegramDict:
"""Convert a Telegram to a dict."""
dst_name = ""
payload_data: int | tuple[int, ...] | None = None
src_name = ""
transcoder = None
value: str | int | float | bool | None = None
if (
ga_info := self.project.group_addresses.get(
f"{telegram.destination_address}"
)
) is not None:
dst_name = ga_info.name
transcoder = ga_info.transcoder
if (
device := self.project.devices.get(f"{telegram.source_address}")
) is not None:
src_name = device["name"]
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
payload_data = telegram.payload.value.value
if transcoder is not None:
try:
value = transcoder.from_knx(telegram.payload.value)
except XKNXException:
value = None
return TelegramDict(
destination=f"{telegram.destination_address}",
destination_name=dst_name,
direction=telegram.direction.value,
payload=payload_data,
source=f"{telegram.source_address}",
source_name=src_name,
telegramtype=telegram.payload.__class__.__name__,
value=value,
)

View File

@ -143,6 +143,7 @@ class KNXTestKit:
"""Assert outgoing telegram. One by one in timely order."""
await self.xknx.telegrams.join()
await self.hass.async_block_till_done()
await self.hass.async_block_till_done()
try:
telegram = self._outgoing_telegrams.get_nowait()
except asyncio.QueueEmpty:

View File

@ -0,0 +1,197 @@
"""Tests for KNX device triggers."""
import pytest
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
from homeassistant.components.knx import DOMAIN, device_trigger
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_OFF
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.setup import async_setup_component
from .conftest import KNXTestKit
from tests.common import async_get_device_automations, async_mock_service
@pytest.fixture
def calls(hass: HomeAssistant) -> list[ServiceCall]:
"""Track calls to a mock service."""
return async_mock_service(hass, "test", "automation")
async def test_get_triggers(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
knx: KNXTestKit,
) -> None:
"""Test we get the expected triggers from knx."""
await knx.setup_integration({})
device_entry = device_registry.async_get_device(
identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")}
)
expected_trigger = {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"type": "telegram",
"metadata": {},
}
triggers = await async_get_device_automations(
hass, DeviceAutomationType.TRIGGER, device_entry.id
)
assert expected_trigger in triggers
async def test_if_fires_on_telegram(
hass: HomeAssistant,
calls: list[ServiceCall],
device_registry: dr.DeviceRegistry,
knx: KNXTestKit,
) -> None:
"""Test for telegram triggers firing."""
await knx.setup_integration({})
device_entry = device_registry.async_get_device(
identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")}
)
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"type": "telegram",
},
"action": {
"service": "test.automation",
"data_template": {
"catch_all": ("telegram - {{ trigger.destination }}")
},
},
},
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"type": "telegram",
"destination": ["1/2/3", "1/2/4"],
},
"action": {
"service": "test.automation",
"data_template": {
"specific": ("telegram - {{ trigger.destination }}")
},
},
},
]
},
)
await knx.receive_write("0/0/1", (0x03, 0x2F))
assert len(calls) == 1
assert calls.pop().data["catch_all"] == "telegram - 0/0/1"
await knx.receive_write("1/2/4", (0x03, 0x2F))
assert len(calls) == 2
assert calls.pop().data["specific"] == "telegram - 1/2/4"
assert calls.pop().data["catch_all"] == "telegram - 1/2/4"
async def test_remove_device_trigger(
hass: HomeAssistant,
calls: list[ServiceCall],
device_registry: dr.DeviceRegistry,
knx: KNXTestKit,
) -> None:
"""Test for removed callback when device trigger not used."""
automation_name = "telegram_trigger_automation"
await knx.setup_integration({})
device_entry = device_registry.async_get_device(
identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")}
)
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"alias": automation_name,
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"type": "telegram",
},
"action": {
"service": "test.automation",
"data_template": {
"catch_all": ("telegram - {{ trigger.destination }}")
},
},
}
]
},
)
assert len(hass.data[DOMAIN].telegrams._jobs) == 1
await knx.receive_write("0/0/1", (0x03, 0x2F))
assert len(calls) == 1
assert calls.pop().data["catch_all"] == "telegram - 0/0/1"
await hass.services.async_call(
automation.DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: f"automation.{automation_name}"},
blocking=True,
)
assert len(hass.data[DOMAIN].telegrams._jobs) == 0
await knx.receive_write("0/0/1", (0x03, 0x2F))
assert len(calls) == 0
async def test_get_trigger_capabilities_node_status(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
knx: KNXTestKit,
) -> None:
"""Test we get the expected capabilities from a node_status trigger."""
await knx.setup_integration({})
device_entry = device_registry.async_get_device(
identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")}
)
capabilities = await device_trigger.async_get_trigger_capabilities(
hass,
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"type": "telegram",
},
)
assert capabilities and "extra_fields" in capabilities
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
"name": "destination",
"optional": True,
"selector": {
"select": {
"custom_value": True,
"mode": "dropdown",
"multiple": True,
"options": [],
},
},
}
]