Compare commits

...

6 Commits

Author SHA1 Message Date
farmio
ea3c9e2520 add error translation and test it 2025-12-03 20:53:04 +01:00
farmio
c0863ca585 review 2025-12-03 20:28:44 +01:00
farmio
9d53d37cbf Fix missing description_placeholders 2025-12-03 14:50:13 +01:00
farmio
823f320425 Add repair issue for DataSecure key issues 2025-12-03 14:50:13 +01:00
farmio
b5a8516bd6 Store GroupAddress-entity map in knx_module 2025-12-03 14:50:13 +01:00
farmio
f05cb6b2c7 Forward undecodable DataSecure to GroupMonitor 2025-12-03 14:50:13 +01:00
11 changed files with 455 additions and 9 deletions

View File

@@ -94,6 +94,8 @@ SERVICE_KNX_EVENT_REGISTER: Final = "event_register"
SERVICE_KNX_EXPOSURE_REGISTER: Final = "exposure_register"
SERVICE_KNX_READ: Final = "read"
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY: Final = "data_secure_group_key_issue"
class KNXConfigEntryData(TypedDict, total=False):
"""Config entry for the KNX integration."""

View File

@@ -77,6 +77,11 @@ class _KnxEntityBase(Entity):
"""Store register state change callback and start device object."""
self._device.register_device_updated_cb(self.after_update_callback)
self._device.xknx.devices.async_add(self._device)
if uid := self.unique_id:
self._knx_module.add_to_group_address_entities(
group_addresses=self._device.group_addresses(),
identifier=(self.platform_data.domain, uid),
)
# super call needed to have methods of multi-inherited classes called
# eg. for restoring state (like _KNXSwitch)
await super().async_added_to_hass()
@@ -85,6 +90,11 @@ class _KnxEntityBase(Entity):
"""Disconnect device object when removed."""
self._device.unregister_device_updated_cb(self.after_update_callback)
self._device.xknx.devices.async_remove(self._device)
if uid := self.unique_id:
self._knx_module.remove_from_group_address_entities(
group_addresses=self._device.group_addresses(),
identifier=(self.platform_data.domain, uid),
)
class KnxYamlEntity(_KnxEntityBase):

View File

@@ -56,6 +56,7 @@ from .const import (
from .device import KNXInterfaceDevice
from .expose import KNXExposeSensor, KNXExposeTime
from .project import KNXProject
from .repairs import data_secure_group_key_issue_dispatcher
from .storage.config_store import KNXConfigStore
from .telegrams import Telegrams
@@ -107,8 +108,12 @@ class KNXModule:
self._address_filter_transcoder: dict[AddressFilter, type[DPTBase]] = {}
self.group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {}
self.group_address_entities: dict[
DeviceGroupAddress, set[tuple[str, str]] # {(platform, unique_id),}
] = {}
self.knx_event_callback: TelegramQueue.Callback = self.register_event_callback()
self.entry.async_on_unload(data_secure_group_key_issue_dispatcher(self))
self.entry.async_on_unload(
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
)
@@ -225,6 +230,29 @@ class KNXModule:
threaded=True,
)
def add_to_group_address_entities(
self,
group_addresses: set[DeviceGroupAddress],
identifier: tuple[str, str], # (platform, unique_id)
) -> None:
"""Register entity in group_address_entities map."""
for ga in group_addresses:
if ga not in self.group_address_entities:
self.group_address_entities[ga] = set()
self.group_address_entities[ga].add(identifier)
def remove_from_group_address_entities(
self,
group_addresses: set[DeviceGroupAddress],
identifier: tuple[str, str],
) -> None:
"""Unregister entity from group_address_entities map."""
for ga in group_addresses:
if ga in self.group_address_entities:
self.group_address_entities[ga].discard(identifier)
if not self.group_address_entities[ga]:
del self.group_address_entities[ga]
def connection_state_changed_cb(self, state: XknxConnectionState) -> None:
"""Call invoked after a KNX connection state change was received."""
self.connected = state == XknxConnectionState.CONNECTED

View File

