diff --git a/homeassistant/components/knx/manifest.json b/homeassistant/components/knx/manifest.json index ad61b812386..b850954e8a8 100644 --- a/homeassistant/components/knx/manifest.json +++ b/homeassistant/components/knx/manifest.json @@ -13,6 +13,6 @@ "requirements": [ "xknx==2.9.0", "xknxproject==3.1.0", - "knx_frontend==2023.5.2.143855" + "knx_frontend==2023.5.16.204359" ] } diff --git a/homeassistant/components/knx/telegrams.py b/homeassistant/components/knx/telegrams.py index 815a0c00a93..4c5ac44f6b1 100644 --- a/homeassistant/components/knx/telegrams.py +++ b/homeassistant/components/knx/telegrams.py @@ -1,7 +1,9 @@ """KNX Telegram handler.""" from __future__ import annotations +from collections import deque from collections.abc import Callable +import datetime as dt from typing import TypedDict from xknx import XKNX @@ -10,6 +12,7 @@ from xknx.telegram import Telegram from xknx.telegram.apci import GroupValueResponse, GroupValueWrite from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback +import homeassistant.util.dt as dt_util from .project import KNXProject @@ -24,6 +27,8 @@ class TelegramDict(TypedDict): source: str source_name: str telegramtype: str + timestamp: dt.datetime + unit: str | None value: str | int | float | bool | None @@ -41,10 +46,12 @@ class Telegrams: match_for_outgoing=True, ) ) + self.recent_telegrams: deque[TelegramDict] = deque(maxlen=50) async def _xknx_telegram_cb(self, telegram: Telegram) -> None: """Handle incoming and outgoing telegrams from xknx.""" telegram_dict = self.telegram_to_dict(telegram) + self.recent_telegrams.appendleft(telegram_dict) for job in self._jobs: self.hass.async_run_hass_job(job, telegram_dict) @@ -70,6 +77,7 @@ class Telegrams: payload_data: int | tuple[int, ...] | None = None src_name = "" transcoder = None + unit = None value: str | int | float | bool | None = None if ( @@ -83,15 +91,16 @@ class Telegrams: if ( device := self.project.devices.get(f"{telegram.source_address}") ) is not None: - src_name = device["name"] + src_name = f"{device['manufacturer_name']} {device['name']}" if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)): payload_data = telegram.payload.value.value if transcoder is not None: try: value = transcoder.from_knx(telegram.payload.value) + unit = transcoder.unit except XKNXException: - value = None + value = "Error decoding value" return TelegramDict( destination=f"{telegram.destination_address}", @@ -101,5 +110,7 @@ class Telegrams: source=f"{telegram.source_address}", source_name=src_name, telegramtype=telegram.payload.__class__.__name__, + timestamp=dt_util.as_local(dt_util.utcnow()), + unit=unit, value=value, ) diff --git a/homeassistant/components/knx/websocket.py b/homeassistant/components/knx/websocket.py index d5a41dce146..d63ba89fbcc 100644 --- a/homeassistant/components/knx/websocket.py +++ b/homeassistant/components/knx/websocket.py @@ -1,28 +1,22 @@ """KNX Websocket API.""" from __future__ import annotations -from collections.abc import Callable -from typing import Final +from typing import TYPE_CHECKING, Final from knx_frontend import get_build_id, locate_dir import voluptuous as vol -from xknx.dpt import DPTArray -from xknx.exceptions import XKNXException -from xknx.telegram import Telegram, TelegramDirection -from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite +from xknx.telegram import TelegramDirection from xknxproject.exceptions import XknxProjectException from homeassistant.components import panel_custom, websocket_api from homeassistant.core import HomeAssistant, callback -import homeassistant.util.dt as dt_util -from .const import ( - DOMAIN, - AsyncMessageCallbackType, - KNXBusMonitorMessage, - MessageCallbackType, -) -from .project import KNXProject +from .const import DOMAIN, KNXBusMonitorMessage +from .telegrams import TelegramDict + +if TYPE_CHECKING: + from . import KNXModule + URL_BASE: Final = "/knx_static" @@ -65,10 +59,10 @@ def ws_info( msg: dict, ) -> None: """Handle get info command.""" - xknx = hass.data[DOMAIN].xknx + knx: KNXModule = hass.data[DOMAIN] _project_info = None - if project_info := hass.data[DOMAIN].project.info: + if project_info := knx.project.info: _project_info = { "name": project_info["name"], "last_modified": project_info["last_modified"], @@ -78,9 +72,9 @@ def ws_info( connection.send_result( msg["id"], { - "version": xknx.version, - "connected": xknx.connection_manager.connected.is_set(), - "current_address": str(xknx.current_address), + "version": knx.xknx.version, + "connected": knx.xknx.connection_manager.connected.is_set(), + "current_address": str(knx.xknx.current_address), "project": _project_info, }, ) @@ -101,9 +95,9 @@ async def ws_project_file_process( msg: dict, ) -> None: """Handle get info command.""" - knx_project = hass.data[DOMAIN].project + knx: KNXModule = hass.data[DOMAIN] try: - await knx_project.process_project_file( + await knx.project.process_project_file( file_id=msg["file_id"], password=msg["password"], ) @@ -130,8 +124,8 @@ async def ws_project_file_remove( msg: dict, ) -> None: """Handle get info command.""" - knx_project = hass.data[DOMAIN].project - await knx_project.remove_project_file() + knx: KNXModule = hass.data[DOMAIN] + await knx.project.remove_project_file() connection.send_result(msg["id"]) @@ -147,10 +141,17 @@ def ws_group_monitor_info( msg: dict, ) -> None: """Handle get info command of group monitor.""" - project_loaded = hass.data[DOMAIN].project.loaded + knx: KNXModule = hass.data[DOMAIN] + recent_telegrams = [ + _telegram_dict_to_group_monitor(telegram) + for telegram in knx.telegrams.recent_telegrams + ] connection.send_result( msg["id"], - {"project_loaded": bool(project_loaded)}, + { + "project_loaded": knx.project.loaded, + "recent_telegrams": recent_telegrams, + }, ) @@ -166,86 +167,53 @@ def ws_subscribe_telegram( msg: dict, ) -> None: """Subscribe to incoming and outgoing KNX telegrams.""" - project: KNXProject = hass.data[DOMAIN].project - - async def forward_telegrams(telegram: Telegram) -> None: - """Forward events to websocket.""" - payload: str - dpt_payload = None - if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)): - dpt_payload = telegram.payload.value - if isinstance(dpt_payload, DPTArray): - payload = f"0x{bytes(dpt_payload.value).hex()}" - else: - payload = f"{dpt_payload.value:d}" - elif isinstance(telegram.payload, GroupValueRead): - payload = "" - else: - return - - direction = ( - "group_monitor_incoming" - if telegram.direction is TelegramDirection.INCOMING - else "group_monitor_outgoing" - ) - dst = str(telegram.destination_address) - src = str(telegram.source_address) - bus_message: KNXBusMonitorMessage = KNXBusMonitorMessage( - destination_address=dst, - destination_text=None, - payload=payload, - type=str(telegram.payload.__class__.__name__), - value=None, - source_address=src, - source_text=None, - direction=direction, - timestamp=dt_util.as_local(dt_util.utcnow()).strftime("%H:%M:%S.%f")[:-3], - ) - if project.loaded: - if ga_infos := project.group_addresses.get(dst): - bus_message["destination_text"] = ga_infos.name - if dpt_payload is not None and ga_infos.transcoder is not None: - try: - value = ga_infos.transcoder.from_knx(dpt_payload) - except XKNXException: - bus_message["value"] = "Error decoding value" - else: - unit = ( - f" {ga_infos.transcoder.unit}" - if ga_infos.transcoder.unit is not None - else "" - ) - bus_message["value"] = f"{value}{unit}" - if ia_infos := project.devices.get(src): - bus_message[ - "source_text" - ] = f"{ia_infos['manufacturer_name']} {ia_infos['name']}" + knx: KNXModule = hass.data[DOMAIN] + @callback + def forward_telegram(telegram: TelegramDict) -> None: + """Forward telegram to websocket subscription.""" connection.send_event( msg["id"], - bus_message, + _telegram_dict_to_group_monitor(telegram), ) - connection.subscriptions[msg["id"]] = async_subscribe_telegrams( - hass, forward_telegrams + connection.subscriptions[msg["id"]] = knx.telegrams.async_listen_telegram( + action=forward_telegram, + name="KNX GroupMonitor subscription", ) - connection.send_result(msg["id"]) -def async_subscribe_telegrams( - hass: HomeAssistant, - telegram_callback: AsyncMessageCallbackType | MessageCallbackType, -) -> Callable[[], None]: - """Subscribe to telegram received callback.""" - xknx = hass.data[DOMAIN].xknx - - unregister = xknx.telegram_queue.register_telegram_received_cb( - telegram_callback, match_for_outgoing=True +def _telegram_dict_to_group_monitor(telegram: TelegramDict) -> KNXBusMonitorMessage: + """Convert a TelegramDict to a KNXBusMonitorMessage object.""" + direction = ( + "group_monitor_incoming" + if telegram["direction"] == TelegramDirection.INCOMING.value + else "group_monitor_outgoing" ) - def async_remove() -> None: - """Remove callback.""" - xknx.telegram_queue.unregister_telegram_received_cb(unregister) + _payload = telegram["payload"] + if isinstance(_payload, tuple): + payload = f"0x{bytes(_payload).hex()}" + elif isinstance(_payload, int): + payload = f"{_payload:d}" + else: + payload = "" - return async_remove + timestamp = telegram["timestamp"].strftime("%H:%M:%S.%f")[:-3] + + if (value := telegram["value"]) is not None: + unit = telegram["unit"] + value = f"{value}{' ' + unit if unit else ''}" + + return KNXBusMonitorMessage( + destination_address=telegram["destination"], + destination_text=telegram["destination_name"], + direction=direction, + payload=payload, + source_address=telegram["source"], + source_text=telegram["source_name"], + timestamp=timestamp, + type=telegram["telegramtype"], + value=value, + ) diff --git a/requirements_all.txt b/requirements_all.txt index df86675a7c9..08334567f61 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1028,7 +1028,7 @@ kegtron-ble==0.4.0 kiwiki-client==0.1.1 # homeassistant.components.knx -knx_frontend==2023.5.2.143855 +knx_frontend==2023.5.16.204359 # homeassistant.components.konnected konnected==1.2.0 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 1239b3010c9..8a3369c3921 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -787,7 +787,7 @@ justnimbus==0.6.0 kegtron-ble==0.4.0 # homeassistant.components.knx -knx_frontend==2023.5.2.143855 +knx_frontend==2023.5.16.204359 # homeassistant.components.konnected konnected==1.2.0 diff --git a/tests/components/knx/conftest.py b/tests/components/knx/conftest.py index 79a3de91715..084a3a37c27 100644 --- a/tests/components/knx/conftest.py +++ b/tests/components/knx/conftest.py @@ -12,13 +12,7 @@ from xknx.dpt import DPTArray, DPTBinary from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT from xknx.telegram import Telegram, TelegramDirection from xknx.telegram.address import GroupAddress, IndividualAddress -from xknx.telegram.apci import ( - APCI, - GroupValueRead, - GroupValueResponse, - GroupValueWrite, - IndividualAddressRead, -) +from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite from homeassistant.components.knx.const import ( CONF_KNX_AUTOMATIC, @@ -210,19 +204,6 @@ class KNXTestKit: await self.xknx.telegrams.join() await self.hass.async_block_till_done() - async def receive_individual_address_read(self, source: str | None = None): - """Inject incoming IndividualAddressRead telegram.""" - self.xknx.telegrams.put_nowait( - Telegram( - destination_address=IndividualAddress(self.INDIVIDUAL_ADDRESS), - direction=TelegramDirection.INCOMING, - payload=IndividualAddressRead(), - source_address=IndividualAddress(source or "1.3.5"), - ) - ) - await self.xknx.telegrams.join() - await self.hass.async_block_till_done() - async def receive_read(self, group_address: str, source: str | None = None) -> None: """Inject incoming GroupValueRead telegram.""" await self._receive_telegram( diff --git a/tests/components/knx/test_websocket.py b/tests/components/knx/test_websocket.py index c053e4fa9cb..115b92f70e8 100644 --- a/tests/components/knx/test_websocket.py +++ b/tests/components/knx/test_websocket.py @@ -150,6 +150,55 @@ async def test_knx_group_monitor_info_command( res = await client.receive_json() assert res["success"], res assert res["result"]["project_loaded"] is False + assert res["result"]["recent_telegrams"] == [] + + +async def test_knx_subscribe_telegrams_command_recent_telegrams( + hass: HomeAssistant, knx: KNXTestKit, hass_ws_client: WebSocketGenerator +): + """Test knx/subscribe_telegrams command sending recent telegrams.""" + await knx.setup_integration( + { + SwitchSchema.PLATFORM: { + CONF_NAME: "test", + KNX_ADDRESS: "1/2/4", + } + } + ) + + # send incoming telegram + await knx.receive_write("1/3/4", True) + # send outgoing telegram + await hass.services.async_call( + "switch", "turn_on", {"entity_id": "switch.test"}, blocking=True + ) + await knx.assert_write("1/2/4", 1) + + # connect websocket after telegrams have been sent + client = await hass_ws_client(hass) + await client.send_json({"id": 6, "type": "knx/group_monitor_info"}) + res = await client.receive_json() + assert res["success"], res + assert res["result"]["project_loaded"] is False + + recent_tgs = res["result"]["recent_telegrams"] + assert len(recent_tgs) == 2 + # telegrams are sorted from newest to oldest + assert recent_tgs[0]["destination_address"] == "1/2/4" + assert recent_tgs[0]["payload"] == "1" + assert recent_tgs[0]["type"] == "GroupValueWrite" + assert ( + recent_tgs[0]["source_address"] == "0.0.0" + ) # needs to be the IA currently connected to + assert recent_tgs[0]["direction"] == "group_monitor_outgoing" + assert recent_tgs[0]["timestamp"] is not None + + assert recent_tgs[1]["destination_address"] == "1/3/4" + assert recent_tgs[1]["payload"] == "1" + assert recent_tgs[1]["type"] == "GroupValueWrite" + assert recent_tgs[1]["source_address"] == "1.2.3" + assert recent_tgs[1]["direction"] == "group_monitor_incoming" + assert recent_tgs[1]["timestamp"] is not None async def test_knx_subscribe_telegrams_command_no_project( @@ -169,13 +218,12 @@ async def test_knx_subscribe_telegrams_command_no_project( res = await client.receive_json() assert res["success"], res - # send incoming events + # send incoming telegrams await knx.receive_read("1/2/3") await knx.receive_write("1/3/4", True) await knx.receive_write("1/3/4", False) - await knx.receive_individual_address_read() await knx.receive_write("1/3/8", (0x34, 0x45)) - # send outgoing events + # send outgoing telegrams await hass.services.async_call( "switch", "turn_on", {"entity_id": "switch.test"}, blocking=True )