Add ZHA support for Bosch Twinguard and siren install QR codes (#107460)

* Enable Bosch Outdoor Siren and Bosch Twinguard QR Codes

These devices contain inside their QR code device specific link keys instead of installation codes. Normally, the link key is generated from the installation code, but in this case we can directly pass the provided link key from QR code to zigpy application controller.

* Replace ZHA deprecated permit_with_key by permit_with_link_key

Convert installation code directly to link key

* Update tests

* formatting
This commit is contained in:
Paul Strawder 2024-01-31 18:02:34 +01:00 committed by GitHub
parent 3b7ec8ed2c
commit f4a2d7c612
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 78 additions and 47 deletions

View File

@ -318,7 +318,7 @@ class LogMixin:
return self.log(logging.ERROR, msg, *args, **kwargs) return self.log(logging.ERROR, msg, *args, **kwargs)
def convert_install_code(value: str) -> bytes: def convert_install_code(value: str) -> zigpy.types.KeyData:
"""Convert string to install code bytes and validate length.""" """Convert string to install code bytes and validate length."""
try: try:
@ -329,10 +329,11 @@ def convert_install_code(value: str) -> bytes:
if len(code) != 18: # 16 byte code + 2 crc bytes if len(code) != 18: # 16 byte code + 2 crc bytes
raise vol.Invalid("invalid length of the install code") raise vol.Invalid("invalid length of the install code")
if zigpy.util.convert_install_code(code) is None: link_key = zigpy.util.convert_install_code(code)
if link_key is None:
raise vol.Invalid("invalid install code") raise vol.Invalid("invalid install code")
return code return link_key
QR_CODES = ( QR_CODES = (
@ -360,13 +361,13 @@ QR_CODES = (
[0-9a-fA-F]{34} [0-9a-fA-F]{34}
([0-9a-fA-F]{16}) # IEEE address ([0-9a-fA-F]{16}) # IEEE address
DLK DLK
([0-9a-fA-F]{36}) # install code ([0-9a-fA-F]{36}|[0-9a-fA-F]{32}) # install code / link key
$ $
""", """,
) )
def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]: def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, zigpy.types.KeyData]:
"""Try to parse the QR code. """Try to parse the QR code.
if successful, return a tuple of a EUI64 address and install code. if successful, return a tuple of a EUI64 address and install code.
@ -379,10 +380,16 @@ def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]:
ieee_hex = binascii.unhexlify(match[1]) ieee_hex = binascii.unhexlify(match[1])
ieee = zigpy.types.EUI64(ieee_hex[::-1]) ieee = zigpy.types.EUI64(ieee_hex[::-1])
# Bosch supplies (A) device specific link key (DSLK) or (A) install code + crc
if "RB01SG" in code_pattern and len(match[2]) == 32:
link_key_hex = binascii.unhexlify(match[2])
link_key = zigpy.types.KeyData(link_key_hex)
return ieee, link_key
install_code = match[2] install_code = match[2]
# install_code sanity check # install_code sanity check
install_code = convert_install_code(install_code) link_key = convert_install_code(install_code)
return ieee, install_code return ieee, link_key
raise vol.Invalid(f"couldn't convert qr code: {qr_code}") raise vol.Invalid(f"couldn't convert qr code: {qr_code}")

View File

