This commit is contained in:
J. Nick Koston
2025-06-22 13:29:41 +02:00
parent e6d7639209
commit 61c29213a7
3 changed files with 192 additions and 18 deletions

View File

@@ -6,7 +6,7 @@ from pathlib import Path
import voluptuous as vol
from esphome import automation
from esphome import automation, core
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
@@ -483,17 +483,19 @@ async def to_code(config: ConfigType) -> None:
area_hashes: dict[int, str] = {}
area_ids: set[str] = set()
device_hashes: dict[int, str] = {}
area_conf: dict[str, str] | str | None
area_conf: dict[str, str | core.ID] | str | None
if area_conf := config.get(CONF_AREA):
if isinstance(area_conf, dict):
# New way: structured area configuration
area_id_str = area_conf[CONF_ID]
area_var = cg.new_Pvariable(area_id_str)
area_id = fnv1a_32bit_hash(area_id_str)
area_id: core.ID = area_conf[CONF_ID]
area_id_str: str = area_id.id
area_var = cg.new_Pvariable(area_id)
area_id_hash = fnv1a_32bit_hash(area_id_str)
area_name = area_conf[CONF_NAME]
else:
# Old way: string-based area (deprecated)
area_slug = slugify(area_conf)
area_id: core.ID = cv.declare_id(Area)
area_id_str = area_slug
_LOGGER.warning(
"Using 'area' as a string is deprecated. Please use the new format:\n"
@@ -504,19 +506,19 @@ async def to_code(config: ConfigType) -> None:
area_conf,
)
# Create a synthetic area for backwards compatibility
area_var = cg.Pvariable(area_slug, Area)
area_id = fnv1a_32bit_hash(area_conf)
area_var = cg.Pvariable(area_id)
area_id_hash = fnv1a_32bit_hash(area_conf)
area_name = area_conf
# Common setup for both ways
area_hashes[area_id] = area_name
area_hashes[area_id_hash] = area_name
area_ids.add(area_id_str)
cg.add(area_var.set_area_id(area_id))
cg.add(area_var.set_area_id(area_id_hash))
cg.add(area_var.set_name(area_name))
cg.add(cg.App.register_area(area_var))
# Process devices and areas
devices: list[dict[str, str]]
devices: list[dict[str, str | core.ID]]
if not (devices := config[CONF_DEVICES]):
return
@@ -528,11 +530,11 @@ async def to_code(config: ConfigType) -> None:
areas: list[dict[str, str]]
if areas := config[CONF_AREAS]:
for area_conf in areas:
area_id = area_conf[CONF_ID]
area_ids.add(area_id)
area_id: core.ID = area_conf[CONF_ID]
area_ids.add(area_id.id)
area = cg.new_Pvariable(area_id)
area_id_hash = fnv1a_32bit_hash(area_id)
area_name = area_conf[CONF_NAME]
area_id_hash = fnv1a_32bit_hash(area_id.id)
area_name: str = area_conf[CONF_NAME]
_verify_no_collisions(area_hashes, area_id, area_id_hash, CONF_AREAS)
area_hashes[area_id_hash] = area_name
cg.add(area.set_area_id(area_id_hash))
@@ -542,7 +544,7 @@ async def to_code(config: ConfigType) -> None:
# Process devices
for dev_conf in devices:
device_id = dev_conf[CONF_ID]
device_id_hash = fnv1a_32bit_hash(device_id)
device_id_hash = fnv1a_32bit_hash(device_id.id)
device_name = dev_conf[CONF_NAME]
_verify_no_collisions(device_hashes, device_id, device_id_hash, CONF_DEVICES)
device_hashes[device_id_hash] = device_name
@@ -552,10 +554,10 @@ async def to_code(config: ConfigType) -> None:
if CONF_AREA_ID in dev_conf:
# Get the area variable and use its area_id
area_id = dev_conf[CONF_AREA_ID]
area_id_hash = fnv1a_32bit_hash(area_id)
if area_id not in area_ids:
area_id_hash = fnv1a_32bit_hash(area_id.id)
if area_id.id not in area_ids:
raise vol.Invalid(
f"Device '{device_name}' has an area_id '{area_id}' that does not exist.",
f"Device '{device_name}' has an area_id '{area_id.id}' that does not exist.",
path=[CONF_DEVICES, dev_conf[CONF_ID], CONF_AREA_ID],
)
cg.add(dev.set_area_id(area_id_hash))

View File

