diff --git a/esphome/components/sx127x/sx127x.cpp b/esphome/components/sx127x/sx127x.cpp index e41efe098c..7f62ee2bd3 100644 --- a/esphome/components/sx127x/sx127x.cpp +++ b/esphome/components/sx127x/sx127x.cpp @@ -252,15 +252,17 @@ size_t SX127x::get_max_packet_size() { } } -void SX127x::transmit_packet(const std::vector &packet) { +SX127xError SX127x::transmit_packet(const std::vector &packet) { if (this->payload_length_ > 0 && this->payload_length_ != packet.size()) { ESP_LOGE(TAG, "Packet size does not match config"); - return; + return SX127xError::INVALID_PARAMS; } if (packet.empty() || packet.size() > this->get_max_packet_size()) { ESP_LOGE(TAG, "Packet size out of range"); - return; + return SX127xError::INVALID_PARAMS; } + + SX127xError ret = SX127xError::NONE; if (this->modulation_ == MOD_LORA) { this->set_mode_standby(); if (this->payload_length_ == 0) { @@ -278,11 +280,13 @@ void SX127x::transmit_packet(const std::vector &packet) { this->write_fifo_(packet); this->set_mode_tx(); } + // wait until transmit completes, typically the delay will be less than 100 ms uint32_t start = millis(); while (!this->dio0_pin_->digital_read()) { if (millis() - start > 4000) { ESP_LOGE(TAG, "Transmit packet failure"); + ret = SX127xError::TIMEOUT; break; } } @@ -291,6 +295,7 @@ void SX127x::transmit_packet(const std::vector &packet) { } else { this->set_mode_sleep(); } + return ret; } void SX127x::call_listeners_(const std::vector &packet, float rssi, float snr) { @@ -335,13 +340,7 @@ void SX127x::loop() { } void SX127x::run_image_cal() { - uint32_t start = millis(); - uint8_t mode = this->read_register_(REG_OP_MODE); - if ((mode & MODE_MASK) != MODE_STDBY) { - ESP_LOGE(TAG, "Need to be in standby for image cal"); - return; - } - if (mode & MOD_LORA) { + if (this->modulation_ == MOD_LORA) { this->set_mode_(MOD_FSK, MODE_SLEEP); this->set_mode_(MOD_FSK, MODE_STDBY); } @@ -350,13 +349,15 @@ void SX127x::run_image_cal() { } else { this->write_register_(REG_IMAGE_CAL, IMAGE_CAL_START); } + uint32_t start = millis(); while (this->read_register_(REG_IMAGE_CAL) & IMAGE_CAL_RUNNING) { if (millis() - start > 20) { ESP_LOGE(TAG, "Image cal failure"); + this->mark_failed(); break; } } - if (mode & MOD_LORA) { + if (this->modulation_ == MOD_LORA) { this->set_mode_(this->modulation_, MODE_SLEEP); this->set_mode_(this->modulation_, MODE_STDBY); } @@ -375,6 +376,7 @@ void SX127x::set_mode_(uint8_t modulation, uint8_t mode) { } if (millis() - start > 20) { ESP_LOGE(TAG, "Set mode failure"); + this->mark_failed(); break; } } diff --git a/esphome/components/sx127x/sx127x.h b/esphome/components/sx127x/sx127x.h index fe9f60e860..4cc7c9b6d3 100644 --- a/esphome/components/sx127x/sx127x.h +++ b/esphome/components/sx127x/sx127x.h @@ -34,6 +34,8 @@ enum SX127xBw : uint8_t { SX127X_BW_500_0, }; +enum class SX127xError { NONE = 0, TIMEOUT, INVALID_PARAMS }; + class SX127xListener { public: virtual void on_packet(const std::vector &packet, float rssi, float snr) = 0; @@ -79,7 +81,7 @@ class SX127x : public Component, void set_sync_value(const std::vector &sync_value) { this->sync_value_ = sync_value; } void run_image_cal(); void configure(); - void transmit_packet(const std::vector &packet); + SX127xError transmit_packet(const std::vector &packet); void register_listener(SX127xListener *listener) { this->listeners_.push_back(listener); } Trigger, float, float> *get_packet_trigger() const { return this->packet_trigger_; }; diff --git a/esphome/components/web_server/web_server.cpp b/esphome/components/web_server/web_server.cpp index 77c20b956b..20ff1a7c29 100644 --- a/esphome/components/web_server/web_server.cpp +++ b/esphome/components/web_server/web_server.cpp @@ -255,11 +255,7 @@ void DeferredUpdateEventSourceList::on_client_disconnect_(DeferredUpdateEventSou } #endif -WebServer::WebServer(web_server_base::WebServerBase *base) : base_(base) { -#ifdef USE_ESP32 - to_schedule_lock_ = xSemaphoreCreateMutex(); -#endif -} +WebServer::WebServer(web_server_base::WebServerBase *base) : base_(base) {} #ifdef USE_WEBSERVER_CSS_INCLUDE void WebServer::set_css_include(const char *css_include) { this->css_include_ = css_include; } @@ -308,30 +304,7 @@ void WebServer::setup() { // getting a lot of events this->set_interval(10000, [this]() { this->events_.try_send_nodefer("", "ping", millis(), 30000); }); } -void WebServer::loop() { -#ifdef USE_ESP32 - // Check atomic flag first to avoid taking semaphore when queue is empty - if (this->to_schedule_has_items_.load(std::memory_order_relaxed) && xSemaphoreTake(this->to_schedule_lock_, 0L)) { - std::function fn; - if (!to_schedule_.empty()) { - // scheduler execute things out of order which may lead to incorrect state - // this->defer(std::move(to_schedule_.front())); - // let's execute it directly from the loop - fn = std::move(to_schedule_.front()); - to_schedule_.pop_front(); - if (to_schedule_.empty()) { - this->to_schedule_has_items_.store(false, std::memory_order_relaxed); - } - } - xSemaphoreGive(this->to_schedule_lock_); - if (fn) { - fn(); - } - } -#endif - - this->events_.loop(); -} +void WebServer::loop() { this->events_.loop(); } void WebServer::dump_config() { ESP_LOGCONFIG(TAG, "Web Server:\n" @@ -526,13 +499,13 @@ void WebServer::handle_switch_request(AsyncWebServerRequest *request, const UrlM std::string data = this->switch_json(obj, obj->state, detail); request->send(200, "application/json", data.c_str()); } else if (match.method_equals("toggle")) { - this->schedule_([obj]() { obj->toggle(); }); + this->defer([obj]() { obj->toggle(); }); request->send(200); } else if (match.method_equals("turn_on")) { - this->schedule_([obj]() { obj->turn_on(); }); + this->defer([obj]() { obj->turn_on(); }); request->send(200); } else if (match.method_equals("turn_off")) { - this->schedule_([obj]() { obj->turn_off(); }); + this->defer([obj]() { obj->turn_off(); }); request->send(200); } else { request->send(404); @@ -568,7 +541,7 @@ void WebServer::handle_button_request(AsyncWebServerRequest *request, const UrlM std::string data = this->button_json(obj, detail); request->send(200, "application/json", data.c_str()); } else if (match.method_equals("press")) { - this->schedule_([obj]() { obj->press(); }); + this->defer([obj]() { obj->press(); }); request->send(200); return; } else { @@ -648,7 +621,7 @@ void WebServer::handle_fan_request(AsyncWebServerRequest *request, const UrlMatc std::string data = this->fan_json(obj, detail); request->send(200, "application/json", data.c_str()); } else if (match.method_equals("toggle")) { - this->schedule_([obj]() { obj->toggle().perform(); }); + this->defer([obj]() { obj->toggle().perform(); }); request->send(200); } else if (match.method_equals("turn_on") || match.method_equals("turn_off")) { auto call = match.method_equals("turn_on") ? obj->turn_on() : obj->turn_off(); @@ -680,7 +653,7 @@ void WebServer::handle_fan_request(AsyncWebServerRequest *request, const UrlMatc return; } } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); } else { request->send(404); @@ -729,7 +702,7 @@ void WebServer::handle_light_request(AsyncWebServerRequest *request, const UrlMa std::string data = this->light_json(obj, detail); request->send(200, "application/json", data.c_str()); } else if (match.method_equals("toggle")) { - this->schedule_([obj]() { obj->toggle().perform(); }); + this->defer([obj]() { obj->toggle().perform(); }); request->send(200); } else if (match.method_equals("turn_on")) { auto call = obj->turn_on(); @@ -786,7 +759,7 @@ void WebServer::handle_light_request(AsyncWebServerRequest *request, const UrlMa call.set_effect(effect); } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); } else if (match.method_equals("turn_off")) { auto call = obj->turn_off(); @@ -796,7 +769,7 @@ void WebServer::handle_light_request(AsyncWebServerRequest *request, const UrlMa call.set_transition_length(*transition * 1000); } } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); } else { request->send(404); @@ -881,7 +854,7 @@ void WebServer::handle_cover_request(AsyncWebServerRequest *request, const UrlMa } } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -939,7 +912,7 @@ void WebServer::handle_number_request(AsyncWebServerRequest *request, const UrlM call.set_value(*value); } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1014,7 +987,7 @@ void WebServer::handle_date_request(AsyncWebServerRequest *request, const UrlMat call.set_date(value); } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1073,7 +1046,7 @@ void WebServer::handle_time_request(AsyncWebServerRequest *request, const UrlMat call.set_time(value); } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1131,7 +1104,7 @@ void WebServer::handle_datetime_request(AsyncWebServerRequest *request, const Ur call.set_datetime(value); } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1248,7 +1221,7 @@ void WebServer::handle_select_request(AsyncWebServerRequest *request, const UrlM call.set_option(option.c_str()); // NOLINT } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1335,7 +1308,7 @@ void WebServer::handle_climate_request(AsyncWebServerRequest *request, const Url call.set_target_temperature(*target_temperature); } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1452,13 +1425,13 @@ void WebServer::handle_lock_request(AsyncWebServerRequest *request, const UrlMat std::string data = this->lock_json(obj, obj->state, detail); request->send(200, "application/json", data.c_str()); } else if (match.method_equals("lock")) { - this->schedule_([obj]() { obj->lock(); }); + this->defer([obj]() { obj->lock(); }); request->send(200); } else if (match.method_equals("unlock")) { - this->schedule_([obj]() { obj->unlock(); }); + this->defer([obj]() { obj->unlock(); }); request->send(200); } else if (match.method_equals("open")) { - this->schedule_([obj]() { obj->open(); }); + this->defer([obj]() { obj->open(); }); request->send(200); } else { request->send(404); @@ -1529,7 +1502,7 @@ void WebServer::handle_valve_request(AsyncWebServerRequest *request, const UrlMa } } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1594,7 +1567,7 @@ void WebServer::handle_alarm_control_panel_request(AsyncWebServerRequest *reques return; } - this->schedule_([call]() mutable { call.perform(); }); + this->defer([call]() mutable { call.perform(); }); request->send(200); return; } @@ -1695,7 +1668,7 @@ void WebServer::handle_update_request(AsyncWebServerRequest *request, const UrlM return; } - this->schedule_([obj]() mutable { obj->perform(); }); + this->defer([obj]() mutable { obj->perform(); }); request->send(200); return; } @@ -2072,17 +2045,6 @@ void WebServer::add_sorting_group(uint64_t group_id, const std::string &group_na } #endif -void WebServer::schedule_(std::function &&f) { -#ifdef USE_ESP32 - xSemaphoreTake(this->to_schedule_lock_, portMAX_DELAY); - to_schedule_.push_back(std::move(f)); - this->to_schedule_has_items_.store(true, std::memory_order_relaxed); - xSemaphoreGive(this->to_schedule_lock_); -#else - this->defer(std::move(f)); -#endif -} - } // namespace web_server } // namespace esphome #endif diff --git a/esphome/components/web_server/web_server.h b/esphome/components/web_server/web_server.h index 82b31ab656..0c15881d1e 100644 --- a/esphome/components/web_server/web_server.h +++ b/esphome/components/web_server/web_server.h @@ -14,12 +14,6 @@ #include #include #include -#ifdef USE_ESP32 -#include -#include -#include -#include -#endif #if USE_WEBSERVER_VERSION >= 2 extern const uint8_t ESPHOME_WEBSERVER_INDEX_HTML[] PROGMEM; @@ -504,7 +498,6 @@ class WebServer : public Controller, public Component, public AsyncWebHandler { protected: void add_sorting_info_(JsonObject &root, EntityBase *entity); - void schedule_(std::function &&f); web_server_base::WebServerBase *base_; #ifdef USE_ARDUINO DeferredUpdateEventSourceList events_; @@ -524,11 +517,6 @@ class WebServer : public Controller, public Component, public AsyncWebHandler { const char *js_include_{nullptr}; #endif bool expose_log_{true}; -#ifdef USE_ESP32 - std::deque> to_schedule_; - SemaphoreHandle_t to_schedule_lock_; - std::atomic to_schedule_has_items_{false}; -#endif }; } // namespace web_server diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 14c9768b3c..bcdeba291f 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -98,8 +98,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type return; } - const auto now = this->millis_(); - // Create and populate the scheduler item auto item = make_unique(); item->component = component; @@ -108,6 +106,19 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type item->callback = std::move(func); item->remove = false; +#if !defined(USE_ESP8266) && !defined(USE_RP2040) + // Special handling for defer() (delay = 0, type = TIMEOUT) + // ESP8266 and RP2040 are excluded because they don't need thread-safe defer handling + if (delay == 0 && type == SchedulerItem::TIMEOUT) { + // Put in defer queue for guaranteed FIFO execution + LockGuard guard{this->lock_}; + this->defer_queue_.push_back(std::move(item)); + return; + } +#endif + + const auto now = this->millis_(); + // Type-specific setup if (type == SchedulerItem::INTERVAL) { item->interval = delay; @@ -244,6 +255,35 @@ optional HOT Scheduler::next_schedule_in() { return item->next_execution_ - now; } void HOT Scheduler::call() { +#if !defined(USE_ESP8266) && !defined(USE_RP2040) + // Process defer queue first to guarantee FIFO execution order for deferred items. + // Previously, defer() used the heap which gave undefined order for equal timestamps, + // causing race conditions on multi-core systems (ESP32, BK7200). + // With the defer queue: + // - Deferred items (delay=0) go directly to defer_queue_ in set_timer_common_ + // - Items execute in exact order they were deferred (FIFO guarantee) + // - No deferred items exist in to_add_, so processing order doesn't affect correctness + // ESP8266 and RP2040 don't use this queue - they fall back to the heap-based approach + // (ESP8266: single-core, RP2040: empty mutex implementation). + while (!this->defer_queue_.empty()) { + // The outer check is done without a lock for performance. If the queue + // appears non-empty, we lock and process an item. We don't need to check + // empty() again inside the lock because only this thread can remove items. + std::unique_ptr item; + { + LockGuard lock(this->lock_); + item = std::move(this->defer_queue_.front()); + this->defer_queue_.pop_front(); + } + + // Execute callback without holding lock to prevent deadlocks + // if the callback tries to call defer() again + if (!this->should_skip_item_(item.get())) { + this->execute_item_(item.get()); + } + } +#endif + const auto now = this->millis_(); this->process_to_add(); @@ -317,8 +357,6 @@ void HOT Scheduler::call() { this->pop_raw_(); continue; } - App.set_current_component(item->component); - #ifdef ESPHOME_DEBUG_SCHEDULER const char *item_name = item->get_name(); ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")", @@ -329,13 +367,7 @@ void HOT Scheduler::call() { // Warning: During callback(), a lot of stuff can happen, including: // - timeouts/intervals get added, potentially invalidating vector pointers // - timeouts/intervals get cancelled - { - uint32_t now_ms = millis(); - WarnIfComponentBlockingGuard guard{item->component, now_ms}; - item->callback(); - // Call finish to ensure blocking time is properly calculated and reported - guard.finish(); - } + this->execute_item_(item.get()); } { @@ -394,6 +426,26 @@ void HOT Scheduler::pop_raw_() { std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); this->items_.pop_back(); } +// Helper function to check if item matches criteria for cancellation +bool HOT Scheduler::matches_item_(const std::unique_ptr &item, Component *component, + const char *name_cstr, SchedulerItem::Type type) { + if (item->component != component || item->type != type || item->remove) { + return false; + } + const char *item_name = item->get_name(); + return item_name != nullptr && strcmp(name_cstr, item_name) == 0; +} + +// Helper to execute a scheduler item +void HOT Scheduler::execute_item_(SchedulerItem *item) { + App.set_current_component(item->component); + + uint32_t now_ms = millis(); + WarnIfComponentBlockingGuard guard{item->component, now_ms}; + item->callback(); + guard.finish(); +} + // Common implementation for cancel operations bool HOT Scheduler::cancel_item_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type) { @@ -410,6 +462,39 @@ bool HOT Scheduler::cancel_item_(Component *component, bool is_static_string, co return this->cancel_item_locked_(component, name_cstr, type); } +// Helper to cancel items by name - must be called with lock held +bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_cstr, SchedulerItem::Type type) { + bool ret = false; + + // Check all containers for matching items +#if !defined(USE_ESP8266) && !defined(USE_RP2040) + // Only check defer_queue_ on platforms that have it + for (auto &item : this->defer_queue_) { + if (this->matches_item_(item, component, name_cstr, type)) { + item->remove = true; + ret = true; + } + } +#endif + + for (auto &item : this->items_) { + if (this->matches_item_(item, component, name_cstr, type)) { + item->remove = true; + ret = true; + this->to_remove_++; // Only track removals for heap items + } + } + + for (auto &item : this->to_add_) { + if (this->matches_item_(item, component, name_cstr, type)) { + item->remove = true; + ret = true; + } + } + + return ret; +} + uint64_t Scheduler::millis_() { // Get the current 32-bit millis value const uint32_t now = millis(); diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index 6eceec151b..64ea4cf652 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -3,6 +3,7 @@ #include #include #include +#include #include "esphome/core/component.h" #include "esphome/core/helpers.h" @@ -145,6 +146,19 @@ class Scheduler { // Common implementation for cancel operations bool cancel_item_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type); + private: + // Helper functions for cancel operations + bool matches_item_(const std::unique_ptr &item, Component *component, const char *name_cstr, + SchedulerItem::Type type); + + // Helper to execute a scheduler item + void execute_item_(SchedulerItem *item); + + // Helper to check if item should be skipped + bool should_skip_item_(const SchedulerItem *item) const { + return item->remove || (item->component != nullptr && item->component->is_failed()); + } + bool empty_() { this->cleanup_(); return this->items_.empty(); @@ -153,6 +167,13 @@ class Scheduler { Mutex lock_; std::vector> items_; std::vector> to_add_; +#if !defined(USE_ESP8266) && !defined(USE_RP2040) + // ESP8266 and RP2040 don't need the defer queue because: + // ESP8266: Single-core with no preemptive multitasking + // RP2040: Currently has empty mutex implementation in ESPHome + // Both platforms save 40 bytes of RAM by excluding this + std::deque> defer_queue_; // FIFO queue for defer() calls +#endif uint32_t last_millis_{0}; uint16_t millis_major_{0}; uint32_t to_remove_{0}; diff --git a/tests/integration/fixtures/defer_fifo_simple.yaml b/tests/integration/fixtures/defer_fifo_simple.yaml new file mode 100644 index 0000000000..db24ebf601 --- /dev/null +++ b/tests/integration/fixtures/defer_fifo_simple.yaml @@ -0,0 +1,109 @@ +esphome: + name: defer-fifo-simple + +host: + +logger: + level: DEBUG + +api: + services: + - service: test_set_timeout + then: + - lambda: |- + // Test set_timeout with 0 delay (direct scheduler call) + static int set_timeout_order = 0; + static bool set_timeout_passed = true; + + // Reset for this test + set_timeout_order = 0; + set_timeout_passed = true; + + ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order..."); + for (int i = 0; i < 10; i++) { + int expected = i; + App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { + ESP_LOGD("defer_test", "set_timeout(0) item %d executed, order %d", expected, set_timeout_order); + if (set_timeout_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in set_timeout: expected %d but got execution order %d", expected, set_timeout_order); + set_timeout_passed = false; + } + set_timeout_order++; + + if (set_timeout_order == 10) { + if (set_timeout_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + + ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); + + - service: test_defer + then: + - lambda: |- + // Test defer() method (component method) + static int defer_order = 0; + static bool defer_passed = true; + + // Reset for this test + defer_order = 0; + defer_passed = true; + + ESP_LOGD("defer_test", "Testing defer() for FIFO order..."); + + // Create a test component class that exposes defer() + class TestComponent : public Component { + public: + void test_defer() { + for (int i = 0; i < 10; i++) { + int expected = i; + this->defer([expected]() { + ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); + if (defer_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); + defer_passed = false; + } + defer_order++; + + if (defer_order == 10) { + if (defer_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + } + }; + + // Use a static instance so it doesn't go out of scope + static TestComponent test_component; + test_component.test_defer(); + + ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml new file mode 100644 index 0000000000..6df475229b --- /dev/null +++ b/tests/integration/fixtures/defer_stress.yaml @@ -0,0 +1,38 @@ +esphome: + name: defer-stress-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [defer_stress_component] + +host: + +logger: + level: VERBOSE + +defer_stress_component: + id: defer_stress + +api: + services: + - service: run_stress_test + then: + - lambda: |- + id(defer_stress)->run_multi_thread_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/external_components/defer_stress_component/__init__.py b/tests/integration/fixtures/external_components/defer_stress_component/__init__.py new file mode 100644 index 0000000000..177e595f51 --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/__init__.py @@ -0,0 +1,19 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +defer_stress_component_ns = cg.esphome_ns.namespace("defer_stress_component") +DeferStressComponent = defer_stress_component_ns.class_( + "DeferStressComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(DeferStressComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp new file mode 100644 index 0000000000..21ca45947e --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.cpp @@ -0,0 +1,75 @@ +#include "defer_stress_component.h" +#include "esphome/core/log.h" +#include +#include +#include +#include + +namespace esphome { +namespace defer_stress_component { + +static const char *const TAG = "defer_stress"; + +void DeferStressComponent::setup() { ESP_LOGCONFIG(TAG, "DeferStressComponent setup"); } + +void DeferStressComponent::run_multi_thread_test() { + // Use member variables instead of static to avoid issues + this->total_defers_ = 0; + this->executed_defers_ = 0; + static constexpr int NUM_THREADS = 10; + static constexpr int DEFERS_PER_THREAD = 100; + + ESP_LOGI(TAG, "Starting defer stress test - multi-threaded concurrent defers"); + + // Ensure we're starting clean + ESP_LOGI(TAG, "Initial counters: total=%d, executed=%d", this->total_defers_.load(), this->executed_defers_.load()); + + // Track start time + auto start_time = std::chrono::steady_clock::now(); + + // Create threads + std::vector threads; + + ESP_LOGI(TAG, "Creating %d threads, each will defer %d callbacks", NUM_THREADS, DEFERS_PER_THREAD); + + threads.reserve(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + threads.emplace_back([this, i]() { + ESP_LOGV(TAG, "Thread %d starting", i); + // Each thread directly calls defer() without any locking + for (int j = 0; j < DEFERS_PER_THREAD; j++) { + int defer_id = this->total_defers_.fetch_add(1); + ESP_LOGV(TAG, "Thread %d calling defer for request %d", i, defer_id); + + // Capture this pointer safely for the lambda + auto *component = this; + + // Directly call defer() from this thread - no locking! + this->defer([component, i, j, defer_id]() { + component->executed_defers_.fetch_add(1); + ESP_LOGV(TAG, "Executed defer %d (thread %d, index %d)", defer_id, i, j); + }); + + ESP_LOGV(TAG, "Thread %d called defer for request %d successfully", i, defer_id); + + // Small random delay to increase contention + if (j % 10 == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + ESP_LOGV(TAG, "Thread %d finished", i); + }); + } + + // Wait for all threads to complete + for (auto &t : threads) { + t.join(); + } + + auto end_time = std::chrono::steady_clock::now(); + auto thread_time = std::chrono::duration_cast(end_time - start_time).count(); + ESP_LOGI(TAG, "All threads finished in %lldms. Created %d defer requests", thread_time, this->total_defers_.load()); +} + +} // namespace defer_stress_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h new file mode 100644 index 0000000000..59b7565726 --- /dev/null +++ b/tests/integration/fixtures/external_components/defer_stress_component/defer_stress_component.h @@ -0,0 +1,20 @@ +#pragma once + +#include "esphome/core/component.h" +#include + +namespace esphome { +namespace defer_stress_component { + +class DeferStressComponent : public Component { + public: + void setup() override; + void run_multi_thread_test(); + + private: + std::atomic total_defers_{0}; + std::atomic executed_defers_{0}; +}; + +} // namespace defer_stress_component +} // namespace esphome diff --git a/tests/integration/test_defer_fifo_simple.py b/tests/integration/test_defer_fifo_simple.py new file mode 100644 index 0000000000..5a62a45786 --- /dev/null +++ b/tests/integration/test_defer_fifo_simple.py @@ -0,0 +1,117 @@ +"""Simple test that defer() maintains FIFO order.""" + +import asyncio + +from aioesphomeapi import EntityState, Event, EventInfo, UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_defer_fifo_simple( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() maintains FIFO order with a simple test.""" + + async with run_compiled(yaml_config), api_client_connected() as client: + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "defer-fifo-simple" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test entities + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None + + for entity in entity_info: + if isinstance(entity, EventInfo): + if entity.object_id == "test_complete": + test_complete_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity + + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test services + test_set_timeout_service: UserService | None = None + test_defer_service: UserService | None = None + for service in services: + if service.name == "test_set_timeout": + test_set_timeout_service = service + elif service.name == "test_defer": + test_defer_service = service + + assert test_set_timeout_service is not None, ( + "test_set_timeout service not found" + ) + assert test_defer_service is not None, "test_defer service not found" + + # Get the event loop + loop = asyncio.get_running_loop() + + # Subscribe to states + # (events are delivered as EventStates through subscribe_states) + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[bool] = loop.create_future() + + def on_state(state: EntityState) -> None: + if not isinstance(state, Event): + return + + if ( + state.key == test_complete_entity.key + and state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + return + + if state.key == test_result_entity.key and not test_result_future.done(): + if state.event_type == "passed": + test_result_future.set_result(True) + elif state.event_type == "failed": + test_result_future.set_result(False) + + client.subscribe_states(on_state) + + # Test 1: Test set_timeout(0) + client.execute_service(test_set_timeout_service, {}) + + # Wait for first test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test set_timeout(0) did not complete within 5 seconds") + + assert test1_passed is True, ( + "set_timeout(0) FIFO test failed - items executed out of order" + ) + + # Reset futures for second test + test_complete_future = loop.create_future() + test_result_future = loop.create_future() + + # Test 2: Test defer() + client.execute_service(test_defer_service, {}) + + # Wait for second test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test defer() did not complete within 5 seconds") + + # Verify the test passed + assert test2_passed is True, ( + "defer() FIFO test failed - items executed out of order" + ) diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py new file mode 100644 index 0000000000..f63ec8d25f --- /dev/null +++ b/tests/integration/test_defer_stress.py @@ -0,0 +1,137 @@ +"""Stress test for defer() thread safety with multiple threads.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_defer_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() doesn't crash when called rapidly from multiple threads.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_event_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track executed defers and their order + executed_defers: set[int] = set() + thread_executions: dict[ + int, list[int] + ] = {} # thread_id -> list of indices in execution order + fifo_violations: list[str] = [] + + def on_log_line(line: str) -> None: + # Track all executed defers with thread and index info + match = re.search(r"Executed defer (\d+) \(thread (\d+), index (\d+)\)", line) + if not match: + return + + defer_id = int(match.group(1)) + thread_id = int(match.group(2)) + index = int(match.group(3)) + + executed_defers.add(defer_id) + + # Track execution order per thread + if thread_id not in thread_executions: + thread_executions[thread_id] = [] + + # Check FIFO ordering within thread + if thread_executions[thread_id] and thread_executions[thread_id][-1] >= index: + fifo_violations.append( + f"Thread {thread_id}: index {index} executed after " + f"{thread_executions[thread_id][-1]}" + ) + + thread_executions[thread_id].append(index) + + # Check if we've executed all 1000 defers (0-999) + if len(executed_defers) == 1000 and not test_complete_future.done(): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "defer-stress-test" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, "run_stress_test service not found" + + # Call the run_stress_test service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for all defers to execute (should be quick) + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + except asyncio.TimeoutError: + # Report how many we got + pytest.fail( + f"Stress test timed out. Only {len(executed_defers)} of " + f"1000 defers executed. Missing IDs: " + f"{sorted(set(range(1000)) - executed_defers)[:10]}..." + ) + + # Verify all defers executed + assert len(executed_defers) == 1000, ( + f"Expected 1000 defers, got {len(executed_defers)}" + ) + + # Verify we have all IDs from 0-999 + expected_ids = set(range(1000)) + missing_ids = expected_ids - executed_defers + assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}" + + # Verify FIFO ordering was maintained within each thread + assert not fifo_violations, "FIFO ordering violations detected:\n" + "\n".join( + fifo_violations[:10] + ) + + # Verify each thread executed all its defers in order + for thread_id, indices in thread_executions.items(): + assert len(indices) == 100, ( + f"Thread {thread_id} executed {len(indices)} defers, expected 100" + ) + # Indices should be 0-99 in ascending order + assert indices == list(range(100)), ( + f"Thread {thread_id} executed indices out of order: {indices[:10]}..." + ) + + # If we got here without crashing and with proper ordering, the test passed + assert True, ( + "Test completed successfully - all 1000 defers executed with " + "FIFO ordering preserved" + )