Add support for KNX IP-Secure routing (#82765)

* always use instance variable for new entry data

- change `self._tunneling_config` to non-optional `self.new_entry_data`
- always use self.new_entry_data in `finish_flow()`

* support secure routing

* amend current tests

* use sync latency tolerance

* test secure routing config flow

* diagnostics redact backbone_key

* test xknx library setup

* check length of backbone_key

* better readable key validation
This commit is contained in:
Matthias Alphart 2022-11-27 23:33:12 +01:00 committed by GitHub
parent d6e287f47a
commit 4517af509c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 422 additions and 88 deletions

View File

@ -51,6 +51,9 @@ from .const import (
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_ROUTING_SECURE,
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
@ -406,15 +409,15 @@ class KNXModule:
auto_reconnect=True,
threaded=True,
)
if _conn_type == CONF_KNX_TUNNELING_TCP_SECURE:
knxkeys_file: str | None = (
self.hass.config.path(
STORAGE_DIR,
self.entry.data[CONF_KNX_KNXKEY_FILENAME],
)
if self.entry.data.get(CONF_KNX_KNXKEY_FILENAME) is not None
else None
knxkeys_file: str | None = (
self.hass.config.path(
STORAGE_DIR,
self.entry.data[CONF_KNX_KNXKEY_FILENAME],
)
if self.entry.data.get(CONF_KNX_KNXKEY_FILENAME) is not None
else None
)
if _conn_type == CONF_KNX_TUNNELING_TCP_SECURE:
return ConnectionConfig(
connection_type=ConnectionType.TUNNELING_TCP_SECURE,
gateway_ip=self.entry.data[CONF_HOST],
@ -431,6 +434,24 @@ class KNXModule:
auto_reconnect=True,
threaded=True,
)
if _conn_type == CONF_KNX_ROUTING_SECURE:
return ConnectionConfig(
connection_type=ConnectionType.ROUTING_SECURE,
individual_address=self.entry.data[CONF_KNX_INDIVIDUAL_ADDRESS],
multicast_group=self.entry.data[CONF_KNX_MCAST_GRP],
multicast_port=self.entry.data[CONF_KNX_MCAST_PORT],
local_ip=self.entry.data.get(CONF_KNX_LOCAL_IP),
secure_config=SecureConfig(
backbone_key=self.entry.data.get(CONF_KNX_ROUTING_BACKBONE_KEY),
latency_ms=self.entry.data.get(
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE
),
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
knxkeys_file_path=knxkeys_file,
),
auto_reconnect=True,
threaded=True,
)
return ConnectionConfig(
auto_reconnect=True,
threaded=True,

View File

@ -33,6 +33,9 @@ from .const import (
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_ROUTING_SECURE,
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
@ -87,13 +90,13 @@ class KNXCommonFlow(ABC, FlowHandler):
def __init__(self, initial_data: KNXConfigEntryData) -> None:
"""Initialize KNXCommonFlow."""
self.initial_data = initial_data
self.new_entry_data = KNXConfigEntryData()
self._found_gateways: list[GatewayDescriptor] = []
self._found_tunnels: list[GatewayDescriptor] = []
self._selected_tunnel: GatewayDescriptor | None = None
self._tunneling_config: KNXConfigEntryData | None = None
@abstractmethod
def finish_flow(self, new_entry_data: KNXConfigEntryData, title: str) -> FlowResult:
def finish_flow(self, title: str) -> FlowResult:
"""Finish the flow."""
async def async_step_connection_type(
@ -119,11 +122,8 @@ class KNXCommonFlow(ABC, FlowHandler):
return await self.async_step_tunnel()
# Automatic connection type
entry_data = KNXConfigEntryData(connection_type=CONF_KNX_AUTOMATIC)
return self.finish_flow(
new_entry_data=entry_data,
title=CONF_KNX_AUTOMATIC.capitalize(),
)
self.new_entry_data = KNXConfigEntryData(connection_type=CONF_KNX_AUTOMATIC)
return self.finish_flow(title=CONF_KNX_AUTOMATIC.capitalize())
supported_connection_types = {
CONF_KNX_TUNNELING: CONF_KNX_TUNNELING.capitalize(),
@ -163,7 +163,7 @@ class KNXCommonFlow(ABC, FlowHandler):
if self._selected_tunnel.supports_tunnelling_tcp
else CONF_KNX_TUNNELING
)
self._tunneling_config = KNXConfigEntryData(
self.new_entry_data = KNXConfigEntryData(
host=self._selected_tunnel.ip_addr,
port=self._selected_tunnel.port,
route_back=False,
@ -171,13 +171,10 @@ class KNXCommonFlow(ABC, FlowHandler):
)
if connection_type == CONF_KNX_TUNNELING_TCP_SECURE:
return self.async_show_menu(
step_id="secure_tunneling",
step_id="secure_key_source",
menu_options=["secure_knxkeys", "secure_tunnel_manual"],
)
return self.finish_flow(
new_entry_data=self._tunneling_config,
title=f"Tunneling @ {self._selected_tunnel}",
)
return self.finish_flow(title=f"Tunneling @ {self._selected_tunnel}")
if not self._found_tunnels:
return await self.async_step_manual_tunnel()
@ -211,7 +208,7 @@ class KNXCommonFlow(ABC, FlowHandler):
if not errors:
connection_type = user_input[CONF_KNX_TUNNELING_TYPE]
self._tunneling_config = KNXConfigEntryData(
self.new_entry_data = KNXConfigEntryData(
host=_host,
port=user_input[CONF_PORT],
route_back=user_input[CONF_KNX_ROUTE_BACK],
@ -221,13 +218,10 @@ class KNXCommonFlow(ABC, FlowHandler):
if connection_type == CONF_KNX_TUNNELING_TCP_SECURE:
return self.async_show_menu(
step_id="secure_tunneling",
menu_options=["secure_knxkeys", "secure_tunnel_manual"],
step_id="secure_key_source",
menu_options=["secure_knxkeys", "secure_routing_manual"],
)
return self.finish_flow(
new_entry_data=self._tunneling_config,
title=f"Tunneling @ {_host}",
)
return self.finish_flow(title=f"Tunneling @ {_host}")
_reconfiguring_existing_tunnel = (
self.initial_data.get(CONF_KNX_CONNECTION_TYPE)
@ -290,20 +284,18 @@ class KNXCommonFlow(ABC, FlowHandler):
async def async_step_secure_tunnel_manual(
self, user_input: dict | None = None
) -> FlowResult:
"""Configure ip secure manually."""
"""Configure ip secure tunnelling manually."""
errors: dict = {}
if user_input is not None:
assert self._tunneling_config
entry_data = self._tunneling_config | KNXConfigEntryData(
self.new_entry_data |= KNXConfigEntryData(
connection_type=CONF_KNX_TUNNELING_TCP_SECURE,
device_authentication=user_input[CONF_KNX_SECURE_DEVICE_AUTHENTICATION],
user_id=user_input[CONF_KNX_SECURE_USER_ID],
user_password=user_input[CONF_KNX_SECURE_USER_PASSWORD],
)
return self.finish_flow(
new_entry_data=entry_data,
title=f"Secure Tunneling @ {self._tunneling_config[CONF_HOST]}",
title=f"Secure Tunneling @ {self.new_entry_data[CONF_HOST]}"
)
fields = {
@ -338,6 +330,60 @@ class KNXCommonFlow(ABC, FlowHandler):
errors=errors,
)
async def async_step_secure_routing_manual(
self, user_input: dict | None = None
) -> FlowResult:
"""Configure ip secure routing manually."""
errors: dict = {}
if user_input is not None:
try:
key_bytes = bytes.fromhex(user_input[CONF_KNX_ROUTING_BACKBONE_KEY])
if len(key_bytes) != 16:
raise ValueError
except ValueError:
errors[CONF_KNX_ROUTING_BACKBONE_KEY] = "invalid_backbone_key"
if not errors:
self.new_entry_data |= KNXConfigEntryData(
backbone_key=user_input[CONF_KNX_ROUTING_BACKBONE_KEY],
sync_latency_tolerance=user_input[
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE
],
)
return self.finish_flow(
title=f"Secure Routing as {self.new_entry_data[CONF_KNX_INDIVIDUAL_ADDRESS]}"
)
fields = {
vol.Required(
CONF_KNX_ROUTING_BACKBONE_KEY,
default=self.initial_data.get(CONF_KNX_ROUTING_BACKBONE_KEY),
): selector.TextSelector(
selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD),
),
vol.Required(
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
default=self.initial_data.get(CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE)
or 1000,
): vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=400,
max=4000,
unit_of_measurement="ms",
mode=selector.NumberSelectorMode.BOX,
),
),
vol.Coerce(int),
),
}
return self.async_show_form(
step_id="secure_routing_manual",
data_schema=vol.Schema(fields),
errors=errors,
)
async def async_step_secure_knxkeys(
self, user_input: dict | None = None
) -> FlowResult:
@ -345,7 +391,6 @@ class KNXCommonFlow(ABC, FlowHandler):
errors = {}
if user_input is not None:
assert self._tunneling_config
storage_key = CONST_KNX_STORAGE_KEY + user_input[CONF_KNX_KNXKEY_FILENAME]
try:
await load_keyring(
@ -358,15 +403,20 @@ class KNXCommonFlow(ABC, FlowHandler):
errors[CONF_KNX_KNXKEY_PASSWORD] = "invalid_signature"
if not errors:
entry_data = self._tunneling_config | KNXConfigEntryData(
connection_type=CONF_KNX_TUNNELING_TCP_SECURE,
self.new_entry_data |= KNXConfigEntryData(
backbone_key=None,
sync_latency_tolerance=None,
knxkeys_filename=storage_key,
knxkeys_password=user_input[CONF_KNX_KNXKEY_PASSWORD],
)
return self.finish_flow(
new_entry_data=entry_data,
title=f"Secure Tunneling @ {self._tunneling_config[CONF_HOST]}",
)
if (
self.new_entry_data[CONF_KNX_CONNECTION_TYPE]
== CONF_KNX_ROUTING_SECURE
):
title = f"Secure Routing as {self.new_entry_data[CONF_KNX_INDIVIDUAL_ADDRESS]}"
else:
title = f"Secure Tunneling @ {self.new_entry_data[CONF_HOST]}"
return self.finish_flow(title=title)
fields = {
vol.Required(
@ -418,34 +468,46 @@ class KNXCommonFlow(ABC, FlowHandler):
errors[CONF_KNX_LOCAL_IP] = "invalid_ip_address"
if not errors:
entry_data = KNXConfigEntryData(
connection_type=CONF_KNX_ROUTING,
connection_type = (
CONF_KNX_ROUTING_SECURE
if user_input[CONF_KNX_ROUTING_SECURE]
else CONF_KNX_ROUTING
)
self.new_entry_data = KNXConfigEntryData(
connection_type=connection_type,
individual_address=_individual_address,
multicast_group=_multicast_group,
multicast_port=_multicast_port,
local_ip=_local_ip,
)
return self.finish_flow(
new_entry_data=entry_data,
title=f"Routing as {_individual_address}",
)
if connection_type == CONF_KNX_ROUTING_SECURE:
return self.async_show_menu(
step_id="secure_key_source",
menu_options=["secure_knxkeys", "secure_routing_manual"],
)
return self.finish_flow(title=f"Routing as {_individual_address}")
routers = [router for router in self._found_gateways if router.supports_routing]
if not routers:
errors["base"] = "no_router_discovered"
default_secure_routing_enable = any(
router for router in routers if router.routing_requires_secure
)
fields = {
vol.Required(
CONF_KNX_INDIVIDUAL_ADDRESS, default=_individual_address
): _IA_SELECTOR,
vol.Required(
CONF_KNX_ROUTING_SECURE, default=default_secure_routing_enable
): selector.BooleanSelector(),
vol.Required(CONF_KNX_MCAST_GRP, default=_multicast_group): _IP_SELECTOR,
vol.Required(CONF_KNX_MCAST_PORT, default=_multicast_port): _PORT_SELECTOR,
}
if self.show_advanced_options:
# Optional with default doesn't work properly in flow UI
fields[vol.Optional(CONF_KNX_LOCAL_IP)] = _IP_SELECTOR
if not any(
router for router in self._found_gateways if router.supports_routing
):
errors["base"] = "no_router_discovered"
return self.async_show_form(
step_id="routing", data_schema=vol.Schema(fields), errors=errors
)
@ -467,11 +529,11 @@ class KNXConfigFlow(KNXCommonFlow, ConfigFlow, domain=DOMAIN):
return KNXOptionsFlow(config_entry)
@callback
def finish_flow(self, new_entry_data: KNXConfigEntryData, title: str) -> FlowResult:
def finish_flow(self, title: str) -> FlowResult:
"""Create the ConfigEntry."""
return self.async_create_entry(
title=title,
data=DEFAULT_ENTRY_DATA | new_entry_data,
data=DEFAULT_ENTRY_DATA | self.new_entry_data,
)
async def async_step_user(self, user_input: dict | None = None) -> FlowResult:
@ -492,11 +554,9 @@ class KNXOptionsFlow(KNXCommonFlow, OptionsFlow):
super().__init__(initial_data=config_entry.data) # type: ignore[arg-type]
@callback
def finish_flow(
self, new_entry_data: KNXConfigEntryData, title: str | None
) -> FlowResult:
def finish_flow(self, title: str | None) -> FlowResult:
"""Update the ConfigEntry and finish the flow."""
new_data = DEFAULT_ENTRY_DATA | self.initial_data | new_entry_data
new_data = DEFAULT_ENTRY_DATA | self.initial_data | self.new_entry_data
self.hass.config_entries.async_update_entry(
self.config_entry,
data=new_data,
@ -518,13 +578,11 @@ class KNXOptionsFlow(KNXCommonFlow, OptionsFlow):
) -> FlowResult:
"""Manage KNX communication settings."""
if user_input is not None:
return self.finish_flow(
new_entry_data=KNXConfigEntryData(
state_updater=user_input[CONF_KNX_STATE_UPDATER],
rate_limit=user_input[CONF_KNX_RATE_LIMIT],
),
title=None,
self.new_entry_data = KNXConfigEntryData(
state_updater=user_input[CONF_KNX_STATE_UPDATER],
rate_limit=user_input[CONF_KNX_RATE_LIMIT],
)
return self.finish_flow(title=None)
data_schema = {
vol.Required(

View File

@ -30,6 +30,9 @@ CONF_KNX_INDIVIDUAL_ADDRESS: Final = "individual_address"
CONF_KNX_CONNECTION_TYPE: Final = "connection_type"
CONF_KNX_AUTOMATIC: Final = "automatic"
CONF_KNX_ROUTING: Final = "routing"
CONF_KNX_ROUTING_BACKBONE_KEY: Final = "backbone_key"
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: Final = "sync_latency_tolerance"
CONF_KNX_ROUTING_SECURE: Final = "routing_secure"
CONF_KNX_TUNNELING: Final = "tunneling"
CONF_KNX_TUNNELING_TCP: Final = "tunneling_tcp"
CONF_KNX_TUNNELING_TCP_SECURE: Final = "tunneling_tcp_secure"
@ -92,6 +95,8 @@ class KNXConfigEntryData(TypedDict, total=False):
device_authentication: str
knxkeys_filename: str
knxkeys_password: str
backbone_key: str | None
sync_latency_tolerance: int | None
class ColorTempModes(Enum):

View File

@ -13,12 +13,14 @@ from homeassistant.core import HomeAssistant
from . import CONFIG_SCHEMA
from .const import (
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_PASSWORD,
DOMAIN,
)
TO_REDACT = {
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,

View File

@ -29,11 +29,12 @@
"local_ip": "Leave blank to use auto-discovery."
}
},
"secure_tunneling": {
"secure_key_source": {
"description": "Select how you want to configure KNX/IP Secure.",
"menu_options": {
"secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys",
"secure_tunnel_manual": "Configure IP secure keys manually"
"secure_tunnel_manual": "Configure IP secure credentials manually",
"secure_routing_manual": "Configure IP secure backbone key manually"
}
},
"secure_knxkeys": {
@ -60,10 +61,22 @@
"device_authentication": "This is set in the 'IP' panel of the interface in ETS."
}
},
"secure_routing_manual": {
"description": "Please enter your IP secure information.",
"data": {
"backbone_key": "Backbone key",
"sync_latency_tolerance": "Network latency tolerance"
},
"data_description": {
"backbone_key": "Can be seen in the 'Security' report of an ETS project. Eg. '00112233445566778899AABBCCDDEEFF'",
"sync_latency_tolerance": "Default is 1000."
}
},
"routing": {
"description": "Please configure the routing options.",
"data": {
"individual_address": "Individual address",
"routing_secure": "Use KNX IP Secure",
"multicast_group": "Multicast group",
"multicast_port": "Multicast port",
"local_ip": "Local IP of Home Assistant"
@ -80,6 +93,7 @@
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_backbone_key": "Invalid backbone key. 32 hexadecimal numbers expected.",
"invalid_individual_address": "Value does not match pattern for KNX individual address.\n'area.line.device'",
"invalid_ip_address": "Invalid IPv4 address.",
"invalid_signature": "The password to decrypt the `.knxkeys` file is wrong.",
@ -134,11 +148,12 @@
"local_ip": "[%key:component::knx::config::step::manual_tunnel::data_description::local_ip%]"
}
},
"secure_tunneling": {
"description": "[%key:component::knx::config::step::secure_tunneling::description%]",
"secure_key_source": {
"description": "[%key:component::knx::config::step::secure_key_source::description%]",
"menu_options": {
"secure_knxkeys": "[%key:component::knx::config::step::secure_tunneling::menu_options::secure_knxkeys%]",
"secure_tunnel_manual": "[%key:component::knx::config::step::secure_tunneling::menu_options::secure_tunnel_manual%]"
"secure_knxkeys": "[%key:component::knx::config::step::secure_key_source::menu_options::secure_knxkeys%]",
"secure_tunnel_manual": "[%key:component::knx::config::step::secure_key_source::menu_options::secure_tunnel_manual%]",
"secure_routing_manual": "[%key:component::knx::config::step::secure_key_source::menu_options::secure_routing_manual%]"
}
},
"secure_knxkeys": {
@ -165,10 +180,22 @@
"device_authentication": "[%key:component::knx::config::step::secure_tunnel_manual::data_description::device_authentication%]"
}
},
"secure_routing_manual": {
"description": "[%key:component::knx::config::step::secure_routing_manual::description%]",
"data": {
"backbone_key": "[%key:component::knx::config::step::secure_routing_manual::data::backbone_key%]",
"sync_latency_tolerance": "[%key:component::knx::config::step::secure_routing_manual::data::sync_latency_tolerance%]"
},
"data_description": {
"backbone_key": "[%key:component::knx::config::step::secure_routing_manual::data_description::backbone_key%]",
"sync_latency_tolerance": "[%key:component::knx::config::step::secure_routing_manual::data_description::sync_latency_tolerance%]"
}
},
"routing": {
"description": "[%key:component::knx::config::step::routing::description%]",
"data": {
"individual_address": "[%key:component::knx::config::step::routing::data::individual_address%]",
"routing_secure": "[%key:component::knx::config::step::routing::data::routing_secure%]",
"multicast_group": "[%key:component::knx::config::step::routing::data::multicast_group%]",
"multicast_port": "[%key:component::knx::config::step::routing::data::multicast_port%]",
"local_ip": "[%key:component::knx::config::step::routing::data::local_ip%]"
@ -181,6 +208,7 @@
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_backbone_key": "[%key:component::knx::config::error::invalid_backbone_key%]",
"invalid_individual_address": "[%key:component::knx::config::error::invalid_individual_address%]",
"invalid_ip_address": "[%key:component::knx::config::error::invalid_ip_address%]",
"invalid_signature": "[%key:component::knx::config::error::invalid_signature%]",

View File

@ -7,6 +7,7 @@
"error": {
"cannot_connect": "Failed to connect",
"file_not_found": "The specified `.knxkeys` file was not found in the path config/.storage/knx/",
"invalid_backbone_key": "Invalid backbone key. 32 hexadecimal numbers expected.",
"invalid_individual_address": "Value does not match pattern for KNX individual address.\n'area.line.device'",
"invalid_ip_address": "Invalid IPv4 address.",
"invalid_signature": "The password to decrypt the `.knxkeys` file is wrong.",
@ -41,7 +42,8 @@
"individual_address": "Individual address",
"local_ip": "Local IP of Home Assistant",
"multicast_group": "Multicast group",
"multicast_port": "Multicast port"
"multicast_port": "Multicast port",
"routing_secure": "Use KNX IP Secure"
},
"data_description": {
"individual_address": "KNX address to be used by Home Assistant, e.g. `0.0.4`",
@ -49,6 +51,14 @@
},
"description": "Please configure the routing options."
},
"secure_key_source": {
"description": "Select how you want to configure KNX/IP Secure.",
"menu_options": {
"secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys",
"secure_routing_manual": "Configure IP secure backbone key manually",
"secure_tunnel_manual": "Configure IP secure credentials manually"
}
},
"secure_knxkeys": {
"data": {
"knxkeys_filename": "The filename of your `.knxkeys` file (including extension)",
@ -60,6 +70,17 @@
},
"description": "Please enter the information for your `.knxkeys` file."
},
"secure_routing_manual": {
"data": {
"backbone_key": "Backbone key",
"sync_latency_tolerance": "Network latency tolerance"
},
"data_description": {
"backbone_key": "Can be seen in the 'Security' report of an ETS project. Eg. '00112233445566778899AABBCCDDEEFF'",
"sync_latency_tolerance": "Default is 1000."
},
"description": "Please enter your IP secure information."
},
"secure_tunnel_manual": {
"data": {
"device_authentication": "Device authentication password",
@ -73,13 +94,6 @@
},
"description": "Please enter your IP secure information."
},
"secure_tunneling": {
"description": "Select how you want to configure KNX/IP Secure.",
"menu_options": {
"secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys",
"secure_tunnel_manual": "Configure IP secure keys manually"
}
},
"tunnel": {
"data": {
"gateway": "KNX Tunnel Connection"
@ -92,6 +106,7 @@
"error": {
"cannot_connect": "Failed to connect",
"file_not_found": "The specified `.knxkeys` file was not found in the path config/.storage/knx/",
"invalid_backbone_key": "Invalid backbone key. 32 hexadecimal numbers expected.",
"invalid_individual_address": "Value does not match pattern for KNX individual address.\n'area.line.device'",
"invalid_ip_address": "Invalid IPv4 address.",
"invalid_signature": "The password to decrypt the `.knxkeys` file is wrong.",
@ -142,7 +157,8 @@
"individual_address": "Individual address",
"local_ip": "Local IP of Home Assistant",
"multicast_group": "Multicast group",
"multicast_port": "Multicast port"
"multicast_port": "Multicast port",
"routing_secure": "Use KNX IP Secure"
},
"data_description": {
"individual_address": "KNX address to be used by Home Assistant, e.g. `0.0.4`",
@ -150,6 +166,14 @@
},
"description": "Please configure the routing options."
},
"secure_key_source": {
"description": "Select how you want to configure KNX/IP Secure.",
"menu_options": {
"secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys",
"secure_routing_manual": "Configure IP secure backbone key manually",
"secure_tunnel_manual": "Configure IP secure credentials manually"
}
},
"secure_knxkeys": {
"data": {
"knxkeys_filename": "The filename of your `.knxkeys` file (including extension)",
@ -161,6 +185,17 @@
},
"description": "Please enter the information for your `.knxkeys` file."
},
"secure_routing_manual": {
"data": {
"backbone_key": "Backbone key",
"sync_latency_tolerance": "Network latency tolerance"
},
"data_description": {
"backbone_key": "Can be seen in the 'Security' report of an ETS project. Eg. '00112233445566778899AABBCCDDEEFF'",
"sync_latency_tolerance": "Default is 1000."
},
"description": "Please enter your IP secure information."
},
"secure_tunnel_manual": {
"data": {
"device_authentication": "Device authentication password",
@ -174,13 +209,6 @@
},
"description": "Please enter your IP secure information."
},
"secure_tunneling": {
"description": "Select how you want to configure KNX/IP Secure.",
"menu_options": {
"secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys",
"secure_tunnel_manual": "Configure IP secure keys manually"
}
},
"tunnel": {
"data": {
"gateway": "KNX Tunnel Connection"

View File

@ -26,6 +26,9 @@ from homeassistant.components.knx.const import (
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_ROUTING_SECURE,
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
@ -197,6 +200,162 @@ async def test_routing_setup_advanced(hass: HomeAssistant) -> None:
assert len(mock_setup_entry.mock_calls) == 1
async def test_routing_secure_manual_setup(hass: HomeAssistant) -> None:
"""Test routing secure setup with manual key config."""
with patch("xknx.io.gateway_scanner.GatewayScanner.scan") as gateways:
gateways.return_value = []
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.FORM
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING,
},
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.FORM
assert result2["step_id"] == "routing"
assert result2["errors"] == {"base": "no_router_discovered"}
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"],
{
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_MCAST_PORT: 3671,
CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123",
CONF_KNX_ROUTING_SECURE: True,
},
)
assert result3["type"] == FlowResultType.MENU
assert result3["step_id"] == "secure_key_source"
result4 = await hass.config_entries.flow.async_configure(
result3["flow_id"],
{"next_step_id": "secure_routing_manual"},
)
assert result4["type"] == FlowResultType.FORM
assert result4["step_id"] == "secure_routing_manual"
assert not result4["errors"]
result_invalid_key1 = await hass.config_entries.flow.async_configure(
result4["flow_id"],
{
CONF_KNX_ROUTING_BACKBONE_KEY: "xxaacc44bbaacc44bbaacc44bbaaccyy", # invalid hex string
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000,
},
)
assert result_invalid_key1["type"] == FlowResultType.FORM
assert result_invalid_key1["step_id"] == "secure_routing_manual"
assert result_invalid_key1["errors"] == {"backbone_key": "invalid_backbone_key"}
result_invalid_key2 = await hass.config_entries.flow.async_configure(
result4["flow_id"],
{
CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44", # invalid length
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000,
},
)
assert result_invalid_key2["type"] == FlowResultType.FORM
assert result_invalid_key2["step_id"] == "secure_routing_manual"
assert result_invalid_key2["errors"] == {"backbone_key": "invalid_backbone_key"}
with patch(
"homeassistant.components.knx.async_setup_entry",
return_value=True,
) as mock_setup_entry:
secure_routing_manual = await hass.config_entries.flow.async_configure(
result_invalid_key2["flow_id"],
{
CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44",
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000,
},
)
await hass.async_block_till_done()
assert secure_routing_manual["type"] == FlowResultType.CREATE_ENTRY
assert secure_routing_manual["title"] == "Secure Routing as 0.0.123"
assert secure_routing_manual["data"] == {
**DEFAULT_ENTRY_DATA,
CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING_SECURE,
CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44",
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000,
CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_routing_secure_keyfile(hass: HomeAssistant) -> None:
"""Test routing secure setup with keyfile."""
with patch("xknx.io.gateway_scanner.GatewayScanner.scan") as gateways:
gateways.return_value = []
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.FORM
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING,
},
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.FORM
assert result2["step_id"] == "routing"
assert result2["errors"] == {"base": "no_router_discovered"}
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"],
{
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_MCAST_PORT: 3671,
CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123",
CONF_KNX_ROUTING_SECURE: True,
},
)
assert result3["type"] == FlowResultType.MENU
assert result3["step_id"] == "secure_key_source"
result4 = await hass.config_entries.flow.async_configure(
result3["flow_id"],
{"next_step_id": "secure_knxkeys"},
)
assert result4["type"] == FlowResultType.FORM
assert result4["step_id"] == "secure_knxkeys"
assert not result4["errors"]
with patch(
"homeassistant.components.knx.async_setup_entry",
return_value=True,
) as mock_setup_entry, patch(
"homeassistant.components.knx.config_flow.load_keyring", return_value=True
):
routing_secure_knxkeys = await hass.config_entries.flow.async_configure(
result4["flow_id"],
{
CONF_KNX_KNXKEY_FILENAME: "testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
},
)
await hass.async_block_till_done()
assert routing_secure_knxkeys["type"] == FlowResultType.CREATE_ENTRY
assert routing_secure_knxkeys["title"] == "Secure Routing as 0.0.123"
assert routing_secure_knxkeys["data"] == {
**DEFAULT_ENTRY_DATA,
CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING_SECURE,
CONF_KNX_KNXKEY_FILENAME: "knx/testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
CONF_KNX_ROUTING_BACKBONE_KEY: None,
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: None,
CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123",
}
assert len(mock_setup_entry.mock_calls) == 1
@pytest.mark.parametrize(
"user_input,config_entry_data",
[
@ -506,7 +665,7 @@ async def test_form_with_automatic_connection_handling(hass: HomeAssistant) -> N
async def _get_menu_step(hass: HomeAssistant) -> FlowResult:
"""Return flow in secure_tunnellinn menu step."""
"""Return flow in secure_tunnelling menu step."""
gateway = _gateway_descriptor(
"192.168.0.1",
3675,
@ -538,7 +697,7 @@ async def _get_menu_step(hass: HomeAssistant) -> FlowResult:
)
await hass.async_block_till_done()
assert result3["type"] == FlowResultType.MENU
assert result3["step_id"] == "secure_tunneling"
assert result3["step_id"] == "secure_key_source"
return result3
@ -588,7 +747,7 @@ async def test_get_secure_menu_step_manual_tunnelling(
)
await hass.async_block_till_done()
assert result3["type"] == FlowResultType.MENU
assert result3["step_id"] == "secure_tunneling"
assert result3["step_id"] == "secure_key_source"
async def test_configure_secure_tunnel_manual(hass: HomeAssistant):
@ -665,6 +824,8 @@ async def test_configure_secure_knxkeys(hass: HomeAssistant):
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
CONF_KNX_KNXKEY_FILENAME: "knx/testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
CONF_KNX_ROUTING_BACKBONE_KEY: None,
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: None,
CONF_HOST: "192.168.0.1",
CONF_PORT: 3675,
CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.240",

View File

@ -14,6 +14,7 @@ from homeassistant.components.knx.const import (
CONF_KNX_MCAST_GRP,
CONF_KNX_MCAST_PORT,
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_STATE_UPDATER,
@ -107,6 +108,7 @@ async def test_diagnostic_redact(
CONF_KNX_KNXKEY_PASSWORD: "password",
CONF_KNX_SECURE_USER_PASSWORD: "user_password",
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: "device_authentication",
CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44",
},
)
knx: KNXTestKit = KNXTestKit(hass, mock_config_entry)
@ -128,6 +130,7 @@ async def test_diagnostic_redact(
"knxkeys_password": "**REDACTED**",
"user_password": "**REDACTED**",
"device_authentication": "**REDACTED**",
"backbone_key": "**REDACTED**",
},
"configuration_error": None,
"configuration_yaml": None,

View File

@ -23,6 +23,9 @@ from homeassistant.components.knx.const import (
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_ROUTING_SECURE,
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
@ -167,6 +170,31 @@ from tests.common import MockConfigEntry
threaded=True,
),
),
(
{
CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING_SECURE,
CONF_KNX_LOCAL_IP: "192.168.1.1",
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT,
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_INDIVIDUAL_ADDRESS: DEFAULT_ROUTING_IA,
CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44",
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000,
},
ConnectionConfig(
connection_type=ConnectionType.ROUTING_SECURE,
individual_address=DEFAULT_ROUTING_IA,
multicast_group=DEFAULT_MCAST_GRP,
multicast_port=DEFAULT_MCAST_PORT,
secure_config=SecureConfig(
backbone_key="bbaacc44bbaacc44bbaacc44bbaacc44",
latency_ms=2000,
),
local_ip="192.168.1.1",
threaded=True,
),
),
],
)
async def test_init_connection_handling(