Add dynamic encryption key support to the ESPHome integration (#148746)

Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Marcel van der Veldt 2025-07-30 09:54:00 +02:00 committed by GitHub
parent 2ee82e1d6f
commit f66e83f33e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 918 additions and 5 deletions

View File

@ -51,6 +51,7 @@ from .const import (
DOMAIN,
)
from .dashboard import async_get_or_create_dashboard_manager, async_set_dashboard_info
from .encryption_key_storage import async_get_encryption_key_storage
from .entry_data import ESPHomeConfigEntry
from .manager import async_replace_device
@ -159,7 +160,10 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN):
"""Handle reauthorization flow."""
errors = {}
if await self._retrieve_encryption_key_from_dashboard():
if (
await self._retrieve_encryption_key_from_storage()
or await self._retrieve_encryption_key_from_dashboard()
):
error = await self.fetch_device_info()
if error is None:
return await self._async_authenticate_or_add()
@ -226,9 +230,12 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN):
response = await self.fetch_device_info()
self._noise_psk = None
# Try to retrieve an existing key from dashboard or storage.
if (
self._device_name
and await self._retrieve_encryption_key_from_dashboard()
) or (
self._device_mac and await self._retrieve_encryption_key_from_storage()
):
response = await self.fetch_device_info()
@ -284,6 +291,7 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN):
self._name = discovery_info.properties.get("friendly_name", device_name)
self._host = discovery_info.host
self._port = discovery_info.port
self._device_mac = mac_address
self._noise_required = bool(discovery_info.properties.get("api_encryption"))
# Check if already configured
@ -772,6 +780,26 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN):
self._noise_psk = noise_psk
return True
async def _retrieve_encryption_key_from_storage(self) -> bool:
"""Try to retrieve the encryption key from storage.
Return boolean if a key was retrieved.
"""
# Try to get MAC address from current flow state or reauth entry
mac_address = self._device_mac
if mac_address is None and self._reauth_entry is not None:
# In reauth flow, get MAC from the existing entry's unique_id
mac_address = self._reauth_entry.unique_id
assert mac_address is not None
storage = await async_get_encryption_key_storage(self.hass)
if stored_key := await storage.async_get_key(mac_address):
self._noise_psk = stored_key
return True
return False
@staticmethod
@callback
def async_get_options_flow(

View File

@ -0,0 +1,94 @@
"""Encryption key storage for ESPHome devices."""
from __future__ import annotations
import logging
from typing import TypedDict
from homeassistant.core import HomeAssistant
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.singleton import singleton
from homeassistant.helpers.storage import Store
from homeassistant.util.hass_dict import HassKey
_LOGGER = logging.getLogger(__name__)
ENCRYPTION_KEY_STORAGE_VERSION = 1
ENCRYPTION_KEY_STORAGE_KEY = "esphome.encryption_keys"
class EncryptionKeyData(TypedDict):
"""Encryption key storage data."""
keys: dict[str, str] # MAC address -> base64 encoded key
KEY_ENCRYPTION_STORAGE: HassKey[ESPHomeEncryptionKeyStorage] = HassKey(
"esphome_encryption_key_storage"
)
class ESPHomeEncryptionKeyStorage:
"""Storage for ESPHome encryption keys."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the encryption key storage."""
self.hass = hass
self._store = Store[EncryptionKeyData](
hass,
ENCRYPTION_KEY_STORAGE_VERSION,
ENCRYPTION_KEY_STORAGE_KEY,
encoder=JSONEncoder,
)
self._data: EncryptionKeyData | None = None
async def async_load(self) -> None:
"""Load encryption keys from storage."""
if self._data is None:
data = await self._store.async_load()
self._data = data or {"keys": {}}
async def async_save(self) -> None:
"""Save encryption keys to storage."""
if self._data is not None:
await self._store.async_save(self._data)
async def async_get_key(self, mac_address: str) -> str | None:
"""Get encryption key for a MAC address."""
await self.async_load()
assert self._data is not None
return self._data["keys"].get(mac_address.lower())
async def async_store_key(self, mac_address: str, key: str) -> None:
"""Store encryption key for a MAC address."""
await self.async_load()
assert self._data is not None
self._data["keys"][mac_address.lower()] = key
await self.async_save()
_LOGGER.debug(
"Stored encryption key for device with MAC %s",
mac_address,
)
async def async_remove_key(self, mac_address: str) -> None:
"""Remove encryption key for a MAC address."""
await self.async_load()
assert self._data is not None
lower_mac_address = mac_address.lower()
if lower_mac_address in self._data["keys"]:
del self._data["keys"][lower_mac_address]
await self.async_save()
_LOGGER.debug(
"Removed encryption key for device with MAC %s",
mac_address,
)
@singleton(KEY_ENCRYPTION_STORAGE, async_=True)
async def async_get_encryption_key_storage(
hass: HomeAssistant,
) -> ESPHomeEncryptionKeyStorage:
"""Get the encryption key storage instance."""
storage = ESPHomeEncryptionKeyStorage(hass)
await storage.async_load()
return storage

View File

@ -3,8 +3,10 @@
from __future__ import annotations
import asyncio
import base64
from functools import partial
import logging
import secrets
from typing import TYPE_CHECKING, Any, NamedTuple
from aioesphomeapi import (
@ -68,6 +70,7 @@ from .const import (
CONF_ALLOW_SERVICE_CALLS,
CONF_BLUETOOTH_MAC_ADDRESS,
CONF_DEVICE_NAME,
CONF_NOISE_PSK,
CONF_SUBSCRIBE_LOGS,
DEFAULT_ALLOW_SERVICE_CALLS,
DEFAULT_URL,
@ -78,6 +81,7 @@ from .const import (
)
from .dashboard import async_get_dashboard
from .domain_data import DomainData
from .encryption_key_storage import async_get_encryption_key_storage
# Import config flow so that it's added to the registry
from .entry_data import ESPHomeConfigEntry, RuntimeEntryData
@ -85,9 +89,7 @@ from .entry_data import ESPHomeConfigEntry, RuntimeEntryData
DEVICE_CONFLICT_ISSUE_FORMAT = "device_conflict-{}"
if TYPE_CHECKING:
from aioesphomeapi.api_pb2 import ( # type: ignore[attr-defined]
SubscribeLogsResponse,
)
from aioesphomeapi.api_pb2 import SubscribeLogsResponse # type: ignore[attr-defined] # noqa: I001
_LOGGER = logging.getLogger(__name__)
@ -515,6 +517,8 @@ class ESPHomeManager:
assert api_version is not None, "API version must be set"
entry_data.async_on_connect(device_info, api_version)
await self._handle_dynamic_encryption_key(device_info)
if device_info.name:
reconnect_logic.name = device_info.name
@ -618,6 +622,7 @@ class ESPHomeManager:
),
):
return
if isinstance(err, InvalidEncryptionKeyAPIError):
if (
(received_name := err.received_name)
@ -648,6 +653,93 @@ class ESPHomeManager:
return
self.entry.async_start_reauth(self.hass)
async def _handle_dynamic_encryption_key(
self, device_info: EsphomeDeviceInfo
) -> None:
"""Handle dynamic encryption keys.
If a device reports it supports encryption, but we connected without a key,
we need to generate and store one.
"""
noise_psk: str | None = self.entry.data.get(CONF_NOISE_PSK)
if noise_psk:
# we're already connected with a noise PSK - nothing to do
return
if not device_info.api_encryption_supported:
# device does not support encryption - nothing to do
return
# Connected to device without key and the device supports encryption
storage = await async_get_encryption_key_storage(self.hass)
# First check if we have a key in storage for this device
from_storage: bool = False
if self.entry.unique_id and (
stored_key := await storage.async_get_key(self.entry.unique_id)
):
_LOGGER.debug(
"Retrieved encryption key from storage for device %s",
self.entry.unique_id,
)
# Use the stored key
new_key = stored_key.encode()
new_key_str = stored_key
from_storage = True
else:
# No stored key found, generate a new one
_LOGGER.debug(
"Generating new encryption key for device %s", self.entry.unique_id
)
new_key = base64.b64encode(secrets.token_bytes(32))
new_key_str = new_key.decode()
try:
# Store the key on the device using the existing connection
result = await self.cli.noise_encryption_set_key(new_key)
except APIConnectionError as ex:
_LOGGER.error(
"Connection error while storing encryption key for device %s (%s): %s",
self.entry.data.get(CONF_DEVICE_NAME, self.host),
self.entry.unique_id,
ex,
)
return
else:
if not result:
_LOGGER.error(
"Failed to set dynamic encryption key on device %s (%s)",
self.entry.data.get(CONF_DEVICE_NAME, self.host),
self.entry.unique_id,
)
return
# Key stored successfully on device
assert self.entry.unique_id is not None
# Only store in storage if it was newly generated
if not from_storage:
await storage.async_store_key(self.entry.unique_id, new_key_str)
# Always update config entry
self.hass.config_entries.async_update_entry(
self.entry,
data={**self.entry.data, CONF_NOISE_PSK: new_key_str},
)
if from_storage:
_LOGGER.info(
"Set encryption key from storage on device %s (%s)",
self.entry.data.get(CONF_DEVICE_NAME, self.host),
self.entry.unique_id,
)
else:
_LOGGER.info(
"Generated and stored encryption key for device %s (%s)",
self.entry.data.get(CONF_DEVICE_NAME, self.host),
self.entry.unique_id,
)
@callback
def _async_handle_logging_changed(self, _event: Event) -> None:
"""Handle when the logging level changes."""

View File

@ -27,6 +27,9 @@ from homeassistant.components.esphome.const import (
DEFAULT_NEW_CONFIG_ALLOW_ALLOW_SERVICE_CALLS,
DOMAIN,
)
from homeassistant.components.esphome.encryption_key_storage import (
ENCRYPTION_KEY_STORAGE_KEY,
)
from homeassistant.config_entries import SOURCE_IGNORE, ConfigFlowResult
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT
from homeassistant.core import HomeAssistant
@ -41,6 +44,118 @@ from .conftest import MockGenericDeviceEntryType
from tests.common import MockConfigEntry
async def test_retrieve_encryption_key_from_storage_with_device_mac(
hass: HomeAssistant,
mock_client: APIClient,
hass_storage: dict[str, Any],
) -> None:
"""Test key successfully retrieved from storage."""
# Mock the encryption key storage
hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = {
"version": 1,
"minor_version": 1,
"key": ENCRYPTION_KEY_STORAGE_KEY,
"data": {"keys": {"11:22:33:44:55:aa": VALID_NOISE_PSK}},
}
mock_client.device_info.side_effect = [
RequiresEncryptionAPIError,
InvalidEncryptionKeyAPIError("Wrong key", "test", "11:22:33:44:55:AA"),
DeviceInfo(
uses_password=False,
name="test",
mac_address="11:22:33:44:55:AA",
),
]
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
data={CONF_HOST: "127.0.0.1", CONF_PORT: 6053},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_HOST: "127.0.0.1",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_NOISE_PSK: VALID_NOISE_PSK,
CONF_DEVICE_NAME: "test",
}
assert mock_client.noise_psk == VALID_NOISE_PSK
async def test_reauth_fixed_from_from_storage(
hass: HomeAssistant,
mock_client: APIClient,
hass_storage: dict[str, Any],
) -> None:
"""Test reauth fixed automatically via storage."""
# Mock the encryption key storage
hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = {
"version": 1,
"minor_version": 1,
"key": ENCRYPTION_KEY_STORAGE_KEY,
"data": {"keys": {"11:22:33:44:55:aa": VALID_NOISE_PSK}},
}
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "127.0.0.1",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test",
},
unique_id="11:22:33:44:55:aa",
)
entry.add_to_hass(hass)
mock_client.device_info.return_value = DeviceInfo(
uses_password=False, name="test", mac_address="11:22:33:44:55:aa"
)
result = await entry.start_reauth_flow(hass)
assert result["type"] is FlowResultType.ABORT, result
assert result["reason"] == "reauth_successful"
assert entry.data[CONF_NOISE_PSK] == VALID_NOISE_PSK
async def test_retrieve_encryption_key_from_storage_no_key_found(
hass: HomeAssistant,
mock_client: APIClient,
) -> None:
"""Test _retrieve_encryption_key_from_storage when no key is found."""
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "127.0.0.1",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test",
},
unique_id="11:22:33:44:55:aa",
)
entry.add_to_hass(hass)
mock_client.device_info.return_value = DeviceInfo(
uses_password=False, name="test", mac_address="11:22:33:44:55:aa"
)
result = await entry.start_reauth_flow(hass)
assert result["type"] is FlowResultType.FORM, result
assert result["step_id"] == "reauth_confirm"
assert CONF_NOISE_PSK not in entry.data
INVALID_NOISE_PSK = "lSYBYEjQI1bVL8s2Vask4YytGMj1f1epNtmoim2yuTM="
WRONG_NOISE_PSK = "GP+ciK+nVfTQ/gcz6uOdS+oKEdJgesU+jeu8Ssj2how="