@ -9,7 +9,7 @@ import voluptuous as vol
import zigpy.backups import zigpy.backups
from zigpy.config import CONF_DEVICE from zigpy.config import CONF_DEVICE
from zigpy.config.validators import cv_boolean from zigpy.config.validators import cv_boolean
from zigpy.types.named import EUI64 from zigpy.types.named import EUI64, KeyData
from zigpy.zcl.clusters.security import IasAce from zigpy.zcl.clusters.security import IasAce
import zigpy.zdo.types as zdo_types import zigpy.zdo.types as zdo_types
@ -328,19 +328,19 @@ async def websocket_permit_devices(
connection.subscriptions[msg["id"]] = async_cleanup connection.subscriptions[msg["id"]] = async_cleanup
zha_gateway.async_enable_debug_mode() zha_gateway.async_enable_debug_mode()
src_ieee: EUI64 src_ieee: EUI64
code: bytes link_key: KeyData
if ATTR_SOURCE_IEEE in msg: if ATTR_SOURCE_IEEE in msg:
src_ieee = msg[ATTR_SOURCE_IEEE] src_ieee = msg[ATTR_SOURCE_IEEE]
code = msg[ATTR_INSTALL_CODE] link_key = msg[ATTR_INSTALL_CODE]
_LOGGER.debug("Allowing join for %s device with install code", src_ieee) _LOGGER.debug("Allowing join for %s device with link key", src_ieee)
await zha_gateway.application_controller.permit_with_key( await zha_gateway.application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, code=code time_s=duration, node=src_ieee, link_key=link_key
) )
elif ATTR_QR_CODE in msg: elif ATTR_QR_CODE in msg:
src_ieee, code = msg[ATTR_QR_CODE] src_ieee, link_key = msg[ATTR_QR_CODE]
_LOGGER.debug("Allowing join for %s device with install code", src_ieee) _LOGGER.debug("Allowing join for %s device with link key", src_ieee)
await zha_gateway.application_controller.permit_with_key( await zha_gateway.application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, code=code time_s=duration, node=src_ieee, link_key=link_key
) )
else: else:
await zha_gateway.application_controller.permit(time_s=duration, node=ieee) await zha_gateway.application_controller.permit(time_s=duration, node=ieee)
@ -1249,21 +1249,21 @@ def async_load_api(hass: HomeAssistant) -> None:
duration: int = service.data[ATTR_DURATION] duration: int = service.data[ATTR_DURATION]
ieee: EUI64 | None = service.data.get(ATTR_IEEE) ieee: EUI64 | None = service.data.get(ATTR_IEEE)
src_ieee: EUI64 src_ieee: EUI64
code: bytes link_key: KeyData
if ATTR_SOURCE_IEEE in service.data: if ATTR_SOURCE_IEEE in service.data:
src_ieee = service.data[ATTR_SOURCE_IEEE] src_ieee = service.data[ATTR_SOURCE_IEEE]
code = service.data[ATTR_INSTALL_CODE] link_key = service.data[ATTR_INSTALL_CODE]
_LOGGER.info("Allowing join for %s device with install code", src_ieee) _LOGGER.info("Allowing join for %s device with link key", src_ieee)
await application_controller.permit_with_key( await application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, code=code time_s=duration, node=src_ieee, link_key=link_key
) )
return return
if ATTR_QR_CODE in service.data: if ATTR_QR_CODE in service.data:
src_ieee, code = service.data[ATTR_QR_CODE] src_ieee, link_key = service.data[ATTR_QR_CODE]
_LOGGER.info("Allowing join for %s device with install code", src_ieee) _LOGGER.info("Allowing join for %s device with link key", src_ieee)
await application_controller.permit_with_key( await application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, code=code time_s=duration, node=src_ieee, link_key=link_key
) )
return return

View File