@@ -0,0 +1,175 @@
"""Repairs for KNX integration."""
from __future__ import annotations
from collections.abc import Callable
from functools import partial
from typing import TYPE_CHECKING, Any, Final
import voluptuous as vol
from xknx.exceptions.exception import InvalidSecureConfiguration
from xknx.telegram import GroupAddress, IndividualAddress, Telegram
from homeassistant import data_entry_flow
from homeassistant.components.repairs import RepairsFlow
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import issue_registry as ir, selector
from homeassistant.helpers.dispatcher import async_dispatcher_connect
if TYPE_CHECKING:
from .knx_module import KNXModule
from .const import (
CONF_KNX_KNXKEY_PASSWORD,
DOMAIN,
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
KNXConfigEntryData,
)
from .storage.keyring import DEFAULT_KNX_KEYRING_FILENAME, save_uploaded_knxkeys_file
from .telegrams import SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM, TelegramDict
CONF_KEYRING_FILE: Final = "knxkeys_file"
async def async_create_fix_flow(
hass: HomeAssistant,
issue_id: str,
data: dict[str, str | int | float | None] | None,
) -> RepairsFlow:
"""Create flow."""
if issue_id == REPAIR_ISSUE_DATA_SECURE_GROUP_KEY:
return DataSecureGroupIssueRepairFlow()
# If KNX adds confirm-only repairs in the future, this should be changed
# to return a ConfirmRepairFlow instead of raising a ValueError
raise ValueError(f"unknown repair {issue_id}")
######################
# DataSecure key issue
######################
@callback
def data_secure_group_key_issue_dispatcher(knx_module: KNXModule) -> Callable[[], None]:
"""Watcher for DataSecure group key issues."""
return async_dispatcher_connect(
knx_module.hass,
signal=SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM,
target=partial(_data_secure_group_key_issue_handler, knx_module),
)
@callback
def _data_secure_group_key_issue_handler(
knx_module: KNXModule, telegram: Telegram, telegram_dict: TelegramDict
) -> None:
"""Handle DataSecure group key issue telegrams."""
if telegram.destination_address not in knx_module.group_address_entities:
# Only report issues for configured group addresses
return
issue_registry = ir.async_get(knx_module.hass)
new_ga = str(telegram.destination_address)
new_ia = str(telegram.source_address)
new_data = {new_ga: new_ia}
if existing_issue := issue_registry.async_get_issue(
DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY
):
assert isinstance(existing_issue.data, dict)
existing_data: dict[str, str] = existing_issue.data # type: ignore[assignment]
if new_ga in existing_data:
current_ias = existing_data[new_ga].split(", ")
if new_ia in current_ias:
return
current_ias = sorted([*current_ias, new_ia], key=IndividualAddress)
new_data[new_ga] = ", ".join(current_ias)
new_data_unsorted = existing_data | new_data
new_data = {
key: new_data_unsorted[key]
for key in sorted(new_data_unsorted, key=GroupAddress)
}
issue_registry.async_get_or_create(
DOMAIN,
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
data=new_data, # type: ignore[arg-type]
is_fixable=True,
is_persistent=True,
severity=ir.IssueSeverity.ERROR,
translation_key=REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
translation_placeholders={
"addresses": "\n".join(
f"`{ga}` from {ias}" for ga, ias in new_data.items()
),
"interface": str(knx_module.xknx.current_address),
},
)
class DataSecureGroupIssueRepairFlow(RepairsFlow):
"""Handler for an issue fixing flow for outdated DataSecure keys."""
@callback
def _async_get_placeholders(self) -> dict[str, str]:
issue_registry = ir.async_get(self.hass)
issue = issue_registry.async_get_issue(self.handler, self.issue_id)
assert issue is not None
return issue.translation_placeholders or {}
async def async_step_init(
self, user_input: dict[str, str] | None = None
) -> data_entry_flow.FlowResult:
"""Handle the first step of a fix flow."""
return await self.async_step_secure_knxkeys()
async def async_step_secure_knxkeys(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Manage upload of new KNX Keyring file."""
errors: dict[str, str] = {}
if user_input is not None:
password = user_input[CONF_KNX_KNXKEY_PASSWORD]
keyring = None
try:
keyring = await save_uploaded_knxkeys_file(
self.hass,
uploaded_file_id=user_input[CONF_KEYRING_FILE],
password=password,
)
except InvalidSecureConfiguration:
errors[CONF_KNX_KNXKEY_PASSWORD] = "keyfile_invalid_signature"
if not errors and keyring:
new_entry_data = KNXConfigEntryData(
knxkeys_filename=f"{DOMAIN}/{DEFAULT_KNX_KEYRING_FILENAME}",
knxkeys_password=password,
)
return self.finish_flow(new_entry_data)
fields = {
vol.Required(CONF_KEYRING_FILE): selector.FileSelector(
config=selector.FileSelectorConfig(accept=".knxkeys")
),
vol.Required(CONF_KNX_KNXKEY_PASSWORD): selector.TextSelector(),
}
return self.async_show_form(
step_id="secure_knxkeys",
data_schema=vol.Schema(fields),
description_placeholders=self._async_get_placeholders(),
errors=errors,
)
@callback
def finish_flow(
self, new_entry_data: KNXConfigEntryData
) -> data_entry_flow.FlowResult:
"""Finish the repair flow. Reload the config entry."""
knx_config_entries = self.hass.config_entries.async_entries(DOMAIN)
if knx_config_entries:
config_entry = knx_config_entries[0] # single_config_entry
new_data = {**config_entry.data, **new_entry_data}
self.hass.config_entries.async_update_entry(config_entry, data=new_data)
self.hass.config_entries.async_schedule_reload(config_entry.entry_id)
return self.async_create_entry(data={})

View File

@@ -10,9 +10,10 @@ from xknx.secure.keyring import Keyring, sync_load_keyring
from homeassistant.components.file_upload import process_uploaded_file
from homeassistant.core import HomeAssistant
from homeassistant.helpers import issue_registry as ir
from homeassistant.helpers.storage import STORAGE_DIR
from ..const import DOMAIN
from ..const import DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY
_LOGGER = logging.getLogger(__name__)
@@ -45,4 +46,11 @@ async def save_uploaded_knxkeys_file(
shutil.move(file_path, dest_file)
return keyring
return await hass.async_add_executor_job(_process_upload)
keyring = await hass.async_add_executor_job(_process_upload)
# If there is an existing DataSecure group key issue, remove it.
# GAs might not be DataSecure anymore after uploading a valid keyring,
# if they are, we raise the issue again when receiving a telegram.
ir.async_delete_issue(hass, DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY)
return keyring

View File

@@ -671,6 +671,30 @@
"message": "Invalid type for `knx.send` service: {type}"
}
},
"issues": {
"data_secure_group_key_issue": {
"fix_flow": {
"error": {
"keyfile_invalid_signature": "[%key:component::knx::config::error::keyfile_invalid_signature%]"
},
"step": {
"secure_knxkeys": {
"data": {
"knxkeys_file": "[%key:component::knx::config::step::secure_knxkeys::data::knxkeys_file%]",
"knxkeys_password": "[%key:component::knx::config::step::secure_knxkeys::data::knxkeys_password%]"
},
"data_description": {
"knxkeys_file": "[%key:component::knx::config::step::secure_knxkeys::data_description::knxkeys_file%]",
"knxkeys_password": "[%key:component::knx::config::step::secure_knxkeys::data_description::knxkeys_password%]"
},
"description": "Telegrams for group addresses used in Home Assistant could not be decrypted because Data Secure keys are missing or invalid:\n\n{addresses}\n\nTo fix this, update the sending devices configurations via ETS and provide an updated KNX Keyring file. Make sure that the group addresses used in Home Assistant are associated with the interface used by Home Assistant (`{interface}` when the issue last occurred).",
"title": "Update KNX Keyring"
}
}
},
"title": "KNX Data Secure telegrams can't be decrypted"
}
},
"options": {
"step": {
"communication_settings": {

View File

@@ -26,6 +26,9 @@ STORAGE_KEY: Final = f"{DOMAIN}/telegrams_history.json"
# dispatcher signal for KNX interface device triggers
SIGNAL_KNX_TELEGRAM: SignalType[Telegram, TelegramDict] = SignalType("knx_telegram")
SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM: SignalType[Telegram, TelegramDict] = SignalType(
"knx_data_secure_issue_telegram"
)
class DecodedTelegramPayload(TypedDict):
@@ -74,6 +77,11 @@ class Telegrams:
match_for_outgoing=True,
)
)
self._xknx_data_secure_group_key_issue_cb_handle = (
xknx.telegram_queue.register_data_secure_group_key_issue_cb(
self._xknx_data_secure_group_key_issue_cb,
)
)
self.recent_telegrams: deque[TelegramDict] = deque(maxlen=log_size)
self.last_ga_telegrams: dict[str, TelegramDict] = {}
@@ -107,6 +115,14 @@ class Telegrams:
self.last_ga_telegrams[telegram_dict["destination"]] = telegram_dict
async_dispatcher_send(self.hass, SIGNAL_KNX_TELEGRAM, telegram, telegram_dict)
def _xknx_data_secure_group_key_issue_cb(self, telegram: Telegram) -> None:
"""Handle telegrams with undecodable data secure payload from xknx."""
telegram_dict = self.telegram_to_dict(telegram)
self.recent_telegrams.append(telegram_dict)
async_dispatcher_send(
self.hass, SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM, telegram, telegram_dict
)
def telegram_to_dict(self, telegram: Telegram) -> TelegramDict:
"""Convert a Telegram to a dict."""
dst_name = ""

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable
from contextlib import ExitStack
from functools import wraps
import inspect
from typing import TYPE_CHECKING, Any, Final, overload
@@ -34,7 +35,11 @@ from .storage.entity_store_validation import (
validate_entity_data,
)
from .storage.serialize import get_serialized_schema
from .telegrams import SIGNAL_KNX_TELEGRAM, TelegramDict
from .telegrams import (
SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM,
SIGNAL_KNX_TELEGRAM,
TelegramDict,
)
if TYPE_CHECKING:
from .knx_module import KNXModule
@@ -334,11 +339,23 @@ def ws_subscribe_telegram(
telegram_dict,
)
connection.subscriptions[msg["id"]] = async_dispatcher_connect(
hass,
signal=SIGNAL_KNX_TELEGRAM,
target=forward_telegram,
stack = ExitStack()
stack.callback(
async_dispatcher_connect(
hass,
signal=SIGNAL_KNX_TELEGRAM,
target=forward_telegram,
)
)
stack.callback(
async_dispatcher_connect(
hass,
signal=SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM,
target=forward_telegram,
)
)
connection.subscriptions[msg["id"]] = stack.close
connection.send_result(msg["id"])

View File

@@ -11,9 +11,15 @@ from xknx import XKNX
from xknx.core import XknxConnectionState, XknxConnectionType
from xknx.dpt import DPTArray, DPTBinary
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram import Telegram, TelegramDirection, tpci
from xknx.telegram.address import GroupAddress, IndividualAddress
from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.telegram.apci import (
APCI,
GroupValueRead,
GroupValueResponse,
GroupValueWrite,
SecureAPDU,
)
from homeassistant.components.knx.const import (
CONF_KNX_AUTOMATIC,
@@ -312,6 +318,23 @@ class KNXTestKit:
source=source,
)
def receive_data_secure_issue(
self,
group_address: str,
source: str | None = None,
) -> None:
"""Inject incoming telegram with undecodable data secure payload."""
telegram = Telegram(
destination_address=GroupAddress(group_address),
direction=TelegramDirection.INCOMING,
source_address=IndividualAddress(source or self.INDIVIDUAL_ADDRESS),
tpci=tpci.TDataGroup(),
payload=SecureAPDU.from_knx(
bytes.fromhex("03f110002446cfef4ac085e7092ab062b44d")
),
)
self.xknx.telegram_queue.received_data_secure_group_key_issue(telegram)
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:

View File

@@ -0,0 +1,133 @@
"""Test repair flows for KNX integration."""
import pytest
from xknx.exceptions.exception import InvalidSecureConfiguration
from homeassistant.components.knx import repairs
from homeassistant.components.knx.const import (
CONF_KNX_KNXKEY_PASSWORD,
DOMAIN,
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import issue_registry as ir
from .conftest import KNXTestKit
from .test_config_flow import FIXTURE_UPLOAD_UUID, patch_file_upload
from tests.components.repairs import (
async_process_repairs_platforms,
get_repairs,
process_repair_fix_flow,
start_repair_fix_flow,
)
from tests.typing import ClientSessionGenerator, WebSocketGenerator
async def test_create_fix_flow_raises_on_unknown_issue_id(hass: HomeAssistant) -> None:
"""Test create_fix_flow raises on unknown issue_id."""
with pytest.raises(ValueError):
await repairs.async_create_fix_flow(hass, "no_such_issue", None)
@pytest.mark.parametrize(
"configured_group_address",
["1/2/5", "3/4/6"],
)
async def test_data_secure_group_key_issue_only_for_configured_group_address(
hass: HomeAssistant,
knx: KNXTestKit,
configured_group_address: str,
) -> None:
"""Test that repair issue is only created for configured group addresses."""
await knx.setup_integration(
{
"switch": {
"name": "Test Switch",
"address": configured_group_address,
}
}
)
issue_registry = ir.async_get(hass)
assert bool(issue_registry.issues) is False
# An issue should only be created if this address is configured.
knx.receive_data_secure_issue("1/2/5")
assert bool(issue_registry.issues) is (configured_group_address == "1/2/5")
async def test_data_secure_group_key_issue_repair_flow(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
hass_ws_client: WebSocketGenerator,
knx: KNXTestKit,
) -> None:
"""Test repair flow for DataSecure group key issue."""
await knx.setup_integration(
{
"switch": [
{"name": "Test 1", "address": "1/2/5"},
{"name": "Test 2", "address": "11/0/0"},
]
}
)
knx.receive_data_secure_issue("11/0/0", source="1.0.1")
knx.receive_data_secure_issue("1/2/5", source="1.0.10")
knx.receive_data_secure_issue("1/2/5", source="1.0.1")
_placeholders = {
"addresses": "`1/2/5` from 1.0.1, 1.0.10\n`11/0/0` from 1.0.1", # check sorting
"interface": "0.0.0",
}
issue_registry = ir.async_get(hass)
issue = issue_registry.async_get_issue(DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY)
assert issue is not None
assert issue.translation_placeholders == _placeholders
issues = await get_repairs(hass, hass_ws_client)
assert issues
await async_process_repairs_platforms(hass)
client = await hass_client()
flow = await start_repair_fix_flow(
client, DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY
)
flow_id = flow["flow_id"]
assert flow["type"] == FlowResultType.FORM
assert flow["step_id"] == "secure_knxkeys"
assert flow["description_placeholders"] == _placeholders
# test error handling
with patch_file_upload(
side_effect=InvalidSecureConfiguration(),
):
flow = await process_repair_fix_flow(
client,
flow_id,
{
repairs.CONF_KEYRING_FILE: FIXTURE_UPLOAD_UUID,
CONF_KNX_KNXKEY_PASSWORD: "invalid_password_mocked",
},
)
assert flow["type"] == FlowResultType.FORM
assert flow["step_id"] == "secure_knxkeys"
assert flow["errors"] == {CONF_KNX_KNXKEY_PASSWORD: "keyfile_invalid_signature"}
# test successful file upload
with patch_file_upload():
flow = await process_repair_fix_flow(
client,
flow_id,
{
repairs.CONF_KEYRING_FILE: FIXTURE_UPLOAD_UUID,
CONF_KNX_KNXKEY_PASSWORD: "password",
},
)
assert flow["type"] == FlowResultType.CREATE_ENTRY
assert (
issue_registry.async_get_issue(DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY)
is None
)

View File

@@ -311,6 +311,8 @@ async def test_knx_subscribe_telegrams_command_no_project(
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
)
await knx.assert_write("1/2/4", 1)
# receive undecodable data secure telegram
knx.receive_data_secure_issue("1/2/5")
# receive events
res = await client.receive_json()
@@ -355,6 +357,14 @@ async def test_knx_subscribe_telegrams_command_no_project(
assert res["event"]["direction"] == "Outgoing"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination"] == "1/2/5"
assert res["event"]["payload"] is None
assert res["event"]["telegramtype"] == "SecureAPDU"
assert res["event"]["source"] == "1.2.3"
assert res["event"]["direction"] == "Incoming"
assert res["event"]["timestamp"] is not None
async def test_knx_subscribe_telegrams_command_project(
hass: HomeAssistant,