KNX Group Monitor: Load latest 50 Telegrams on start (#93153)

* remove impossible test

IndividualAddress telegrams are not processed by xknx.telegram_queue

* Use Telegrams helper class for group monitor messages

* Store 50 telegrams in deque for group monitor

* Send recent telegrams at once on connection of group monitor

* Update KNX-frontend to support group monitor prepopulation
This commit is contained in:
Matthias Alphart 2023-05-17 09:58:00 +02:00 committed by GitHub
parent b993fe1c9d
commit c522ea855d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 132 additions and 124 deletions

View File

@ -13,6 +13,6 @@
"requirements": [
"xknx==2.9.0",
"xknxproject==3.1.0",
"knx_frontend==2023.5.2.143855"
"knx_frontend==2023.5.16.204359"
]
}

View File

@ -1,7 +1,9 @@
"""KNX Telegram handler."""
from __future__ import annotations
from collections import deque
from collections.abc import Callable
import datetime as dt
from typing import TypedDict
from xknx import XKNX
@ -10,6 +12,7 @@ from xknx.telegram import Telegram
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
import homeassistant.util.dt as dt_util
from .project import KNXProject
@ -24,6 +27,8 @@ class TelegramDict(TypedDict):
source: str
source_name: str
telegramtype: str
timestamp: dt.datetime
unit: str | None
value: str | int | float | bool | None
@ -41,10 +46,12 @@ class Telegrams:
match_for_outgoing=True,
)
)
self.recent_telegrams: deque[TelegramDict] = deque(maxlen=50)
async def _xknx_telegram_cb(self, telegram: Telegram) -> None:
"""Handle incoming and outgoing telegrams from xknx."""
telegram_dict = self.telegram_to_dict(telegram)
self.recent_telegrams.appendleft(telegram_dict)
for job in self._jobs:
self.hass.async_run_hass_job(job, telegram_dict)
@ -70,6 +77,7 @@ class Telegrams:
payload_data: int | tuple[int, ...] | None = None
src_name = ""
transcoder = None
unit = None
value: str | int | float | bool | None = None
if (
@ -83,15 +91,16 @@ class Telegrams:
if (
device := self.project.devices.get(f"{telegram.source_address}")
) is not None:
src_name = device["name"]
src_name = f"{device['manufacturer_name']} {device['name']}"
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
payload_data = telegram.payload.value.value
if transcoder is not None:
try:
value = transcoder.from_knx(telegram.payload.value)
unit = transcoder.unit
except XKNXException:
value = None
value = "Error decoding value"
return TelegramDict(
destination=f"{telegram.destination_address}",
@ -101,5 +110,7 @@ class Telegrams:
source=f"{telegram.source_address}",
source_name=src_name,
telegramtype=telegram.payload.__class__.__name__,
timestamp=dt_util.as_local(dt_util.utcnow()),
unit=unit,
value=value,
)

View File

@ -1,28 +1,22 @@
"""KNX Websocket API."""
from __future__ import annotations
from collections.abc import Callable
from typing import Final
from typing import TYPE_CHECKING, Final
from knx_frontend import get_build_id, locate_dir
import voluptuous as vol
from xknx.dpt import DPTArray
from xknx.exceptions import XKNXException
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.telegram import TelegramDirection
from xknxproject.exceptions import XknxProjectException
from homeassistant.components import panel_custom, websocket_api
from homeassistant.core import HomeAssistant, callback
import homeassistant.util.dt as dt_util
from .const import (
DOMAIN,
AsyncMessageCallbackType,
KNXBusMonitorMessage,
MessageCallbackType,
)
from .project import KNXProject
from .const import DOMAIN, KNXBusMonitorMessage
from .telegrams import TelegramDict
if TYPE_CHECKING:
from . import KNXModule
URL_BASE: Final = "/knx_static"
@ -65,10 +59,10 @@ def ws_info(
msg: dict,
) -> None:
"""Handle get info command."""
xknx = hass.data[DOMAIN].xknx
knx: KNXModule = hass.data[DOMAIN]
_project_info = None
if project_info := hass.data[DOMAIN].project.info:
if project_info := knx.project.info:
_project_info = {
"name": project_info["name"],
"last_modified": project_info["last_modified"],
@ -78,9 +72,9 @@ def ws_info(
connection.send_result(
msg["id"],
{
"version": xknx.version,
"connected": xknx.connection_manager.connected.is_set(),
"current_address": str(xknx.current_address),
"version": knx.xknx.version,
"connected": knx.xknx.connection_manager.connected.is_set(),
"current_address": str(knx.xknx.current_address),
"project": _project_info,
},
)
@ -101,9 +95,9 @@ async def ws_project_file_process(
msg: dict,
) -> None:
"""Handle get info command."""
knx_project = hass.data[DOMAIN].project
knx: KNXModule = hass.data[DOMAIN]
try:
await knx_project.process_project_file(
await knx.project.process_project_file(
file_id=msg["file_id"],
password=msg["password"],
)
@ -130,8 +124,8 @@ async def ws_project_file_remove(
msg: dict,
) -> None:
"""Handle get info command."""
knx_project = hass.data[DOMAIN].project
await knx_project.remove_project_file()
knx: KNXModule = hass.data[DOMAIN]
await knx.project.remove_project_file()
connection.send_result(msg["id"])
@ -147,10 +141,17 @@ def ws_group_monitor_info(
msg: dict,
) -> None:
"""Handle get info command of group monitor."""
project_loaded = hass.data[DOMAIN].project.loaded
knx: KNXModule = hass.data[DOMAIN]
recent_telegrams = [
_telegram_dict_to_group_monitor(telegram)
for telegram in knx.telegrams.recent_telegrams
]
connection.send_result(
msg["id"],
{"project_loaded": bool(project_loaded)},
{
"project_loaded": knx.project.loaded,
"recent_telegrams": recent_telegrams,
},
)
@ -166,86 +167,53 @@ def ws_subscribe_telegram(
msg: dict,
) -> None:
"""Subscribe to incoming and outgoing KNX telegrams."""
project: KNXProject = hass.data[DOMAIN].project
async def forward_telegrams(telegram: Telegram) -> None:
"""Forward events to websocket."""
payload: str
dpt_payload = None
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
dpt_payload = telegram.payload.value
if isinstance(dpt_payload, DPTArray):
payload = f"0x{bytes(dpt_payload.value).hex()}"
else:
payload = f"{dpt_payload.value:d}"
elif isinstance(telegram.payload, GroupValueRead):
payload = ""
else:
return
direction = (
"group_monitor_incoming"
if telegram.direction is TelegramDirection.INCOMING
else "group_monitor_outgoing"
)
dst = str(telegram.destination_address)
src = str(telegram.source_address)
bus_message: KNXBusMonitorMessage = KNXBusMonitorMessage(
destination_address=dst,
destination_text=None,
payload=payload,
type=str(telegram.payload.__class__.__name__),
value=None,
source_address=src,
source_text=None,
direction=direction,
timestamp=dt_util.as_local(dt_util.utcnow()).strftime("%H:%M:%S.%f")[:-3],
)
if project.loaded:
if ga_infos := project.group_addresses.get(dst):
bus_message["destination_text"] = ga_infos.name
if dpt_payload is not None and ga_infos.transcoder is not None:
try:
value = ga_infos.transcoder.from_knx(dpt_payload)
except XKNXException:
bus_message["value"] = "Error decoding value"
else:
unit = (
f" {ga_infos.transcoder.unit}"
if ga_infos.transcoder.unit is not None
else ""
)
bus_message["value"] = f"{value}{unit}"
if ia_infos := project.devices.get(src):
bus_message[
"source_text"
] = f"{ia_infos['manufacturer_name']} {ia_infos['name']}"
knx: KNXModule = hass.data[DOMAIN]
@callback
def forward_telegram(telegram: TelegramDict) -> None:
"""Forward telegram to websocket subscription."""
connection.send_event(
msg["id"],
bus_message,
_telegram_dict_to_group_monitor(telegram),
)
connection.subscriptions[msg["id"]] = async_subscribe_telegrams(
hass, forward_telegrams
connection.subscriptions[msg["id"]] = knx.telegrams.async_listen_telegram(
action=forward_telegram,
name="KNX GroupMonitor subscription",
)
connection.send_result(msg["id"])
def async_subscribe_telegrams(
hass: HomeAssistant,
telegram_callback: AsyncMessageCallbackType | MessageCallbackType,
) -> Callable[[], None]:
"""Subscribe to telegram received callback."""
xknx = hass.data[DOMAIN].xknx
unregister = xknx.telegram_queue.register_telegram_received_cb(
telegram_callback, match_for_outgoing=True
def _telegram_dict_to_group_monitor(telegram: TelegramDict) -> KNXBusMonitorMessage:
"""Convert a TelegramDict to a KNXBusMonitorMessage object."""
direction = (
"group_monitor_incoming"
if telegram["direction"] == TelegramDirection.INCOMING.value
else "group_monitor_outgoing"
)
def async_remove() -> None:
"""Remove callback."""
xknx.telegram_queue.unregister_telegram_received_cb(unregister)
_payload = telegram["payload"]
if isinstance(_payload, tuple):
payload = f"0x{bytes(_payload).hex()}"
elif isinstance(_payload, int):
payload = f"{_payload:d}"
else:
payload = ""
return async_remove
timestamp = telegram["timestamp"].strftime("%H:%M:%S.%f")[:-3]
if (value := telegram["value"]) is not None:
unit = telegram["unit"]
value = f"{value}{' ' + unit if unit else ''}"
return KNXBusMonitorMessage(
destination_address=telegram["destination"],
destination_text=telegram["destination_name"],
direction=direction,
payload=payload,
source_address=telegram["source"],
source_text=telegram["source_name"],
timestamp=timestamp,
type=telegram["telegramtype"],
value=value,
)

View File

@ -1028,7 +1028,7 @@ kegtron-ble==0.4.0
kiwiki-client==0.1.1
# homeassistant.components.knx
knx_frontend==2023.5.2.143855
knx_frontend==2023.5.16.204359
# homeassistant.components.konnected
konnected==1.2.0

View File

@ -787,7 +787,7 @@ justnimbus==0.6.0
kegtron-ble==0.4.0
# homeassistant.components.knx
knx_frontend==2023.5.2.143855
knx_frontend==2023.5.16.204359
# homeassistant.components.konnected
konnected==1.2.0

View File

@ -12,13 +12,7 @@ from xknx.dpt import DPTArray, DPTBinary
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, IndividualAddress
from xknx.telegram.apci import (
APCI,
GroupValueRead,
GroupValueResponse,
GroupValueWrite,
IndividualAddressRead,
)
from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite
from homeassistant.components.knx.const import (
CONF_KNX_AUTOMATIC,
@ -210,19 +204,6 @@ class KNXTestKit:
await self.xknx.telegrams.join()
await self.hass.async_block_till_done()
async def receive_individual_address_read(self, source: str | None = None):
"""Inject incoming IndividualAddressRead telegram."""
self.xknx.telegrams.put_nowait(
Telegram(
destination_address=IndividualAddress(self.INDIVIDUAL_ADDRESS),
direction=TelegramDirection.INCOMING,
payload=IndividualAddressRead(),
source_address=IndividualAddress(source or "1.3.5"),
)
)
await self.xknx.telegrams.join()
await self.hass.async_block_till_done()
async def receive_read(self, group_address: str, source: str | None = None) -> None:
"""Inject incoming GroupValueRead telegram."""
await self._receive_telegram(

View File

@ -150,6 +150,55 @@ async def test_knx_group_monitor_info_command(
res = await client.receive_json()
assert res["success"], res
assert res["result"]["project_loaded"] is False
assert res["result"]["recent_telegrams"] == []
async def test_knx_subscribe_telegrams_command_recent_telegrams(
hass: HomeAssistant, knx: KNXTestKit, hass_ws_client: WebSocketGenerator
):
"""Test knx/subscribe_telegrams command sending recent telegrams."""
await knx.setup_integration(
{
SwitchSchema.PLATFORM: {
CONF_NAME: "test",
KNX_ADDRESS: "1/2/4",
}
}
)
# send incoming telegram
await knx.receive_write("1/3/4", True)
# send outgoing telegram
await hass.services.async_call(
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
)
await knx.assert_write("1/2/4", 1)
# connect websocket after telegrams have been sent
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": "knx/group_monitor_info"})
res = await client.receive_json()
assert res["success"], res
assert res["result"]["project_loaded"] is False
recent_tgs = res["result"]["recent_telegrams"]
assert len(recent_tgs) == 2
# telegrams are sorted from newest to oldest
assert recent_tgs[0]["destination_address"] == "1/2/4"
assert recent_tgs[0]["payload"] == "1"
assert recent_tgs[0]["type"] == "GroupValueWrite"
assert (
recent_tgs[0]["source_address"] == "0.0.0"
) # needs to be the IA currently connected to
assert recent_tgs[0]["direction"] == "group_monitor_outgoing"
assert recent_tgs[0]["timestamp"] is not None
assert recent_tgs[1]["destination_address"] == "1/3/4"
assert recent_tgs[1]["payload"] == "1"
assert recent_tgs[1]["type"] == "GroupValueWrite"
assert recent_tgs[1]["source_address"] == "1.2.3"
assert recent_tgs[1]["direction"] == "group_monitor_incoming"
assert recent_tgs[1]["timestamp"] is not None
async def test_knx_subscribe_telegrams_command_no_project(
@ -169,13 +218,12 @@ async def test_knx_subscribe_telegrams_command_no_project(
res = await client.receive_json()
assert res["success"], res
# send incoming events
# send incoming telegrams
await knx.receive_read("1/2/3")
await knx.receive_write("1/3/4", True)
await knx.receive_write("1/3/4", False)
await knx.receive_individual_address_read()
await knx.receive_write("1/3/8", (0x34, 0x45))
# send outgoing events
# send outgoing telegrams
await hass.services.async_call(
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
)