diff --git a/homeassistant/components/zha/core/helpers.py b/homeassistant/components/zha/core/helpers.py index 6f0167827e8..5506ffb8289 100644 --- a/homeassistant/components/zha/core/helpers.py +++ b/homeassistant/components/zha/core/helpers.py @@ -318,7 +318,7 @@ class LogMixin: return self.log(logging.ERROR, msg, *args, **kwargs) -def convert_install_code(value: str) -> bytes: +def convert_install_code(value: str) -> zigpy.types.KeyData: """Convert string to install code bytes and validate length.""" try: @@ -329,10 +329,11 @@ def convert_install_code(value: str) -> bytes: if len(code) != 18: # 16 byte code + 2 crc bytes raise vol.Invalid("invalid length of the install code") - if zigpy.util.convert_install_code(code) is None: + link_key = zigpy.util.convert_install_code(code) + if link_key is None: raise vol.Invalid("invalid install code") - return code + return link_key QR_CODES = ( @@ -360,13 +361,13 @@ QR_CODES = ( [0-9a-fA-F]{34} ([0-9a-fA-F]{16}) # IEEE address DLK - ([0-9a-fA-F]{36}) # install code + ([0-9a-fA-F]{36}|[0-9a-fA-F]{32}) # install code / link key $ """, ) -def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]: +def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, zigpy.types.KeyData]: """Try to parse the QR code. if successful, return a tuple of a EUI64 address and install code. @@ -379,10 +380,16 @@ def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]: ieee_hex = binascii.unhexlify(match[1]) ieee = zigpy.types.EUI64(ieee_hex[::-1]) + + # Bosch supplies (A) device specific link key (DSLK) or (A) install code + crc + if "RB01SG" in code_pattern and len(match[2]) == 32: + link_key_hex = binascii.unhexlify(match[2]) + link_key = zigpy.types.KeyData(link_key_hex) + return ieee, link_key install_code = match[2] # install_code sanity check - install_code = convert_install_code(install_code) - return ieee, install_code + link_key = convert_install_code(install_code) + return ieee, link_key raise vol.Invalid(f"couldn't convert qr code: {qr_code}") diff --git a/homeassistant/components/zha/websocket_api.py b/homeassistant/components/zha/websocket_api.py index 447aa5efd0f..e3e67ea0e41 100644 --- a/homeassistant/components/zha/websocket_api.py +++ b/homeassistant/components/zha/websocket_api.py @@ -9,7 +9,7 @@ import voluptuous as vol import zigpy.backups from zigpy.config import CONF_DEVICE from zigpy.config.validators import cv_boolean -from zigpy.types.named import EUI64 +from zigpy.types.named import EUI64, KeyData from zigpy.zcl.clusters.security import IasAce import zigpy.zdo.types as zdo_types @@ -328,19 +328,19 @@ async def websocket_permit_devices( connection.subscriptions[msg["id"]] = async_cleanup zha_gateway.async_enable_debug_mode() src_ieee: EUI64 - code: bytes + link_key: KeyData if ATTR_SOURCE_IEEE in msg: src_ieee = msg[ATTR_SOURCE_IEEE] - code = msg[ATTR_INSTALL_CODE] - _LOGGER.debug("Allowing join for %s device with install code", src_ieee) - await zha_gateway.application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + link_key = msg[ATTR_INSTALL_CODE] + _LOGGER.debug("Allowing join for %s device with link key", src_ieee) + await zha_gateway.application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) elif ATTR_QR_CODE in msg: - src_ieee, code = msg[ATTR_QR_CODE] - _LOGGER.debug("Allowing join for %s device with install code", src_ieee) - await zha_gateway.application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + src_ieee, link_key = msg[ATTR_QR_CODE] + _LOGGER.debug("Allowing join for %s device with link key", src_ieee) + await zha_gateway.application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) else: await zha_gateway.application_controller.permit(time_s=duration, node=ieee) @@ -1249,21 +1249,21 @@ def async_load_api(hass: HomeAssistant) -> None: duration: int = service.data[ATTR_DURATION] ieee: EUI64 | None = service.data.get(ATTR_IEEE) src_ieee: EUI64 - code: bytes + link_key: KeyData if ATTR_SOURCE_IEEE in service.data: src_ieee = service.data[ATTR_SOURCE_IEEE] - code = service.data[ATTR_INSTALL_CODE] - _LOGGER.info("Allowing join for %s device with install code", src_ieee) - await application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + link_key = service.data[ATTR_INSTALL_CODE] + _LOGGER.info("Allowing join for %s device with link key", src_ieee) + await application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) return if ATTR_QR_CODE in service.data: - src_ieee, code = service.data[ATTR_QR_CODE] - _LOGGER.info("Allowing join for %s device with install code", src_ieee) - await application_controller.permit_with_key( - time_s=duration, node=src_ieee, code=code + src_ieee, link_key = service.data[ATTR_QR_CODE] + _LOGGER.info("Allowing join for %s device with link key", src_ieee) + await application_controller.permit_with_link_key( + time_s=duration, node=src_ieee, link_key=link_key ) return diff --git a/tests/components/zha/test_websocket_api.py b/tests/components/zha/test_websocket_api.py index 44006ea6ca1..bafea7e1965 100644 --- a/tests/components/zha/test_websocket_api.py +++ b/tests/components/zha/test_websocket_api.py @@ -13,6 +13,7 @@ import zigpy.backups import zigpy.profiles.zha import zigpy.types from zigpy.types.named import EUI64 +import zigpy.util import zigpy.zcl.clusters.general as general from zigpy.zcl.clusters.general import Groups import zigpy.zcl.clusters.security as security @@ -528,7 +529,7 @@ async def test_permit_ha12( assert app_controller.permit.await_count == 1 assert app_controller.permit.await_args[1]["time_s"] == duration assert app_controller.permit.await_args[1]["node"] == node - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 IC_TEST_PARAMS = ( @@ -538,7 +539,9 @@ IC_TEST_PARAMS = ( ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051", }, zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ( { @@ -546,7 +549,9 @@ IC_TEST_PARAMS = ( ATTR_INSTALL_CODE: "52797BF4A5084DAA8E1712B61741CA024051", }, zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ) @@ -566,10 +571,10 @@ async def test_permit_with_install_code( DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) ) assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 1 - assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 - assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee - assert app_controller.permit_with_key.await_args[1]["code"] == code + assert app_controller.permit_with_link_key.call_count == 1 + assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60 + assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee + assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code IC_FAIL_PARAMS = ( @@ -621,19 +626,23 @@ async def test_permit_with_install_code_fail( DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) ) assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 IC_QR_CODE_TEST_PARAMS = ( ( {ATTR_QR_CODE: "000D6FFFFED4163B|52797BF4A5084DAA8E1712B61741CA024051"}, zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ( {ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051"}, zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), ), ( { @@ -643,7 +652,22 @@ IC_QR_CODE_TEST_PARAMS = ( ) }, zigpy.types.EUI64.convert("04:CF:8C:DF:3C:3C:3C:3C"), - unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), + zigpy.util.convert_install_code( + unhexlify("52797BF4A5084DAA8E1712B61741CA024051") + ), + ), + ( + { + ATTR_QR_CODE: ( + "RB01SG" + "0D836591B3CC0010000000000000000000" + "000D6F0019107BB1" + "DLK" + "E4636CB6C41617C3E08F7325FFBFE1F9" + ) + }, + zigpy.types.EUI64.convert("00:0D:6F:00:19:10:7B:B1"), + zigpy.types.KeyData.convert("E4:63:6C:B6:C4:16:17:C3:E0:8F:73:25:FF:BF:E1:F9"), ), ) @@ -663,10 +687,10 @@ async def test_permit_with_qr_code( DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) ) assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 1 - assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 - assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee - assert app_controller.permit_with_key.await_args[1]["code"] == code + assert app_controller.permit_with_link_key.call_count == 1 + assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60 + assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee + assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code @pytest.mark.parametrize(("params", "src_ieee", "code"), IC_QR_CODE_TEST_PARAMS) @@ -685,10 +709,10 @@ async def test_ws_permit_with_qr_code( assert msg["success"] assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 1 - assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 - assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee - assert app_controller.permit_with_key.await_args[1]["code"] == code + assert app_controller.permit_with_link_key.call_count == 1 + assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60 + assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee + assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code @pytest.mark.parametrize("params", IC_FAIL_PARAMS) @@ -707,7 +731,7 @@ async def test_ws_permit_with_install_code_fail( assert msg["success"] is False assert app_controller.permit.await_count == 0 - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 @pytest.mark.parametrize( @@ -744,7 +768,7 @@ async def test_ws_permit_ha12( assert app_controller.permit.await_count == 1 assert app_controller.permit.await_args[1]["time_s"] == duration assert app_controller.permit.await_args[1]["node"] == node - assert app_controller.permit_with_key.call_count == 0 + assert app_controller.permit_with_link_key.call_count == 0 async def test_get_network_settings(