View File

@ -0,0 +1,102 @@
"""Tests for ESPHome dynamic encryption key generation."""
from __future__ import annotations
import base64
from homeassistant.components.esphome.encryption_key_storage import (
ESPHomeEncryptionKeyStorage,
async_get_encryption_key_storage,
)
from homeassistant.core import HomeAssistant
async def test_dynamic_encryption_key_generation_mock(hass: HomeAssistant) -> None:
"""Test that encryption key generation works with mocked storage."""
storage = await async_get_encryption_key_storage(hass)
# Store a key
mac_address = "11:22:33:44:55:aa"
test_key = base64.b64encode(b"test_key_32_bytes_long_exactly!").decode()
await storage.async_store_key(mac_address, test_key)
# Retrieve a key
retrieved_key = await storage.async_get_key(mac_address)
assert retrieved_key == test_key
async def test_encryption_key_storage_remove_key(hass: HomeAssistant) -> None:
"""Test ESPHomeEncryptionKeyStorage async_remove_key method."""
# Create storage instance
storage = ESPHomeEncryptionKeyStorage(hass)
# Test removing a key that exists
mac_address = "11:22:33:44:55:aa"
test_key = "test_encryption_key_32_bytes_long"
# First store a key
await storage.async_store_key(mac_address, test_key)
# Verify key exists
retrieved_key = await storage.async_get_key(mac_address)
assert retrieved_key == test_key
# Remove the key
await storage.async_remove_key(mac_address)
# Verify key no longer exists
retrieved_key = await storage.async_get_key(mac_address)
assert retrieved_key is None
# Test removing a key that doesn't exist (should not raise an error)
non_existent_mac = "aa:bb:cc:dd:ee:ff"
await storage.async_remove_key(non_existent_mac) # Should not raise
# Test case insensitive removal
upper_mac = "22:33:44:55:66:77"
await storage.async_store_key(upper_mac, test_key)
# Remove using lowercase MAC address
await storage.async_remove_key(upper_mac.lower())
# Verify key was removed
retrieved_key = await storage.async_get_key(upper_mac)
assert retrieved_key is None
async def test_encryption_key_basic_storage(
hass: HomeAssistant,
) -> None:
"""Test basic encryption key storage functionality."""
storage = await async_get_encryption_key_storage(hass)
mac_address = "11:22:33:44:55:aa"
key = "test_encryption_key_32_bytes_long"
# Store key
await storage.async_store_key(mac_address, key)
# Retrieve key
retrieved_key = await storage.async_get_key(mac_address)
assert retrieved_key == key
async def test_retrieve_key_from_storage(
hass: HomeAssistant,
) -> None:
"""Test config flow can retrieve encryption key from storage for new device."""
# Test that the encryption key storage integration works with config flow
storage = await async_get_encryption_key_storage(hass)
mac_address = "11:22:33:44:55:aa"
stored_key = "test_encryption_key_32_bytes_long"
# Store encryption key for a device
await storage.async_store_key(mac_address, stored_key)
# Verify the key can be retrieved (simulating config flow behavior)
retrieved_key = await storage.async_get_key(mac_address)
assert retrieved_key == stored_key
# Test case insensitive retrieval (since config flows might use different case)
retrieved_key_upper = await storage.async_get_key(mac_address.upper())
assert retrieved_key_upper == stored_key

