From a72905191a2c8e97573840757fa7699d6d7df9e3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 7 Jul 2025 18:08:21 -0500 Subject: [PATCH] Fix flaky test_api_conditional_memory and improve integration test patterns (#9379) --- tests/integration/README.md | 265 ++++++++++++++++++ .../fixtures/api_conditional_memory.yaml | 43 --- .../test_api_conditional_memory.py | 179 +++--------- tests/integration/test_api_vv_logging.py | 4 +- tests/integration/test_areas_and_devices.py | 4 +- tests/integration/test_device_id_in_state.py | 54 ++-- tests/integration/test_entity_icon.py | 6 - .../test_host_mode_entity_fields.py | 4 +- .../test_host_mode_many_entities.py | 8 +- tests/integration/test_host_mode_sensor.py | 5 +- tests/integration/test_light_calls.py | 3 +- 11 files changed, 341 insertions(+), 234 deletions(-) diff --git a/tests/integration/README.md b/tests/integration/README.md index 26bd5a00ee..8fce81bb80 100644 --- a/tests/integration/README.md +++ b/tests/integration/README.md @@ -78,3 +78,268 @@ pytest -s tests/integration/test_host_mode_basic.py - Each test gets its own temporary directory and unique port - Port allocation minimizes race conditions by holding the socket until just before ESPHome starts - Output from ESPHome processes is displayed for debugging + +## Integration Test Writing Guide + +### Test Patterns and Best Practices + +#### 1. Test File Naming Convention +- Use descriptive names: `test_{category}_{feature}.py` +- Common categories: `host_mode`, `api`, `scheduler`, `light`, `areas_and_devices` +- Examples: + - `test_host_mode_basic.py` - Basic host mode functionality + - `test_api_message_batching.py` - API message batching + - `test_scheduler_stress.py` - Scheduler stress testing + +#### 2. Essential Imports +```python +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest +from aioesphomeapi import EntityState, SensorState + +from .types import APIClientConnectedFactory, RunCompiledFunction +``` + +#### 3. Common Test Patterns + +##### Basic Entity Test +```python +@pytest.mark.asyncio +async def test_my_sensor( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test sensor functionality.""" + async with run_compiled(yaml_config), api_client_connected() as client: + # Get entity list + entities, services = await client.list_entities_services() + + # Find specific entity + sensor = next((e for e in entities if e.object_id == "my_sensor"), None) + assert sensor is not None +``` + +##### State Subscription Pattern +```python +# Track state changes with futures +loop = asyncio.get_running_loop() +states: dict[int, EntityState] = {} +state_future: asyncio.Future[EntityState] = loop.create_future() + +def on_state(state: EntityState) -> None: + states[state.key] = state + # Check for specific condition using isinstance + if isinstance(state, SensorState) and state.state == expected_value: + if not state_future.done(): + state_future.set_result(state) + +client.subscribe_states(on_state) + +# Wait for state with timeout +try: + result = await asyncio.wait_for(state_future, timeout=5.0) +except asyncio.TimeoutError: + pytest.fail(f"Expected state not received. Got: {list(states.values())}") +``` + +##### Service Execution Pattern +```python +# Find and execute service +entities, services = await client.list_entities_services() +my_service = next((s for s in services if s.name == "my_service"), None) +assert my_service is not None + +# Execute with parameters +client.execute_service(my_service, {"param1": "value1", "param2": 42}) +``` + +##### Multiple Entity Tracking +```python +# For tests with many entities +loop = asyncio.get_running_loop() +entity_count = 50 +received_states: set[int] = set() +all_states_future: asyncio.Future[bool] = loop.create_future() + +def on_state(state: EntityState) -> None: + received_states.add(state.key) + if len(received_states) >= entity_count and not all_states_future.done(): + all_states_future.set_result(True) + +client.subscribe_states(on_state) +await asyncio.wait_for(all_states_future, timeout=10.0) +``` + +#### 4. YAML Fixture Guidelines + +##### Naming Convention +- Match test function name: `test_my_feature` → `fixtures/my_feature.yaml` +- Note: Remove `test_` prefix for fixture filename + +##### Basic Structure +```yaml +esphome: + name: test-name # Use kebab-case + # Optional: areas, devices, platformio_options + +host: # Always use host platform for integration tests +api: # Port injected automatically +logger: + level: DEBUG # Optional: Set log level + +# Component configurations +sensor: + - platform: template + name: "My Sensor" + id: my_sensor + lambda: return 42.0; + update_interval: 0.1s # Fast updates for testing +``` + +##### Advanced Features +```yaml +# External components for custom test code +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH # Replaced by test framework + components: [my_test_component] + +# Areas and devices +esphome: + name: test-device + areas: + - id: living_room + name: "Living Room" + - id: kitchen + name: "Kitchen" + parent_id: living_room + devices: + - id: my_device + name: "Test Device" + area_id: living_room + +# API services +api: + services: + - service: test_service + variables: + my_param: string + then: + - logger.log: + format: "Service called with: %s" + args: [my_param.c_str()] +``` + +#### 5. Testing Complex Scenarios + +##### External Components +Create C++ components in `fixtures/external_components/` for: +- Stress testing +- Custom entity behaviors +- Scheduler testing +- Memory management tests + +##### Log Line Monitoring +```python +log_lines: list[str] = [] + +def on_log_line(line: str) -> None: + log_lines.append(line) + if "expected message" in line: + # Handle specific log messages + +async with run_compiled(yaml_config, line_callback=on_log_line): + # Test implementation +``` + +Example using futures for specific log patterns: +```python +import re + +loop = asyncio.get_running_loop() +connected_future = loop.create_future() +service_future = loop.create_future() + +# Patterns to match +connected_pattern = re.compile(r"Client .* connected from") +service_pattern = re.compile(r"Service called") + +def check_output(line: str) -> None: + """Check log output for expected messages.""" + if not connected_future.done() and connected_pattern.search(line): + connected_future.set_result(True) + elif not service_future.done() and service_pattern.search(line): + service_future.set_result(True) + +async with run_compiled(yaml_config, line_callback=check_output): + async with api_client_connected() as client: + # Wait for specific log message + await asyncio.wait_for(connected_future, timeout=5.0) + + # Do test actions... + + # Wait for service log + await asyncio.wait_for(service_future, timeout=5.0) +``` + +**Note**: Tests that monitor log messages typically have fewer race conditions compared to state-based testing, making them more reliable. However, be aware that the host platform currently does not have a thread-safe logger, so logging from threads will not work correctly. + +##### Timeout Handling +```python +# Always use timeouts for async operations +try: + result = await asyncio.wait_for(some_future, timeout=5.0) +except asyncio.TimeoutError: + pytest.fail("Operation timed out - check test expectations") +``` + +#### 6. Common Assertions + +```python +# Device info +assert device_info.name == "expected-name" +assert device_info.compilation_time is not None + +# Entity properties +assert sensor.accuracy_decimals == 2 +assert sensor.state_class == 1 # measurement +assert sensor.force_update is True + +# Service availability +assert len(services) > 0 +assert any(s.name == "expected_service" for s in services) + +# State values +assert state.state == expected_value +assert state.missing_state is False +``` + +#### 7. Debugging Tips + +- Use `pytest -s` to see ESPHome output during tests +- Add descriptive failure messages to assertions +- Use `pytest.fail()` with detailed error info for timeouts +- Check `log_lines` for compilation or runtime errors +- Enable debug logging in YAML fixtures when needed + +#### 8. Performance Considerations + +- Use short update intervals (0.1s) for faster tests +- Set reasonable timeouts (5-10s for most operations) +- Batch multiple assertions when possible +- Clean up resources properly using context managers + +#### 9. Test Categories + +- **Basic Tests**: Minimal functionality verification +- **Entity Tests**: Sensor, switch, light behavior +- **API Tests**: Message batching, services, events +- **Scheduler Tests**: Timing, defer operations, stress +- **Memory Tests**: Conditional compilation, optimization +- **Integration Tests**: Areas, devices, complex interactions diff --git a/tests/integration/fixtures/api_conditional_memory.yaml b/tests/integration/fixtures/api_conditional_memory.yaml index 49412c3bfe..22e8ed79d6 100644 --- a/tests/integration/fixtures/api_conditional_memory.yaml +++ b/tests/integration/fixtures/api_conditional_memory.yaml @@ -2,14 +2,10 @@ esphome: name: api-conditional-memory-test host: api: - batch_delay: 0ms actions: - action: test_simple_service then: - logger.log: "Simple service called" - - binary_sensor.template.publish: - id: service_called_sensor - state: ON - action: test_service_with_args variables: arg_string: string @@ -20,53 +16,14 @@ api: - logger.log: format: "Service called with: %s, %d, %d, %.2f" args: [arg_string.c_str(), arg_int, arg_bool, arg_float] - - sensor.template.publish: - id: service_arg_sensor - state: !lambda 'return arg_float;' on_client_connected: - logger.log: format: "Client %s connected from %s" args: [client_info.c_str(), client_address.c_str()] - - binary_sensor.template.publish: - id: client_connected - state: ON - - text_sensor.template.publish: - id: last_client_info - state: !lambda 'return client_info;' on_client_disconnected: - logger.log: format: "Client %s disconnected from %s" args: [client_info.c_str(), client_address.c_str()] - - binary_sensor.template.publish: - id: client_connected - state: OFF - - binary_sensor.template.publish: - id: client_disconnected_event - state: ON logger: level: DEBUG - -binary_sensor: - - platform: template - name: "Client Connected" - id: client_connected - device_class: connectivity - - platform: template - name: "Client Disconnected Event" - id: client_disconnected_event - - platform: template - name: "Service Called" - id: service_called_sensor - -sensor: - - platform: template - name: "Service Argument Value" - id: service_arg_sensor - unit_of_measurement: "" - accuracy_decimals: 2 - -text_sensor: - - platform: template - name: "Last Client Info" - id: last_client_info diff --git a/tests/integration/test_api_conditional_memory.py b/tests/integration/test_api_conditional_memory.py index 8048624f70..cfa32c431d 100644 --- a/tests/integration/test_api_conditional_memory.py +++ b/tests/integration/test_api_conditional_memory.py @@ -3,15 +3,9 @@ from __future__ import annotations import asyncio +import re -from aioesphomeapi import ( - BinarySensorInfo, - EntityState, - SensorInfo, - TextSensorInfo, - UserService, - UserServiceArgType, -) +from aioesphomeapi import UserService, UserServiceArgType import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -25,50 +19,45 @@ async def test_api_conditional_memory( ) -> None: """Test API triggers and services work correctly with conditional compilation.""" loop = asyncio.get_running_loop() - # Keep ESPHome process running throughout the test - async with run_compiled(yaml_config): - # First connection + + # Track log messages + connected_future = loop.create_future() + disconnected_future = loop.create_future() + service_simple_future = loop.create_future() + service_args_future = loop.create_future() + + # Patterns to match in logs + connected_pattern = re.compile(r"Client .* connected from") + disconnected_pattern = re.compile(r"Client .* disconnected from") + service_simple_pattern = re.compile(r"Simple service called") + service_args_pattern = re.compile( + r"Service called with: test_string, 123, 1, 42\.50" + ) + + def check_output(line: str) -> None: + """Check log output for expected messages.""" + if not connected_future.done() and connected_pattern.search(line): + connected_future.set_result(True) + elif not disconnected_future.done() and disconnected_pattern.search(line): + disconnected_future.set_result(True) + elif not service_simple_future.done() and service_simple_pattern.search(line): + service_simple_future.set_result(True) + elif not service_args_future.done() and service_args_pattern.search(line): + service_args_future.set_result(True) + + # Run with log monitoring + async with run_compiled(yaml_config, line_callback=check_output): async with api_client_connected() as client: # Verify device info device_info = await client.device_info() assert device_info is not None assert device_info.name == "api-conditional-memory-test" - # List entities and services - entity_info, services = await asyncio.wait_for( - client.list_entities_services(), timeout=5.0 - ) + # Wait for connection log + await asyncio.wait_for(connected_future, timeout=5.0) - # Find our entities - client_connected: BinarySensorInfo | None = None - client_disconnected_event: BinarySensorInfo | None = None - service_called_sensor: BinarySensorInfo | None = None - service_arg_sensor: SensorInfo | None = None - last_client_info: TextSensorInfo | None = None - - for entity in entity_info: - if isinstance(entity, BinarySensorInfo): - if entity.object_id == "client_connected": - client_connected = entity - elif entity.object_id == "client_disconnected_event": - client_disconnected_event = entity - elif entity.object_id == "service_called": - service_called_sensor = entity - elif isinstance(entity, SensorInfo): - if entity.object_id == "service_argument_value": - service_arg_sensor = entity - elif isinstance(entity, TextSensorInfo): - if entity.object_id == "last_client_info": - last_client_info = entity - - # Verify all entities exist - assert client_connected is not None, "client_connected sensor not found" - assert client_disconnected_event is not None, ( - "client_disconnected_event sensor not found" - ) - assert service_called_sensor is not None, "service_called sensor not found" - assert service_arg_sensor is not None, "service_arg_sensor not found" - assert last_client_info is not None, "last_client_info sensor not found" + # List services + _, services = await client.list_entities_services() # Verify services exist assert len(services) == 2, f"Expected 2 services, found {len(services)}" @@ -98,66 +87,11 @@ async def test_api_conditional_memory( assert arg_types["arg_bool"] == UserServiceArgType.BOOL assert arg_types["arg_float"] == UserServiceArgType.FLOAT - # Track state changes - states: dict[int, EntityState] = {} - states_future: asyncio.Future[None] = loop.create_future() - - def on_state(state: EntityState) -> None: - states[state.key] = state - # Check if we have initial states for connection sensors - if ( - client_connected.key in states - and last_client_info.key in states - and not states_future.done() - ): - states_future.set_result(None) - - client.subscribe_states(on_state) - - # Wait for initial states - await asyncio.wait_for(states_future, timeout=5.0) - - # Verify on_client_connected trigger fired - connected_state = states.get(client_connected.key) - assert connected_state is not None - assert connected_state.state is True, "Client should be connected" - - # Verify client info was captured - client_info_state = states.get(last_client_info.key) - assert client_info_state is not None - assert isinstance(client_info_state.state, str) - assert len(client_info_state.state) > 0, "Client info should not be empty" - - # Test simple service - service_future: asyncio.Future[None] = loop.create_future() - - def check_service_called(state: EntityState) -> None: - if state.key == service_called_sensor.key and state.state is True: - if not service_future.done(): - service_future.set_result(None) - - # Update callback to check for service execution - client.subscribe_states(check_service_called) - # Call simple service client.execute_service(simple_service, {}) - # Wait for service to execute - await asyncio.wait_for(service_future, timeout=5.0) - - # Test service with arguments - arg_future: asyncio.Future[None] = loop.create_future() - expected_float = 42.5 - - def check_arg_sensor(state: EntityState) -> None: - if ( - state.key == service_arg_sensor.key - and abs(state.state - expected_float) < 0.01 - ): - if not arg_future.done(): - arg_future.set_result(None) - - client.subscribe_states(check_arg_sensor) + # Wait for service log + await asyncio.wait_for(service_simple_future, timeout=5.0) # Call service with arguments client.execute_service( @@ -166,43 +100,12 @@ async def test_api_conditional_memory( "arg_string": "test_string", "arg_int": 123, "arg_bool": True, - "arg_float": expected_float, + "arg_float": 42.5, }, ) - # Wait for service with args to execute - await asyncio.wait_for(arg_future, timeout=5.0) + # Wait for service with args log + await asyncio.wait_for(service_args_future, timeout=5.0) - # After disconnecting first client, reconnect and verify triggers work - async with api_client_connected() as client2: - # Subscribe to states with new client - states2: dict[int, EntityState] = {} - states_ready_future: asyncio.Future[None] = loop.create_future() - - def on_state2(state: EntityState) -> None: - states2[state.key] = state - # Check if we have received both required states - if ( - client_connected.key in states2 - and client_disconnected_event.key in states2 - and not states_ready_future.done() - ): - states_ready_future.set_result(None) - - client2.subscribe_states(on_state2) - - # Wait for both connected and disconnected event states - await asyncio.wait_for(states_ready_future, timeout=5.0) - - # Verify client is connected again (on_client_connected fired) - assert states2[client_connected.key].state is True, ( - "Client should be reconnected" - ) - - # The client_disconnected_event should be ON from when we disconnected - # (it was set ON by on_client_disconnected trigger) - disconnected_state = states2.get(client_disconnected_event.key) - assert disconnected_state is not None - assert disconnected_state.state is True, ( - "Disconnect event should be ON from previous disconnect" - ) + # Client disconnected here, wait for disconnect log + await asyncio.wait_for(disconnected_future, timeout=5.0) diff --git a/tests/integration/test_api_vv_logging.py b/tests/integration/test_api_vv_logging.py index 19aab2001c..fcbdd341ae 100644 --- a/tests/integration/test_api_vv_logging.py +++ b/tests/integration/test_api_vv_logging.py @@ -5,7 +5,7 @@ from __future__ import annotations import asyncio from typing import Any -from aioesphomeapi import LogLevel +from aioesphomeapi import LogLevel, SensorInfo import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -63,7 +63,7 @@ async def test_api_vv_logging( entity_info, _ = await client.list_entities_services() # Count sensors - sensor_count = sum(1 for e in entity_info if hasattr(e, "unit_of_measurement")) + sensor_count = sum(1 for e in entity_info if isinstance(e, SensorInfo)) assert sensor_count >= 10, f"Expected at least 10 sensors, got {sensor_count}" # Wait for sensor updates to flow with VV logging active diff --git a/tests/integration/test_areas_and_devices.py b/tests/integration/test_areas_and_devices.py index 4ce55a30a7..4184255724 100644 --- a/tests/integration/test_areas_and_devices.py +++ b/tests/integration/test_areas_and_devices.py @@ -76,8 +76,8 @@ async def test_areas_and_devices( # Get entity list to verify device_id mapping entities = await client.list_entities_services() - # Collect sensor entities - sensor_entities = [e for e in entities[0] if hasattr(e, "device_id")] + # Collect sensor entities (all entities have device_id) + sensor_entities = entities[0] assert len(sensor_entities) >= 4, ( f"Expected at least 4 sensor entities, got {len(sensor_entities)}" ) diff --git a/tests/integration/test_device_id_in_state.py b/tests/integration/test_device_id_in_state.py index 3c5181595f..eaa91ec92e 100644 --- a/tests/integration/test_device_id_in_state.py +++ b/tests/integration/test_device_id_in_state.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio -from aioesphomeapi import EntityState +from aioesphomeapi import BinarySensorState, EntityState, SensorState, TextSensorState import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -40,28 +40,22 @@ async def test_device_id_in_state( entity_device_mapping: dict[int, int] = {} for entity in all_entities: - if hasattr(entity, "name") and hasattr(entity, "key"): - if entity.name == "Temperature": - entity_device_mapping[entity.key] = device_ids[ - "Temperature Monitor" - ] - elif entity.name == "Humidity": - entity_device_mapping[entity.key] = device_ids["Humidity Monitor"] - elif entity.name == "Motion Detected": - entity_device_mapping[entity.key] = device_ids["Motion Sensor"] - elif entity.name == "Temperature Monitor Power": - entity_device_mapping[entity.key] = device_ids[ - "Temperature Monitor" - ] - elif entity.name == "Temperature Status": - entity_device_mapping[entity.key] = device_ids[ - "Temperature Monitor" - ] - elif entity.name == "Motion Light": - entity_device_mapping[entity.key] = device_ids["Motion Sensor"] - elif entity.name == "No Device Sensor": - # Entity without device_id should have device_id 0 - entity_device_mapping[entity.key] = 0 + # All entities have name and key attributes + if entity.name == "Temperature": + entity_device_mapping[entity.key] = device_ids["Temperature Monitor"] + elif entity.name == "Humidity": + entity_device_mapping[entity.key] = device_ids["Humidity Monitor"] + elif entity.name == "Motion Detected": + entity_device_mapping[entity.key] = device_ids["Motion Sensor"] + elif entity.name == "Temperature Monitor Power": + entity_device_mapping[entity.key] = device_ids["Temperature Monitor"] + elif entity.name == "Temperature Status": + entity_device_mapping[entity.key] = device_ids["Temperature Monitor"] + elif entity.name == "Motion Light": + entity_device_mapping[entity.key] = device_ids["Motion Sensor"] + elif entity.name == "No Device Sensor": + # Entity without device_id should have device_id 0 + entity_device_mapping[entity.key] = 0 assert len(entity_device_mapping) >= 6, ( f"Expected at least 6 mapped entities, got {len(entity_device_mapping)}" @@ -111,7 +105,7 @@ async def test_device_id_in_state( ( s for s in states.values() - if hasattr(s, "state") + if isinstance(s, SensorState) and isinstance(s.state, float) and s.device_id != 0 ), @@ -122,11 +116,7 @@ async def test_device_id_in_state( # Find a binary sensor state binary_sensor_state = next( - ( - s - for s in states.values() - if hasattr(s, "state") and isinstance(s.state, bool) - ), + (s for s in states.values() if isinstance(s, BinarySensorState)), None, ) assert binary_sensor_state is not None, "No binary sensor state found" @@ -136,11 +126,7 @@ async def test_device_id_in_state( # Find a text sensor state text_sensor_state = next( - ( - s - for s in states.values() - if hasattr(s, "state") and isinstance(s.state, str) - ), + (s for s in states.values() if isinstance(s, TextSensorState)), None, ) assert text_sensor_state is not None, "No text sensor state found" diff --git a/tests/integration/test_entity_icon.py b/tests/integration/test_entity_icon.py index 56e266b486..aec7168165 100644 --- a/tests/integration/test_entity_icon.py +++ b/tests/integration/test_entity_icon.py @@ -51,9 +51,6 @@ async def test_entity_icon( entity = entity_map[entity_name] # Check icon field - assert hasattr(entity, "icon"), ( - f"{entity_name}: Entity should have icon attribute" - ) assert entity.icon == expected_icon, ( f"{entity_name}: icon mismatch - " f"expected '{expected_icon}', got '{entity.icon}'" @@ -67,9 +64,6 @@ async def test_entity_icon( entity = entity_map[entity_name] # Check icon field is empty - assert hasattr(entity, "icon"), ( - f"{entity_name}: Entity should have icon attribute" - ) assert entity.icon == "", ( f"{entity_name}: icon should be empty string for entities without icons, " f"got '{entity.icon}'" diff --git a/tests/integration/test_host_mode_entity_fields.py b/tests/integration/test_host_mode_entity_fields.py index cf3fa6916a..b9fa3e9746 100644 --- a/tests/integration/test_host_mode_entity_fields.py +++ b/tests/integration/test_host_mode_entity_fields.py @@ -25,8 +25,8 @@ async def test_host_mode_entity_fields( # Create a map of entity names to entity info entity_map = {} for entity in entities[0]: - if hasattr(entity, "name"): - entity_map[entity.name] = entity + # All entities should have a name attribute + entity_map[entity.name] = entity # Test entities that should be visible via API (non-internal) visible_test_cases = [ diff --git a/tests/integration/test_host_mode_many_entities.py b/tests/integration/test_host_mode_many_entities.py index 005728b8c6..19d1ee315f 100644 --- a/tests/integration/test_host_mode_many_entities.py +++ b/tests/integration/test_host_mode_many_entities.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio -from aioesphomeapi import EntityState +from aioesphomeapi import EntityState, SensorState import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -30,7 +30,7 @@ async def test_host_mode_many_entities( sensor_states = [ s for s in states.values() - if hasattr(s, "state") and isinstance(s.state, float) + if isinstance(s, SensorState) and isinstance(s.state, float) ] # When we have received states from at least 50 sensors, resolve the future if len(sensor_states) >= 50 and not sensor_count_future.done(): @@ -45,7 +45,7 @@ async def test_host_mode_many_entities( sensor_states = [ s for s in states.values() - if hasattr(s, "state") and isinstance(s.state, float) + if isinstance(s, SensorState) and isinstance(s.state, float) ] pytest.fail( f"Did not receive states from at least 50 sensors within 10 seconds. " @@ -61,7 +61,7 @@ async def test_host_mode_many_entities( sensor_states = [ s for s in states.values() - if hasattr(s, "state") and isinstance(s.state, float) + if isinstance(s, SensorState) and isinstance(s.state, float) ] assert sensor_count >= 50, ( diff --git a/tests/integration/test_host_mode_sensor.py b/tests/integration/test_host_mode_sensor.py index 049f7db619..8c1e9f5d51 100644 --- a/tests/integration/test_host_mode_sensor.py +++ b/tests/integration/test_host_mode_sensor.py @@ -19,16 +19,17 @@ async def test_host_mode_with_sensor( ) -> None: """Test Host mode with a sensor component.""" # Write, compile and run the ESPHome device, then connect to API + loop = asyncio.get_running_loop() async with run_compiled(yaml_config), api_client_connected() as client: # Subscribe to state changes states: dict[int, EntityState] = {} - sensor_future: asyncio.Future[EntityState] = asyncio.Future() + sensor_future: asyncio.Future[EntityState] = loop.create_future() def on_state(state: EntityState) -> None: states[state.key] = state # If this is our sensor with value 42.0, resolve the future if ( - hasattr(state, "state") + isinstance(state, aioesphomeapi.SensorState) and state.state == 42.0 and not sensor_future.done() ): diff --git a/tests/integration/test_light_calls.py b/tests/integration/test_light_calls.py index 8ecb77fb99..1c56bbbf9e 100644 --- a/tests/integration/test_light_calls.py +++ b/tests/integration/test_light_calls.py @@ -7,6 +7,7 @@ including RGB, color temperature, effects, transitions, and flash. import asyncio from typing import Any +from aioesphomeapi import LightState import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -76,7 +77,7 @@ async def test_light_calls( client.light_command(key=rgbcw_light.key, white=0.6) state = await wait_for_state_change(rgbcw_light.key) # White might need more tolerance or might not be directly settable - if hasattr(state, "white"): + if isinstance(state, LightState) and state.white is not None: assert state.white == pytest.approx(0.6, abs=0.1) # Test 8: color_temperature only