KNX: use xknx 3.0.0 eager telegram decoding (#122896)

* Use KNX xknx 3.0.0 eager telegram decoding

* review suggestion
This commit is contained in:
Matthias Alphart 2024-07-31 11:08:05 +02:00 committed by GitHub
parent 222011fc5c
commit 67ed8b207a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 41 additions and 26 deletions

View File

@ -333,7 +333,7 @@ class KNXModule:
async def start(self) -> None:
"""Start XKNX object. Connect to tunneling or Routing device."""
await self.project.load_project()
await self.project.load_project(self.xknx)
await self.config_store.load_data()
await self.telegrams.load_history()
await self.xknx.start()

View File

@ -6,6 +6,7 @@ from dataclasses import dataclass
import logging
from typing import Final
from xknx import XKNX
from xknx.dpt import DPTBase
from xknxproject import XKNXProj
from xknxproject.models import (
@ -80,15 +81,23 @@ class KNXProject:
self.group_addresses = {}
self.info = None
async def load_project(self, data: KNXProjectModel | None = None) -> None:
async def load_project(
self, xknx: XKNX, data: KNXProjectModel | None = None
) -> None:
"""Load project data from storage."""
if project := data or await self._store.async_load():
self.devices = project["devices"]
self.info = project["info"]
xknx.group_address_dpt.clear()
xknx_ga_dict = {}
for ga_model in project["group_addresses"].values():
ga_info = _create_group_address_info(ga_model)
self.group_addresses[ga_info.address] = ga_info
if (dpt_model := ga_model.get("dpt")) is not None:
xknx_ga_dict[ga_model["address"]] = dpt_model
xknx.group_address_dpt.set(xknx_ga_dict) # type: ignore[arg-type]
_LOGGER.debug(
"Loaded KNX project data with %s group addresses from storage",
@ -96,7 +105,9 @@ class KNXProject:
)
self.loaded = True
async def process_project_file(self, file_id: str, password: str) -> None:
async def process_project_file(
self, xknx: XKNX, file_id: str, password: str
) -> None:
"""Process an uploaded project file."""
def _parse_project() -> KNXProjectModel:
@ -110,7 +121,7 @@ class KNXProject:
project = await self.hass.async_add_executor_job(_parse_project)
await self._store.async_save(project)
await self.load_project(data=project)
await self.load_project(xknx, data=project)
async def remove_project_file(self) -> None:
"""Remove project file from storage."""

View File

@ -35,7 +35,7 @@ class DecodedTelegramPayload(TypedDict):
dpt_sub: int | None
dpt_name: str | None
unit: str | None
value: str | int | float | bool | None
value: bool | str | int | float | dict[str, str | int | float | bool] | None
class TelegramDict(DecodedTelegramPayload):
@ -106,7 +106,7 @@ class Telegrams:
payload_data: int | tuple[int, ...] | None = None
src_name = ""
transcoder = None
decoded_payload: DecodedTelegramPayload | None = None
value = None
if (
ga_info := self.project.group_addresses.get(
@ -114,7 +114,6 @@ class Telegrams:
)
) is not None:
dst_name = ga_info.name
transcoder = ga_info.transcoder
if (
device := self.project.devices.get(f"{telegram.source_address}")
@ -123,45 +122,49 @@ class Telegrams:
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
payload_data = telegram.payload.value.value
if transcoder is not None:
decoded_payload = decode_telegram_payload(
payload=telegram.payload.value, transcoder=transcoder
)
if telegram.decoded_data is not None:
transcoder = telegram.decoded_data.transcoder
value = _serializable_decoded_data(telegram.decoded_data.value)
return TelegramDict(
destination=f"{telegram.destination_address}",
destination_name=dst_name,
direction=telegram.direction.value,
dpt_main=decoded_payload["dpt_main"]
if decoded_payload is not None
else None,
dpt_sub=decoded_payload["dpt_sub"] if decoded_payload is not None else None,
dpt_name=decoded_payload["dpt_name"]
if decoded_payload is not None
else None,
dpt_main=transcoder.dpt_main_number if transcoder is not None else None,
dpt_sub=transcoder.dpt_sub_number if transcoder is not None else None,
dpt_name=transcoder.value_type if transcoder is not None else None,
payload=payload_data,
source=f"{telegram.source_address}",
source_name=src_name,
telegramtype=telegram.payload.__class__.__name__,
timestamp=dt_util.now().isoformat(),
unit=decoded_payload["unit"] if decoded_payload is not None else None,
value=decoded_payload["value"] if decoded_payload is not None else None,
unit=transcoder.unit if transcoder is not None else None,
value=value,
)
def _serializable_decoded_data(
value: bool | float | str | DPTComplexData | DPTEnumData,
) -> bool | str | int | float | dict[str, str | int | float | bool]:
"""Return a serializable representation of decoded data."""
if isinstance(value, DPTComplexData):
return value.as_dict()
if isinstance(value, DPTEnumData):
return value.name.lower()
return value
def decode_telegram_payload(
payload: DPTArray | DPTBinary, transcoder: type[DPTBase]
) -> DecodedTelegramPayload:
"""Decode the payload of a KNX telegram."""
"""Decode the payload of a KNX telegram with custom transcoder."""
try:
value = transcoder.from_knx(payload)
except XKNXException:
value = "Error decoding value"
if isinstance(value, DPTComplexData):
value = value.as_dict()
elif isinstance(value, DPTEnumData):
value = value.name.lower()
value = _serializable_decoded_data(value)
return DecodedTelegramPayload(
dpt_main=transcoder.dpt_main_number,

View File

@ -154,6 +154,7 @@ async def ws_project_file_process(
knx: KNXModule = hass.data[DOMAIN]
try:
await knx.project.process_project_file(
xknx=knx.xknx,
file_id=msg["file_id"],
password=msg["password"],
)

View File

@ -346,7 +346,7 @@ async def test_knx_subscribe_telegrams_command_project(
assert res["event"]["destination"] == "0/1/1"
assert res["event"]["destination_name"] == "percent"
assert res["event"]["payload"] == 1
assert res["event"]["value"] == "Error decoding value"
assert res["event"]["value"] is None
assert res["event"]["telegramtype"] == "GroupValueWrite"
assert res["event"]["source"] == "1.1.6"
assert (