diff --git a/homeassistant/components/knx/__init__.py b/homeassistant/components/knx/__init__.py index 8fab84c0aea..e177a861af3 100644 --- a/homeassistant/components/knx/__init__.py +++ b/homeassistant/components/knx/__init__.py @@ -92,6 +92,7 @@ from .schema import ( ga_validator, sensor_type_validator, ) +from .telegrams import Telegrams from .websocket import register_panel _LOGGER = logging.getLogger(__name__) @@ -383,6 +384,7 @@ class KNXModule: self.xknx.connection_manager.register_connection_state_changed_cb( self.connection_state_changed_cb ) + self.telegrams = Telegrams(hass, self.xknx, self.project) self.interface_device = KNXInterfaceDevice( hass=hass, entry=entry, xknx=self.xknx ) diff --git a/homeassistant/components/knx/device_trigger.py b/homeassistant/components/knx/device_trigger.py new file mode 100644 index 00000000000..8a074b43b7d --- /dev/null +++ b/homeassistant/components/knx/device_trigger.py @@ -0,0 +1,103 @@ +"""Provides device triggers for KNX.""" +from __future__ import annotations + +from typing import Any, Final + +import voluptuous as vol + +from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA +from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF_TYPE +from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback +from homeassistant.helpers import selector +from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo +from homeassistant.helpers.typing import ConfigType + +from . import KNXModule +from .const import DOMAIN +from .project import KNXProject +from .schema import ga_list_validator +from .telegrams import TelegramDict + +TRIGGER_TELEGRAM: Final = "telegram" +EXTRA_FIELD_DESTINATION: Final = "destination" # no translation support + +TRIGGER_SCHEMA = DEVICE_TRIGGER_BASE_SCHEMA.extend( + { + vol.Optional(EXTRA_FIELD_DESTINATION): ga_list_validator, + vol.Required(CONF_TYPE): TRIGGER_TELEGRAM, + } +) + + +async def async_get_triggers( + hass: HomeAssistant, device_id: str +) -> list[dict[str, Any]]: + """List device triggers for KNX devices.""" + triggers = [] + + knx: KNXModule = hass.data[DOMAIN] + if knx.interface_device.device.id == device_id: + # Add trigger for KNX telegrams to interface device + triggers.append( + { + # Required fields of TRIGGER_BASE_SCHEMA + CONF_PLATFORM: "device", + CONF_DOMAIN: DOMAIN, + CONF_DEVICE_ID: device_id, + # Required fields of TRIGGER_SCHEMA + CONF_TYPE: TRIGGER_TELEGRAM, + } + ) + + return triggers + + +async def async_get_trigger_capabilities( + hass: HomeAssistant, config: ConfigType +) -> dict[str, vol.Schema]: + """List trigger capabilities.""" + project: KNXProject = hass.data[DOMAIN].project + options = [ + selector.SelectOptionDict(value=ga.address, label=f"{ga.address} - {ga.name}") + for ga in project.group_addresses.values() + ] + return { + "extra_fields": vol.Schema( + { + vol.Optional(EXTRA_FIELD_DESTINATION): selector.SelectSelector( + selector.SelectSelectorConfig( + mode=selector.SelectSelectorMode.DROPDOWN, + multiple=True, + custom_value=True, + options=options, + ), + ), + } + ) + } + + +async def async_attach_trigger( + hass: HomeAssistant, + config: ConfigType, + action: TriggerActionType, + trigger_info: TriggerInfo, +) -> CALLBACK_TYPE: + """Attach a trigger.""" + dst_addresses: list[str] = config.get(EXTRA_FIELD_DESTINATION, []) + job = HassJob(action, f"KNX device trigger {trigger_info}") + knx: KNXModule = hass.data[DOMAIN] + + @callback + def async_call_trigger_action(telegram: TelegramDict) -> None: + """Filter Telegram and call trigger action.""" + if dst_addresses and telegram["destination"] not in dst_addresses: + return + hass.async_run_hass_job( + job, + {"trigger": telegram}, + ) + + return knx.telegrams.async_listen_telegram( + async_call_trigger_action, name="KNX device trigger call" + ) diff --git a/homeassistant/components/knx/strings.json b/homeassistant/components/knx/strings.json index 0fce778c521..d4a1eae11c2 100644 --- a/homeassistant/components/knx/strings.json +++ b/homeassistant/components/knx/strings.json @@ -281,5 +281,10 @@ "name": "Telegrams" } } + }, + "device_automation": { + "trigger_type": { + "telegram": "Telegram sent or received" + } } } diff --git a/homeassistant/components/knx/telegrams.py b/homeassistant/components/knx/telegrams.py new file mode 100644 index 00000000000..815a0c00a93 --- /dev/null +++ b/homeassistant/components/knx/telegrams.py @@ -0,0 +1,105 @@ +"""KNX Telegram handler.""" +from __future__ import annotations + +from collections.abc import Callable +from typing import TypedDict + +from xknx import XKNX +from xknx.exceptions import XKNXException +from xknx.telegram import Telegram +from xknx.telegram.apci import GroupValueResponse, GroupValueWrite + +from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback + +from .project import KNXProject + + +class TelegramDict(TypedDict): + """Represent a Telegram as a dict.""" + + destination: str + destination_name: str + direction: str + payload: int | tuple[int, ...] | None + source: str + source_name: str + telegramtype: str + value: str | int | float | bool | None + + +class Telegrams: + """Class to handle KNX telegrams.""" + + def __init__(self, hass: HomeAssistant, xknx: XKNX, project: KNXProject) -> None: + """Initialize Telegrams class.""" + self.hass = hass + self.project = project + self._jobs: list[HassJob[[TelegramDict], None]] = [] + self._xknx_telegram_cb_handle = ( + xknx.telegram_queue.register_telegram_received_cb( + telegram_received_cb=self._xknx_telegram_cb, + match_for_outgoing=True, + ) + ) + + async def _xknx_telegram_cb(self, telegram: Telegram) -> None: + """Handle incoming and outgoing telegrams from xknx.""" + telegram_dict = self.telegram_to_dict(telegram) + for job in self._jobs: + self.hass.async_run_hass_job(job, telegram_dict) + + @callback + def async_listen_telegram( + self, + action: Callable[[TelegramDict], None], + name: str = "KNX telegram listener", + ) -> CALLBACK_TYPE: + """Register callback to listen for telegrams.""" + job = HassJob(action, name=name) + self._jobs.append(job) + + def remove_listener() -> None: + """Remove the listener.""" + self._jobs.remove(job) + + return remove_listener + + def telegram_to_dict(self, telegram: Telegram) -> TelegramDict: + """Convert a Telegram to a dict.""" + dst_name = "" + payload_data: int | tuple[int, ...] | None = None + src_name = "" + transcoder = None + value: str | int | float | bool | None = None + + if ( + ga_info := self.project.group_addresses.get( + f"{telegram.destination_address}" + ) + ) is not None: + dst_name = ga_info.name + transcoder = ga_info.transcoder + + if ( + device := self.project.devices.get(f"{telegram.source_address}") + ) is not None: + src_name = device["name"] + + if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)): + payload_data = telegram.payload.value.value + if transcoder is not None: + try: + value = transcoder.from_knx(telegram.payload.value) + except XKNXException: + value = None + + return TelegramDict( + destination=f"{telegram.destination_address}", + destination_name=dst_name, + direction=telegram.direction.value, + payload=payload_data, + source=f"{telegram.source_address}", + source_name=src_name, + telegramtype=telegram.payload.__class__.__name__, + value=value, + ) diff --git a/tests/components/knx/conftest.py b/tests/components/knx/conftest.py index fcad9a7be89..79a3de91715 100644 --- a/tests/components/knx/conftest.py +++ b/tests/components/knx/conftest.py @@ -143,6 +143,7 @@ class KNXTestKit: """Assert outgoing telegram. One by one in timely order.""" await self.xknx.telegrams.join() await self.hass.async_block_till_done() + await self.hass.async_block_till_done() try: telegram = self._outgoing_telegrams.get_nowait() except asyncio.QueueEmpty: diff --git a/tests/components/knx/test_device_trigger.py b/tests/components/knx/test_device_trigger.py new file mode 100644 index 00000000000..c7063997585 --- /dev/null +++ b/tests/components/knx/test_device_trigger.py @@ -0,0 +1,197 @@ +"""Tests for KNX device triggers.""" +import pytest +import voluptuous_serialize + +from homeassistant.components import automation +from homeassistant.components.device_automation import DeviceAutomationType +from homeassistant.components.knx import DOMAIN, device_trigger +from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_OFF +from homeassistant.core import HomeAssistant, ServiceCall +from homeassistant.helpers import config_validation as cv, device_registry as dr +from homeassistant.setup import async_setup_component + +from .conftest import KNXTestKit + +from tests.common import async_get_device_automations, async_mock_service + + +@pytest.fixture +def calls(hass: HomeAssistant) -> list[ServiceCall]: + """Track calls to a mock service.""" + return async_mock_service(hass, "test", "automation") + + +async def test_get_triggers( + hass: HomeAssistant, + device_registry: dr.DeviceRegistry, + knx: KNXTestKit, +) -> None: + """Test we get the expected triggers from knx.""" + await knx.setup_integration({}) + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")} + ) + expected_trigger = { + "platform": "device", + "domain": DOMAIN, + "device_id": device_entry.id, + "type": "telegram", + "metadata": {}, + } + triggers = await async_get_device_automations( + hass, DeviceAutomationType.TRIGGER, device_entry.id + ) + assert expected_trigger in triggers + + +async def test_if_fires_on_telegram( + hass: HomeAssistant, + calls: list[ServiceCall], + device_registry: dr.DeviceRegistry, + knx: KNXTestKit, +) -> None: + """Test for telegram triggers firing.""" + await knx.setup_integration({}) + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")} + ) + + assert await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: [ + { + "trigger": { + "platform": "device", + "domain": DOMAIN, + "device_id": device_entry.id, + "type": "telegram", + }, + "action": { + "service": "test.automation", + "data_template": { + "catch_all": ("telegram - {{ trigger.destination }}") + }, + }, + }, + { + "trigger": { + "platform": "device", + "domain": DOMAIN, + "device_id": device_entry.id, + "type": "telegram", + "destination": ["1/2/3", "1/2/4"], + }, + "action": { + "service": "test.automation", + "data_template": { + "specific": ("telegram - {{ trigger.destination }}") + }, + }, + }, + ] + }, + ) + + await knx.receive_write("0/0/1", (0x03, 0x2F)) + assert len(calls) == 1 + assert calls.pop().data["catch_all"] == "telegram - 0/0/1" + + await knx.receive_write("1/2/4", (0x03, 0x2F)) + assert len(calls) == 2 + assert calls.pop().data["specific"] == "telegram - 1/2/4" + assert calls.pop().data["catch_all"] == "telegram - 1/2/4" + + +async def test_remove_device_trigger( + hass: HomeAssistant, + calls: list[ServiceCall], + device_registry: dr.DeviceRegistry, + knx: KNXTestKit, +) -> None: + """Test for removed callback when device trigger not used.""" + automation_name = "telegram_trigger_automation" + await knx.setup_integration({}) + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")} + ) + assert await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: [ + { + "alias": automation_name, + "trigger": { + "platform": "device", + "domain": DOMAIN, + "device_id": device_entry.id, + "type": "telegram", + }, + "action": { + "service": "test.automation", + "data_template": { + "catch_all": ("telegram - {{ trigger.destination }}") + }, + }, + } + ] + }, + ) + + assert len(hass.data[DOMAIN].telegrams._jobs) == 1 + await knx.receive_write("0/0/1", (0x03, 0x2F)) + assert len(calls) == 1 + assert calls.pop().data["catch_all"] == "telegram - 0/0/1" + + await hass.services.async_call( + automation.DOMAIN, + SERVICE_TURN_OFF, + {ATTR_ENTITY_ID: f"automation.{automation_name}"}, + blocking=True, + ) + + assert len(hass.data[DOMAIN].telegrams._jobs) == 0 + await knx.receive_write("0/0/1", (0x03, 0x2F)) + assert len(calls) == 0 + + +async def test_get_trigger_capabilities_node_status( + hass: HomeAssistant, + device_registry: dr.DeviceRegistry, + knx: KNXTestKit, +) -> None: + """Test we get the expected capabilities from a node_status trigger.""" + await knx.setup_integration({}) + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, f"_{knx.mock_config_entry.entry_id}_interface")} + ) + + capabilities = await device_trigger.async_get_trigger_capabilities( + hass, + { + "platform": "device", + "domain": DOMAIN, + "device_id": device_entry.id, + "type": "telegram", + }, + ) + assert capabilities and "extra_fields" in capabilities + + assert voluptuous_serialize.convert( + capabilities["extra_fields"], custom_serializer=cv.custom_serializer + ) == [ + { + "name": "destination", + "optional": True, + "selector": { + "select": { + "custom_value": True, + "mode": "dropdown", + "multiple": True, + "options": [], + }, + }, + } + ]