From f66e83f33ebe382696edc612c3580c2c3b156942 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 30 Jul 2025 09:54:00 +0200 Subject: [PATCH] Add dynamic encryption key support to the ESPHome integration (#148746) Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com> Co-authored-by: J. Nick Koston --- .../components/esphome/config_flow.py | 30 +- .../esphome/encryption_key_storage.py | 94 ++++ homeassistant/components/esphome/manager.py | 98 +++- tests/components/esphome/test_config_flow.py | 115 +++++ .../esphome/test_dynamic_encryption.py | 102 ++++ tests/components/esphome/test_manager.py | 484 +++++++++++++++++- 6 files changed, 918 insertions(+), 5 deletions(-) create mode 100644 homeassistant/components/esphome/encryption_key_storage.py create mode 100644 tests/components/esphome/test_dynamic_encryption.py diff --git a/homeassistant/components/esphome/config_flow.py b/homeassistant/components/esphome/config_flow.py index 75408246e78..dc0e9b8e1b1 100644 --- a/homeassistant/components/esphome/config_flow.py +++ b/homeassistant/components/esphome/config_flow.py @@ -51,6 +51,7 @@ from .const import ( DOMAIN, ) from .dashboard import async_get_or_create_dashboard_manager, async_set_dashboard_info +from .encryption_key_storage import async_get_encryption_key_storage from .entry_data import ESPHomeConfigEntry from .manager import async_replace_device @@ -159,7 +160,10 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN): """Handle reauthorization flow.""" errors = {} - if await self._retrieve_encryption_key_from_dashboard(): + if ( + await self._retrieve_encryption_key_from_storage() + or await self._retrieve_encryption_key_from_dashboard() + ): error = await self.fetch_device_info() if error is None: return await self._async_authenticate_or_add() @@ -226,9 +230,12 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN): response = await self.fetch_device_info() self._noise_psk = None + # Try to retrieve an existing key from dashboard or storage. if ( self._device_name and await self._retrieve_encryption_key_from_dashboard() + ) or ( + self._device_mac and await self._retrieve_encryption_key_from_storage() ): response = await self.fetch_device_info() @@ -284,6 +291,7 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN): self._name = discovery_info.properties.get("friendly_name", device_name) self._host = discovery_info.host self._port = discovery_info.port + self._device_mac = mac_address self._noise_required = bool(discovery_info.properties.get("api_encryption")) # Check if already configured @@ -772,6 +780,26 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN): self._noise_psk = noise_psk return True + async def _retrieve_encryption_key_from_storage(self) -> bool: + """Try to retrieve the encryption key from storage. + + Return boolean if a key was retrieved. + """ + # Try to get MAC address from current flow state or reauth entry + mac_address = self._device_mac + if mac_address is None and self._reauth_entry is not None: + # In reauth flow, get MAC from the existing entry's unique_id + mac_address = self._reauth_entry.unique_id + + assert mac_address is not None + + storage = await async_get_encryption_key_storage(self.hass) + if stored_key := await storage.async_get_key(mac_address): + self._noise_psk = stored_key + return True + + return False + @staticmethod @callback def async_get_options_flow( diff --git a/homeassistant/components/esphome/encryption_key_storage.py b/homeassistant/components/esphome/encryption_key_storage.py new file mode 100644 index 00000000000..e4b5ef41c2e --- /dev/null +++ b/homeassistant/components/esphome/encryption_key_storage.py @@ -0,0 +1,94 @@ +"""Encryption key storage for ESPHome devices.""" + +from __future__ import annotations + +import logging +from typing import TypedDict + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.json import JSONEncoder +from homeassistant.helpers.singleton import singleton +from homeassistant.helpers.storage import Store +from homeassistant.util.hass_dict import HassKey + +_LOGGER = logging.getLogger(__name__) + +ENCRYPTION_KEY_STORAGE_VERSION = 1 +ENCRYPTION_KEY_STORAGE_KEY = "esphome.encryption_keys" + + +class EncryptionKeyData(TypedDict): + """Encryption key storage data.""" + + keys: dict[str, str] # MAC address -> base64 encoded key + + +KEY_ENCRYPTION_STORAGE: HassKey[ESPHomeEncryptionKeyStorage] = HassKey( + "esphome_encryption_key_storage" +) + + +class ESPHomeEncryptionKeyStorage: + """Storage for ESPHome encryption keys.""" + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize the encryption key storage.""" + self.hass = hass + self._store = Store[EncryptionKeyData]( + hass, + ENCRYPTION_KEY_STORAGE_VERSION, + ENCRYPTION_KEY_STORAGE_KEY, + encoder=JSONEncoder, + ) + self._data: EncryptionKeyData | None = None + + async def async_load(self) -> None: + """Load encryption keys from storage.""" + if self._data is None: + data = await self._store.async_load() + self._data = data or {"keys": {}} + + async def async_save(self) -> None: + """Save encryption keys to storage.""" + if self._data is not None: + await self._store.async_save(self._data) + + async def async_get_key(self, mac_address: str) -> str | None: + """Get encryption key for a MAC address.""" + await self.async_load() + assert self._data is not None + return self._data["keys"].get(mac_address.lower()) + + async def async_store_key(self, mac_address: str, key: str) -> None: + """Store encryption key for a MAC address.""" + await self.async_load() + assert self._data is not None + self._data["keys"][mac_address.lower()] = key + await self.async_save() + _LOGGER.debug( + "Stored encryption key for device with MAC %s", + mac_address, + ) + + async def async_remove_key(self, mac_address: str) -> None: + """Remove encryption key for a MAC address.""" + await self.async_load() + assert self._data is not None + lower_mac_address = mac_address.lower() + if lower_mac_address in self._data["keys"]: + del self._data["keys"][lower_mac_address] + await self.async_save() + _LOGGER.debug( + "Removed encryption key for device with MAC %s", + mac_address, + ) + + +@singleton(KEY_ENCRYPTION_STORAGE, async_=True) +async def async_get_encryption_key_storage( + hass: HomeAssistant, +) -> ESPHomeEncryptionKeyStorage: + """Get the encryption key storage instance.""" + storage = ESPHomeEncryptionKeyStorage(hass) + await storage.async_load() + return storage diff --git a/homeassistant/components/esphome/manager.py b/homeassistant/components/esphome/manager.py index 5e9e11171af..4d5de77b1e0 100644 --- a/homeassistant/components/esphome/manager.py +++ b/homeassistant/components/esphome/manager.py @@ -3,8 +3,10 @@ from __future__ import annotations import asyncio +import base64 from functools import partial import logging +import secrets from typing import TYPE_CHECKING, Any, NamedTuple from aioesphomeapi import ( @@ -68,6 +70,7 @@ from .const import ( CONF_ALLOW_SERVICE_CALLS, CONF_BLUETOOTH_MAC_ADDRESS, CONF_DEVICE_NAME, + CONF_NOISE_PSK, CONF_SUBSCRIBE_LOGS, DEFAULT_ALLOW_SERVICE_CALLS, DEFAULT_URL, @@ -78,6 +81,7 @@ from .const import ( ) from .dashboard import async_get_dashboard from .domain_data import DomainData +from .encryption_key_storage import async_get_encryption_key_storage # Import config flow so that it's added to the registry from .entry_data import ESPHomeConfigEntry, RuntimeEntryData @@ -85,9 +89,7 @@ from .entry_data import ESPHomeConfigEntry, RuntimeEntryData DEVICE_CONFLICT_ISSUE_FORMAT = "device_conflict-{}" if TYPE_CHECKING: - from aioesphomeapi.api_pb2 import ( # type: ignore[attr-defined] - SubscribeLogsResponse, - ) + from aioesphomeapi.api_pb2 import SubscribeLogsResponse # type: ignore[attr-defined] # noqa: I001 _LOGGER = logging.getLogger(__name__) @@ -515,6 +517,8 @@ class ESPHomeManager: assert api_version is not None, "API version must be set" entry_data.async_on_connect(device_info, api_version) + await self._handle_dynamic_encryption_key(device_info) + if device_info.name: reconnect_logic.name = device_info.name @@ -618,6 +622,7 @@ class ESPHomeManager: ), ): return + if isinstance(err, InvalidEncryptionKeyAPIError): if ( (received_name := err.received_name) @@ -648,6 +653,93 @@ class ESPHomeManager: return self.entry.async_start_reauth(self.hass) + async def _handle_dynamic_encryption_key( + self, device_info: EsphomeDeviceInfo + ) -> None: + """Handle dynamic encryption keys. + + If a device reports it supports encryption, but we connected without a key, + we need to generate and store one. + """ + noise_psk: str | None = self.entry.data.get(CONF_NOISE_PSK) + if noise_psk: + # we're already connected with a noise PSK - nothing to do + return + + if not device_info.api_encryption_supported: + # device does not support encryption - nothing to do + return + + # Connected to device without key and the device supports encryption + storage = await async_get_encryption_key_storage(self.hass) + + # First check if we have a key in storage for this device + from_storage: bool = False + if self.entry.unique_id and ( + stored_key := await storage.async_get_key(self.entry.unique_id) + ): + _LOGGER.debug( + "Retrieved encryption key from storage for device %s", + self.entry.unique_id, + ) + # Use the stored key + new_key = stored_key.encode() + new_key_str = stored_key + from_storage = True + else: + # No stored key found, generate a new one + _LOGGER.debug( + "Generating new encryption key for device %s", self.entry.unique_id + ) + new_key = base64.b64encode(secrets.token_bytes(32)) + new_key_str = new_key.decode() + + try: + # Store the key on the device using the existing connection + result = await self.cli.noise_encryption_set_key(new_key) + except APIConnectionError as ex: + _LOGGER.error( + "Connection error while storing encryption key for device %s (%s): %s", + self.entry.data.get(CONF_DEVICE_NAME, self.host), + self.entry.unique_id, + ex, + ) + return + else: + if not result: + _LOGGER.error( + "Failed to set dynamic encryption key on device %s (%s)", + self.entry.data.get(CONF_DEVICE_NAME, self.host), + self.entry.unique_id, + ) + return + + # Key stored successfully on device + assert self.entry.unique_id is not None + + # Only store in storage if it was newly generated + if not from_storage: + await storage.async_store_key(self.entry.unique_id, new_key_str) + + # Always update config entry + self.hass.config_entries.async_update_entry( + self.entry, + data={**self.entry.data, CONF_NOISE_PSK: new_key_str}, + ) + + if from_storage: + _LOGGER.info( + "Set encryption key from storage on device %s (%s)", + self.entry.data.get(CONF_DEVICE_NAME, self.host), + self.entry.unique_id, + ) + else: + _LOGGER.info( + "Generated and stored encryption key for device %s (%s)", + self.entry.data.get(CONF_DEVICE_NAME, self.host), + self.entry.unique_id, + ) + @callback def _async_handle_logging_changed(self, _event: Event) -> None: """Handle when the logging level changes.""" diff --git a/tests/components/esphome/test_config_flow.py b/tests/components/esphome/test_config_flow.py index 3f0148262e4..d76991a984c 100644 --- a/tests/components/esphome/test_config_flow.py +++ b/tests/components/esphome/test_config_flow.py @@ -27,6 +27,9 @@ from homeassistant.components.esphome.const import ( DEFAULT_NEW_CONFIG_ALLOW_ALLOW_SERVICE_CALLS, DOMAIN, ) +from homeassistant.components.esphome.encryption_key_storage import ( + ENCRYPTION_KEY_STORAGE_KEY, +) from homeassistant.config_entries import SOURCE_IGNORE, ConfigFlowResult from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT from homeassistant.core import HomeAssistant @@ -41,6 +44,118 @@ from .conftest import MockGenericDeviceEntryType from tests.common import MockConfigEntry + +async def test_retrieve_encryption_key_from_storage_with_device_mac( + hass: HomeAssistant, + mock_client: APIClient, + hass_storage: dict[str, Any], +) -> None: + """Test key successfully retrieved from storage.""" + + # Mock the encryption key storage + hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = { + "version": 1, + "minor_version": 1, + "key": ENCRYPTION_KEY_STORAGE_KEY, + "data": {"keys": {"11:22:33:44:55:aa": VALID_NOISE_PSK}}, + } + + mock_client.device_info.side_effect = [ + RequiresEncryptionAPIError, + InvalidEncryptionKeyAPIError("Wrong key", "test", "11:22:33:44:55:AA"), + DeviceInfo( + uses_password=False, + name="test", + mac_address="11:22:33:44:55:AA", + ), + ] + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_USER}, + data={CONF_HOST: "127.0.0.1", CONF_PORT: 6053}, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["data"] == { + CONF_HOST: "127.0.0.1", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_NOISE_PSK: VALID_NOISE_PSK, + CONF_DEVICE_NAME: "test", + } + + assert mock_client.noise_psk == VALID_NOISE_PSK + + +async def test_reauth_fixed_from_from_storage( + hass: HomeAssistant, + mock_client: APIClient, + hass_storage: dict[str, Any], +) -> None: + """Test reauth fixed automatically via storage.""" + + # Mock the encryption key storage + hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = { + "version": 1, + "minor_version": 1, + "key": ENCRYPTION_KEY_STORAGE_KEY, + "data": {"keys": {"11:22:33:44:55:aa": VALID_NOISE_PSK}}, + } + + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "127.0.0.1", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test", + }, + unique_id="11:22:33:44:55:aa", + ) + entry.add_to_hass(hass) + + mock_client.device_info.return_value = DeviceInfo( + uses_password=False, name="test", mac_address="11:22:33:44:55:aa" + ) + + result = await entry.start_reauth_flow(hass) + + assert result["type"] is FlowResultType.ABORT, result + assert result["reason"] == "reauth_successful" + assert entry.data[CONF_NOISE_PSK] == VALID_NOISE_PSK + + +async def test_retrieve_encryption_key_from_storage_no_key_found( + hass: HomeAssistant, + mock_client: APIClient, +) -> None: + """Test _retrieve_encryption_key_from_storage when no key is found.""" + + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "127.0.0.1", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test", + }, + unique_id="11:22:33:44:55:aa", + ) + entry.add_to_hass(hass) + + mock_client.device_info.return_value = DeviceInfo( + uses_password=False, name="test", mac_address="11:22:33:44:55:aa" + ) + + result = await entry.start_reauth_flow(hass) + + assert result["type"] is FlowResultType.FORM, result + assert result["step_id"] == "reauth_confirm" + assert CONF_NOISE_PSK not in entry.data + + INVALID_NOISE_PSK = "lSYBYEjQI1bVL8s2Vask4YytGMj1f1epNtmoim2yuTM=" WRONG_NOISE_PSK = "GP+ciK+nVfTQ/gcz6uOdS+oKEdJgesU+jeu8Ssj2how=" diff --git a/tests/components/esphome/test_dynamic_encryption.py b/tests/components/esphome/test_dynamic_encryption.py new file mode 100644 index 00000000000..cbdcc35aea2 --- /dev/null +++ b/tests/components/esphome/test_dynamic_encryption.py @@ -0,0 +1,102 @@ +"""Tests for ESPHome dynamic encryption key generation.""" + +from __future__ import annotations + +import base64 + +from homeassistant.components.esphome.encryption_key_storage import ( + ESPHomeEncryptionKeyStorage, + async_get_encryption_key_storage, +) +from homeassistant.core import HomeAssistant + + +async def test_dynamic_encryption_key_generation_mock(hass: HomeAssistant) -> None: + """Test that encryption key generation works with mocked storage.""" + storage = await async_get_encryption_key_storage(hass) + + # Store a key + mac_address = "11:22:33:44:55:aa" + test_key = base64.b64encode(b"test_key_32_bytes_long_exactly!").decode() + + await storage.async_store_key(mac_address, test_key) + + # Retrieve a key + retrieved_key = await storage.async_get_key(mac_address) + assert retrieved_key == test_key + + +async def test_encryption_key_storage_remove_key(hass: HomeAssistant) -> None: + """Test ESPHomeEncryptionKeyStorage async_remove_key method.""" + # Create storage instance + storage = ESPHomeEncryptionKeyStorage(hass) + + # Test removing a key that exists + mac_address = "11:22:33:44:55:aa" + test_key = "test_encryption_key_32_bytes_long" + + # First store a key + await storage.async_store_key(mac_address, test_key) + + # Verify key exists + retrieved_key = await storage.async_get_key(mac_address) + assert retrieved_key == test_key + + # Remove the key + await storage.async_remove_key(mac_address) + + # Verify key no longer exists + retrieved_key = await storage.async_get_key(mac_address) + assert retrieved_key is None + + # Test removing a key that doesn't exist (should not raise an error) + non_existent_mac = "aa:bb:cc:dd:ee:ff" + await storage.async_remove_key(non_existent_mac) # Should not raise + + # Test case insensitive removal + upper_mac = "22:33:44:55:66:77" + await storage.async_store_key(upper_mac, test_key) + + # Remove using lowercase MAC address + await storage.async_remove_key(upper_mac.lower()) + + # Verify key was removed + retrieved_key = await storage.async_get_key(upper_mac) + assert retrieved_key is None + + +async def test_encryption_key_basic_storage( + hass: HomeAssistant, +) -> None: + """Test basic encryption key storage functionality.""" + storage = await async_get_encryption_key_storage(hass) + mac_address = "11:22:33:44:55:aa" + key = "test_encryption_key_32_bytes_long" + + # Store key + await storage.async_store_key(mac_address, key) + + # Retrieve key + retrieved_key = await storage.async_get_key(mac_address) + assert retrieved_key == key + + +async def test_retrieve_key_from_storage( + hass: HomeAssistant, +) -> None: + """Test config flow can retrieve encryption key from storage for new device.""" + # Test that the encryption key storage integration works with config flow + storage = await async_get_encryption_key_storage(hass) + mac_address = "11:22:33:44:55:aa" + stored_key = "test_encryption_key_32_bytes_long" + + # Store encryption key for a device + await storage.async_store_key(mac_address, stored_key) + + # Verify the key can be retrieved (simulating config flow behavior) + retrieved_key = await storage.async_get_key(mac_address) + assert retrieved_key == stored_key + + # Test case insensitive retrieval (since config flows might use different case) + retrieved_key_upper = await storage.async_get_key(mac_address.upper()) + assert retrieved_key_upper == stored_key diff --git a/tests/components/esphome/test_manager.py b/tests/components/esphome/test_manager.py index 318ccde221f..8d2dd211869 100644 --- a/tests/components/esphome/test_manager.py +++ b/tests/components/esphome/test_manager.py @@ -1,8 +1,10 @@ """Test ESPHome manager.""" import asyncio +import base64 import logging -from unittest.mock import AsyncMock, Mock, call +from typing import Any +from unittest.mock import AsyncMock, Mock, call, patch from aioesphomeapi import ( APIClient, @@ -27,11 +29,15 @@ from homeassistant.components.esphome.const import ( CONF_ALLOW_SERVICE_CALLS, CONF_BLUETOOTH_MAC_ADDRESS, CONF_DEVICE_NAME, + CONF_NOISE_PSK, CONF_SUBSCRIBE_LOGS, DOMAIN, STABLE_BLE_URL_VERSION, STABLE_BLE_VERSION_STR, ) +from homeassistant.components.esphome.encryption_key_storage import ( + ENCRYPTION_KEY_STORAGE_KEY, +) from homeassistant.components.esphome.manager import DEVICE_CONFLICT_ISSUE_FORMAT from homeassistant.components.tag import DOMAIN as TAG_DOMAIN from homeassistant.const import ( @@ -1788,3 +1794,479 @@ async def test_sub_device_references_main_device_area( ) assert sub_device_3 is not None assert sub_device_3.suggested_area == "Bedroom" + + +@patch("homeassistant.components.esphome.manager.secrets.token_bytes") +async def test_dynamic_encryption_key_generation( + mock_token_bytes: Mock, + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, + hass_storage: dict[str, Any], +) -> None: + """Test that a device without a key in storage gets a new one generated.""" + mac_address = "11:22:33:44:55:aa" + test_key_bytes = b"test_key_32_bytes_long_exactly!" + mock_token_bytes.return_value = test_key_bytes + expected_key = base64.b64encode(test_key_bytes).decode() + + # Create entry without noise PSK + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=mac_address, + ) + entry.add_to_hass(hass) + + # Mock the client methods + mock_client.noise_encryption_set_key = AsyncMock(return_value=True) + + # Set up device with encryption support + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": mac_address, + "esphome_version": "2023.12.0", + "api_encryption_supported": True, + }, + ) + + # Force reconnect to trigger key generation + await device.mock_disconnect(True) + await device.mock_connect() + + # Verify the key was generated and set + mock_token_bytes.assert_called_once_with(32) + mock_client.noise_encryption_set_key.assert_called_once() + + # Verify config entry was updated + assert entry.data[CONF_NOISE_PSK] == expected_key + + +async def test_manager_retrieves_key_from_storage_on_reconnect( + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, + hass_storage: dict[str, Any], +) -> None: + """Test that manager retrieves encryption key from storage during reconnect.""" + mac_address = "11:22:33:44:55:aa" + test_key = base64.b64encode(b"existing_key_32_bytes_long!!!").decode() + + # Set up storage with existing key + hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = { + "version": 1, + "minor_version": 1, + "key": ENCRYPTION_KEY_STORAGE_KEY, + "data": {"keys": {mac_address: test_key}}, + } + + # Create entry without noise PSK (will be loaded from storage) + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=mac_address, + ) + entry.add_to_hass(hass) + + # Mock the client methods + mock_client.noise_encryption_set_key = AsyncMock(return_value=True) + + # Set up device with encryption support + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": mac_address, + "esphome_version": "2023.12.0", + "api_encryption_supported": True, + }, + ) + + # Force reconnect to trigger key retrieval from storage + await device.mock_disconnect(True) + await device.mock_connect() + + # Verify noise_encryption_set_key was called with the stored key + mock_client.noise_encryption_set_key.assert_called_once_with(test_key.encode()) + + # Verify config entry was updated with key from storage + assert entry.data[CONF_NOISE_PSK] == test_key + + +async def test_manager_handle_dynamic_encryption_key_guard_clauses( + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, +) -> None: + """Test _handle_dynamic_encryption_key guard clauses and early returns.""" + # Test guard clause - no unique_id + entry_no_id = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=None, # No unique ID - should not generate key + ) + entry_no_id.add_to_hass(hass) + + # Set up device without unique ID + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry_no_id, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": "11:22:33:44:55:aa", + "esphome_version": "2023.12.0", + "api_encryption_supported": True, + }, + ) + + # noise_encryption_set_key should not be called when no unique_id + mock_client.noise_encryption_set_key = AsyncMock() + await device.mock_disconnect(True) + await device.mock_connect() + + mock_client.noise_encryption_set_key.assert_not_called() + + +async def test_manager_handle_dynamic_encryption_key_edge_cases( + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, +) -> None: + """Test _handle_dynamic_encryption_key edge cases for better coverage.""" + mac_address = "11:22:33:44:55:aa" + + # Test device without encryption support + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=mac_address, + ) + entry.add_to_hass(hass) + + # Set up device without encryption support + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": mac_address, + "esphome_version": "2023.12.0", + "api_encryption_supported": False, # No encryption support + }, + ) + + # noise_encryption_set_key should not be called when encryption not supported + mock_client.noise_encryption_set_key = AsyncMock() + await device.mock_disconnect(True) + await device.mock_connect() + + mock_client.noise_encryption_set_key.assert_not_called() + + +@patch("homeassistant.components.esphome.manager.secrets.token_bytes") +async def test_manager_dynamic_encryption_key_generation_flow( + mock_token_bytes: Mock, + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, + hass_storage: dict[str, Any], +) -> None: + """Test the complete dynamic encryption key generation flow.""" + mac_address = "11:22:33:44:55:aa" + test_key_bytes = b"test_key_32_bytes_long_exactly!" + mock_token_bytes.return_value = test_key_bytes + expected_key = base64.b64encode(test_key_bytes).decode() + + # Initialize empty storage + hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = { + "version": 1, + "minor_version": 1, + "key": ENCRYPTION_KEY_STORAGE_KEY, + "data": { + "keys": {} # No existing keys + }, + } + + # Create entry without noise PSK + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=mac_address, + ) + entry.add_to_hass(hass) + + # Mock the client methods + mock_client.noise_encryption_set_key = AsyncMock(return_value=True) + + # Set up device with encryption support + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": mac_address, + "esphome_version": "2023.12.0", + "api_encryption_supported": True, + }, + ) + + # Force reconnect to trigger key generation + await device.mock_disconnect(True) + await device.mock_connect() + + # Verify the complete flow + mock_token_bytes.assert_called_once_with(32) + mock_client.noise_encryption_set_key.assert_called_once() + assert entry.data[CONF_NOISE_PSK] == expected_key + + # Verify key was stored in hass_storage + assert ( + hass_storage[ENCRYPTION_KEY_STORAGE_KEY]["data"]["keys"][mac_address] + == expected_key + ) + + +@patch("homeassistant.components.esphome.manager.secrets.token_bytes") +async def test_manager_handle_dynamic_encryption_key_no_existing_key( + mock_token_bytes: Mock, + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, + hass_storage: dict[str, Any], +) -> None: + """Test _handle_dynamic_encryption_key when no existing key is found.""" + mac_address = "11:22:33:44:55:aa" + test_key_bytes = b"test_key_32_bytes_long_exactly!" + mock_token_bytes.return_value = test_key_bytes + expected_key = base64.b64encode(test_key_bytes).decode() + + # Initialize empty storage + hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = { + "version": 1, + "minor_version": 1, + "key": ENCRYPTION_KEY_STORAGE_KEY, + "data": { + "keys": {} # No existing keys + }, + } + + # Create entry without noise PSK + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=mac_address, + ) + entry.add_to_hass(hass) + + # Mock the client methods + mock_client.noise_encryption_set_key = AsyncMock(return_value=True) + + # Set up device with encryption support + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": mac_address, + "esphome_version": "2023.12.0", + "api_encryption_supported": True, + }, + ) + + # Force reconnect to trigger key generation + await device.mock_disconnect(True) + await device.mock_connect() + + # Verify key generation flow + mock_token_bytes.assert_called_once_with(32) + mock_client.noise_encryption_set_key.assert_called_once() + + # Verify config entry was updated + assert entry.data[CONF_NOISE_PSK] == expected_key + + # Verify key was stored + assert ( + hass_storage[ENCRYPTION_KEY_STORAGE_KEY]["data"]["keys"][mac_address] + == expected_key + ) + + +@patch("homeassistant.components.esphome.manager.secrets.token_bytes") +async def test_manager_handle_dynamic_encryption_key_device_set_key_fails( + mock_token_bytes: Mock, + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, + hass_storage: dict[str, Any], +) -> None: + """Test _handle_dynamic_encryption_key when noise_encryption_set_key returns False.""" + mac_address = "11:22:33:44:55:aa" + test_key_bytes = b"test_key_32_bytes_long_exactly!" + mock_token_bytes.return_value = test_key_bytes + + # Initialize empty storage + hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = { + "version": 1, + "minor_version": 1, + "key": ENCRYPTION_KEY_STORAGE_KEY, + "data": { + "keys": {} # No existing keys + }, + } + + # Create entry without noise PSK + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=mac_address, + ) + entry.add_to_hass(hass) + + # Mock the client methods - set_key returns False + mock_client.noise_encryption_set_key = AsyncMock(return_value=False) + + # Set up device with encryption support + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": mac_address, + "esphome_version": "2023.12.0", + "api_encryption_supported": True, + }, + ) + + # Reset mocks since initial connection already happened + mock_token_bytes.reset_mock() + mock_client.noise_encryption_set_key.reset_mock() + + # Force reconnect to trigger key generation + await device.mock_disconnect(True) + await device.mock_connect() + + # Verify key generation was attempted with the expected key + mock_token_bytes.assert_called_once_with(32) + mock_client.noise_encryption_set_key.assert_called_once_with( + base64.b64encode(test_key_bytes) + ) + + # Verify config entry was NOT updated since set_key failed + assert CONF_NOISE_PSK not in entry.data + + +@patch("homeassistant.components.esphome.manager.secrets.token_bytes") +async def test_manager_handle_dynamic_encryption_key_connection_error( + mock_token_bytes: Mock, + hass: HomeAssistant, + mock_client: APIClient, + mock_esphome_device: MockESPHomeDeviceType, + hass_storage: dict[str, Any], +) -> None: + """Test _handle_dynamic_encryption_key when noise_encryption_set_key raises APIConnectionError.""" + mac_address = "11:22:33:44:55:aa" + test_key_bytes = b"test_key_32_bytes_long_exactly!" + mock_token_bytes.return_value = test_key_bytes + + # Initialize empty storage + hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = { + "version": 1, + "minor_version": 1, + "key": ENCRYPTION_KEY_STORAGE_KEY, + "data": { + "keys": {} # No existing keys + }, + } + + # Create entry without noise PSK + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: "192.168.1.100", + CONF_PORT: 6053, + CONF_PASSWORD: "", + CONF_DEVICE_NAME: "test-device", + }, + unique_id=mac_address, + ) + entry.add_to_hass(hass) + + # Mock the client methods - set_key raises APIConnectionError + mock_client.noise_encryption_set_key = AsyncMock( + side_effect=APIConnectionError("Connection failed") + ) + + # Set up device with encryption support + device = await mock_esphome_device( + mock_client=mock_client, + entry=entry, + device_info={ + "uses_password": False, + "name": "test-device", + "mac_address": mac_address, + "esphome_version": "2023.12.0", + "api_encryption_supported": True, + }, + ) + + # Force reconnect to trigger key generation + await device.mock_disconnect(True) + await device.mock_connect() + + # Verify key generation was attempted twice (once during setup, once during reconnect) + # This is expected because the first attempt failed with connection error + assert mock_token_bytes.call_count == 2 + mock_token_bytes.assert_called_with(32) + assert mock_client.noise_encryption_set_key.call_count == 2 + + # Verify config entry was NOT updated since connection error occurred + assert CONF_NOISE_PSK not in entry.data + + # Verify key was NOT stored due to connection error + assert mac_address not in hass_storage[ENCRYPTION_KEY_STORAGE_KEY]["data"]["keys"]