View File

@ -1,8 +1,10 @@
"""Test ESPHome manager."""
import asyncio
import base64
import logging
from unittest.mock import AsyncMock, Mock, call
from typing import Any
from unittest.mock import AsyncMock, Mock, call, patch
from aioesphomeapi import (
APIClient,
@ -27,11 +29,15 @@ from homeassistant.components.esphome.const import (
CONF_ALLOW_SERVICE_CALLS,
CONF_BLUETOOTH_MAC_ADDRESS,
CONF_DEVICE_NAME,
CONF_NOISE_PSK,
CONF_SUBSCRIBE_LOGS,
DOMAIN,
STABLE_BLE_URL_VERSION,
STABLE_BLE_VERSION_STR,
)
from homeassistant.components.esphome.encryption_key_storage import (
ENCRYPTION_KEY_STORAGE_KEY,
)
from homeassistant.components.esphome.manager import DEVICE_CONFLICT_ISSUE_FORMAT
from homeassistant.components.tag import DOMAIN as TAG_DOMAIN
from homeassistant.const import (
@ -1788,3 +1794,479 @@ async def test_sub_device_references_main_device_area(
)
assert sub_device_3 is not None
assert sub_device_3.suggested_area == "Bedroom"
@patch("homeassistant.components.esphome.manager.secrets.token_bytes")
async def test_dynamic_encryption_key_generation(
mock_token_bytes: Mock,
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
hass_storage: dict[str, Any],
) -> None:
"""Test that a device without a key in storage gets a new one generated."""
mac_address = "11:22:33:44:55:aa"
test_key_bytes = b"test_key_32_bytes_long_exactly!"
mock_token_bytes.return_value = test_key_bytes
expected_key = base64.b64encode(test_key_bytes).decode()
# Create entry without noise PSK
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=mac_address,
)
entry.add_to_hass(hass)
# Mock the client methods
mock_client.noise_encryption_set_key = AsyncMock(return_value=True)
# Set up device with encryption support
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": mac_address,
"esphome_version": "2023.12.0",
"api_encryption_supported": True,
},
)
# Force reconnect to trigger key generation
await device.mock_disconnect(True)
await device.mock_connect()
# Verify the key was generated and set
mock_token_bytes.assert_called_once_with(32)
mock_client.noise_encryption_set_key.assert_called_once()
# Verify config entry was updated
assert entry.data[CONF_NOISE_PSK] == expected_key
async def test_manager_retrieves_key_from_storage_on_reconnect(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
hass_storage: dict[str, Any],
) -> None:
"""Test that manager retrieves encryption key from storage during reconnect."""
mac_address = "11:22:33:44:55:aa"
test_key = base64.b64encode(b"existing_key_32_bytes_long!!!").decode()
# Set up storage with existing key
hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = {
"version": 1,
"minor_version": 1,
"key": ENCRYPTION_KEY_STORAGE_KEY,
"data": {"keys": {mac_address: test_key}},
}
# Create entry without noise PSK (will be loaded from storage)
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=mac_address,
)
entry.add_to_hass(hass)
# Mock the client methods
mock_client.noise_encryption_set_key = AsyncMock(return_value=True)
# Set up device with encryption support
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": mac_address,
"esphome_version": "2023.12.0",
"api_encryption_supported": True,
},
)
# Force reconnect to trigger key retrieval from storage
await device.mock_disconnect(True)
await device.mock_connect()
# Verify noise_encryption_set_key was called with the stored key
mock_client.noise_encryption_set_key.assert_called_once_with(test_key.encode())
# Verify config entry was updated with key from storage
assert entry.data[CONF_NOISE_PSK] == test_key
async def test_manager_handle_dynamic_encryption_key_guard_clauses(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
) -> None:
"""Test _handle_dynamic_encryption_key guard clauses and early returns."""
# Test guard clause - no unique_id
entry_no_id = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=None, # No unique ID - should not generate key
)
entry_no_id.add_to_hass(hass)
# Set up device without unique ID
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry_no_id,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": "11:22:33:44:55:aa",
"esphome_version": "2023.12.0",
"api_encryption_supported": True,
},
)
# noise_encryption_set_key should not be called when no unique_id
mock_client.noise_encryption_set_key = AsyncMock()
await device.mock_disconnect(True)
await device.mock_connect()
mock_client.noise_encryption_set_key.assert_not_called()
async def test_manager_handle_dynamic_encryption_key_edge_cases(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
) -> None:
"""Test _handle_dynamic_encryption_key edge cases for better coverage."""
mac_address = "11:22:33:44:55:aa"
# Test device without encryption support
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=mac_address,
)
entry.add_to_hass(hass)
# Set up device without encryption support
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": mac_address,
"esphome_version": "2023.12.0",
"api_encryption_supported": False, # No encryption support
},
)
# noise_encryption_set_key should not be called when encryption not supported
mock_client.noise_encryption_set_key = AsyncMock()
await device.mock_disconnect(True)
await device.mock_connect()
mock_client.noise_encryption_set_key.assert_not_called()
@patch("homeassistant.components.esphome.manager.secrets.token_bytes")
async def test_manager_dynamic_encryption_key_generation_flow(
mock_token_bytes: Mock,
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
hass_storage: dict[str, Any],
) -> None:
"""Test the complete dynamic encryption key generation flow."""
mac_address = "11:22:33:44:55:aa"
test_key_bytes = b"test_key_32_bytes_long_exactly!"
mock_token_bytes.return_value = test_key_bytes
expected_key = base64.b64encode(test_key_bytes).decode()
# Initialize empty storage
hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = {
"version": 1,
"minor_version": 1,
"key": ENCRYPTION_KEY_STORAGE_KEY,
"data": {
"keys": {} # No existing keys
},
}
# Create entry without noise PSK
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=mac_address,
)
entry.add_to_hass(hass)
# Mock the client methods
mock_client.noise_encryption_set_key = AsyncMock(return_value=True)
# Set up device with encryption support
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": mac_address,
"esphome_version": "2023.12.0",
"api_encryption_supported": True,
},
)
# Force reconnect to trigger key generation
await device.mock_disconnect(True)
await device.mock_connect()
# Verify the complete flow
mock_token_bytes.assert_called_once_with(32)
mock_client.noise_encryption_set_key.assert_called_once()
assert entry.data[CONF_NOISE_PSK] == expected_key
# Verify key was stored in hass_storage
assert (
hass_storage[ENCRYPTION_KEY_STORAGE_KEY]["data"]["keys"][mac_address]
== expected_key
)
@patch("homeassistant.components.esphome.manager.secrets.token_bytes")
async def test_manager_handle_dynamic_encryption_key_no_existing_key(
mock_token_bytes: Mock,
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
hass_storage: dict[str, Any],
) -> None:
"""Test _handle_dynamic_encryption_key when no existing key is found."""
mac_address = "11:22:33:44:55:aa"
test_key_bytes = b"test_key_32_bytes_long_exactly!"
mock_token_bytes.return_value = test_key_bytes
expected_key = base64.b64encode(test_key_bytes).decode()
# Initialize empty storage
hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = {
"version": 1,
"minor_version": 1,
"key": ENCRYPTION_KEY_STORAGE_KEY,
"data": {
"keys": {} # No existing keys
},
}
# Create entry without noise PSK
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=mac_address,
)
entry.add_to_hass(hass)
# Mock the client methods
mock_client.noise_encryption_set_key = AsyncMock(return_value=True)
# Set up device with encryption support
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": mac_address,
"esphome_version": "2023.12.0",
"api_encryption_supported": True,
},
)
# Force reconnect to trigger key generation
await device.mock_disconnect(True)
await device.mock_connect()
# Verify key generation flow
mock_token_bytes.assert_called_once_with(32)
mock_client.noise_encryption_set_key.assert_called_once()
# Verify config entry was updated
assert entry.data[CONF_NOISE_PSK] == expected_key
# Verify key was stored
assert (
hass_storage[ENCRYPTION_KEY_STORAGE_KEY]["data"]["keys"][mac_address]
== expected_key
)
@patch("homeassistant.components.esphome.manager.secrets.token_bytes")
async def test_manager_handle_dynamic_encryption_key_device_set_key_fails(
mock_token_bytes: Mock,
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
hass_storage: dict[str, Any],
) -> None:
"""Test _handle_dynamic_encryption_key when noise_encryption_set_key returns False."""
mac_address = "11:22:33:44:55:aa"
test_key_bytes = b"test_key_32_bytes_long_exactly!"
mock_token_bytes.return_value = test_key_bytes
# Initialize empty storage
hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = {
"version": 1,
"minor_version": 1,
"key": ENCRYPTION_KEY_STORAGE_KEY,
"data": {
"keys": {} # No existing keys
},
}
# Create entry without noise PSK
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=mac_address,
)
entry.add_to_hass(hass)
# Mock the client methods - set_key returns False
mock_client.noise_encryption_set_key = AsyncMock(return_value=False)
# Set up device with encryption support
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": mac_address,
"esphome_version": "2023.12.0",
"api_encryption_supported": True,
},
)
# Reset mocks since initial connection already happened
mock_token_bytes.reset_mock()
mock_client.noise_encryption_set_key.reset_mock()
# Force reconnect to trigger key generation
await device.mock_disconnect(True)
await device.mock_connect()
# Verify key generation was attempted with the expected key
mock_token_bytes.assert_called_once_with(32)
mock_client.noise_encryption_set_key.assert_called_once_with(
base64.b64encode(test_key_bytes)
)
# Verify config entry was NOT updated since set_key failed
assert CONF_NOISE_PSK not in entry.data
@patch("homeassistant.components.esphome.manager.secrets.token_bytes")
async def test_manager_handle_dynamic_encryption_key_connection_error(
mock_token_bytes: Mock,
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
hass_storage: dict[str, Any],
) -> None:
"""Test _handle_dynamic_encryption_key when noise_encryption_set_key raises APIConnectionError."""
mac_address = "11:22:33:44:55:aa"
test_key_bytes = b"test_key_32_bytes_long_exactly!"
mock_token_bytes.return_value = test_key_bytes
# Initialize empty storage
hass_storage[ENCRYPTION_KEY_STORAGE_KEY] = {
"version": 1,
"minor_version": 1,
"key": ENCRYPTION_KEY_STORAGE_KEY,
"data": {
"keys": {} # No existing keys
},
}
# Create entry without noise PSK
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "192.168.1.100",
CONF_PORT: 6053,
CONF_PASSWORD: "",
CONF_DEVICE_NAME: "test-device",
},
unique_id=mac_address,
)
entry.add_to_hass(hass)
# Mock the client methods - set_key raises APIConnectionError
mock_client.noise_encryption_set_key = AsyncMock(
side_effect=APIConnectionError("Connection failed")
)
# Set up device with encryption support
device = await mock_esphome_device(
mock_client=mock_client,
entry=entry,
device_info={
"uses_password": False,
"name": "test-device",
"mac_address": mac_address,
"esphome_version": "2023.12.0",
"api_encryption_supported": True,
},
)
# Force reconnect to trigger key generation
await device.mock_disconnect(True)
await device.mock_connect()
# Verify key generation was attempted twice (once during setup, once during reconnect)
# This is expected because the first attempt failed with connection error
assert mock_token_bytes.call_count == 2
mock_token_bytes.assert_called_with(32)
assert mock_client.noise_encryption_set_key.call_count == 2
# Verify config entry was NOT updated since connection error occurred
assert CONF_NOISE_PSK not in entry.data
# Verify key was NOT stored due to connection error
assert mac_address not in hass_storage[ENCRYPTION_KEY_STORAGE_KEY]["data"]["keys"]