diff --git a/esphome/core/helpers.cpp b/esphome/core/helpers.cpp index b4923c7af0..7d9b86fccd 100644 --- a/esphome/core/helpers.cpp +++ b/esphome/core/helpers.cpp @@ -645,7 +645,7 @@ void hsv_to_rgb(int hue, float saturation, float value, float &red, float &green } // System APIs -#if defined(USE_ESP8266) || defined(USE_RP2040) || defined(USE_HOST) +#if defined(USE_ESP8266) || defined(USE_RP2040) // ESP8266 doesn't have mutexes, but that shouldn't be an issue as it's single-core and non-preemptive OS. Mutex::Mutex() {} Mutex::~Mutex() {} @@ -658,6 +658,13 @@ Mutex::~Mutex() {} void Mutex::lock() { xSemaphoreTake(this->handle_, portMAX_DELAY); } bool Mutex::try_lock() { return xSemaphoreTake(this->handle_, 0) == pdTRUE; } void Mutex::unlock() { xSemaphoreGive(this->handle_); } +#elif defined(USE_HOST) +// Host platform uses std::mutex for proper thread synchronization +Mutex::Mutex() { handle_ = new std::mutex(); } +Mutex::~Mutex() { delete static_cast(handle_); } +void Mutex::lock() { static_cast(handle_)->lock(); } +bool Mutex::try_lock() { return static_cast(handle_)->try_lock(); } +void Mutex::unlock() { static_cast(handle_)->unlock(); } #endif #if defined(USE_ESP8266) diff --git a/esphome/core/helpers.h b/esphome/core/helpers.h index 362f3d1fa4..d92cf07702 100644 --- a/esphome/core/helpers.h +++ b/esphome/core/helpers.h @@ -32,6 +32,10 @@ #include #endif +#ifdef USE_HOST +#include +#endif + #define HOT __attribute__((hot)) #define ESPDEPRECATED(msg, when) __attribute__((deprecated(msg))) #define ESPHOME_ALWAYS_INLINE __attribute__((always_inline)) diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index e0d2b70102..4c79f51b04 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -225,15 +225,25 @@ void HOT Scheduler::call() { // - Items execute in exact order they were deferred (FIFO guarantee) // - No deferred items exist in to_add_, so processing order doesn't affect correctness while (!this->defer_queue_.empty()) { - std::unique_ptr item; - { - LockGuard guard{this->lock_}; - if (this->defer_queue_.empty()) // Double-check with lock held - break; - item = std::move(this->defer_queue_.front()); - this->defer_queue_.pop_front(); + // IMPORTANT: The double-check pattern is REQUIRED for thread safety: + // 1. First check: !defer_queue_.empty() without lock (may become stale) + // 2. Acquire lock + // 3. Second check: defer_queue_.empty() with lock (authoritative) + // Between steps 1 and 2, another thread could have emptied the queue, + // so we must check again after acquiring the lock to avoid accessing an empty queue. + // Note: We use manual lock/unlock instead of RAII LockGuard to avoid creating + // unnecessary stack variables when the queue is empty after acquiring the lock. + this->lock_.lock(); + if (this->defer_queue_.empty()) { + this->lock_.unlock(); + break; } - // Skip if item was marked for removal or component failed + auto item = std::move(this->defer_queue_.front()); + this->defer_queue_.pop_front(); + this->lock_.unlock(); + + // Execute callback without holding lock to prevent deadlocks + // if the callback tries to call defer() again if (!this->should_skip_item_(item.get())) { this->execute_item_(item.get()); } @@ -312,8 +322,6 @@ void HOT Scheduler::call() { this->pop_raw_(); continue; } - App.set_current_component(item->component); - #ifdef ESPHOME_DEBUG_SCHEDULER const char *item_name = item->get_name(); ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")", diff --git a/tests/integration/fixtures/defer_fifo_simple.yaml b/tests/integration/fixtures/defer_fifo_simple.yaml index aede9a3cd0..a221256f6c 100644 --- a/tests/integration/fixtures/defer_fifo_simple.yaml +++ b/tests/integration/fixtures/defer_fifo_simple.yaml @@ -8,14 +8,18 @@ logger: api: services: - - service: run_defer_test + - service: test_set_timeout then: - lambda: |- - // Test 1: Test set_timeout with 0 delay (direct scheduler call) + // Test set_timeout with 0 delay (direct scheduler call) static int set_timeout_order = 0; static bool set_timeout_passed = true; - ESP_LOGD("defer_test", "Test 1: Testing set_timeout(0) for FIFO order..."); + // Reset for this test + set_timeout_order = 0; + set_timeout_passed = true; + + ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order..."); for (int i = 0; i < 10; i++) { int expected = i; App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { @@ -28,68 +32,66 @@ api: if (set_timeout_order == 10) { if (set_timeout_passed) { - ESP_LOGI("defer_test", "✓ Test 1 PASSED - set_timeout(0) maintains FIFO order"); + ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order"); + id(test_result)->trigger("passed"); } else { - ESP_LOGE("defer_test", "✗ Test 1 FAILED - set_timeout(0) executed out of order"); + ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order"); + id(test_result)->trigger("failed"); } - - // Start Test 2 after Test 1 completes - App.scheduler.set_timeout((Component*)nullptr, nullptr, 100, []() { - // Test 2: Test defer() method (component method) - static int defer_order = 0; - static bool defer_passed = true; - - ESP_LOGD("defer_test", "Test 2: Testing defer() for FIFO order..."); - - // Create a test component class that exposes defer() - class TestComponent : public Component { - public: - void test_defer() { - for (int i = 0; i < 10; i++) { - int expected = i; - this->defer([expected]() { - ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); - if (defer_order != expected) { - ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); - defer_passed = false; - } - defer_order++; - - if (defer_order == 10) { - bool all_passed = set_timeout_passed && defer_passed; - if (defer_passed) { - ESP_LOGI("defer_test", "✓ Test 2 PASSED - defer() maintains FIFO order"); - if (all_passed) { - ESP_LOGI("defer_test", "✓ ALL TESTS PASSED - Both set_timeout(0) and defer() maintain FIFO order"); - } - } else { - ESP_LOGE("defer_test", "✗ Test 2 FAILED - defer() executed out of order"); - } - - // Fire test result events - if (all_passed) { - id(test_result)->trigger("passed"); - } else { - id(test_result)->trigger("failed"); - } - id(test_complete)->trigger("test_finished"); - } - }); - } - } - }; - - TestComponent test_component; - test_component.test_defer(); - - ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); - }); + id(test_complete)->trigger("test_finished"); } }); } ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); + - service: test_defer + then: + - lambda: |- + // Test defer() method (component method) + static int defer_order = 0; + static bool defer_passed = true; + + // Reset for this test + defer_order = 0; + defer_passed = true; + + ESP_LOGD("defer_test", "Testing defer() for FIFO order..."); + + // Create a test component class that exposes defer() + class TestComponent : public Component { + public: + void test_defer() { + for (int i = 0; i < 10; i++) { + int expected = i; + this->defer([expected]() { + ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); + if (defer_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); + defer_passed = false; + } + defer_order++; + + if (defer_order == 10) { + if (defer_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + } + }; + + TestComponent test_component; + test_component.test_defer(); + + ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); + event: - platform: template name: "Test Complete" diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml new file mode 100644 index 0000000000..6df475229b --- /dev/null +++ b/tests/integration/fixtures/defer_stress.yaml @@ -0,0 +1,38 @@ +esphome: + name: defer-stress-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [defer_stress_component] + +host: + +logger: + level: VERBOSE + +defer_stress_component: + id: defer_stress + +api: + services: + - service: run_stress_test + then: + - lambda: |- + id(defer_stress)->run_multi_thread_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/external_components/defer_stress_component/__init__.py b/tests/integration/fixtures/external_components/defer_stress_component/__init__.py new file mode 100644 index 0000000000..177e595f51 --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/__init__.py @@ -0,0 +1,19 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +defer_stress_component_ns = cg.esphome_ns.namespace("defer_stress_component") +DeferStressComponent = defer_stress_component_ns.class_( + "DeferStressComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(DeferStressComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp new file mode 100644 index 0000000000..3a97476067 --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp @@ -0,0 +1,74 @@ +#include "defer_stress_component.h" +#include "esphome/core/log.h" +#include +#include +#include +#include + +namespace esphome { +namespace defer_stress_component { + +static const char *const TAG = "defer_stress"; + +void DeferStressComponent::setup() { ESP_LOGCONFIG(TAG, "DeferStressComponent setup"); } + +void DeferStressComponent::run_multi_thread_test() { + // Use member variables instead of static to avoid issues + this->total_defers_ = 0; + this->executed_defers_ = 0; + static constexpr int NUM_THREADS = 10; + static constexpr int DEFERS_PER_THREAD = 100; + + ESP_LOGI(TAG, "Starting defer stress test - multi-threaded concurrent defers"); + + // Ensure we're starting clean + ESP_LOGI(TAG, "Initial counters: total=%d, executed=%d", this->total_defers_.load(), this->executed_defers_.load()); + + // Track start time + auto start_time = std::chrono::steady_clock::now(); + + // Create threads + std::vector threads; + + ESP_LOGI(TAG, "Creating %d threads, each will defer %d callbacks", NUM_THREADS, DEFERS_PER_THREAD); + + for (int i = 0; i < NUM_THREADS; i++) { + threads.emplace_back([this, i]() { + ESP_LOGV(TAG, "Thread %d starting", i); + // Each thread directly calls defer() without any locking + for (int j = 0; j < DEFERS_PER_THREAD; j++) { + int defer_id = this->total_defers_.fetch_add(1); + ESP_LOGV(TAG, "Thread %d calling defer for request %d", i, defer_id); + + // Capture this pointer safely for the lambda + auto *component = this; + + // Directly call defer() from this thread - no locking! + this->defer([component, i, j, defer_id]() { + component->executed_defers_.fetch_add(1); + ESP_LOGV(TAG, "Executed defer %d (thread %d, index %d)", defer_id, i, j); + }); + + ESP_LOGV(TAG, "Thread %d called defer for request %d successfully", i, defer_id); + + // Small random delay to increase contention + if (j % 10 == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + ESP_LOGV(TAG, "Thread %d finished", i); + }); + } + + // Wait for all threads to complete + for (auto &t : threads) { + t.join(); + } + + auto end_time = std::chrono::steady_clock::now(); + auto thread_time = std::chrono::duration_cast(end_time - start_time).count(); + ESP_LOGI(TAG, "All threads finished in %lldms. Created %d defer requests", thread_time, this->total_defers_.load()); +} + +} // namespace defer_stress_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h new file mode 100644 index 0000000000..59b7565726 --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h @@ -0,0 +1,20 @@ +#pragma once + +#include "esphome/core/component.h" +#include + +namespace esphome { +namespace defer_stress_component { + +class DeferStressComponent : public Component { + public: + void setup() override; + void run_multi_thread_test(); + + private: + std::atomic total_defers_{0}; + std::atomic executed_defers_{0}; +}; + +} // namespace defer_stress_component +} // namespace esphome diff --git a/tests/integration/test_defer_fifo_simple.py b/tests/integration/test_defer_fifo_simple.py index 46d68db171..5bfe02329f 100644 --- a/tests/integration/test_defer_fifo_simple.py +++ b/tests/integration/test_defer_fifo_simple.py @@ -41,14 +41,19 @@ async def test_defer_fifo_simple( assert test_complete_entity is not None, "test_complete event not found" assert test_result_entity is not None, "test_result event not found" - # Find our test service - run_defer_test_service: UserService | None = None + # Find our test services + test_set_timeout_service: UserService | None = None + test_defer_service: UserService | None = None for service in services: - if service.name == "run_defer_test": - run_defer_test_service = service - break + if service.name == "test_set_timeout": + test_set_timeout_service = service + elif service.name == "test_defer": + test_defer_service = service - assert run_defer_test_service is not None, "run_defer_test service not found" + assert test_set_timeout_service is not None, ( + "test_set_timeout service not found" + ) + assert test_defer_service is not None, "test_defer service not found" # Get the event loop loop = asyncio.get_running_loop() @@ -74,15 +79,35 @@ async def test_defer_fifo_simple( client.subscribe_states(on_state) - # Call the run_defer_test service to start the test - client.execute_service(run_defer_test_service, {}) + # Test 1: Test set_timeout(0) + client.execute_service(test_set_timeout_service, {}) - # Wait for test completion with timeout + # Wait for first test completion try: - await asyncio.wait_for(test_complete_future, timeout=10.0) - test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + await asyncio.wait_for(test_complete_future, timeout=5.0) + test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0) except asyncio.TimeoutError: - pytest.fail("Test did not complete within 10 seconds") + pytest.fail("Test set_timeout(0) did not complete within 5 seconds") + + assert test1_passed is True, ( + "set_timeout(0) FIFO test failed - items executed out of order" + ) + + # Reset futures for second test + test_complete_future = loop.create_future() + test_result_future = loop.create_future() + + # Test 2: Test defer() + client.execute_service(test_defer_service, {}) + + # Wait for second test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test defer() did not complete within 5 seconds") # Verify the test passed - assert test_passed is True, "FIFO test failed - items executed out of order" + assert test2_passed is True, ( + "defer() FIFO test failed - items executed out of order" + ) diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py new file mode 100644 index 0000000000..c11a3aec4a --- /dev/null +++ b/tests/integration/test_defer_stress.py @@ -0,0 +1,133 @@ +"""Stress test for defer() thread safety with multiple threads.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_defer_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() doesn't crash when called rapidly from multiple threads.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_event_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track executed defers and their order + executed_defers = set() + thread_executions = {} # thread_id -> list of indices in execution order + fifo_violations = [] + + def on_log_line(line: str) -> None: + # Track all executed defers with thread and index info + match = re.search(r"Executed defer (\d+) \(thread (\d+), index (\d+)\)", line) + if match: + defer_id = int(match.group(1)) + thread_id = int(match.group(2)) + index = int(match.group(3)) + + executed_defers.add(defer_id) + + # Track execution order per thread + if thread_id not in thread_executions: + thread_executions[thread_id] = [] + + # Check FIFO ordering within thread + if ( + thread_executions[thread_id] + and thread_executions[thread_id][-1] >= index + ): + fifo_violations.append( + f"Thread {thread_id}: index {index} executed after {thread_executions[thread_id][-1]}" + ) + + thread_executions[thread_id].append(index) + + # Check if we've executed all 1000 defers (0-999) + if len(executed_defers) == 1000 and not test_complete_future.done(): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "defer-stress-test" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, "run_stress_test service not found" + + # Call the run_stress_test service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for all defers to execute (should be quick) + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + except asyncio.TimeoutError: + # Report how many we got + pytest.fail( + f"Stress test timed out. Only {len(executed_defers)} of 1000 defers executed. " + f"Missing IDs: {sorted(set(range(1000)) - executed_defers)[:10]}..." + ) + + # Verify all defers executed + assert len(executed_defers) == 1000, ( + f"Expected 1000 defers, got {len(executed_defers)}" + ) + + # Verify we have all IDs from 0-999 + expected_ids = set(range(1000)) + missing_ids = expected_ids - executed_defers + assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}" + + # Verify FIFO ordering was maintained within each thread + assert not fifo_violations, "FIFO ordering violations detected:\n" + "\n".join( + fifo_violations[:10] + ) + + # Verify each thread executed all its defers in order + for thread_id, indices in thread_executions.items(): + assert len(indices) == 100, ( + f"Thread {thread_id} executed {len(indices)} defers, expected 100" + ) + # Indices should be 0-99 in ascending order + assert indices == list(range(100)), ( + f"Thread {thread_id} executed indices out of order: {indices[:10]}..." + ) + + # If we got here without crashing and with proper ordering, the test passed + assert True, ( + "Test completed successfully - all 1000 defers executed with FIFO ordering preserved" + )