@ -13,6 +13,7 @@ import zigpy.backups
import zigpy.profiles.zha import zigpy.profiles.zha
import zigpy.types import zigpy.types
from zigpy.types.named import EUI64 from zigpy.types.named import EUI64
import zigpy.util
import zigpy.zcl.clusters.general as general import zigpy.zcl.clusters.general as general
from zigpy.zcl.clusters.general import Groups from zigpy.zcl.clusters.general import Groups
import zigpy.zcl.clusters.security as security import zigpy.zcl.clusters.security as security
@ -528,7 +529,7 @@ async def test_permit_ha12(
assert app_controller.permit.await_count == 1 assert app_controller.permit.await_count == 1
assert app_controller.permit.await_args[1]["time_s"] == duration assert app_controller.permit.await_args[1]["time_s"] == duration
assert app_controller.permit.await_args[1]["node"] == node assert app_controller.permit.await_args[1]["node"] == node
assert app_controller.permit_with_key.call_count == 0 assert app_controller.permit_with_link_key.call_count == 0
IC_TEST_PARAMS = ( IC_TEST_PARAMS = (
@ -538,7 +539,9 @@ IC_TEST_PARAMS = (
ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051", ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051",
}, },
zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE), zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
), ),
( (
{ {
@ -546,7 +549,9 @@ IC_TEST_PARAMS = (
ATTR_INSTALL_CODE: "52797BF4A5084DAA8E1712B61741CA024051", ATTR_INSTALL_CODE: "52797BF4A5084DAA8E1712B61741CA024051",
}, },
zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE), zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
), ),
) )
@ -566,10 +571,10 @@ async def test_permit_with_install_code(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
) )
assert app_controller.permit.await_count == 0 assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 1 assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_key.await_args[1]["code"] == code assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code
IC_FAIL_PARAMS = ( IC_FAIL_PARAMS = (
@ -621,19 +626,23 @@ async def test_permit_with_install_code_fail(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
) )
assert app_controller.permit.await_count == 0 assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 0 assert app_controller.permit_with_link_key.call_count == 0
IC_QR_CODE_TEST_PARAMS = ( IC_QR_CODE_TEST_PARAMS = (
( (
{ATTR_QR_CODE: "000D6FFFFED4163B|52797BF4A5084DAA8E1712B61741CA024051"}, {ATTR_QR_CODE: "000D6FFFFED4163B|52797BF4A5084DAA8E1712B61741CA024051"},
zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"), zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
), ),
( (
{ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051"}, {ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051"},
zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"), zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
), ),
( (
{ {
@ -643,7 +652,22 @@ IC_QR_CODE_TEST_PARAMS = (
) )
}, },
zigpy.types.EUI64.convert("04:CF:8C:DF:3C:3C:3C:3C"), zigpy.types.EUI64.convert("04:CF:8C:DF:3C:3C:3C:3C"),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"), zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{
ATTR_QR_CODE: (
"RB01SG"
"0D836591B3CC0010000000000000000000"
"000D6F0019107BB1"
"DLK"
"E4636CB6C41617C3E08F7325FFBFE1F9"
)
},
zigpy.types.EUI64.convert("00:0D:6F:00:19:10:7B:B1"),
zigpy.types.KeyData.convert("E4:63:6C:B6:C4:16:17:C3:E0:8F:73:25:FF:BF:E1:F9"),
), ),
) )
@ -663,10 +687,10 @@ async def test_permit_with_qr_code(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id) DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
) )
assert app_controller.permit.await_count == 0 assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 1 assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_key.await_args[1]["code"] == code assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code
@pytest.mark.parametrize(("params", "src_ieee", "code"), IC_QR_CODE_TEST_PARAMS) @pytest.mark.parametrize(("params", "src_ieee", "code"), IC_QR_CODE_TEST_PARAMS)
@ -685,10 +709,10 @@ async def test_ws_permit_with_qr_code(
assert msg["success"] assert msg["success"]
assert app_controller.permit.await_count == 0 assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 1 assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_key.await_args[1]["time_s"] == 60 assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_key.await_args[1]["code"] == code assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code
@pytest.mark.parametrize("params", IC_FAIL_PARAMS) @pytest.mark.parametrize("params", IC_FAIL_PARAMS)
@ -707,7 +731,7 @@ async def test_ws_permit_with_install_code_fail(
assert msg["success"] is False assert msg["success"] is False
assert app_controller.permit.await_count == 0 assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 0 assert app_controller.permit_with_link_key.call_count == 0
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -744,7 +768,7 @@ async def test_ws_permit_ha12(
assert app_controller.permit.await_count == 1 assert app_controller.permit.await_count == 1
assert app_controller.permit.await_args[1]["time_s"] == duration assert app_controller.permit.await_args[1]["time_s"] == duration
assert app_controller.permit.await_args[1]["node"] == node assert app_controller.permit.await_args[1]["node"] == node
assert app_controller.permit_with_key.call_count == 0 assert app_controller.permit_with_link_key.call_count == 0
async def test_get_network_settings( async def test_get_network_settings(