Fix bosch shc multi controller support (#127844)

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
This commit is contained in:
Vendetta01 2024-10-29 14:19:33 +01:00 committed by GitHub
parent 2c9ad9562e
commit 9bda3bd477
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 185 additions and 18 deletions

View File

@ -39,16 +39,21 @@ HOST_SCHEMA = vol.Schema(
)
def write_tls_asset(hass: HomeAssistant, filename: str, asset: bytes) -> None:
def write_tls_asset(
hass: HomeAssistant, folder: str, filename: str, asset: bytes
) -> None:
"""Write the tls assets to disk."""
makedirs(hass.config.path(DOMAIN), exist_ok=True)
with open(hass.config.path(DOMAIN, filename), "w", encoding="utf8") as file_handle:
makedirs(hass.config.path(DOMAIN, folder), exist_ok=True)
with open(
hass.config.path(DOMAIN, folder, filename), "w", encoding="utf8"
) as file_handle:
file_handle.write(asset.decode("utf-8"))
def create_credentials_and_validate(
hass: HomeAssistant,
host: str,
unique_id: str,
user_input: dict[str, Any],
zeroconf_instance: zeroconf.HaZeroconf,
) -> dict[str, Any] | None:
@ -57,13 +62,15 @@ def create_credentials_and_validate(
result = helper.register(host, "HomeAssistant")
if result is not None:
write_tls_asset(hass, CONF_SHC_CERT, result["cert"])
write_tls_asset(hass, CONF_SHC_KEY, result["key"])
# Save key/certificate pair for each registered host separately
# otherwise only the last registered host is accessible.
write_tls_asset(hass, unique_id, CONF_SHC_CERT, result["cert"])
write_tls_asset(hass, unique_id, CONF_SHC_KEY, result["key"])
session = SHCSession(
host,
hass.config.path(DOMAIN, CONF_SHC_CERT),
hass.config.path(DOMAIN, CONF_SHC_KEY),
hass.config.path(DOMAIN, unique_id, CONF_SHC_CERT),
hass.config.path(DOMAIN, unique_id, CONF_SHC_KEY),
True,
zeroconf_instance,
)
@ -143,11 +150,16 @@ class BoschSHCConfigFlow(ConfigFlow, domain=DOMAIN):
errors: dict[str, str] = {}
if user_input is not None:
zeroconf_instance = await zeroconf.async_get_instance(self.hass)
# unique_id uniquely identifies the registered controller and is used
# to save the key/certificate pair for each controller separately
unique_id = self.info["unique_id"]
assert unique_id
try:
result = await self.hass.async_add_executor_job(
create_credentials_and_validate,
self.hass,
self.host,
unique_id,
user_input,
zeroconf_instance,
)
@ -167,13 +179,18 @@ class BoschSHCConfigFlow(ConfigFlow, domain=DOMAIN):
else:
assert result
entry_data = {
CONF_SSL_CERTIFICATE: self.hass.config.path(DOMAIN, CONF_SHC_CERT),
CONF_SSL_KEY: self.hass.config.path(DOMAIN, CONF_SHC_KEY),
# Each host has its own key/certificate pair
CONF_SSL_CERTIFICATE: self.hass.config.path(
DOMAIN, unique_id, CONF_SHC_CERT
),
CONF_SSL_KEY: self.hass.config.path(
DOMAIN, unique_id, CONF_SHC_KEY
),
CONF_HOST: self.host,
CONF_TOKEN: result["token"],
CONF_HOSTNAME: result["token"].split(":", 1)[1],
}
existing_entry = await self.async_set_unique_id(self.info["unique_id"])
existing_entry = await self.async_set_unique_id(unique_id)
if existing_entry:
return self.async_update_reload_and_abort(
existing_entry,

View File

@ -99,8 +99,8 @@ async def test_form_user(hass: HomeAssistant) -> None:
assert result3["title"] == "shc012345"
assert result3["data"] == {
"host": "1.1.1.1",
"ssl_certificate": hass.config.path(DOMAIN, CONF_SHC_CERT),
"ssl_key": hass.config.path(DOMAIN, CONF_SHC_KEY),
"ssl_certificate": hass.config.path(DOMAIN, "test-mac", CONF_SHC_CERT),
"ssl_key": hass.config.path(DOMAIN, "test-mac", CONF_SHC_KEY),
"token": "abc:123",
"hostname": "123",
}
@ -549,8 +549,8 @@ async def test_zeroconf(hass: HomeAssistant) -> None:
assert result3["title"] == "shc012345"
assert result3["data"] == {
"host": "1.1.1.1",
"ssl_certificate": hass.config.path(DOMAIN, CONF_SHC_CERT),
"ssl_key": hass.config.path(DOMAIN, CONF_SHC_KEY),
"ssl_certificate": hass.config.path(DOMAIN, "test-mac", CONF_SHC_CERT),
"ssl_key": hass.config.path(DOMAIN, "test-mac", CONF_SHC_KEY),
"token": "abc:123",
"hostname": "123",
}
@ -708,6 +708,7 @@ async def test_reauth(hass: HomeAssistant) -> None:
async def test_tls_assets_writer(hass: HomeAssistant) -> None:
"""Test we write tls assets to correct location."""
unique_id = "test-mac"
assets = {
"token": "abc:123",
"cert": b"content_cert",
@ -719,14 +720,163 @@ async def test_tls_assets_writer(hass: HomeAssistant) -> None:
"homeassistant.components.bosch_shc.config_flow.open", mock_open()
) as mocked_file,
):
write_tls_asset(hass, CONF_SHC_CERT, assets["cert"])
write_tls_asset(hass, unique_id, CONF_SHC_CERT, assets["cert"])
mocked_file.assert_called_with(
hass.config.path(DOMAIN, CONF_SHC_CERT), "w", encoding="utf8"
hass.config.path(DOMAIN, unique_id, CONF_SHC_CERT), "w", encoding="utf8"
)
mocked_file().write.assert_called_with("content_cert")
write_tls_asset(hass, CONF_SHC_KEY, assets["key"])
write_tls_asset(hass, unique_id, CONF_SHC_KEY, assets["key"])
mocked_file.assert_called_with(
hass.config.path(DOMAIN, CONF_SHC_KEY), "w", encoding="utf8"
hass.config.path(DOMAIN, unique_id, CONF_SHC_KEY), "w", encoding="utf8"
)
mocked_file().write.assert_called_with("content_key")
@pytest.mark.usefixtures("mock_zeroconf")
async def test_register_multiple_controllers(hass: HomeAssistant) -> None:
"""Test register multiple controllers.
Each registered controller must get its own key/certificate pair,
which must not get overwritten when a new controller is added.
"""
controller_1 = {
"hostname": "shc111111",
"mac": "test-mac1",
"host": "1.1.1.1",
"register": {
"token": "abc:shc111111",
"cert": b"content_cert1",
"key": b"content_key1",
},
}
controller_2 = {
"hostname": "shc222222",
"mac": "test-mac2",
"host": "2.2.2.2",
"register": {
"token": "abc:shc222222",
"cert": b"content_cert2",
"key": b"content_key2",
},
}
# Set up controller 1
ctrl_1_result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with (
patch(
"boschshcpy.session.SHCSession.mdns_info",
return_value=SHCInformation,
),
patch(
"boschshcpy.information.SHCInformation.name",
new_callable=PropertyMock,
return_value=controller_1["hostname"],
),
patch(
"boschshcpy.information.SHCInformation.unique_id",
new_callable=PropertyMock,
return_value=controller_1["mac"],
),
):
ctrl_1_result2 = await hass.config_entries.flow.async_configure(
ctrl_1_result["flow_id"],
{"host": controller_1["host"]},
)
with (
patch(
"boschshcpy.register_client.SHCRegisterClient.register",
return_value=controller_1["register"],
),
patch("os.mkdir"),
patch("homeassistant.components.bosch_shc.config_flow.open"),
patch("boschshcpy.session.SHCSession.authenticate"),
patch(
"homeassistant.components.bosch_shc.async_setup_entry",
return_value=True,
),
):
ctrl_1_result3 = await hass.config_entries.flow.async_configure(
ctrl_1_result2["flow_id"],
{"password": "test"},
)
await hass.async_block_till_done()
assert ctrl_1_result3["type"] is FlowResultType.CREATE_ENTRY
assert ctrl_1_result3["title"] == "shc111111"
assert ctrl_1_result3["context"]["unique_id"] == controller_1["mac"]
assert ctrl_1_result3["data"] == {
"host": "1.1.1.1",
"ssl_certificate": hass.config.path(DOMAIN, controller_1["mac"], CONF_SHC_CERT),
"ssl_key": hass.config.path(DOMAIN, controller_1["mac"], CONF_SHC_KEY),
"token": "abc:shc111111",
"hostname": "shc111111",
}
# Set up controller 2
ctrl_2_result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with (
patch(
"boschshcpy.session.SHCSession.mdns_info",
return_value=SHCInformation,
),
patch(
"boschshcpy.information.SHCInformation.name",
new_callable=PropertyMock,
return_value=controller_2["hostname"],
),
patch(
"boschshcpy.information.SHCInformation.unique_id",
new_callable=PropertyMock,
return_value=controller_2["mac"],
),
):
ctrl_2_result2 = await hass.config_entries.flow.async_configure(
ctrl_2_result["flow_id"],
{"host": controller_2["host"]},
)
with (
patch(
"boschshcpy.register_client.SHCRegisterClient.register",
return_value=controller_2["register"],
),
patch("os.mkdir"),
patch("homeassistant.components.bosch_shc.config_flow.open"),
patch("boschshcpy.session.SHCSession.authenticate"),
patch(
"homeassistant.components.bosch_shc.async_setup_entry",
return_value=True,
),
):
ctrl_2_result3 = await hass.config_entries.flow.async_configure(
ctrl_2_result2["flow_id"],
{"password": "test"},
)
await hass.async_block_till_done()
assert ctrl_2_result3["type"] is FlowResultType.CREATE_ENTRY
assert ctrl_2_result3["title"] == "shc222222"
assert ctrl_2_result3["context"]["unique_id"] == controller_2["mac"]
assert ctrl_2_result3["data"] == {
"host": "2.2.2.2",
"ssl_certificate": hass.config.path(DOMAIN, controller_2["mac"], CONF_SHC_CERT),
"ssl_key": hass.config.path(DOMAIN, controller_2["mac"], CONF_SHC_KEY),
"token": "abc:shc222222",
"hostname": "shc222222",
}
# Check that each controller has its own key/certificate pair
assert (
ctrl_1_result3["data"]["ssl_certificate"]
!= ctrl_2_result3["data"]["ssl_certificate"]
)
assert ctrl_1_result3["data"]["ssl_key"] != ctrl_2_result3["data"]["ssl_key"]