@@ -0,0 +1,56 @@
esphome:
name: areas-devices-test
# Define top-level area
area:
id: living_room_area
name: Living Room
# Define additional areas
areas:
- id: bedroom_area
name: Bedroom
- id: kitchen_area
name: Kitchen
# Define devices with area assignments
devices:
- id: light_controller_device
name: Light Controller
area_id: living_room_area # Uses top-level area
- id: temp_sensor_device
name: Temperature Sensor
area_id: bedroom_area
- id: motion_detector_device
name: Motion Detector
area_id: living_room_area # Reuses top-level area
- id: smart_switch_device
name: Smart Switch
area_id: kitchen_area
host:
api:
logger:
# Sensors assigned to different devices
sensor:
- platform: template
name: Light Controller Sensor
device_id: light_controller_device
lambda: return 1.0;
update_interval: 0.1s
- platform: template
name: Temperature Sensor Reading
device_id: temp_sensor_device
lambda: return 2.0;
update_interval: 0.1s
- platform: template
name: Motion Detector Status
device_id: motion_detector_device
lambda: return 3.0;
update_interval: 0.1s
- platform: template
name: Smart Switch Power
device_id: smart_switch_device
lambda: return 4.0;
update_interval: 0.1s

View File

@@ -0,0 +1,116 @@
"""Integration test for areas and devices feature."""
from __future__ import annotations
import asyncio
from aioesphomeapi import EntityState
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_areas_and_devices(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test areas and devices configuration with entity mapping."""
async with run_compiled(yaml_config), api_client_connected() as client:
# Get device info which includes areas and devices
device_info = await client.device_info()
assert device_info is not None
# Verify areas are reported
areas = device_info.areas
assert len(areas) >= 2, f"Expected at least 2 areas, got {len(areas)}"
# Find our specific areas
main_area = next((a for a in areas if a.name == "Living Room"), None)
bedroom_area = next((a for a in areas if a.name == "Bedroom"), None)
kitchen_area = next((a for a in areas if a.name == "Kitchen"), None)
assert main_area is not None, "Living Room area not found"
assert bedroom_area is not None, "Bedroom area not found"
assert kitchen_area is not None, "Kitchen area not found"
# Verify devices are reported
devices = device_info.devices
assert len(devices) >= 4, f"Expected at least 4 devices, got {len(devices)}"
# Find our specific devices
light_controller = next(
(d for d in devices if d.name == "Light Controller"), None
)
temp_sensor = next((d for d in devices if d.name == "Temperature Sensor"), None)
motion_detector = next(
(d for d in devices if d.name == "Motion Detector"), None
)
smart_switch = next((d for d in devices if d.name == "Smart Switch"), None)
assert light_controller is not None, "Light Controller device not found"
assert temp_sensor is not None, "Temperature Sensor device not found"
assert motion_detector is not None, "Motion Detector device not found"
assert smart_switch is not None, "Smart Switch device not found"
# Verify device area assignments
assert light_controller.area_id == main_area.area_id, (
"Light Controller should be in Living Room"
)
assert temp_sensor.area_id == bedroom_area.area_id, (
"Temperature Sensor should be in Bedroom"
)
assert motion_detector.area_id == main_area.area_id, (
"Motion Detector should be in Living Room"
)
assert smart_switch.area_id == kitchen_area.area_id, (
"Smart Switch should be in Kitchen"
)
# Get entity list to verify device_id mapping
entities = await client.list_entities_services()
# Collect sensor entities
sensor_entities = [e for e in entities[0] if hasattr(e, "device_id")]
assert len(sensor_entities) >= 4, (
f"Expected at least 4 sensor entities, got {len(sensor_entities)}"
)
# Subscribe to states to get sensor values
loop = asyncio.get_running_loop()
states: dict[int, EntityState] = {}
states_future: asyncio.Future[bool] = loop.create_future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# Check if we have all expected sensor states
if len(states) >= 4 and not states_future.done():
states_future.set_result(True)
client.subscribe_states(on_state)
# Wait for sensor states
try:
await asyncio.wait_for(states_future, timeout=10.0)
except asyncio.TimeoutError:
pytest.fail(
f"Did not receive all sensor states within 10 seconds. "
f"Received {len(states)} states"
)
# Verify we have sensor entities with proper device_id assignments
device_id_mapping = {
"Light Controller Sensor": light_controller.device_id,
"Temperature Sensor Reading": temp_sensor.device_id,
"Motion Detector Status": motion_detector.device_id,
"Smart Switch Power": smart_switch.device_id,
}
for entity in sensor_entities:
if entity.name in device_id_mapping:
expected_device_id = device_id_mapping[entity.name]
assert entity.device_id == expected_device_id, (
f"{entity.name} has device_id {entity.device_id}, "
f"expected {expected_device_id}"
)