diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index 49ad9706bc..4d99bdbbd6 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -90,19 +90,6 @@ APIConnection::~APIConnection() { #endif } -#ifdef HAS_PROTO_MESSAGE_DUMP -void APIConnection::log_batch_item_(const DeferredBatch::BatchItem &item) { - // Set log-only mode - this->flags_.log_only_mode = true; - - // Call the creator - it will create the message and log it via encode_message_to_buffer - item.creator(item.entity, this, std::numeric_limits::max(), true, item.message_type); - - // Clear log-only mode - this->flags_.log_only_mode = false; -} -#endif - void APIConnection::loop() { if (this->flags_.next_close) { // requested a disconnect @@ -154,15 +141,25 @@ void APIConnection::loop() { } } - // Process deferred batch if scheduled + // Process deferred batch if scheduled and timer has expired if (this->flags_.batch_scheduled && now - this->deferred_batch_.batch_start_time >= this->get_batch_delay_ms_()) { this->process_batch_(); } if (!this->list_entities_iterator_.completed()) { - this->list_entities_iterator_.advance(); + this->process_iterator_batch_(this->list_entities_iterator_); } else if (!this->initial_state_iterator_.completed()) { - this->initial_state_iterator_.advance(); + this->process_iterator_batch_(this->initial_state_iterator_); + + // If we've completed initial states, process any remaining and clear the flag + if (this->initial_state_iterator_.completed()) { + // Process any remaining batched messages immediately + if (!this->deferred_batch_.empty()) { + this->process_batch_(); + } + // Now that everything is sent, enable immediate sending for future state changes + this->flags_.should_try_send_immediately = true; + } } if (this->flags_.sent_ping) { @@ -300,8 +297,8 @@ uint16_t APIConnection::encode_message_to_buffer(ProtoMessage &msg, uint16_t mes #ifdef USE_BINARY_SENSOR bool APIConnection::send_binary_sensor_state(binary_sensor::BinarySensor *binary_sensor) { - return this->schedule_message_(binary_sensor, &APIConnection::try_send_binary_sensor_state, - BinarySensorStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(binary_sensor, &APIConnection::try_send_binary_sensor_state, + BinarySensorStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_binary_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -328,7 +325,7 @@ uint16_t APIConnection::try_send_binary_sensor_info(EntityBase *entity, APIConne #ifdef USE_COVER bool APIConnection::send_cover_state(cover::Cover *cover) { - return this->schedule_message_(cover, &APIConnection::try_send_cover_state, CoverStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(cover, &APIConnection::try_send_cover_state, CoverStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_cover_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -389,7 +386,7 @@ void APIConnection::cover_command(const CoverCommandRequest &msg) { #ifdef USE_FAN bool APIConnection::send_fan_state(fan::Fan *fan) { - return this->schedule_message_(fan, &APIConnection::try_send_fan_state, FanStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(fan, &APIConnection::try_send_fan_state, FanStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_fan_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -448,7 +445,7 @@ void APIConnection::fan_command(const FanCommandRequest &msg) { #ifdef USE_LIGHT bool APIConnection::send_light_state(light::LightState *light) { - return this->schedule_message_(light, &APIConnection::try_send_light_state, LightStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(light, &APIConnection::try_send_light_state, LightStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_light_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -540,7 +537,7 @@ void APIConnection::light_command(const LightCommandRequest &msg) { #ifdef USE_SENSOR bool APIConnection::send_sensor_state(sensor::Sensor *sensor) { - return this->schedule_message_(sensor, &APIConnection::try_send_sensor_state, SensorStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(sensor, &APIConnection::try_send_sensor_state, SensorStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -572,7 +569,7 @@ uint16_t APIConnection::try_send_sensor_info(EntityBase *entity, APIConnection * #ifdef USE_SWITCH bool APIConnection::send_switch_state(switch_::Switch *a_switch) { - return this->schedule_message_(a_switch, &APIConnection::try_send_switch_state, SwitchStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(a_switch, &APIConnection::try_send_switch_state, SwitchStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_switch_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -609,8 +606,8 @@ void APIConnection::switch_command(const SwitchCommandRequest &msg) { #ifdef USE_TEXT_SENSOR bool APIConnection::send_text_sensor_state(text_sensor::TextSensor *text_sensor) { - return this->schedule_message_(text_sensor, &APIConnection::try_send_text_sensor_state, - TextSensorStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(text_sensor, &APIConnection::try_send_text_sensor_state, + TextSensorStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_text_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -637,7 +634,7 @@ uint16_t APIConnection::try_send_text_sensor_info(EntityBase *entity, APIConnect #ifdef USE_CLIMATE bool APIConnection::send_climate_state(climate::Climate *climate) { - return this->schedule_message_(climate, &APIConnection::try_send_climate_state, ClimateStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(climate, &APIConnection::try_send_climate_state, ClimateStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_climate_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -737,7 +734,7 @@ void APIConnection::climate_command(const ClimateCommandRequest &msg) { #ifdef USE_NUMBER bool APIConnection::send_number_state(number::Number *number) { - return this->schedule_message_(number, &APIConnection::try_send_number_state, NumberStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(number, &APIConnection::try_send_number_state, NumberStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_number_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -777,7 +774,7 @@ void APIConnection::number_command(const NumberCommandRequest &msg) { #ifdef USE_DATETIME_DATE bool APIConnection::send_date_state(datetime::DateEntity *date) { - return this->schedule_message_(date, &APIConnection::try_send_date_state, DateStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(date, &APIConnection::try_send_date_state, DateStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_date_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -811,7 +808,7 @@ void APIConnection::date_command(const DateCommandRequest &msg) { #ifdef USE_DATETIME_TIME bool APIConnection::send_time_state(datetime::TimeEntity *time) { - return this->schedule_message_(time, &APIConnection::try_send_time_state, TimeStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(time, &APIConnection::try_send_time_state, TimeStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_time_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -845,8 +842,8 @@ void APIConnection::time_command(const TimeCommandRequest &msg) { #ifdef USE_DATETIME_DATETIME bool APIConnection::send_datetime_state(datetime::DateTimeEntity *datetime) { - return this->schedule_message_(datetime, &APIConnection::try_send_datetime_state, - DateTimeStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(datetime, &APIConnection::try_send_datetime_state, + DateTimeStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_datetime_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -881,7 +878,7 @@ void APIConnection::datetime_command(const DateTimeCommandRequest &msg) { #ifdef USE_TEXT bool APIConnection::send_text_state(text::Text *text) { - return this->schedule_message_(text, &APIConnection::try_send_text_state, TextStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(text, &APIConnection::try_send_text_state, TextStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_text_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -919,7 +916,7 @@ void APIConnection::text_command(const TextCommandRequest &msg) { #ifdef USE_SELECT bool APIConnection::send_select_state(select::Select *select) { - return this->schedule_message_(select, &APIConnection::try_send_select_state, SelectStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(select, &APIConnection::try_send_select_state, SelectStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_select_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -974,7 +971,7 @@ void esphome::api::APIConnection::button_command(const ButtonCommandRequest &msg #ifdef USE_LOCK bool APIConnection::send_lock_state(lock::Lock *a_lock) { - return this->schedule_message_(a_lock, &APIConnection::try_send_lock_state, LockStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(a_lock, &APIConnection::try_send_lock_state, LockStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_lock_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, @@ -1018,7 +1015,7 @@ void APIConnection::lock_command(const LockCommandRequest &msg) { #ifdef USE_VALVE bool APIConnection::send_valve_state(valve::Valve *valve) { - return this->schedule_message_(valve, &APIConnection::try_send_valve_state, ValveStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(valve, &APIConnection::try_send_valve_state, ValveStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_valve_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -1058,8 +1055,8 @@ void APIConnection::valve_command(const ValveCommandRequest &msg) { #ifdef USE_MEDIA_PLAYER bool APIConnection::send_media_player_state(media_player::MediaPlayer *media_player) { - return this->schedule_message_(media_player, &APIConnection::try_send_media_player_state, - MediaPlayerStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(media_player, &APIConnection::try_send_media_player_state, + MediaPlayerStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_media_player_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -1320,8 +1317,8 @@ void APIConnection::voice_assistant_set_configuration(const VoiceAssistantSetCon #ifdef USE_ALARM_CONTROL_PANEL bool APIConnection::send_alarm_control_panel_state(alarm_control_panel::AlarmControlPanel *a_alarm_control_panel) { - return this->schedule_message_(a_alarm_control_panel, &APIConnection::try_send_alarm_control_panel_state, - AlarmControlPanelStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(a_alarm_control_panel, &APIConnection::try_send_alarm_control_panel_state, + AlarmControlPanelStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_alarm_control_panel_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -1404,7 +1401,7 @@ uint16_t APIConnection::try_send_event_info(EntityBase *entity, APIConnection *c #ifdef USE_UPDATE bool APIConnection::send_update_state(update::UpdateEntity *update) { - return this->schedule_message_(update, &APIConnection::try_send_update_state, UpdateStateResponse::MESSAGE_TYPE); + return this->send_message_smart_(update, &APIConnection::try_send_update_state, UpdateStateResponse::MESSAGE_TYPE); } uint16_t APIConnection::try_send_update_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single) { @@ -1751,11 +1748,16 @@ void APIConnection::process_batch_() { if (payload_size > 0 && this->send_buffer(ProtoWriteBuffer{&this->parent_->get_shared_buffer_ref()}, item.message_type)) { - this->deferred_batch_.clear(); +#ifdef HAS_PROTO_MESSAGE_DUMP + // Log messages after send attempt for VV debugging + // It's safe to use the buffer for logging at this point regardless of send result + this->log_batch_item_(item); +#endif + this->clear_batch_(); } else if (payload_size == 0) { // Message too large ESP_LOGW(TAG, "Message too large to send: type=%u", item.message_type); - this->deferred_batch_.clear(); + this->clear_batch_(); } return; } @@ -1864,7 +1866,7 @@ void APIConnection::process_batch_() { this->schedule_batch_(); } else { // All items processed - this->deferred_batch_.clear(); + this->clear_batch_(); } } diff --git a/esphome/components/api/api_connection.h b/esphome/components/api/api_connection.h index 151369aa70..8922aab94a 100644 --- a/esphome/components/api/api_connection.h +++ b/esphome/components/api/api_connection.h @@ -18,6 +18,8 @@ namespace api { // Keepalive timeout in milliseconds static constexpr uint32_t KEEPALIVE_TIMEOUT_MS = 60000; +// Maximum number of entities to process in a single batch during initial state/info sending +static constexpr size_t MAX_INITIAL_PER_BATCH = 20; class APIConnection : public APIServerConnection { public: @@ -296,6 +298,20 @@ class APIConnection : public APIServerConnection { static uint16_t encode_message_to_buffer(ProtoMessage &msg, uint16_t message_type, APIConnection *conn, uint32_t remaining_size, bool is_single); + // Helper method to process multiple entities from an iterator in a batch + template void process_iterator_batch_(Iterator &iterator) { + size_t initial_size = this->deferred_batch_.size(); + while (!iterator.completed() && (this->deferred_batch_.size() - initial_size) < MAX_INITIAL_PER_BATCH) { + iterator.advance(); + } + + // If the batch is full, process it immediately + // Note: iterator.advance() already calls schedule_batch_() via schedule_message_() + if (this->deferred_batch_.size() >= MAX_INITIAL_PER_BATCH) { + this->process_batch_(); + } + } + #ifdef USE_BINARY_SENSOR static uint16_t try_send_binary_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size, bool is_single); @@ -582,7 +598,8 @@ class APIConnection : public APIServerConnection { uint8_t service_call_subscription : 1; uint8_t next_close : 1; uint8_t batch_scheduled : 1; - uint8_t batch_first_message : 1; // For batch buffer allocation + uint8_t batch_first_message : 1; // For batch buffer allocation + uint8_t should_try_send_immediately : 1; // True after initial states are sent #ifdef HAS_PROTO_MESSAGE_DUMP uint8_t log_only_mode : 1; #endif @@ -609,11 +626,50 @@ class APIConnection : public APIServerConnection { bool schedule_batch_(); void process_batch_(); + void clear_batch_() { + this->deferred_batch_.clear(); + this->flags_.batch_scheduled = false; + } #ifdef HAS_PROTO_MESSAGE_DUMP - void log_batch_item_(const DeferredBatch::BatchItem &item); + // Helper to log a proto message from a MessageCreator object + void log_proto_message_(EntityBase *entity, const MessageCreator &creator, uint16_t message_type) { + this->flags_.log_only_mode = true; + creator(entity, this, MAX_PACKET_SIZE, true, message_type); + this->flags_.log_only_mode = false; + } + + void log_batch_item_(const DeferredBatch::BatchItem &item) { + // Use the helper to log the message + this->log_proto_message_(item.entity, item.creator, item.message_type); + } #endif + // Helper method to send a message either immediately or via batching + bool send_message_smart_(EntityBase *entity, MessageCreatorPtr creator, uint16_t message_type) { + // Try to send immediately if: + // 1. We should try to send immediately (should_try_send_immediately = true) + // 2. Batch delay is 0 (user has opted in to immediate sending) + // 3. Buffer has space available + if (this->flags_.should_try_send_immediately && this->get_batch_delay_ms_() == 0 && + this->helper_->can_write_without_blocking()) { + // Now actually encode and send + if (creator(entity, this, MAX_PACKET_SIZE, true) && + this->send_buffer(ProtoWriteBuffer{&this->parent_->get_shared_buffer_ref()}, message_type)) { +#ifdef HAS_PROTO_MESSAGE_DUMP + // Log the message in verbose mode + this->log_proto_message_(entity, MessageCreator(creator), message_type); +#endif + return true; + } + + // If immediate send failed, fall through to batching + } + + // Fall back to scheduled batching + return this->schedule_message_(entity, creator, message_type); + } + // Helper function to schedule a deferred message with known message type bool schedule_message_(EntityBase *entity, MessageCreator creator, uint16_t message_type) { this->deferred_batch_.add_item(entity, std::move(creator), message_type); diff --git a/tests/integration/fixtures/batch_delay_zero_rapid_transitions.yaml b/tests/integration/fixtures/batch_delay_zero_rapid_transitions.yaml new file mode 100644 index 0000000000..32cacfaa79 --- /dev/null +++ b/tests/integration/fixtures/batch_delay_zero_rapid_transitions.yaml @@ -0,0 +1,43 @@ +esphome: + name: rapid-transitions-test +host: +api: + batch_delay: 0ms # Enable immediate sending for rapid transitions +logger: + level: DEBUG + +# Add a sensor that updates frequently to trigger lambda evaluations +sensor: + - platform: template + name: "Update Trigger" + id: update_trigger + lambda: |- + return 0; + update_interval: 10ms + internal: true + +# Simulate an IR remote binary sensor with rapid ON/OFF transitions +binary_sensor: + - platform: template + name: "Simulated IR Remote Button" + id: ir_remote_button + lambda: |- + // Simulate rapid button presses every ~100ms + // Each "press" is ON for ~30ms then OFF + uint32_t now = millis(); + uint32_t press_cycle = now % 100; // 100ms cycle + + // ON for first 30ms of each cycle + if (press_cycle < 30) { + // Only log state change + if (!id(ir_remote_button).state) { + ESP_LOGD("test", "Button ON at %u", now); + } + return true; + } else { + // Only log state change + if (id(ir_remote_button).state) { + ESP_LOGD("test", "Button OFF at %u", now); + } + return false; + } diff --git a/tests/integration/test_batch_delay_zero_rapid_transitions.py b/tests/integration/test_batch_delay_zero_rapid_transitions.py new file mode 100644 index 0000000000..f17319dddf --- /dev/null +++ b/tests/integration/test_batch_delay_zero_rapid_transitions.py @@ -0,0 +1,58 @@ +"""Integration test for API batch_delay: 0 with rapid state transitions.""" + +from __future__ import annotations + +import asyncio +import time + +from aioesphomeapi import BinarySensorInfo, BinarySensorState, EntityState +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_batch_delay_zero_rapid_transitions( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that rapid binary sensor transitions are preserved with batch_delay: 0ms.""" + async with run_compiled(yaml_config), api_client_connected() as client: + # Track state changes + state_changes: list[tuple[bool, float]] = [] + + def on_state(state: EntityState) -> None: + """Track state changes with timestamps.""" + if isinstance(state, BinarySensorState): + state_changes.append((state.state, time.monotonic())) + + # Subscribe to state changes + client.subscribe_states(on_state) + + # Wait for entity info + entity_info, _ = await client.list_entities_services() + binary_sensors = [e for e in entity_info if isinstance(e, BinarySensorInfo)] + assert len(binary_sensors) == 1, "Expected 1 binary sensor" + + # Collect states for 2 seconds + await asyncio.sleep(2.1) + + # Count ON->OFF transitions + on_off_count = 0 + for i in range(1, len(state_changes)): + if state_changes[i - 1][0] and not state_changes[i][0]: # ON to OFF + on_off_count += 1 + + # With batch_delay: 0, we should capture rapid transitions + # The test timing can be variable in CI, so we're being conservative + # We mainly want to verify that we capture multiple rapid transitions + assert on_off_count >= 5, ( + f"Expected at least 5 ON->OFF transitions with batch_delay: 0ms, got {on_off_count}. " + "Rapid transitions may have been lost." + ) + + # Also verify that state changes are happening frequently + assert len(state_changes) >= 10, ( + f"Expected at least 10 state changes, got {len(state_changes)}" + ) diff --git a/tests/integration/test_host_mode_empty_string_options.py b/tests/integration/test_host_mode_empty_string_options.py index d2df839a75..16399dcfb8 100644 --- a/tests/integration/test_host_mode_empty_string_options.py +++ b/tests/integration/test_host_mode_empty_string_options.py @@ -74,37 +74,41 @@ async def test_host_mode_empty_string_options( # If we got here without protobuf decoding errors, the fix is working # The bug would have caused "Invalid protobuf message" errors with trailing bytes - # Also verify we can interact with the select entities - # Subscribe to state changes + # Also verify we can receive state updates for select entities + # This ensures empty strings work properly in state messages too states: dict[int, EntityState] = {} - state_change_future: asyncio.Future[None] = loop.create_future() + states_received_future: asyncio.Future[None] = loop.create_future() + expected_select_keys = {empty_first.key, empty_middle.key, empty_last.key} + received_select_keys = set() def on_state(state: EntityState) -> None: """Track state changes.""" states[state.key] = state - # When we receive the state change for our select, resolve the future - if state.key == empty_first.key and not state_change_future.done(): - state_change_future.set_result(None) + # Track which select entities we've received states for + if state.key in expected_select_keys: + received_select_keys.add(state.key) + # Once we have all select states, we're done + if ( + received_select_keys == expected_select_keys + and not states_received_future.done() + ): + states_received_future.set_result(None) client.subscribe_states(on_state) - # Try setting a select to an empty string option - # This further tests that empty strings are handled correctly - client.select_command(empty_first.key, "") - - # Wait for state update with timeout + # Wait for initial states with timeout try: - await asyncio.wait_for(state_change_future, timeout=5.0) + await asyncio.wait_for(states_received_future, timeout=5.0) except asyncio.TimeoutError: pytest.fail( - "Did not receive state update after setting select to empty string" + f"Did not receive states for all select entities. " + f"Expected keys: {expected_select_keys}, Received: {received_select_keys}" ) - # Verify the state was set to empty string + # Verify we received states for all select entities assert empty_first.key in states - select_state = states[empty_first.key] - assert hasattr(select_state, "state") - assert select_state.state == "" + assert empty_middle.key in states + assert empty_last.key in states - # The test passes if no protobuf decoding errors occurred - # With the bug, we would have gotten "Invalid protobuf message" errors + # The main test is that we got here without protobuf errors + # The select entities with empty string options were properly encoded diff --git a/tests/integration/test_host_mode_fan_preset.py b/tests/integration/test_host_mode_fan_preset.py index 1d956a7290..d18b9f08ad 100644 --- a/tests/integration/test_host_mode_fan_preset.py +++ b/tests/integration/test_host_mode_fan_preset.py @@ -46,14 +46,22 @@ async def test_host_mode_fan_preset( # Subscribe to states states: dict[int, FanState] = {} state_event = asyncio.Event() + initial_states_received = set() def on_state(state: FanState) -> None: if isinstance(state, FanState): states[state.key] = state + initial_states_received.add(state.key) state_event.set() client.subscribe_states(on_state) + # Wait for initial states to be received for all fans + expected_fan_keys = {fan.key for fan in fans} + while initial_states_received != expected_fan_keys: + state_event.clear() + await asyncio.wait_for(state_event.wait(), timeout=2.0) + # Test 1: Turn on fan without speed or preset - should set speed to 100% state_event.clear() client.fan_command( diff --git a/tests/integration/test_host_mode_many_entities.py b/tests/integration/test_host_mode_many_entities.py index d5622e6fa4..005728b8c6 100644 --- a/tests/integration/test_host_mode_many_entities.py +++ b/tests/integration/test_host_mode_many_entities.py @@ -22,36 +22,51 @@ async def test_host_mode_many_entities( async with run_compiled(yaml_config), api_client_connected() as client: # Subscribe to state changes states: dict[int, EntityState] = {} - entity_count_future: asyncio.Future[int] = loop.create_future() + sensor_count_future: asyncio.Future[int] = loop.create_future() def on_state(state: EntityState) -> None: states[state.key] = state - # When we have received states from a good number of entities, resolve the future - if len(states) >= 50 and not entity_count_future.done(): - entity_count_future.set_result(len(states)) + # Count sensor states specifically + sensor_states = [ + s + for s in states.values() + if hasattr(s, "state") and isinstance(s.state, float) + ] + # When we have received states from at least 50 sensors, resolve the future + if len(sensor_states) >= 50 and not sensor_count_future.done(): + sensor_count_future.set_result(len(sensor_states)) client.subscribe_states(on_state) - # Wait for states from at least 50 entities with timeout + # Wait for states from at least 50 sensors with timeout try: - entity_count = await asyncio.wait_for(entity_count_future, timeout=10.0) + sensor_count = await asyncio.wait_for(sensor_count_future, timeout=10.0) except asyncio.TimeoutError: + sensor_states = [ + s + for s in states.values() + if hasattr(s, "state") and isinstance(s.state, float) + ] pytest.fail( - f"Did not receive states from at least 50 entities within 10 seconds. " - f"Received {len(states)} states: {list(states.keys())}" + f"Did not receive states from at least 50 sensors within 10 seconds. " + f"Received {len(sensor_states)} sensor states out of {len(states)} total states" ) # Verify we received a good number of entity states - assert entity_count >= 50, f"Expected at least 50 entities, got {entity_count}" - assert len(states) >= 50, f"Expected at least 50 states, got {len(states)}" + assert len(states) >= 50, ( + f"Expected at least 50 total states, got {len(states)}" + ) - # Verify we have different entity types by checking some expected values + # Verify we have the expected sensor states sensor_states = [ s for s in states.values() if hasattr(s, "state") and isinstance(s.state, float) ] + assert sensor_count >= 50, ( + f"Expected at least 50 sensor states, got {sensor_count}" + ) assert len(sensor_states) >= 50, ( f"Expected at least 50 sensor states, got {len(sensor_states)}" )