Fix flaky test_api_conditional_memory and improve integration test patterns (#9379)

This commit is contained in:
J. Nick Koston 2025-07-07 18:08:21 -05:00 committed by GitHub
parent 7150f2806f
commit a72905191a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 341 additions and 234 deletions

View File

@ -78,3 +78,268 @@ pytest -s tests/integration/test_host_mode_basic.py
- Each test gets its own temporary directory and unique port - Each test gets its own temporary directory and unique port
- Port allocation minimizes race conditions by holding the socket until just before ESPHome starts - Port allocation minimizes race conditions by holding the socket until just before ESPHome starts
- Output from ESPHome processes is displayed for debugging - Output from ESPHome processes is displayed for debugging
## Integration Test Writing Guide
### Test Patterns and Best Practices
#### 1. Test File Naming Convention
- Use descriptive names: `test_{category}_{feature}.py`
- Common categories: `host_mode`, `api`, `scheduler`, `light`, `areas_and_devices`
- Examples:
- `test_host_mode_basic.py` - Basic host mode functionality
- `test_api_message_batching.py` - API message batching
- `test_scheduler_stress.py` - Scheduler stress testing
#### 2. Essential Imports
```python
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from aioesphomeapi import EntityState, SensorState
from .types import APIClientConnectedFactory, RunCompiledFunction
```
#### 3. Common Test Patterns
##### Basic Entity Test
```python
@pytest.mark.asyncio
async def test_my_sensor(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test sensor functionality."""
async with run_compiled(yaml_config), api_client_connected() as client:
# Get entity list
entities, services = await client.list_entities_services()
# Find specific entity
sensor = next((e for e in entities if e.object_id == "my_sensor"), None)
assert sensor is not None
```
##### State Subscription Pattern
```python
# Track state changes with futures
loop = asyncio.get_running_loop()
states: dict[int, EntityState] = {}
state_future: asyncio.Future[EntityState] = loop.create_future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# Check for specific condition using isinstance
if isinstance(state, SensorState) and state.state == expected_value:
if not state_future.done():
state_future.set_result(state)
client.subscribe_states(on_state)
# Wait for state with timeout
try:
result = await asyncio.wait_for(state_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail(f"Expected state not received. Got: {list(states.values())}")
```
##### Service Execution Pattern
```python
# Find and execute service
entities, services = await client.list_entities_services()
my_service = next((s for s in services if s.name == "my_service"), None)
assert my_service is not None
# Execute with parameters
client.execute_service(my_service, {"param1": "value1", "param2": 42})
```
##### Multiple Entity Tracking
```python
# For tests with many entities
loop = asyncio.get_running_loop()
entity_count = 50
received_states: set[int] = set()
all_states_future: asyncio.Future[bool] = loop.create_future()
def on_state(state: EntityState) -> None:
received_states.add(state.key)
if len(received_states) >= entity_count and not all_states_future.done():
all_states_future.set_result(True)
client.subscribe_states(on_state)
await asyncio.wait_for(all_states_future, timeout=10.0)
```
#### 4. YAML Fixture Guidelines
##### Naming Convention
- Match test function name: `test_my_feature``fixtures/my_feature.yaml`
- Note: Remove `test_` prefix for fixture filename
##### Basic Structure
```yaml
esphome:
name: test-name # Use kebab-case
# Optional: areas, devices, platformio_options
host: # Always use host platform for integration tests
api: # Port injected automatically
logger:
level: DEBUG # Optional: Set log level
# Component configurations
sensor:
- platform: template
name: "My Sensor"
id: my_sensor
lambda: return 42.0;
update_interval: 0.1s # Fast updates for testing
```
##### Advanced Features
```yaml
# External components for custom test code
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH # Replaced by test framework
components: [my_test_component]
# Areas and devices
esphome:
name: test-device
areas:
- id: living_room
name: "Living Room"
- id: kitchen
name: "Kitchen"
parent_id: living_room
devices:
- id: my_device
name: "Test Device"
area_id: living_room
# API services
api:
services:
- service: test_service
variables:
my_param: string
then:
- logger.log:
format: "Service called with: %s"
args: [my_param.c_str()]
```
#### 5. Testing Complex Scenarios
##### External Components
Create C++ components in `fixtures/external_components/` for:
- Stress testing
- Custom entity behaviors
- Scheduler testing
- Memory management tests
##### Log Line Monitoring
```python
log_lines: list[str] = []
def on_log_line(line: str) -> None:
log_lines.append(line)
if "expected message" in line:
# Handle specific log messages
async with run_compiled(yaml_config, line_callback=on_log_line):
# Test implementation
```
Example using futures for specific log patterns:
```python
import re
loop = asyncio.get_running_loop()
connected_future = loop.create_future()
service_future = loop.create_future()
# Patterns to match
connected_pattern = re.compile(r"Client .* connected from")
service_pattern = re.compile(r"Service called")
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if not connected_future.done() and connected_pattern.search(line):
connected_future.set_result(True)
elif not service_future.done() and service_pattern.search(line):
service_future.set_result(True)
async with run_compiled(yaml_config, line_callback=check_output):
async with api_client_connected() as client:
# Wait for specific log message
await asyncio.wait_for(connected_future, timeout=5.0)
# Do test actions...
# Wait for service log
await asyncio.wait_for(service_future, timeout=5.0)
```
**Note**: Tests that monitor log messages typically have fewer race conditions compared to state-based testing, making them more reliable. However, be aware that the host platform currently does not have a thread-safe logger, so logging from threads will not work correctly.
##### Timeout Handling
```python
# Always use timeouts for async operations
try:
result = await asyncio.wait_for(some_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail("Operation timed out - check test expectations")
```
#### 6. Common Assertions
```python
# Device info
assert device_info.name == "expected-name"
assert device_info.compilation_time is not None
# Entity properties
assert sensor.accuracy_decimals == 2
assert sensor.state_class == 1 # measurement
assert sensor.force_update is True
# Service availability
assert len(services) > 0
assert any(s.name == "expected_service" for s in services)
# State values
assert state.state == expected_value
assert state.missing_state is False
```
#### 7. Debugging Tips
- Use `pytest -s` to see ESPHome output during tests
- Add descriptive failure messages to assertions
- Use `pytest.fail()` with detailed error info for timeouts
- Check `log_lines` for compilation or runtime errors
- Enable debug logging in YAML fixtures when needed
#### 8. Performance Considerations
- Use short update intervals (0.1s) for faster tests
- Set reasonable timeouts (5-10s for most operations)
- Batch multiple assertions when possible
- Clean up resources properly using context managers
#### 9. Test Categories
- **Basic Tests**: Minimal functionality verification
- **Entity Tests**: Sensor, switch, light behavior
- **API Tests**: Message batching, services, events
- **Scheduler Tests**: Timing, defer operations, stress
- **Memory Tests**: Conditional compilation, optimization
- **Integration Tests**: Areas, devices, complex interactions

View File

@ -2,14 +2,10 @@ esphome:
name: api-conditional-memory-test name: api-conditional-memory-test
host: host:
api: api:
batch_delay: 0ms
actions: actions:
- action: test_simple_service - action: test_simple_service
then: then:
- logger.log: "Simple service called" - logger.log: "Simple service called"
- binary_sensor.template.publish:
id: service_called_sensor
state: ON
- action: test_service_with_args - action: test_service_with_args
variables: variables:
arg_string: string arg_string: string
@ -20,53 +16,14 @@ api:
- logger.log: - logger.log:
format: "Service called with: %s, %d, %d, %.2f" format: "Service called with: %s, %d, %d, %.2f"
args: [arg_string.c_str(), arg_int, arg_bool, arg_float] args: [arg_string.c_str(), arg_int, arg_bool, arg_float]
- sensor.template.publish:
id: service_arg_sensor
state: !lambda 'return arg_float;'
on_client_connected: on_client_connected:
- logger.log: - logger.log:
format: "Client %s connected from %s" format: "Client %s connected from %s"
args: [client_info.c_str(), client_address.c_str()] args: [client_info.c_str(), client_address.c_str()]
- binary_sensor.template.publish:
id: client_connected
state: ON
- text_sensor.template.publish:
id: last_client_info
state: !lambda 'return client_info;'
on_client_disconnected: on_client_disconnected:
- logger.log: - logger.log:
format: "Client %s disconnected from %s" format: "Client %s disconnected from %s"
args: [client_info.c_str(), client_address.c_str()] args: [client_info.c_str(), client_address.c_str()]
- binary_sensor.template.publish:
id: client_connected
state: OFF
- binary_sensor.template.publish:
id: client_disconnected_event
state: ON
logger: logger:
level: DEBUG level: DEBUG
binary_sensor:
- platform: template
name: "Client Connected"
id: client_connected
device_class: connectivity
- platform: template
name: "Client Disconnected Event"
id: client_disconnected_event
- platform: template
name: "Service Called"
id: service_called_sensor
sensor:
- platform: template
name: "Service Argument Value"
id: service_arg_sensor
unit_of_measurement: ""
accuracy_decimals: 2
text_sensor:
- platform: template
name: "Last Client Info"
id: last_client_info

View File

@ -3,15 +3,9 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import re
from aioesphomeapi import ( from aioesphomeapi import UserService, UserServiceArgType
BinarySensorInfo,
EntityState,
SensorInfo,
TextSensorInfo,
UserService,
UserServiceArgType,
)
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -25,50 +19,45 @@ async def test_api_conditional_memory(
) -> None: ) -> None:
"""Test API triggers and services work correctly with conditional compilation.""" """Test API triggers and services work correctly with conditional compilation."""
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# Keep ESPHome process running throughout the test
async with run_compiled(yaml_config): # Track log messages
# First connection connected_future = loop.create_future()
disconnected_future = loop.create_future()
service_simple_future = loop.create_future()
service_args_future = loop.create_future()
# Patterns to match in logs
connected_pattern = re.compile(r"Client .* connected from")
disconnected_pattern = re.compile(r"Client .* disconnected from")
service_simple_pattern = re.compile(r"Simple service called")
service_args_pattern = re.compile(
r"Service called with: test_string, 123, 1, 42\.50"
)
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if not connected_future.done() and connected_pattern.search(line):
connected_future.set_result(True)
elif not disconnected_future.done() and disconnected_pattern.search(line):
disconnected_future.set_result(True)
elif not service_simple_future.done() and service_simple_pattern.search(line):
service_simple_future.set_result(True)
elif not service_args_future.done() and service_args_pattern.search(line):
service_args_future.set_result(True)
# Run with log monitoring
async with run_compiled(yaml_config, line_callback=check_output):
async with api_client_connected() as client: async with api_client_connected() as client:
# Verify device info # Verify device info
device_info = await client.device_info() device_info = await client.device_info()
assert device_info is not None assert device_info is not None
assert device_info.name == "api-conditional-memory-test" assert device_info.name == "api-conditional-memory-test"
# List entities and services # Wait for connection log
entity_info, services = await asyncio.wait_for( await asyncio.wait_for(connected_future, timeout=5.0)
client.list_entities_services(), timeout=5.0
)
# Find our entities # List services
client_connected: BinarySensorInfo | None = None _, services = await client.list_entities_services()
client_disconnected_event: BinarySensorInfo | None = None
service_called_sensor: BinarySensorInfo | None = None
service_arg_sensor: SensorInfo | None = None
last_client_info: TextSensorInfo | None = None
for entity in entity_info:
if isinstance(entity, BinarySensorInfo):
if entity.object_id == "client_connected":
client_connected = entity
elif entity.object_id == "client_disconnected_event":
client_disconnected_event = entity
elif entity.object_id == "service_called":
service_called_sensor = entity
elif isinstance(entity, SensorInfo):
if entity.object_id == "service_argument_value":
service_arg_sensor = entity
elif isinstance(entity, TextSensorInfo):
if entity.object_id == "last_client_info":
last_client_info = entity
# Verify all entities exist
assert client_connected is not None, "client_connected sensor not found"
assert client_disconnected_event is not None, (
"client_disconnected_event sensor not found"
)
assert service_called_sensor is not None, "service_called sensor not found"
assert service_arg_sensor is not None, "service_arg_sensor not found"
assert last_client_info is not None, "last_client_info sensor not found"
# Verify services exist # Verify services exist
assert len(services) == 2, f"Expected 2 services, found {len(services)}" assert len(services) == 2, f"Expected 2 services, found {len(services)}"
@ -98,66 +87,11 @@ async def test_api_conditional_memory(
assert arg_types["arg_bool"] == UserServiceArgType.BOOL assert arg_types["arg_bool"] == UserServiceArgType.BOOL
assert arg_types["arg_float"] == UserServiceArgType.FLOAT assert arg_types["arg_float"] == UserServiceArgType.FLOAT
# Track state changes
states: dict[int, EntityState] = {}
states_future: asyncio.Future[None] = loop.create_future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# Check if we have initial states for connection sensors
if (
client_connected.key in states
and last_client_info.key in states
and not states_future.done()
):
states_future.set_result(None)
client.subscribe_states(on_state)
# Wait for initial states
await asyncio.wait_for(states_future, timeout=5.0)
# Verify on_client_connected trigger fired
connected_state = states.get(client_connected.key)
assert connected_state is not None
assert connected_state.state is True, "Client should be connected"
# Verify client info was captured
client_info_state = states.get(last_client_info.key)
assert client_info_state is not None
assert isinstance(client_info_state.state, str)
assert len(client_info_state.state) > 0, "Client info should not be empty"
# Test simple service
service_future: asyncio.Future[None] = loop.create_future()
def check_service_called(state: EntityState) -> None:
if state.key == service_called_sensor.key and state.state is True:
if not service_future.done():
service_future.set_result(None)
# Update callback to check for service execution
client.subscribe_states(check_service_called)
# Call simple service # Call simple service
client.execute_service(simple_service, {}) client.execute_service(simple_service, {})
# Wait for service to execute # Wait for service log
await asyncio.wait_for(service_future, timeout=5.0) await asyncio.wait_for(service_simple_future, timeout=5.0)
# Test service with arguments
arg_future: asyncio.Future[None] = loop.create_future()
expected_float = 42.5
def check_arg_sensor(state: EntityState) -> None:
if (
state.key == service_arg_sensor.key
and abs(state.state - expected_float) < 0.01
):
if not arg_future.done():
arg_future.set_result(None)
client.subscribe_states(check_arg_sensor)
# Call service with arguments # Call service with arguments
client.execute_service( client.execute_service(
@ -166,43 +100,12 @@ async def test_api_conditional_memory(
"arg_string": "test_string", "arg_string": "test_string",
"arg_int": 123, "arg_int": 123,
"arg_bool": True, "arg_bool": True,
"arg_float": expected_float, "arg_float": 42.5,
}, },
) )
# Wait for service with args to execute # Wait for service with args log
await asyncio.wait_for(arg_future, timeout=5.0) await asyncio.wait_for(service_args_future, timeout=5.0)
# After disconnecting first client, reconnect and verify triggers work # Client disconnected here, wait for disconnect log
async with api_client_connected() as client2: await asyncio.wait_for(disconnected_future, timeout=5.0)
# Subscribe to states with new client
states2: dict[int, EntityState] = {}
states_ready_future: asyncio.Future[None] = loop.create_future()
def on_state2(state: EntityState) -> None:
states2[state.key] = state
# Check if we have received both required states
if (
client_connected.key in states2
and client_disconnected_event.key in states2
and not states_ready_future.done()
):
states_ready_future.set_result(None)
client2.subscribe_states(on_state2)
# Wait for both connected and disconnected event states
await asyncio.wait_for(states_ready_future, timeout=5.0)
# Verify client is connected again (on_client_connected fired)
assert states2[client_connected.key].state is True, (
"Client should be reconnected"
)
# The client_disconnected_event should be ON from when we disconnected
# (it was set ON by on_client_disconnected trigger)
disconnected_state = states2.get(client_disconnected_event.key)
assert disconnected_state is not None
assert disconnected_state.state is True, (
"Disconnect event should be ON from previous disconnect"
)

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import asyncio import asyncio
from typing import Any from typing import Any
from aioesphomeapi import LogLevel from aioesphomeapi import LogLevel, SensorInfo
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -63,7 +63,7 @@ async def test_api_vv_logging(
entity_info, _ = await client.list_entities_services() entity_info, _ = await client.list_entities_services()
# Count sensors # Count sensors
sensor_count = sum(1 for e in entity_info if hasattr(e, "unit_of_measurement")) sensor_count = sum(1 for e in entity_info if isinstance(e, SensorInfo))
assert sensor_count >= 10, f"Expected at least 10 sensors, got {sensor_count}" assert sensor_count >= 10, f"Expected at least 10 sensors, got {sensor_count}"
# Wait for sensor updates to flow with VV logging active # Wait for sensor updates to flow with VV logging active

View File

@ -76,8 +76,8 @@ async def test_areas_and_devices(
# Get entity list to verify device_id mapping # Get entity list to verify device_id mapping
entities = await client.list_entities_services() entities = await client.list_entities_services()
# Collect sensor entities # Collect sensor entities (all entities have device_id)
sensor_entities = [e for e in entities[0] if hasattr(e, "device_id")] sensor_entities = entities[0]
assert len(sensor_entities) >= 4, ( assert len(sensor_entities) >= 4, (
f"Expected at least 4 sensor entities, got {len(sensor_entities)}" f"Expected at least 4 sensor entities, got {len(sensor_entities)}"
) )

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
from aioesphomeapi import EntityState from aioesphomeapi import BinarySensorState, EntityState, SensorState, TextSensorState
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -40,28 +40,22 @@ async def test_device_id_in_state(
entity_device_mapping: dict[int, int] = {} entity_device_mapping: dict[int, int] = {}
for entity in all_entities: for entity in all_entities:
if hasattr(entity, "name") and hasattr(entity, "key"): # All entities have name and key attributes
if entity.name == "Temperature": if entity.name == "Temperature":
entity_device_mapping[entity.key] = device_ids[ entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
"Temperature Monitor" elif entity.name == "Humidity":
] entity_device_mapping[entity.key] = device_ids["Humidity Monitor"]
elif entity.name == "Humidity": elif entity.name == "Motion Detected":
entity_device_mapping[entity.key] = device_ids["Humidity Monitor"] entity_device_mapping[entity.key] = device_ids["Motion Sensor"]
elif entity.name == "Motion Detected": elif entity.name == "Temperature Monitor Power":
entity_device_mapping[entity.key] = device_ids["Motion Sensor"] entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
elif entity.name == "Temperature Monitor Power": elif entity.name == "Temperature Status":
entity_device_mapping[entity.key] = device_ids[ entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
"Temperature Monitor" elif entity.name == "Motion Light":
] entity_device_mapping[entity.key] = device_ids["Motion Sensor"]
elif entity.name == "Temperature Status": elif entity.name == "No Device Sensor":
entity_device_mapping[entity.key] = device_ids[ # Entity without device_id should have device_id 0
"Temperature Monitor" entity_device_mapping[entity.key] = 0
]
elif entity.name == "Motion Light":
entity_device_mapping[entity.key] = device_ids["Motion Sensor"]
elif entity.name == "No Device Sensor":
# Entity without device_id should have device_id 0
entity_device_mapping[entity.key] = 0
assert len(entity_device_mapping) >= 6, ( assert len(entity_device_mapping) >= 6, (
f"Expected at least 6 mapped entities, got {len(entity_device_mapping)}" f"Expected at least 6 mapped entities, got {len(entity_device_mapping)}"
@ -111,7 +105,7 @@ async def test_device_id_in_state(
( (
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") if isinstance(s, SensorState)
and isinstance(s.state, float) and isinstance(s.state, float)
and s.device_id != 0 and s.device_id != 0
), ),
@ -122,11 +116,7 @@ async def test_device_id_in_state(
# Find a binary sensor state # Find a binary sensor state
binary_sensor_state = next( binary_sensor_state = next(
( (s for s in states.values() if isinstance(s, BinarySensorState)),
s
for s in states.values()
if hasattr(s, "state") and isinstance(s.state, bool)
),
None, None,
) )
assert binary_sensor_state is not None, "No binary sensor state found" assert binary_sensor_state is not None, "No binary sensor state found"
@ -136,11 +126,7 @@ async def test_device_id_in_state(
# Find a text sensor state # Find a text sensor state
text_sensor_state = next( text_sensor_state = next(
( (s for s in states.values() if isinstance(s, TextSensorState)),
s
for s in states.values()
if hasattr(s, "state") and isinstance(s.state, str)
),
None, None,
) )
assert text_sensor_state is not None, "No text sensor state found" assert text_sensor_state is not None, "No text sensor state found"

View File

@ -51,9 +51,6 @@ async def test_entity_icon(
entity = entity_map[entity_name] entity = entity_map[entity_name]
# Check icon field # Check icon field
assert hasattr(entity, "icon"), (
f"{entity_name}: Entity should have icon attribute"
)
assert entity.icon == expected_icon, ( assert entity.icon == expected_icon, (
f"{entity_name}: icon mismatch - " f"{entity_name}: icon mismatch - "
f"expected '{expected_icon}', got '{entity.icon}'" f"expected '{expected_icon}', got '{entity.icon}'"
@ -67,9 +64,6 @@ async def test_entity_icon(
entity = entity_map[entity_name] entity = entity_map[entity_name]
# Check icon field is empty # Check icon field is empty
assert hasattr(entity, "icon"), (
f"{entity_name}: Entity should have icon attribute"
)
assert entity.icon == "", ( assert entity.icon == "", (
f"{entity_name}: icon should be empty string for entities without icons, " f"{entity_name}: icon should be empty string for entities without icons, "
f"got '{entity.icon}'" f"got '{entity.icon}'"

View File

@ -25,8 +25,8 @@ async def test_host_mode_entity_fields(
# Create a map of entity names to entity info # Create a map of entity names to entity info
entity_map = {} entity_map = {}
for entity in entities[0]: for entity in entities[0]:
if hasattr(entity, "name"): # All entities should have a name attribute
entity_map[entity.name] = entity entity_map[entity.name] = entity
# Test entities that should be visible via API (non-internal) # Test entities that should be visible via API (non-internal)
visible_test_cases = [ visible_test_cases = [

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
from aioesphomeapi import EntityState from aioesphomeapi import EntityState, SensorState
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -30,7 +30,7 @@ async def test_host_mode_many_entities(
sensor_states = [ sensor_states = [
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") and isinstance(s.state, float) if isinstance(s, SensorState) and isinstance(s.state, float)
] ]
# When we have received states from at least 50 sensors, resolve the future # When we have received states from at least 50 sensors, resolve the future
if len(sensor_states) >= 50 and not sensor_count_future.done(): if len(sensor_states) >= 50 and not sensor_count_future.done():
@ -45,7 +45,7 @@ async def test_host_mode_many_entities(
sensor_states = [ sensor_states = [
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") and isinstance(s.state, float) if isinstance(s, SensorState) and isinstance(s.state, float)
] ]
pytest.fail( pytest.fail(
f"Did not receive states from at least 50 sensors within 10 seconds. " f"Did not receive states from at least 50 sensors within 10 seconds. "
@ -61,7 +61,7 @@ async def test_host_mode_many_entities(
sensor_states = [ sensor_states = [
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") and isinstance(s.state, float) if isinstance(s, SensorState) and isinstance(s.state, float)
] ]
assert sensor_count >= 50, ( assert sensor_count >= 50, (

View File

@ -19,16 +19,17 @@ async def test_host_mode_with_sensor(
) -> None: ) -> None:
"""Test Host mode with a sensor component.""" """Test Host mode with a sensor component."""
# Write, compile and run the ESPHome device, then connect to API # Write, compile and run the ESPHome device, then connect to API
loop = asyncio.get_running_loop()
async with run_compiled(yaml_config), api_client_connected() as client: async with run_compiled(yaml_config), api_client_connected() as client:
# Subscribe to state changes # Subscribe to state changes
states: dict[int, EntityState] = {} states: dict[int, EntityState] = {}
sensor_future: asyncio.Future[EntityState] = asyncio.Future() sensor_future: asyncio.Future[EntityState] = loop.create_future()
def on_state(state: EntityState) -> None: def on_state(state: EntityState) -> None:
states[state.key] = state states[state.key] = state
# If this is our sensor with value 42.0, resolve the future # If this is our sensor with value 42.0, resolve the future
if ( if (
hasattr(state, "state") isinstance(state, aioesphomeapi.SensorState)
and state.state == 42.0 and state.state == 42.0
and not sensor_future.done() and not sensor_future.done()
): ):

View File

@ -7,6 +7,7 @@ including RGB, color temperature, effects, transitions, and flash.
import asyncio import asyncio
from typing import Any from typing import Any
from aioesphomeapi import LightState
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -76,7 +77,7 @@ async def test_light_calls(
client.light_command(key=rgbcw_light.key, white=0.6) client.light_command(key=rgbcw_light.key, white=0.6)
state = await wait_for_state_change(rgbcw_light.key) state = await wait_for_state_change(rgbcw_light.key)
# White might need more tolerance or might not be directly settable # White might need more tolerance or might not be directly settable
if hasattr(state, "white"): if isinstance(state, LightState) and state.white is not None:
assert state.white == pytest.approx(0.6, abs=0.1) assert state.white == pytest.approx(0.6, abs=0.1)
# Test 8: color_temperature only # Test 8: color_temperature only