From ca70f17b3b282ee505da73e9e3ebc383ed31b6ee Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 08:33:24 -0500 Subject: [PATCH 01/14] make test race safe --- .../fixtures/defer_fifo_simple.yaml | 171 ++++++++++-------- tests/integration/test_defer_fifo_simple.py | 70 ++++--- 2 files changed, 132 insertions(+), 109 deletions(-) diff --git a/tests/integration/fixtures/defer_fifo_simple.yaml b/tests/integration/fixtures/defer_fifo_simple.yaml index 29c1a2bf38..aede9a3cd0 100644 --- a/tests/integration/fixtures/defer_fifo_simple.yaml +++ b/tests/integration/fixtures/defer_fifo_simple.yaml @@ -1,81 +1,5 @@ esphome: name: defer-fifo-simple - on_boot: - - lambda: |- - // Test 1: Test set_timeout with 0 delay (direct scheduler call) - static int set_timeout_order = 0; - static bool set_timeout_passed = true; - - ESP_LOGD("defer_test", "Test 1: Testing set_timeout(0) for FIFO order..."); - for (int i = 0; i < 10; i++) { - int expected = i; - App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { - ESP_LOGD("defer_test", "set_timeout(0) item %d executed, order %d", expected, set_timeout_order); - if (set_timeout_order != expected) { - ESP_LOGE("defer_test", "FIFO violation in set_timeout: expected %d but got execution order %d", expected, set_timeout_order); - set_timeout_passed = false; - } - set_timeout_order++; - - if (set_timeout_order == 10) { - if (set_timeout_passed) { - ESP_LOGI("defer_test", "✓ Test 1 PASSED - set_timeout(0) maintains FIFO order"); - } else { - ESP_LOGE("defer_test", "✗ Test 1 FAILED - set_timeout(0) executed out of order"); - } - - // Start Test 2 after Test 1 completes - App.scheduler.set_timeout((Component*)nullptr, nullptr, 100, []() { - // Test 2: Test defer() method (component method) - static int defer_order = 0; - static bool defer_passed = true; - - ESP_LOGD("defer_test", "Test 2: Testing defer() for FIFO order..."); - - // Create a test component class that exposes defer() - class TestComponent : public Component { - public: - void test_defer() { - for (int i = 0; i < 10; i++) { - int expected = i; - this->defer([expected]() { - ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); - if (defer_order != expected) { - ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); - defer_passed = false; - } - defer_order++; - - if (defer_order == 10) { - bool all_passed = set_timeout_passed && defer_passed; - if (defer_passed) { - ESP_LOGI("defer_test", "✓ Test 2 PASSED - defer() maintains FIFO order"); - if (all_passed) { - ESP_LOGI("defer_test", "✓ ALL TESTS PASSED - Both set_timeout(0) and defer() maintain FIFO order"); - } - } else { - ESP_LOGE("defer_test", "✗ Test 2 FAILED - defer() executed out of order"); - } - - // Publish test results - id(test_complete)->publish_state(true); - id(test_passed)->publish_state(all_passed); - } - }); - } - } - }; - - TestComponent test_component; - test_component.test_defer(); - - ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); - }); - } - }); - } - - ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); host: @@ -83,11 +7,100 @@ logger: level: DEBUG api: + services: + - service: run_defer_test + then: + - lambda: |- + // Test 1: Test set_timeout with 0 delay (direct scheduler call) + static int set_timeout_order = 0; + static bool set_timeout_passed = true; -binary_sensor: + ESP_LOGD("defer_test", "Test 1: Testing set_timeout(0) for FIFO order..."); + for (int i = 0; i < 10; i++) { + int expected = i; + App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { + ESP_LOGD("defer_test", "set_timeout(0) item %d executed, order %d", expected, set_timeout_order); + if (set_timeout_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in set_timeout: expected %d but got execution order %d", expected, set_timeout_order); + set_timeout_passed = false; + } + set_timeout_order++; + + if (set_timeout_order == 10) { + if (set_timeout_passed) { + ESP_LOGI("defer_test", "✓ Test 1 PASSED - set_timeout(0) maintains FIFO order"); + } else { + ESP_LOGE("defer_test", "✗ Test 1 FAILED - set_timeout(0) executed out of order"); + } + + // Start Test 2 after Test 1 completes + App.scheduler.set_timeout((Component*)nullptr, nullptr, 100, []() { + // Test 2: Test defer() method (component method) + static int defer_order = 0; + static bool defer_passed = true; + + ESP_LOGD("defer_test", "Test 2: Testing defer() for FIFO order..."); + + // Create a test component class that exposes defer() + class TestComponent : public Component { + public: + void test_defer() { + for (int i = 0; i < 10; i++) { + int expected = i; + this->defer([expected]() { + ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); + if (defer_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); + defer_passed = false; + } + defer_order++; + + if (defer_order == 10) { + bool all_passed = set_timeout_passed && defer_passed; + if (defer_passed) { + ESP_LOGI("defer_test", "✓ Test 2 PASSED - defer() maintains FIFO order"); + if (all_passed) { + ESP_LOGI("defer_test", "✓ ALL TESTS PASSED - Both set_timeout(0) and defer() maintain FIFO order"); + } + } else { + ESP_LOGE("defer_test", "✗ Test 2 FAILED - defer() executed out of order"); + } + + // Fire test result events + if (all_passed) { + id(test_result)->trigger("passed"); + } else { + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + } + }; + + TestComponent test_component; + test_component.test_defer(); + + ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); + }); + } + }); + } + + ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); + +event: - platform: template name: "Test Complete" id: test_complete + device_class: button + event_types: + - "test_finished" - platform: template - name: "Test Passed" - id: test_passed + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/test_defer_fifo_simple.py b/tests/integration/test_defer_fifo_simple.py index 95a14e64b7..46d68db171 100644 --- a/tests/integration/test_defer_fifo_simple.py +++ b/tests/integration/test_defer_fifo_simple.py @@ -2,7 +2,7 @@ import asyncio -from aioesphomeapi import BinarySensorInfo, BinarySensorState, EntityState +from aioesphomeapi import EntityState, Event, EventInfo, UserService import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -22,57 +22,67 @@ async def test_defer_fifo_simple( assert device_info is not None assert device_info.name == "defer-fifo-simple" - # List entities to get the keys - entity_info, _ = await asyncio.wait_for( + # List entities and services + entity_info, services = await asyncio.wait_for( client.list_entities_services(), timeout=5.0 ) # Find our test entities - test_complete_entity: BinarySensorInfo | None = None - test_passed_entity: BinarySensorInfo | None = None + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None for entity in entity_info: - if isinstance(entity, BinarySensorInfo): + if isinstance(entity, EventInfo): if entity.object_id == "test_complete": test_complete_entity = entity - elif entity.object_id == "test_passed": - test_passed_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity - assert test_complete_entity is not None, "test_complete sensor not found" - assert test_passed_entity is not None, "test_passed sensor not found" + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test service + run_defer_test_service: UserService | None = None + for service in services: + if service.name == "run_defer_test": + run_defer_test_service = service + break + + assert run_defer_test_service is not None, "run_defer_test service not found" # Get the event loop loop = asyncio.get_running_loop() - # Subscribe to state changes - states: dict[int, EntityState] = {} - test_complete_future: asyncio.Future[BinarySensorState] = loop.create_future() - test_passed_future: asyncio.Future[BinarySensorState] = loop.create_future() + # Subscribe to states (events are delivered as EventStates through subscribe_states) + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[bool] = loop.create_future() def on_state(state: EntityState) -> None: - states[state.key] = state - # Check if this is our test_complete binary sensor - if isinstance(state, BinarySensorState): + if isinstance(state, Event): if state.key == test_complete_entity.key: - if state.state and not test_complete_future.done(): - test_complete_future.set_result(state) - elif state.key == test_passed_entity.key: - if not test_passed_future.done(): - test_passed_future.set_result(state) + if ( + state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + elif state.key == test_result_entity.key: + if not test_result_future.done(): + if state.event_type == "passed": + test_result_future.set_result(True) + elif state.event_type == "failed": + test_result_future.set_result(False) client.subscribe_states(on_state) + # Call the run_defer_test service to start the test + client.execute_service(run_defer_test_service, {}) + # Wait for test completion with timeout try: await asyncio.wait_for(test_complete_future, timeout=10.0) - test_passed_state = await asyncio.wait_for(test_passed_future, timeout=1.0) + test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) except asyncio.TimeoutError: - pytest.fail( - f"Test did not complete within 10 seconds. " - f"Received states: {list(states.values())}" - ) + pytest.fail("Test did not complete within 10 seconds") # Verify the test passed - assert test_passed_state.state is True, ( - "FIFO test failed - items executed out of order" - ) + assert test_passed is True, "FIFO test failed - items executed out of order" From cd2b50c27f72671f24cf39dbc84c7baee188b639 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 08:49:12 -0500 Subject: [PATCH 02/14] stress test --- .../fixtures/defer_fifo_simple.yaml | 116 ++++++------ tests/integration/fixtures/defer_stress.yaml | 77 ++++++++ .../api_buffer_test_component/__init__.py | 35 ++++ .../api_buffer_test_component.cpp | 166 ++++++++++++++++++ .../api_buffer_test_component.h | 52 ++++++ tests/integration/test_defer_fifo_simple.py | 51 ++++-- tests/integration/test_defer_stress.py | 90 ++++++++++ 7 files changed, 517 insertions(+), 70 deletions(-) create mode 100644 tests/integration/fixtures/defer_stress.yaml create mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp create mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h create mode 100644 tests/integration/test_defer_stress.py diff --git a/tests/integration/fixtures/defer_fifo_simple.yaml b/tests/integration/fixtures/defer_fifo_simple.yaml index aede9a3cd0..a221256f6c 100644 --- a/tests/integration/fixtures/defer_fifo_simple.yaml +++ b/tests/integration/fixtures/defer_fifo_simple.yaml @@ -8,14 +8,18 @@ logger: api: services: - - service: run_defer_test + - service: test_set_timeout then: - lambda: |- - // Test 1: Test set_timeout with 0 delay (direct scheduler call) + // Test set_timeout with 0 delay (direct scheduler call) static int set_timeout_order = 0; static bool set_timeout_passed = true; - ESP_LOGD("defer_test", "Test 1: Testing set_timeout(0) for FIFO order..."); + // Reset for this test + set_timeout_order = 0; + set_timeout_passed = true; + + ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order..."); for (int i = 0; i < 10; i++) { int expected = i; App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { @@ -28,68 +32,66 @@ api: if (set_timeout_order == 10) { if (set_timeout_passed) { - ESP_LOGI("defer_test", "✓ Test 1 PASSED - set_timeout(0) maintains FIFO order"); + ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order"); + id(test_result)->trigger("passed"); } else { - ESP_LOGE("defer_test", "✗ Test 1 FAILED - set_timeout(0) executed out of order"); + ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order"); + id(test_result)->trigger("failed"); } - - // Start Test 2 after Test 1 completes - App.scheduler.set_timeout((Component*)nullptr, nullptr, 100, []() { - // Test 2: Test defer() method (component method) - static int defer_order = 0; - static bool defer_passed = true; - - ESP_LOGD("defer_test", "Test 2: Testing defer() for FIFO order..."); - - // Create a test component class that exposes defer() - class TestComponent : public Component { - public: - void test_defer() { - for (int i = 0; i < 10; i++) { - int expected = i; - this->defer([expected]() { - ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); - if (defer_order != expected) { - ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); - defer_passed = false; - } - defer_order++; - - if (defer_order == 10) { - bool all_passed = set_timeout_passed && defer_passed; - if (defer_passed) { - ESP_LOGI("defer_test", "✓ Test 2 PASSED - defer() maintains FIFO order"); - if (all_passed) { - ESP_LOGI("defer_test", "✓ ALL TESTS PASSED - Both set_timeout(0) and defer() maintain FIFO order"); - } - } else { - ESP_LOGE("defer_test", "✗ Test 2 FAILED - defer() executed out of order"); - } - - // Fire test result events - if (all_passed) { - id(test_result)->trigger("passed"); - } else { - id(test_result)->trigger("failed"); - } - id(test_complete)->trigger("test_finished"); - } - }); - } - } - }; - - TestComponent test_component; - test_component.test_defer(); - - ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); - }); + id(test_complete)->trigger("test_finished"); } }); } ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); + - service: test_defer + then: + - lambda: |- + // Test defer() method (component method) + static int defer_order = 0; + static bool defer_passed = true; + + // Reset for this test + defer_order = 0; + defer_passed = true; + + ESP_LOGD("defer_test", "Testing defer() for FIFO order..."); + + // Create a test component class that exposes defer() + class TestComponent : public Component { + public: + void test_defer() { + for (int i = 0; i < 10; i++) { + int expected = i; + this->defer([expected]() { + ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); + if (defer_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); + defer_passed = false; + } + defer_order++; + + if (defer_order == 10) { + if (defer_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + } + }; + + TestComponent test_component; + test_component.test_defer(); + + ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); + event: - platform: template name: "Test Complete" diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml new file mode 100644 index 0000000000..867d40ab53 --- /dev/null +++ b/tests/integration/fixtures/defer_stress.yaml @@ -0,0 +1,77 @@ +esphome: + name: defer-stress-test + +host: + +logger: + level: DEBUG + +api: + services: + - service: run_stress_test + then: + - lambda: |- + static int total_defers = 0; + static int executed_defers = 0; + + ESP_LOGI("stress", "Starting defer stress test - rapid sequential defers"); + + // Reset counters + total_defers = 0; + executed_defers = 0; + + // Create a temporary component to access defer() + class TestComponent : public Component { + public: + void run_test() { + // Rapidly defer many callbacks to stress the defer mechanism + for (int batch = 0; batch < 10; batch++) { + for (int i = 0; i < 100; i++) { + int expected_id = total_defers; + this->defer([expected_id]() { + executed_defers++; + ESP_LOGV("stress", "Defer %d executed", expected_id); + }); + total_defers++; + } + // Brief yield to let other work happen + delay(1); + } + } + }; + + TestComponent test_comp; + test_comp.run_test(); + + ESP_LOGI("stress", "Scheduled %d defers", total_defers); + + // Give the main loop time to process all defers + App.scheduler.set_timeout((Component*)nullptr, nullptr, 500, []() { + ESP_LOGI("stress", "Test complete. Defers scheduled: %d, executed: %d", total_defers, executed_defers); + + // We should have executed all defers without crashing + if (executed_defers == total_defers && total_defers == 1000) { + ESP_LOGI("stress", "✓ Stress test PASSED - All %d defers executed", total_defers); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("stress", "✗ Stress test FAILED - Expected 1000 executed, got %d", executed_defers); + id(test_result)->trigger("failed"); + } + + id(test_complete)->trigger("test_finished"); + }); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py b/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py new file mode 100644 index 0000000000..9263e1e084 --- /dev/null +++ b/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py @@ -0,0 +1,35 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +CODEOWNERS = ["@test"] +AUTO_LOAD = ["api"] + +api_buffer_test_component_ns = cg.esphome_ns.namespace("api_buffer_test_component") +APIBufferTestComponent = api_buffer_test_component_ns.class_( + "APIBufferTestComponent", cg.Component +) + +CONF_FILL_SIZE = "fill_size" +CONF_FILL_COUNT = "fill_count" +CONF_AUTO_FILL_DELAY = "auto_fill_delay" + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(APIBufferTestComponent), + cv.Optional(CONF_FILL_SIZE, default=2048): cv.int_range(min=1, max=16384), + cv.Optional(CONF_FILL_COUNT, default=200): cv.int_range(min=1, max=1000), + cv.Optional( + CONF_AUTO_FILL_DELAY, default="2s" + ): cv.positive_time_period_milliseconds, + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) + + cg.add(var.set_fill_size(config[CONF_FILL_SIZE])) + cg.add(var.set_fill_count(config[CONF_FILL_COUNT])) + cg.add(var.set_auto_fill_delay(config[CONF_AUTO_FILL_DELAY])) diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp new file mode 100644 index 0000000000..34be504d8e --- /dev/null +++ b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp @@ -0,0 +1,166 @@ +#include "api_buffer_test_component.h" +#include "esphome/core/application.h" + +namespace esphome { +namespace api_buffer_test_component { + +APIBufferTestComponent *global_api_buffer_test_component = + nullptr; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + +void APIBufferTestComponent::setup() { + ESP_LOGD(TAG, "API Buffer Test Component setup"); + this->last_fill_time_ = millis(); + global_api_buffer_test_component = this; + + // For testing, we'll get the API connection through a hack + // In a real implementation, this would be done properly through the API + App.scheduler.set_timeout(this, "get_api_connection", 500, [this]() { + auto *api_server = api::global_api_server; + if (api_server != nullptr) { + // This is a hack - in production code, use proper API subscription + // For testing, we'll assume there's only one connection + ESP_LOGD(TAG, "Looking for API connection to subscribe to"); + } + }); +} + +void APIBufferTestComponent::loop() { + // Check if API server is ready and has connections + auto *api_server = api::global_api_server; + if (api_server == nullptr || !api_server->is_connected()) { + return; + } + + // Try to get an API connection if we don't have one + if (this->api_connection_ == nullptr && !this->tried_subscribe_) { + this->tried_subscribe_ = true; + ESP_LOGD(TAG, "API server is connected, buffer test component ready"); + // For testing, we'll work with the fact that send_message is available + // through the global API server's connection management + } + + uint32_t now = millis(); + + // Auto-fill buffer after delay if configured + if (this->auto_fill_delay_ > 0 && !this->buffer_filled_ && api_server->is_connected()) { + if (now - this->last_fill_time_ > this->auto_fill_delay_) { + ESP_LOGD(TAG, "Auto-filling buffer after %u ms delay", this->auto_fill_delay_); + // For the test, we'll generate heavy log traffic instead + this->generate_heavy_traffic(); + this->buffer_filled_ = true; + + // Keep generating traffic for 5 seconds + this->should_keep_full_ = true; + this->keep_full_until_ = now + 5000; + } + } + + // Keep buffer full if requested + if (this->should_keep_full_ && now < this->keep_full_until_) { + // Generate more traffic to keep buffer full + this->generate_traffic_burst(); + } else if (this->should_keep_full_ && now >= this->keep_full_until_) { + this->should_keep_full_ = false; + ESP_LOGD(TAG, "Stopped keeping buffer full"); + } +} + +void APIBufferTestComponent::subscribe_api_connection(api::APIConnection *api_connection) { + if (this->api_connection_ != nullptr) { + ESP_LOGE(TAG, "Already subscribed to an API connection"); + return; + } + this->api_connection_ = api_connection; + ESP_LOGD(TAG, "Subscribed to API connection"); +} + +void APIBufferTestComponent::unsubscribe_api_connection(api::APIConnection *api_connection) { + if (this->api_connection_ != api_connection) { + return; + } + this->api_connection_ = nullptr; + ESP_LOGD(TAG, "Unsubscribed from API connection"); +} + +void APIBufferTestComponent::fill_buffer() { + if (this->api_connection_ == nullptr) { + ESP_LOGW(TAG, "No API connection available to fill buffer"); + return; + } + + ESP_LOGD(TAG, "Filling transmit buffer with %zu messages of %zu bytes each", this->fill_count_, this->fill_size_); + + // Create a large text sensor state response to fill the buffer + api::TextSensorStateResponse resp; + resp.key = 0x12345678; // Dummy key + resp.state = std::string(this->fill_size_, 'X'); // Large payload + resp.missing_state = false; + + // Send many messages rapidly to fill the transmit buffer + size_t sent_count = 0; + size_t failed_count = 0; + + for (size_t i = 0; i < this->fill_count_; i++) { + // Modify the string slightly each time + resp.state[0] = 'A' + (i % 26); + + // Send message directly without batching + bool sent = this->api_connection_->send_message(resp); + + if (!sent) { + failed_count++; + ESP_LOGV(TAG, "Message %zu failed to send - buffer likely full", i); + } else { + sent_count++; + } + + // Log progress + if (i % 50 == 0) { + ESP_LOGD(TAG, "Progress: %zu/%zu messages, %zu failed", i, this->fill_count_, failed_count); + } + } + + ESP_LOGD(TAG, "Buffer fill complete: %zu sent, %zu failed", sent_count, failed_count); + this->last_fill_time_ = millis(); +} + +void APIBufferTestComponent::generate_heavy_traffic() { + ESP_LOGD(TAG, "Generating heavy traffic to fill transmit buffer"); + + // Generate many large log messages rapidly + // These will be sent over the API if log subscription is active + std::string large_log(this->fill_size_, 'X'); + + for (size_t i = 0; i < this->fill_count_; i++) { + // Modify the string to ensure each message is unique + large_log[0] = 'A' + (i % 26); + + // Use VERY_VERBOSE level to ensure it's sent when subscribed + ESP_LOGVV(TAG, "Buffer fill #%zu: %s", i, large_log.c_str()); + + // Progress logging at higher level + if (i % 50 == 0) { + ESP_LOGD(TAG, "Traffic generation progress: %zu/%zu", i, this->fill_count_); + } + } + + ESP_LOGD(TAG, "Heavy traffic generation complete"); +} + +void APIBufferTestComponent::generate_traffic_burst() { + // Generate a burst of medium-sized messages to keep buffer topped up + std::string medium_log(512, 'K'); + + for (int i = 0; i < 5; i++) { + medium_log[0] = '0' + (i % 10); + ESP_LOGVV(TAG, "Keep-full burst #%d: %s", i, medium_log.c_str()); + } +} + +void APIBufferTestComponent::keep_buffer_full() { + // Deprecated - use generate_traffic_burst instead + this->generate_traffic_burst(); +} + +} // namespace api_buffer_test_component +} // namespace esphome \ No newline at end of file diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h new file mode 100644 index 0000000000..122f01b6c9 --- /dev/null +++ b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h @@ -0,0 +1,52 @@ +#pragma once + +#include "esphome/core/component.h" +#include "esphome/core/log.h" +#include "esphome/components/api/api_server.h" +#include "esphome/components/api/api_connection.h" +#include "esphome/components/api/api_pb2.h" + +namespace esphome { +namespace api_buffer_test_component { + +static const char *const TAG = "api_buffer_test"; + +class APIBufferTestComponent : public Component { + public: + void setup() override; + void loop() override; + + float get_setup_priority() const override { return setup_priority::AFTER_CONNECTION; } + + // Subscribe to API connection (like bluetooth_proxy) + void subscribe_api_connection(api::APIConnection *api_connection); + void unsubscribe_api_connection(api::APIConnection *api_connection); + + // Test methods + void fill_buffer(); + void keep_buffer_full(); + void generate_heavy_traffic(); + void generate_traffic_burst(); + + // Configuration + void set_fill_size(size_t size) { this->fill_size_ = size; } + void set_fill_count(size_t count) { this->fill_count_ = count; } + void set_auto_fill_delay(uint32_t delay) { this->auto_fill_delay_ = delay; } + + protected: + api::APIConnection *api_connection_{nullptr}; + size_t fill_size_{2048}; + size_t fill_count_{200}; + uint32_t auto_fill_delay_{2000}; + uint32_t last_fill_time_{0}; + bool buffer_filled_{false}; + bool should_keep_full_{false}; + uint32_t keep_full_until_{0}; + bool tried_subscribe_{false}; +}; + +extern APIBufferTestComponent + *global_api_buffer_test_component; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + +} // namespace api_buffer_test_component +} // namespace esphome \ No newline at end of file diff --git a/tests/integration/test_defer_fifo_simple.py b/tests/integration/test_defer_fifo_simple.py index 46d68db171..5bfe02329f 100644 --- a/tests/integration/test_defer_fifo_simple.py +++ b/tests/integration/test_defer_fifo_simple.py @@ -41,14 +41,19 @@ async def test_defer_fifo_simple( assert test_complete_entity is not None, "test_complete event not found" assert test_result_entity is not None, "test_result event not found" - # Find our test service - run_defer_test_service: UserService | None = None + # Find our test services + test_set_timeout_service: UserService | None = None + test_defer_service: UserService | None = None for service in services: - if service.name == "run_defer_test": - run_defer_test_service = service - break + if service.name == "test_set_timeout": + test_set_timeout_service = service + elif service.name == "test_defer": + test_defer_service = service - assert run_defer_test_service is not None, "run_defer_test service not found" + assert test_set_timeout_service is not None, ( + "test_set_timeout service not found" + ) + assert test_defer_service is not None, "test_defer service not found" # Get the event loop loop = asyncio.get_running_loop() @@ -74,15 +79,35 @@ async def test_defer_fifo_simple( client.subscribe_states(on_state) - # Call the run_defer_test service to start the test - client.execute_service(run_defer_test_service, {}) + # Test 1: Test set_timeout(0) + client.execute_service(test_set_timeout_service, {}) - # Wait for test completion with timeout + # Wait for first test completion try: - await asyncio.wait_for(test_complete_future, timeout=10.0) - test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + await asyncio.wait_for(test_complete_future, timeout=5.0) + test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0) except asyncio.TimeoutError: - pytest.fail("Test did not complete within 10 seconds") + pytest.fail("Test set_timeout(0) did not complete within 5 seconds") + + assert test1_passed is True, ( + "set_timeout(0) FIFO test failed - items executed out of order" + ) + + # Reset futures for second test + test_complete_future = loop.create_future() + test_result_future = loop.create_future() + + # Test 2: Test defer() + client.execute_service(test_defer_service, {}) + + # Wait for second test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test defer() did not complete within 5 seconds") # Verify the test passed - assert test_passed is True, "FIFO test failed - items executed out of order" + assert test2_passed is True, ( + "defer() FIFO test failed - items executed out of order" + ) diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py new file mode 100644 index 0000000000..e9d6c48664 --- /dev/null +++ b/tests/integration/test_defer_stress.py @@ -0,0 +1,90 @@ +"""Stress test for defer() thread safety with multiple threads.""" + +import asyncio + +from aioesphomeapi import EntityState, Event, EventInfo, UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_defer_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() doesn't crash when called rapidly from multiple threads.""" + + async with run_compiled(yaml_config), api_client_connected() as client: + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "defer-stress-test" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test entities + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None + + for entity in entity_info: + if isinstance(entity, EventInfo): + if entity.object_id == "test_complete": + test_complete_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity + + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, "run_stress_test service not found" + + # Get the event loop + loop = asyncio.get_running_loop() + + # Subscribe to states (events are delivered as EventStates through subscribe_states) + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[bool] = loop.create_future() + + def on_state(state: EntityState) -> None: + if isinstance(state, Event): + if state.key == test_complete_entity.key: + if ( + state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + elif state.key == test_result_entity.key: + if not test_result_future.done(): + if state.event_type == "passed": + test_result_future.set_result(True) + elif state.event_type == "failed": + test_result_future.set_result(False) + + client.subscribe_states(on_state) + + # Call the run_stress_test service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for test completion with a longer timeout (threads run for 100ms + processing time) + try: + await asyncio.wait_for(test_complete_future, timeout=10.0) + test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Stress test did not complete within 10 seconds") + + # Verify the test passed + assert test_passed is True, ( + "Stress test failed - defer() crashed or failed under thread pressure" + ) From 0665fcea9e023fa6cd6cafec001d928fe1d90246 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 08:49:35 -0500 Subject: [PATCH 03/14] stress test --- .../api_buffer_test_component/__init__.py | 35 ---- .../api_buffer_test_component.cpp | 166 ------------------ .../api_buffer_test_component.h | 52 ------ 3 files changed, 253 deletions(-) delete mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py delete mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp delete mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py b/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py deleted file mode 100644 index 9263e1e084..0000000000 --- a/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py +++ /dev/null @@ -1,35 +0,0 @@ -import esphome.codegen as cg -import esphome.config_validation as cv -from esphome.const import CONF_ID - -CODEOWNERS = ["@test"] -AUTO_LOAD = ["api"] - -api_buffer_test_component_ns = cg.esphome_ns.namespace("api_buffer_test_component") -APIBufferTestComponent = api_buffer_test_component_ns.class_( - "APIBufferTestComponent", cg.Component -) - -CONF_FILL_SIZE = "fill_size" -CONF_FILL_COUNT = "fill_count" -CONF_AUTO_FILL_DELAY = "auto_fill_delay" - -CONFIG_SCHEMA = cv.Schema( - { - cv.GenerateID(): cv.declare_id(APIBufferTestComponent), - cv.Optional(CONF_FILL_SIZE, default=2048): cv.int_range(min=1, max=16384), - cv.Optional(CONF_FILL_COUNT, default=200): cv.int_range(min=1, max=1000), - cv.Optional( - CONF_AUTO_FILL_DELAY, default="2s" - ): cv.positive_time_period_milliseconds, - } -).extend(cv.COMPONENT_SCHEMA) - - -async def to_code(config): - var = cg.new_Pvariable(config[CONF_ID]) - await cg.register_component(var, config) - - cg.add(var.set_fill_size(config[CONF_FILL_SIZE])) - cg.add(var.set_fill_count(config[CONF_FILL_COUNT])) - cg.add(var.set_auto_fill_delay(config[CONF_AUTO_FILL_DELAY])) diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp deleted file mode 100644 index 34be504d8e..0000000000 --- a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp +++ /dev/null @@ -1,166 +0,0 @@ -#include "api_buffer_test_component.h" -#include "esphome/core/application.h" - -namespace esphome { -namespace api_buffer_test_component { - -APIBufferTestComponent *global_api_buffer_test_component = - nullptr; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) - -void APIBufferTestComponent::setup() { - ESP_LOGD(TAG, "API Buffer Test Component setup"); - this->last_fill_time_ = millis(); - global_api_buffer_test_component = this; - - // For testing, we'll get the API connection through a hack - // In a real implementation, this would be done properly through the API - App.scheduler.set_timeout(this, "get_api_connection", 500, [this]() { - auto *api_server = api::global_api_server; - if (api_server != nullptr) { - // This is a hack - in production code, use proper API subscription - // For testing, we'll assume there's only one connection - ESP_LOGD(TAG, "Looking for API connection to subscribe to"); - } - }); -} - -void APIBufferTestComponent::loop() { - // Check if API server is ready and has connections - auto *api_server = api::global_api_server; - if (api_server == nullptr || !api_server->is_connected()) { - return; - } - - // Try to get an API connection if we don't have one - if (this->api_connection_ == nullptr && !this->tried_subscribe_) { - this->tried_subscribe_ = true; - ESP_LOGD(TAG, "API server is connected, buffer test component ready"); - // For testing, we'll work with the fact that send_message is available - // through the global API server's connection management - } - - uint32_t now = millis(); - - // Auto-fill buffer after delay if configured - if (this->auto_fill_delay_ > 0 && !this->buffer_filled_ && api_server->is_connected()) { - if (now - this->last_fill_time_ > this->auto_fill_delay_) { - ESP_LOGD(TAG, "Auto-filling buffer after %u ms delay", this->auto_fill_delay_); - // For the test, we'll generate heavy log traffic instead - this->generate_heavy_traffic(); - this->buffer_filled_ = true; - - // Keep generating traffic for 5 seconds - this->should_keep_full_ = true; - this->keep_full_until_ = now + 5000; - } - } - - // Keep buffer full if requested - if (this->should_keep_full_ && now < this->keep_full_until_) { - // Generate more traffic to keep buffer full - this->generate_traffic_burst(); - } else if (this->should_keep_full_ && now >= this->keep_full_until_) { - this->should_keep_full_ = false; - ESP_LOGD(TAG, "Stopped keeping buffer full"); - } -} - -void APIBufferTestComponent::subscribe_api_connection(api::APIConnection *api_connection) { - if (this->api_connection_ != nullptr) { - ESP_LOGE(TAG, "Already subscribed to an API connection"); - return; - } - this->api_connection_ = api_connection; - ESP_LOGD(TAG, "Subscribed to API connection"); -} - -void APIBufferTestComponent::unsubscribe_api_connection(api::APIConnection *api_connection) { - if (this->api_connection_ != api_connection) { - return; - } - this->api_connection_ = nullptr; - ESP_LOGD(TAG, "Unsubscribed from API connection"); -} - -void APIBufferTestComponent::fill_buffer() { - if (this->api_connection_ == nullptr) { - ESP_LOGW(TAG, "No API connection available to fill buffer"); - return; - } - - ESP_LOGD(TAG, "Filling transmit buffer with %zu messages of %zu bytes each", this->fill_count_, this->fill_size_); - - // Create a large text sensor state response to fill the buffer - api::TextSensorStateResponse resp; - resp.key = 0x12345678; // Dummy key - resp.state = std::string(this->fill_size_, 'X'); // Large payload - resp.missing_state = false; - - // Send many messages rapidly to fill the transmit buffer - size_t sent_count = 0; - size_t failed_count = 0; - - for (size_t i = 0; i < this->fill_count_; i++) { - // Modify the string slightly each time - resp.state[0] = 'A' + (i % 26); - - // Send message directly without batching - bool sent = this->api_connection_->send_message(resp); - - if (!sent) { - failed_count++; - ESP_LOGV(TAG, "Message %zu failed to send - buffer likely full", i); - } else { - sent_count++; - } - - // Log progress - if (i % 50 == 0) { - ESP_LOGD(TAG, "Progress: %zu/%zu messages, %zu failed", i, this->fill_count_, failed_count); - } - } - - ESP_LOGD(TAG, "Buffer fill complete: %zu sent, %zu failed", sent_count, failed_count); - this->last_fill_time_ = millis(); -} - -void APIBufferTestComponent::generate_heavy_traffic() { - ESP_LOGD(TAG, "Generating heavy traffic to fill transmit buffer"); - - // Generate many large log messages rapidly - // These will be sent over the API if log subscription is active - std::string large_log(this->fill_size_, 'X'); - - for (size_t i = 0; i < this->fill_count_; i++) { - // Modify the string to ensure each message is unique - large_log[0] = 'A' + (i % 26); - - // Use VERY_VERBOSE level to ensure it's sent when subscribed - ESP_LOGVV(TAG, "Buffer fill #%zu: %s", i, large_log.c_str()); - - // Progress logging at higher level - if (i % 50 == 0) { - ESP_LOGD(TAG, "Traffic generation progress: %zu/%zu", i, this->fill_count_); - } - } - - ESP_LOGD(TAG, "Heavy traffic generation complete"); -} - -void APIBufferTestComponent::generate_traffic_burst() { - // Generate a burst of medium-sized messages to keep buffer topped up - std::string medium_log(512, 'K'); - - for (int i = 0; i < 5; i++) { - medium_log[0] = '0' + (i % 10); - ESP_LOGVV(TAG, "Keep-full burst #%d: %s", i, medium_log.c_str()); - } -} - -void APIBufferTestComponent::keep_buffer_full() { - // Deprecated - use generate_traffic_burst instead - this->generate_traffic_burst(); -} - -} // namespace api_buffer_test_component -} // namespace esphome \ No newline at end of file diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h deleted file mode 100644 index 122f01b6c9..0000000000 --- a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include "esphome/core/component.h" -#include "esphome/core/log.h" -#include "esphome/components/api/api_server.h" -#include "esphome/components/api/api_connection.h" -#include "esphome/components/api/api_pb2.h" - -namespace esphome { -namespace api_buffer_test_component { - -static const char *const TAG = "api_buffer_test"; - -class APIBufferTestComponent : public Component { - public: - void setup() override; - void loop() override; - - float get_setup_priority() const override { return setup_priority::AFTER_CONNECTION; } - - // Subscribe to API connection (like bluetooth_proxy) - void subscribe_api_connection(api::APIConnection *api_connection); - void unsubscribe_api_connection(api::APIConnection *api_connection); - - // Test methods - void fill_buffer(); - void keep_buffer_full(); - void generate_heavy_traffic(); - void generate_traffic_burst(); - - // Configuration - void set_fill_size(size_t size) { this->fill_size_ = size; } - void set_fill_count(size_t count) { this->fill_count_ = count; } - void set_auto_fill_delay(uint32_t delay) { this->auto_fill_delay_ = delay; } - - protected: - api::APIConnection *api_connection_{nullptr}; - size_t fill_size_{2048}; - size_t fill_count_{200}; - uint32_t auto_fill_delay_{2000}; - uint32_t last_fill_time_{0}; - bool buffer_filled_{false}; - bool should_keep_full_{false}; - uint32_t keep_full_until_{0}; - bool tried_subscribe_{false}; -}; - -extern APIBufferTestComponent - *global_api_buffer_test_component; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) - -} // namespace api_buffer_test_component -} // namespace esphome \ No newline at end of file From f7ca26eef887ad7e4d1fab7efbaf4e40f9c5f5f7 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 08:59:15 -0500 Subject: [PATCH 04/14] stress --- tests/integration/fixtures/defer_stress.yaml | 69 ++++++-------------- tests/integration/test_defer_stress.py | 15 ++++- 2 files changed, 33 insertions(+), 51 deletions(-) diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml index 867d40ab53..9400c33f11 100644 --- a/tests/integration/fixtures/defer_stress.yaml +++ b/tests/integration/fixtures/defer_stress.yaml @@ -1,65 +1,36 @@ esphome: name: defer-stress-test +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [defer_stress_component] + host: logger: level: DEBUG +defer_stress_component: + id: defer_stress + api: services: - service: run_stress_test then: - lambda: |- - static int total_defers = 0; - static int executed_defers = 0; - - ESP_LOGI("stress", "Starting defer stress test - rapid sequential defers"); - - // Reset counters - total_defers = 0; - executed_defers = 0; - - // Create a temporary component to access defer() - class TestComponent : public Component { - public: - void run_test() { - // Rapidly defer many callbacks to stress the defer mechanism - for (int batch = 0; batch < 10; batch++) { - for (int i = 0; i < 100; i++) { - int expected_id = total_defers; - this->defer([expected_id]() { - executed_defers++; - ESP_LOGV("stress", "Defer %d executed", expected_id); - }); - total_defers++; - } - // Brief yield to let other work happen - delay(1); - } - } - }; - - TestComponent test_comp; - test_comp.run_test(); - - ESP_LOGI("stress", "Scheduled %d defers", total_defers); - - // Give the main loop time to process all defers - App.scheduler.set_timeout((Component*)nullptr, nullptr, 500, []() { - ESP_LOGI("stress", "Test complete. Defers scheduled: %d, executed: %d", total_defers, executed_defers); - - // We should have executed all defers without crashing - if (executed_defers == total_defers && total_defers == 1000) { - ESP_LOGI("stress", "✓ Stress test PASSED - All %d defers executed", total_defers); - id(test_result)->trigger("passed"); - } else { - ESP_LOGE("stress", "✗ Stress test FAILED - Expected 1000 executed, got %d", executed_defers); - id(test_result)->trigger("failed"); - } - - id(test_complete)->trigger("test_finished"); - }); + id(defer_stress)->run_multi_thread_test(); + - wait_until: + lambda: |- + return id(defer_stress)->is_test_complete(); + - lambda: |- + if (id(defer_stress)->is_test_passed()) { + id(test_result)->trigger("passed"); + } else { + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); event: - platform: template diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py index e9d6c48664..ed0ae74a08 100644 --- a/tests/integration/test_defer_stress.py +++ b/tests/integration/test_defer_stress.py @@ -1,6 +1,7 @@ """Stress test for defer() thread safety with multiple threads.""" import asyncio +from pathlib import Path from aioesphomeapi import EntityState, Event, EventInfo, UserService import pytest @@ -16,6 +17,16 @@ async def test_defer_stress( ) -> None: """Test that defer() doesn't crash when called rapidly from multiple threads.""" + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + async with run_compiled(yaml_config), api_client_connected() as client: # Verify we can connect device_info = await client.device_info() @@ -79,10 +90,10 @@ async def test_defer_stress( # Wait for test completion with a longer timeout (threads run for 100ms + processing time) try: - await asyncio.wait_for(test_complete_future, timeout=10.0) + await asyncio.wait_for(test_complete_future, timeout=15.0) test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) except asyncio.TimeoutError: - pytest.fail("Stress test did not complete within 10 seconds") + pytest.fail("Stress test did not complete within 15 seconds") # Verify the test passed assert test_passed is True, ( From 71f78e3a8176c60e5fd4955c9a378d4600701317 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:00:25 -0500 Subject: [PATCH 05/14] fixes --- esphome/core/scheduler.cpp | 15 +++-- tests/integration/conftest.py | 22 +++++++ tests/integration/fixtures/defer_stress.yaml | 12 +--- tests/integration/test_defer_stress.py | 61 ++++++-------------- 4 files changed, 48 insertions(+), 62 deletions(-) diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index e0d2b70102..285354b262 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -225,14 +225,13 @@ void HOT Scheduler::call() { // - Items execute in exact order they were deferred (FIFO guarantee) // - No deferred items exist in to_add_, so processing order doesn't affect correctness while (!this->defer_queue_.empty()) { - std::unique_ptr item; - { - LockGuard guard{this->lock_}; - if (this->defer_queue_.empty()) // Double-check with lock held - break; - item = std::move(this->defer_queue_.front()); - this->defer_queue_.pop_front(); - } + this->lock_.lock(); + if (this->defer_queue_.empty()) // Double-check with lock held + this->lock_.unlock(); + break; + auto item = std::move(this->defer_queue_.front()); + this->defer_queue_.pop_front(); + this->lock_.unlock(); // Skip if item was marked for removal or component failed if (!this->should_skip_item_(item.get())) { this->execute_item_(item.get()); diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8f5f77ca52..56f2eb0a54 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -474,6 +474,14 @@ async def run_binary_and_wait_for_port( if process.returncode is not None: error_msg += f"\nProcess exited with code: {process.returncode}" + # Check for common signals + if process.returncode < 0: + sig = -process.returncode + try: + sig_name = signal.Signals(sig).name + error_msg += f" (killed by signal {sig_name})" + except ValueError: + error_msg += f" (killed by signal {sig})" # Include any output collected so far if stdout_lines: @@ -501,6 +509,20 @@ async def run_binary_and_wait_for_port( if controller_transport is not None: controller_transport.close() + # Log the exit code if process already exited + if process.returncode is not None: + print(f"\nProcess exited with code: {process.returncode}", file=sys.stderr) + if process.returncode < 0: + sig = -process.returncode + try: + sig_name = signal.Signals(sig).name + print( + f"Process was killed by signal {sig_name} ({sig})", + file=sys.stderr, + ) + except ValueError: + print(f"Process was killed by signal {sig}", file=sys.stderr) + # Cleanup: terminate the process gracefully if process.returncode is None: # Send SIGINT (Ctrl+C) for graceful shutdown diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml index 9400c33f11..6df475229b 100644 --- a/tests/integration/fixtures/defer_stress.yaml +++ b/tests/integration/fixtures/defer_stress.yaml @@ -10,7 +10,7 @@ external_components: host: logger: - level: DEBUG + level: VERBOSE defer_stress_component: id: defer_stress @@ -21,16 +21,6 @@ api: then: - lambda: |- id(defer_stress)->run_multi_thread_test(); - - wait_until: - lambda: |- - return id(defer_stress)->is_test_complete(); - - lambda: |- - if (id(defer_stress)->is_test_passed()) { - id(test_result)->trigger("passed"); - } else { - id(test_result)->trigger("failed"); - } - id(test_complete)->trigger("test_finished"); event: - platform: template diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py index ed0ae74a08..6dd9f15623 100644 --- a/tests/integration/test_defer_stress.py +++ b/tests/integration/test_defer_stress.py @@ -3,7 +3,7 @@ import asyncio from pathlib import Path -from aioesphomeapi import EntityState, Event, EventInfo, UserService +from aioesphomeapi import UserService import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -27,7 +27,21 @@ async def test_defer_stress( "EXTERNAL_COMPONENT_PATH", external_components_path ) - async with run_compiled(yaml_config), api_client_connected() as client: + # Create a future to signal test completion + loop = asyncio.get_event_loop() + test_complete_future: asyncio.Future[bool] = loop.create_future() + + def on_log_line(line: str) -> None: + if not test_complete_future.done(): + if "✓ Stress test PASSED" in line: + test_complete_future.set_result(True) + elif "✗ Stress test FAILED" in line: + test_complete_future.set_result(False) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): # Verify we can connect device_info = await client.device_info() assert device_info is not None @@ -38,20 +52,6 @@ async def test_defer_stress( client.list_entities_services(), timeout=5.0 ) - # Find our test entities - test_complete_entity: EventInfo | None = None - test_result_entity: EventInfo | None = None - - for entity in entity_info: - if isinstance(entity, EventInfo): - if entity.object_id == "test_complete": - test_complete_entity = entity - elif entity.object_id == "test_result": - test_result_entity = entity - - assert test_complete_entity is not None, "test_complete event not found" - assert test_result_entity is not None, "test_result event not found" - # Find our test service run_stress_test_service: UserService | None = None for service in services: @@ -61,37 +61,12 @@ async def test_defer_stress( assert run_stress_test_service is not None, "run_stress_test service not found" - # Get the event loop - loop = asyncio.get_running_loop() - - # Subscribe to states (events are delivered as EventStates through subscribe_states) - test_complete_future: asyncio.Future[bool] = loop.create_future() - test_result_future: asyncio.Future[bool] = loop.create_future() - - def on_state(state: EntityState) -> None: - if isinstance(state, Event): - if state.key == test_complete_entity.key: - if ( - state.event_type == "test_finished" - and not test_complete_future.done() - ): - test_complete_future.set_result(True) - elif state.key == test_result_entity.key: - if not test_result_future.done(): - if state.event_type == "passed": - test_result_future.set_result(True) - elif state.event_type == "failed": - test_result_future.set_result(False) - - client.subscribe_states(on_state) - # Call the run_stress_test service to start the test client.execute_service(run_stress_test_service, {}) - # Wait for test completion with a longer timeout (threads run for 100ms + processing time) + # Wait for test completion try: - await asyncio.wait_for(test_complete_future, timeout=15.0) - test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + test_passed = await asyncio.wait_for(test_complete_future, timeout=15.0) except asyncio.TimeoutError: pytest.fail("Stress test did not complete within 15 seconds") From 46495995929f7a2d06a260d68685ee9e6a35d48d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:01:00 -0500 Subject: [PATCH 06/14] fixes --- esphome/core/scheduler.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 285354b262..2086f5e3dd 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -226,9 +226,10 @@ void HOT Scheduler::call() { // - No deferred items exist in to_add_, so processing order doesn't affect correctness while (!this->defer_queue_.empty()) { this->lock_.lock(); - if (this->defer_queue_.empty()) // Double-check with lock held + if (this->defer_queue_.empty()) { // Double-check with lock held this->lock_.unlock(); - break; + break; + } auto item = std::move(this->defer_queue_.front()); this->defer_queue_.pop_front(); this->lock_.unlock(); From 37578f3e22257e2cf6ba65bd6c1def88ba738ca3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:11:19 -0500 Subject: [PATCH 07/14] fixes --- esphome/core/scheduler.cpp | 3 +- tests/integration/test_defer_stress.py | 48 +++++++++++++++++++------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 2086f5e3dd..1ebcc6339e 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -226,13 +226,14 @@ void HOT Scheduler::call() { // - No deferred items exist in to_add_, so processing order doesn't affect correctness while (!this->defer_queue_.empty()) { this->lock_.lock(); - if (this->defer_queue_.empty()) { // Double-check with lock held + if (this->defer_queue_.empty()) { this->lock_.unlock(); break; } auto item = std::move(this->defer_queue_.front()); this->defer_queue_.pop_front(); this->lock_.unlock(); + // Skip if item was marked for removal or component failed if (!this->should_skip_item_(item.get())) { this->execute_item_(item.get()); diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py index 6dd9f15623..5e061e4651 100644 --- a/tests/integration/test_defer_stress.py +++ b/tests/integration/test_defer_stress.py @@ -2,6 +2,7 @@ import asyncio from pathlib import Path +import re from aioesphomeapi import UserService import pytest @@ -29,14 +30,25 @@ async def test_defer_stress( # Create a future to signal test completion loop = asyncio.get_event_loop() - test_complete_future: asyncio.Future[bool] = loop.create_future() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track executed defers + executed_defers = set() def on_log_line(line: str) -> None: - if not test_complete_future.done(): - if "✓ Stress test PASSED" in line: - test_complete_future.set_result(True) - elif "✗ Stress test FAILED" in line: - test_complete_future.set_result(False) + # Track all executed defers + match = re.search(r"Executed defer (\d+)", line) + if match: + defer_id = int(match.group(1)) + executed_defers.add(defer_id) + + # Check if we've executed all 1000 defers (0-999) + if ( + defer_id == 999 + and len(executed_defers) == 1000 + and not test_complete_future.done() + ): + test_complete_future.set_result(None) async with ( run_compiled(yaml_config, line_callback=on_log_line), @@ -64,13 +76,25 @@ async def test_defer_stress( # Call the run_stress_test service to start the test client.execute_service(run_stress_test_service, {}) - # Wait for test completion + # Wait for all defers to execute (should be quick) try: - test_passed = await asyncio.wait_for(test_complete_future, timeout=15.0) + await asyncio.wait_for(test_complete_future, timeout=5.0) except asyncio.TimeoutError: - pytest.fail("Stress test did not complete within 15 seconds") + # Report how many we got + pytest.fail( + f"Stress test timed out. Only {len(executed_defers)} of 1000 defers executed. " + f"Missing IDs: {sorted(set(range(1000)) - executed_defers)[:10]}..." + ) - # Verify the test passed - assert test_passed is True, ( - "Stress test failed - defer() crashed or failed under thread pressure" + # Verify all defers executed + assert len(executed_defers) == 1000, ( + f"Expected 1000 defers, got {len(executed_defers)}" ) + + # Verify we have all IDs from 0-999 + expected_ids = set(range(1000)) + missing_ids = expected_ids - executed_defers + assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}" + + # If we got here without crashing, the test passed + assert True, "Test completed successfully - all 1000 defers executed in order" From 9c09a271f218f59510d68632b7eea8022a51f65a Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:14:54 -0500 Subject: [PATCH 08/14] tweaks --- esphome/core/scheduler.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 1ebcc6339e..09bb784de8 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -313,8 +313,6 @@ void HOT Scheduler::call() { this->pop_raw_(); continue; } - App.set_current_component(item->component); - #ifdef ESPHOME_DEBUG_SCHEDULER const char *item_name = item->get_name(); ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")", From e4c0f18ee3a78049742c8764e4c141e6a72cee22 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:17:41 -0500 Subject: [PATCH 09/14] fixes --- .../defer_stress_component/__init__.py | 19 ++++ .../defer_stress_component.cpp | 86 +++++++++++++++++++ .../defer_stress_component.h | 28 ++++++ 3 files changed, 133 insertions(+) create mode 100644 tests/integration/fixtures/external_components/defer_stress_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp create mode 100644 tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h diff --git a/tests/integration/fixtures/external_components/defer_stress_component/__init__.py b/tests/integration/fixtures/external_components/defer_stress_component/__init__.py new file mode 100644 index 0000000000..177e595f51 --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/__init__.py @@ -0,0 +1,19 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +defer_stress_component_ns = cg.esphome_ns.namespace("defer_stress_component") +DeferStressComponent = defer_stress_component_ns.class_( + "DeferStressComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(DeferStressComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp new file mode 100644 index 0000000000..e5f3471dfb --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp @@ -0,0 +1,86 @@ +#include "defer_stress_component.h" +#include "esphome/core/log.h" +#include +#include +#include +#include + +namespace esphome { +namespace defer_stress_component { + +static const char *const TAG = "defer_stress"; + +void DeferStressComponent::setup() { ESP_LOGCONFIG(TAG, "DeferStressComponent setup"); } + +void DeferStressComponent::run_multi_thread_test() { + // Use member variables instead of static to avoid issues + this->total_defers_ = 0; + this->executed_defers_ = 0; + static constexpr int NUM_THREADS = 10; + static constexpr int DEFERS_PER_THREAD = 100; + + ESP_LOGI(TAG, "Starting defer stress test - multi-threaded concurrent defers"); + + // Ensure we're starting clean + ESP_LOGI(TAG, "Initial counters: total=%d, executed=%d", this->total_defers_.load(), this->executed_defers_.load()); + + // Track start time + auto start_time = std::chrono::steady_clock::now(); + + // Create threads + std::vector threads; + + ESP_LOGI(TAG, "Creating %d threads, each will defer %d callbacks", NUM_THREADS, DEFERS_PER_THREAD); + + for (int i = 0; i < NUM_THREADS; i++) { + threads.emplace_back([this, i]() { + ESP_LOGV(TAG, "Thread %d starting", i); + // Each thread directly calls defer() without any locking + for (int j = 0; j < DEFERS_PER_THREAD; j++) { + int defer_id = this->total_defers_.fetch_add(1); + ESP_LOGV(TAG, "Thread %d calling defer for request %d", i, defer_id); + + // Capture this pointer safely for the lambda + auto *component = this; + + // Directly call defer() from this thread - no locking! + this->defer([component, defer_id]() { + component->executed_defers_.fetch_add(1); + ESP_LOGV(TAG, "Executed defer %d", defer_id); + }); + + ESP_LOGV(TAG, "Thread %d called defer for request %d successfully", i, defer_id); + + // Small random delay to increase contention + if (j % 10 == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + ESP_LOGV(TAG, "Thread %d finished", i); + }); + } + + // Wait for all threads to complete + for (auto &t : threads) { + t.join(); + } + + auto end_time = std::chrono::steady_clock::now(); + auto thread_time = std::chrono::duration_cast(end_time - start_time).count(); + ESP_LOGI(TAG, "All threads finished in %lldms. Created %d defer requests", thread_time, this->total_defers_.load()); + + // Store the final values for checking + this->expected_total_ = NUM_THREADS * DEFERS_PER_THREAD; + this->test_complete_ = true; +} + +int DeferStressComponent::get_total_defers() { return this->total_defers_.load(); } + +int DeferStressComponent::get_executed_defers() { return this->executed_defers_.load(); } + +bool DeferStressComponent::is_test_complete() { return this->test_complete_; } + +int DeferStressComponent::get_expected_total() { return this->expected_total_; } + +} // namespace defer_stress_component +} // namespace esphome \ No newline at end of file diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h new file mode 100644 index 0000000000..5ddcc4086a --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h @@ -0,0 +1,28 @@ +#pragma once + +#include "esphome/core/component.h" +#include + +namespace esphome { +namespace defer_stress_component { + +class DeferStressComponent : public Component { + public: + void setup() override; + void run_multi_thread_test(); + + // Getters for test status + int get_total_defers(); + int get_executed_defers(); + bool is_test_complete(); + int get_expected_total(); + + private: + std::atomic total_defers_{0}; + std::atomic executed_defers_{0}; + bool test_complete_{false}; + int expected_total_{0}; +}; + +} // namespace defer_stress_component +} // namespace esphome \ No newline at end of file From aaff086aeb496e1132cd71f9ecfe38abbb2aff0d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:24:04 -0500 Subject: [PATCH 10/14] there was no locking on host! --- esphome/core/helpers.cpp | 10 +++++++++- esphome/core/helpers.h | 4 ++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/esphome/core/helpers.cpp b/esphome/core/helpers.cpp index b4923c7af0..daa03fa41d 100644 --- a/esphome/core/helpers.cpp +++ b/esphome/core/helpers.cpp @@ -645,8 +645,9 @@ void hsv_to_rgb(int hue, float saturation, float value, float &red, float &green } // System APIs -#if defined(USE_ESP8266) || defined(USE_RP2040) || defined(USE_HOST) +#if defined(USE_ESP8266) || defined(USE_RP2040) // ESP8266 doesn't have mutexes, but that shouldn't be an issue as it's single-core and non-preemptive OS. +// RP2040 support is currently limited to single-core mode Mutex::Mutex() {} Mutex::~Mutex() {} void Mutex::lock() {} @@ -658,6 +659,13 @@ Mutex::~Mutex() {} void Mutex::lock() { xSemaphoreTake(this->handle_, portMAX_DELAY); } bool Mutex::try_lock() { return xSemaphoreTake(this->handle_, 0) == pdTRUE; } void Mutex::unlock() { xSemaphoreGive(this->handle_); } +#elif defined(USE_HOST) +// Host platform uses std::mutex for proper thread synchronization +Mutex::Mutex() { handle_ = new std::mutex(); } +Mutex::~Mutex() { delete static_cast(handle_); } +void Mutex::lock() { static_cast(handle_)->lock(); } +bool Mutex::try_lock() { return static_cast(handle_)->try_lock(); } +void Mutex::unlock() { static_cast(handle_)->unlock(); } #endif #if defined(USE_ESP8266) diff --git a/esphome/core/helpers.h b/esphome/core/helpers.h index 362f3d1fa4..d92cf07702 100644 --- a/esphome/core/helpers.h +++ b/esphome/core/helpers.h @@ -32,6 +32,10 @@ #include #endif +#ifdef USE_HOST +#include +#endif + #define HOT __attribute__((hot)) #define ESPDEPRECATED(msg, when) __attribute__((deprecated(msg))) #define ESPHOME_ALWAYS_INLINE __attribute__((always_inline)) From bc2adb6b5aca060df0dd42b4830b60a427cc5594 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:25:31 -0500 Subject: [PATCH 11/14] there was no locking on host! --- .../defer_stress_component/defer_stress_component.cpp | 2 +- .../defer_stress_component/defer_stress_component.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp index e5f3471dfb..c49c7db21a 100644 --- a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp @@ -83,4 +83,4 @@ bool DeferStressComponent::is_test_complete() { return this->test_complete_; } int DeferStressComponent::get_expected_total() { return this->expected_total_; } } // namespace defer_stress_component -} // namespace esphome \ No newline at end of file +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h index 5ddcc4086a..4d60c3b484 100644 --- a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h @@ -25,4 +25,4 @@ class DeferStressComponent : public Component { }; } // namespace defer_stress_component -} // namespace esphome \ No newline at end of file +} // namespace esphome From 729b2b287343da06c9355459d7bf0e2159e74034 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:35:29 -0500 Subject: [PATCH 12/14] remove debug --- esphome/core/helpers.cpp | 1 - tests/integration/conftest.py | 22 ---------------------- 2 files changed, 23 deletions(-) diff --git a/esphome/core/helpers.cpp b/esphome/core/helpers.cpp index daa03fa41d..7d9b86fccd 100644 --- a/esphome/core/helpers.cpp +++ b/esphome/core/helpers.cpp @@ -647,7 +647,6 @@ void hsv_to_rgb(int hue, float saturation, float value, float &red, float &green // System APIs #if defined(USE_ESP8266) || defined(USE_RP2040) // ESP8266 doesn't have mutexes, but that shouldn't be an issue as it's single-core and non-preemptive OS. -// RP2040 support is currently limited to single-core mode Mutex::Mutex() {} Mutex::~Mutex() {} void Mutex::lock() {} diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 56f2eb0a54..8f5f77ca52 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -474,14 +474,6 @@ async def run_binary_and_wait_for_port( if process.returncode is not None: error_msg += f"\nProcess exited with code: {process.returncode}" - # Check for common signals - if process.returncode < 0: - sig = -process.returncode - try: - sig_name = signal.Signals(sig).name - error_msg += f" (killed by signal {sig_name})" - except ValueError: - error_msg += f" (killed by signal {sig})" # Include any output collected so far if stdout_lines: @@ -509,20 +501,6 @@ async def run_binary_and_wait_for_port( if controller_transport is not None: controller_transport.close() - # Log the exit code if process already exited - if process.returncode is not None: - print(f"\nProcess exited with code: {process.returncode}", file=sys.stderr) - if process.returncode < 0: - sig = -process.returncode - try: - sig_name = signal.Signals(sig).name - print( - f"Process was killed by signal {sig_name} ({sig})", - file=sys.stderr, - ) - except ValueError: - print(f"Process was killed by signal {sig}", file=sys.stderr) - # Cleanup: terminate the process gracefully if process.returncode is None: # Send SIGINT (Ctrl+C) for graceful shutdown From 3df434fd55a197c9cd9f37378a3351f32fcff5be Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:41:59 -0500 Subject: [PATCH 13/14] improve test --- .../defer_stress_component.cpp | 16 +----- .../defer_stress_component.h | 8 --- tests/integration/test_defer_stress.py | 51 +++++++++++++++---- 3 files changed, 44 insertions(+), 31 deletions(-) diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp index c49c7db21a..3a97476067 100644 --- a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp @@ -44,9 +44,9 @@ void DeferStressComponent::run_multi_thread_test() { auto *component = this; // Directly call defer() from this thread - no locking! - this->defer([component, defer_id]() { + this->defer([component, i, j, defer_id]() { component->executed_defers_.fetch_add(1); - ESP_LOGV(TAG, "Executed defer %d", defer_id); + ESP_LOGV(TAG, "Executed defer %d (thread %d, index %d)", defer_id, i, j); }); ESP_LOGV(TAG, "Thread %d called defer for request %d successfully", i, defer_id); @@ -68,19 +68,7 @@ void DeferStressComponent::run_multi_thread_test() { auto end_time = std::chrono::steady_clock::now(); auto thread_time = std::chrono::duration_cast(end_time - start_time).count(); ESP_LOGI(TAG, "All threads finished in %lldms. Created %d defer requests", thread_time, this->total_defers_.load()); - - // Store the final values for checking - this->expected_total_ = NUM_THREADS * DEFERS_PER_THREAD; - this->test_complete_ = true; } -int DeferStressComponent::get_total_defers() { return this->total_defers_.load(); } - -int DeferStressComponent::get_executed_defers() { return this->executed_defers_.load(); } - -bool DeferStressComponent::is_test_complete() { return this->test_complete_; } - -int DeferStressComponent::get_expected_total() { return this->expected_total_; } - } // namespace defer_stress_component } // namespace esphome diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h index 4d60c3b484..59b7565726 100644 --- a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h @@ -11,17 +11,9 @@ class DeferStressComponent : public Component { void setup() override; void run_multi_thread_test(); - // Getters for test status - int get_total_defers(); - int get_executed_defers(); - bool is_test_complete(); - int get_expected_total(); - private: std::atomic total_defers_{0}; std::atomic executed_defers_{0}; - bool test_complete_{false}; - int expected_total_{0}; }; } // namespace defer_stress_component diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py index 5e061e4651..c11a3aec4a 100644 --- a/tests/integration/test_defer_stress.py +++ b/tests/integration/test_defer_stress.py @@ -32,22 +32,38 @@ async def test_defer_stress( loop = asyncio.get_event_loop() test_complete_future: asyncio.Future[None] = loop.create_future() - # Track executed defers + # Track executed defers and their order executed_defers = set() + thread_executions = {} # thread_id -> list of indices in execution order + fifo_violations = [] def on_log_line(line: str) -> None: - # Track all executed defers - match = re.search(r"Executed defer (\d+)", line) + # Track all executed defers with thread and index info + match = re.search(r"Executed defer (\d+) \(thread (\d+), index (\d+)\)", line) if match: defer_id = int(match.group(1)) + thread_id = int(match.group(2)) + index = int(match.group(3)) + executed_defers.add(defer_id) - # Check if we've executed all 1000 defers (0-999) + # Track execution order per thread + if thread_id not in thread_executions: + thread_executions[thread_id] = [] + + # Check FIFO ordering within thread if ( - defer_id == 999 - and len(executed_defers) == 1000 - and not test_complete_future.done() + thread_executions[thread_id] + and thread_executions[thread_id][-1] >= index ): + fifo_violations.append( + f"Thread {thread_id}: index {index} executed after {thread_executions[thread_id][-1]}" + ) + + thread_executions[thread_id].append(index) + + # Check if we've executed all 1000 defers (0-999) + if len(executed_defers) == 1000 and not test_complete_future.done(): test_complete_future.set_result(None) async with ( @@ -96,5 +112,22 @@ async def test_defer_stress( missing_ids = expected_ids - executed_defers assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}" - # If we got here without crashing, the test passed - assert True, "Test completed successfully - all 1000 defers executed in order" + # Verify FIFO ordering was maintained within each thread + assert not fifo_violations, "FIFO ordering violations detected:\n" + "\n".join( + fifo_violations[:10] + ) + + # Verify each thread executed all its defers in order + for thread_id, indices in thread_executions.items(): + assert len(indices) == 100, ( + f"Thread {thread_id} executed {len(indices)} defers, expected 100" + ) + # Indices should be 0-99 in ascending order + assert indices == list(range(100)), ( + f"Thread {thread_id} executed indices out of order: {indices[:10]}..." + ) + + # If we got here without crashing and with proper ordering, the test passed + assert True, ( + "Test completed successfully - all 1000 defers executed with FIFO ordering preserved" + ) From 71e06ea1b6b9b7f228c6999c1c19b7314399f083 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:45:47 -0500 Subject: [PATCH 14/14] cleanup --- esphome/core/scheduler.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 09bb784de8..4c79f51b04 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -225,6 +225,14 @@ void HOT Scheduler::call() { // - Items execute in exact order they were deferred (FIFO guarantee) // - No deferred items exist in to_add_, so processing order doesn't affect correctness while (!this->defer_queue_.empty()) { + // IMPORTANT: The double-check pattern is REQUIRED for thread safety: + // 1. First check: !defer_queue_.empty() without lock (may become stale) + // 2. Acquire lock + // 3. Second check: defer_queue_.empty() with lock (authoritative) + // Between steps 1 and 2, another thread could have emptied the queue, + // so we must check again after acquiring the lock to avoid accessing an empty queue. + // Note: We use manual lock/unlock instead of RAII LockGuard to avoid creating + // unnecessary stack variables when the queue is empty after acquiring the lock. this->lock_.lock(); if (this->defer_queue_.empty()) { this->lock_.unlock(); @@ -234,7 +242,8 @@ void HOT Scheduler::call() { this->defer_queue_.pop_front(); this->lock_.unlock(); - // Skip if item was marked for removal or component failed + // Execute callback without holding lock to prevent deadlocks + // if the callback tries to call defer() again if (!this->should_skip_item_(item.get())) { this->execute_item_(item.get()); }