From f4a2d7c61254d251b7c0dbe61b989f9e86737984 Mon Sep 17 00:00:00 2001 From: Paul Strawder Date: Wed, 31 Jan 2024 18:02:34 +0100 Subject: [PATCH] Add ZHA support for Bosch Twinguard and siren install QR codes (#107460) * Enable Bosch Outdoor Siren and Bosch Twinguard QR Codes These devices contain inside their QR code device specific link keys instead of installation codes. Normally, the link key is generated from the installation code, but in this case we can directly pass the provided link key from QR code to zigpy application controller. * Replace ZHA deprecated permit_with_key by permit_with_link_key Convert installation code directly to link key * Update tests * formatting --- homeassistant/components/zha/core/helpers.py | 21 ++++-- homeassistant/components/zha/websocket_api.py | 38 +++++------ tests/components/zha/test_websocket_api.py | 66 +++++++++++++------ 3 files changed, 78 insertions(+), 47 deletions(-) diff --git a/homeassistant/components/zha/core/helpers.py b/homeassistant/components/zha/core/helpers.py index 6f0167827e8..5506ffb8289 100644 --- a/homeassistant/components/zha/core/helpers.py +++ b/homeassistant/components/zha/core/helpers.py @@ -318,7 +318,7 @@ class LogMixin: return self.log(logging.ERROR, msg, *args, **kwargs) -def convert_install_code(value: str) -> bytes: +def convert_install_code(value: str) -> zigpy.types.KeyData: """Convert string to install code bytes and validate length.""" try: @@ -329,10 +329,11 @@ def convert_install_code(value: str) -> bytes: if len(code) != 18: # 16 byte code + 2 crc bytes raise vol.Invalid("invalid length of the install code") - if zigpy.util.convert_install_code(code) is None: + link_key = zigpy.util.convert_install_code(code) + if link_key is None: raise vol.Invalid("invalid install code") - return code + return link_key QR_CODES = ( @@ -360,13 +361,13 @@ QR_CODES = ( [0-9a-fA-F]{34} ([0-9a-fA-F]{16}) # IEEE address DLK - ([0-9a-fA-F]{36}) # install code + ([0-9a-fA-F]{36}|[0-9a-fA-F]{32}) # install code / link key $ """, ) -def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]: +def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, zigpy.types.KeyData]: """Try to parse the QR code. if successful, return a tuple of a EUI64 address and install code. @@ -379,10 +380,16 @@ def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]: ieee_hex = binascii.unhexlify(match[1]) ieee = zigpy.types.EUI64(ieee_hex[::-1]) + + # Bosch supplies (A) device specific link key (DSLK) or (A) install code + crc + if "RB01SG" in code_pattern and len(match[2]) == 32: + link_key_hex = binascii.unhexlify(match[2]) + link_key = zigpy.types.KeyData(link_key_hex) + return ieee, link_key install_code = match[2] # install_code sanity check - install_code = convert_install_code(install_code) - return ieee, install_code + link_key = convert_install_code(install_code) + return ieee, link_key raise vol.Invalid(f"couldn't convert qr code: {qr_code}") diff --git a/homeassistant/components/zha/websocket_api.py b/homeassistant/components/zha/websocket_api.py index 447aa5efd0f..e3e67ea0e41 100644 --- a/homeassistant/components/zha/websocket_api.py +++ b/homeassistant/components/zha/websocket_api.py @@ -9,7 +9,7 @@ import voluptuous as vol import zigpy.backups from zigpy.config import CONF_DEVICE from zigpy.config.validators import cv_boolean -from zigpy.types.named import EUI64 +from zigpy.types.named import EUI64, KeyData from zigpy.zcl.clusters.security import IasAce import zigpy.zdo.types as zdo_types @@ -328,19 +328,19 @@ async def websocket_permit_devices( connection.subscriptions[msg["id"]] = async_cleanup zha_gateway.async_enable_debug_mode() src_ieee: EUI64 - code: bytes + link_key: KeyData if ATTR_SOURCE_IEEE in msg: src_ieee = msg[ATTR_SOURCE_IEEE] - code = msg[ATTR_INSTALL_CODE] - _LOGGER.debug("Allowing join for %s device with install code", src_ieee) - await zha_gateway.application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + link_key = msg[ATTR_INSTALL_CODE] + _LOGGER.debug("Allowing join for %s device with link key", src_ieee) + await zha_gateway.application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) elif ATTR_QR_CODE in msg: - src_ieee, code = msg[ATTR_QR_CODE] - _LOGGER.debug("Allowing join for %s device with install code", src_ieee) - await zha_gateway.application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + src_ieee, link_key = msg[ATTR_QR_CODE] + _LOGGER.debug("Allowing join for %s device with link key", src_ieee) + await zha_gateway.application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) else: await zha_gateway.application_controller.permit(time_s=duration, node=ieee) @@ -1249,21 +1249,21 @@ def async_load_api(hass: HomeAssistant) -> None: duration: int = service.data[ATTR_DURATION] ieee: EUI64 | None = service.data.get(ATTR_IEEE) src_ieee: EUI64 - code: bytes + link_key: KeyData if ATTR_SOURCE_IEEE in service.data: src_ieee = service.data[ATTR_SOURCE_IEEE] - code = service.data[ATTR_INSTALL_CODE] - _LOGGER.info("Allowing join for %s device with install code", src_ieee) - await application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + link_key = service.data[ATTR_INSTALL_CODE] + _LOGGER.info("Allowing join for %s device with link key", src_ieee) + await application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) return if ATTR_QR_CODE in service.data: - src_ieee, code = service.data[ATTR_QR_CODE] - _LOGGER.info("Allowing join for %s device with install code", src_ieee) - await application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + src_ieee, link_key = service.data[ATTR_QR_CODE] + _LOGGER.info("Allowing join for %s device with link key", src_ieee) + await application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) return diff --git a/tests/components/zha/test_websocket_api.py b/tests/components/zha/test_websocket_api.py index 44006ea6ca1..bafea7e1965 100644 --- a/tests/components/zha/test_websocket_api.py +++ b/tests/components/zha/test_websocket_api.py @@ -13,6 +13,7 @@ import zigpy.backups import zigpy.profiles.zha import zigpy.types from zigpy.types.named import EUI64 +import zigpy.util import zigpy.zcl.clusters.general as general from zigpy.zcl.clusters.general import Groups import zigpy.zcl.clusters.security as security @@ -528,7 +529,7 @@ async def test_permit_ha12( assert app_controller.permit.await_count == 1 assert app_controller.permit.await_args[1]["time_s"] == duration assert app_controller.permit.await_args[1]["node"] == node - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 IC_TEST_PARAMS = ( @@ -538,7 +539,9 @@ IC_TEST_PARAMS = ( ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051", }, zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ( { @@ -546,7 +549,9 @@ IC_TEST_PARAMS = ( ATTR_INSTALL_CODE: "52797BF4A5084DAA8E1712B61741CA024051", }, zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ) @@ -566,10 +571,10 @@ async def test_permit_with_install_code( DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) ) assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 1 - assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 - assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee - assert app_controller.permit_with_key.await_args[1]["code"] == code + assert app_controller.permit_with_link_key.call_count == 1 + assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60 + assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee + assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code IC_FAIL_PARAMS = ( @@ -621,19 +626,23 @@ async def test_permit_with_install_code_fail( DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) ) assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 IC_QR_CODE_TEST_PARAMS = ( ( {ATTR_QR_CODE: "000D6FFFFED4163B|52797BF4A5084DAA8E1712B61741CA024051"}, zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ( {ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051"}, zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ( { @@ -643,7 +652,22 @@ IC_QR_CODE_TEST_PARAMS = ( ) }, zigpy.types.EUI64.convert("04:CF:8C:DF:3C:3C:3C:3C"), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), + ), + ( + { + ATTR_QR_CODE: ( + "RB01SG" + "0D836591B3CC0010000000000000000000" + "000D6F0019107BB1" + "DLK" + "E4636CB6C41617C3E08F7325FFBFE1F9" + ) + }, + zigpy.types.EUI64.convert("00:0D:6F:00:19:10:7B:B1"), + zigpy.types.KeyData.convert("E4:63:6C:B6:C4:16:17:C3:E0:8F:73:25:FF:BF:E1:F9"), ), ) @@ -663,10 +687,10 @@ async def test_permit_with_qr_code( DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) ) assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 1 - assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 - assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee - assert app_controller.permit_with_key.await_args[1]["code"] == code + assert app_controller.permit_with_link_key.call_count == 1 + assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60 + assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee + assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code @pytest.mark.parametrize(("params", "src_ieee", "code"), IC_QR_CODE_TEST_PARAMS) @@ -685,10 +709,10 @@ async def test_ws_permit_with_qr_code( assert msg["success"] assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 1 - assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 - assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee - assert app_controller.permit_with_key.await_args[1]["code"] == code + assert app_controller.permit_with_link_key.call_count == 1 + assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60 + assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee + assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code @pytest.mark.parametrize("params", IC_FAIL_PARAMS) @@ -707,7 +731,7 @@ async def test_ws_permit_with_install_code_fail( assert msg["success"] is False assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 @pytest.mark.parametrize( @@ -744,7 +768,7 @@ async def test_ws_permit_ha12( assert app_controller.permit.await_count == 1 assert app_controller.permit.await_args[1]["time_s"] == duration assert app_controller.permit.await_args[1]["node"] == node - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 async def test_get_network_settings(