diff --git a/esphome/components/sx127x/sx127x.cpp b/esphome/components/sx127x/sx127x.cpp index e41efe098c..7f62ee2bd3 100644 --- a/esphome/components/sx127x/sx127x.cpp +++ b/esphome/components/sx127x/sx127x.cpp @@ -252,15 +252,17 @@ size_t SX127x::get_max_packet_size() { } } -void SX127x::transmit_packet(const std::vector &packet) { +SX127xError SX127x::transmit_packet(const std::vector &packet) { if (this->payload_length_ > 0 && this->payload_length_ != packet.size()) { ESP_LOGE(TAG, "Packet size does not match config"); - return; + return SX127xError::INVALID_PARAMS; } if (packet.empty() || packet.size() > this->get_max_packet_size()) { ESP_LOGE(TAG, "Packet size out of range"); - return; + return SX127xError::INVALID_PARAMS; } + + SX127xError ret = SX127xError::NONE; if (this->modulation_ == MOD_LORA) { this->set_mode_standby(); if (this->payload_length_ == 0) { @@ -278,11 +280,13 @@ void SX127x::transmit_packet(const std::vector &packet) { this->write_fifo_(packet); this->set_mode_tx(); } + // wait until transmit completes, typically the delay will be less than 100 ms uint32_t start = millis(); while (!this->dio0_pin_->digital_read()) { if (millis() - start > 4000) { ESP_LOGE(TAG, "Transmit packet failure"); + ret = SX127xError::TIMEOUT; break; } } @@ -291,6 +295,7 @@ void SX127x::transmit_packet(const std::vector &packet) { } else { this->set_mode_sleep(); } + return ret; } void SX127x::call_listeners_(const std::vector &packet, float rssi, float snr) { @@ -335,13 +340,7 @@ void SX127x::loop() { } void SX127x::run_image_cal() { - uint32_t start = millis(); - uint8_t mode = this->read_register_(REG_OP_MODE); - if ((mode & MODE_MASK) != MODE_STDBY) { - ESP_LOGE(TAG, "Need to be in standby for image cal"); - return; - } - if (mode & MOD_LORA) { + if (this->modulation_ == MOD_LORA) { this->set_mode_(MOD_FSK, MODE_SLEEP); this->set_mode_(MOD_FSK, MODE_STDBY); } @@ -350,13 +349,15 @@ void SX127x::run_image_cal() { } else { this->write_register_(REG_IMAGE_CAL, IMAGE_CAL_START); } + uint32_t start = millis(); while (this->read_register_(REG_IMAGE_CAL) & IMAGE_CAL_RUNNING) { if (millis() - start > 20) { ESP_LOGE(TAG, "Image cal failure"); + this->mark_failed(); break; } } - if (mode & MOD_LORA) { + if (this->modulation_ == MOD_LORA) { this->set_mode_(this->modulation_, MODE_SLEEP); this->set_mode_(this->modulation_, MODE_STDBY); } @@ -375,6 +376,7 @@ void SX127x::set_mode_(uint8_t modulation, uint8_t mode) { } if (millis() - start > 20) { ESP_LOGE(TAG, "Set mode failure"); + this->mark_failed(); break; } } diff --git a/esphome/components/sx127x/sx127x.h b/esphome/components/sx127x/sx127x.h index fe9f60e860..4cc7c9b6d3 100644 --- a/esphome/components/sx127x/sx127x.h +++ b/esphome/components/sx127x/sx127x.h @@ -34,6 +34,8 @@ enum SX127xBw : uint8_t { SX127X_BW_500_0, }; +enum class SX127xError { NONE = 0, TIMEOUT, INVALID_PARAMS }; + class SX127xListener { public: virtual void on_packet(const std::vector &packet, float rssi, float snr) = 0; @@ -79,7 +81,7 @@ class SX127x : public Component, void set_sync_value(const std::vector &sync_value) { this->sync_value_ = sync_value; } void run_image_cal(); void configure(); - void transmit_packet(const std::vector &packet); + SX127xError transmit_packet(const std::vector &packet); void register_listener(SX127xListener *listener) { this->listeners_.push_back(listener); } Trigger, float, float> *get_packet_trigger() const { return this->packet_trigger_; }; diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 515f6fd355..0c4a4ff230 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -62,16 +62,16 @@ static void validate_static_string(const char *name) { void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type type, bool is_static_string, const void *name_ptr, uint32_t delay, std::function func) { // Get the name as const char* - const char *name_cstr = - is_static_string ? static_cast(name_ptr) : static_cast(name_ptr)->c_str(); + const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr); - // Cancel existing timer if name is not empty - if (name_cstr != nullptr && name_cstr[0] != '\0') { - this->cancel_item_(component, name_cstr, type); - } - - if (delay == SCHEDULER_DONT_RUN) + if (delay == SCHEDULER_DONT_RUN) { + // Still need to cancel existing timer if name is not empty + if (name_cstr != nullptr && name_cstr[0] != '\0') { + LockGuard guard{this->lock_}; + this->cancel_item_locked_(component, name_cstr, type, delay == 0 && type == SchedulerItem::TIMEOUT); + } return; + } // Create and populate the scheduler item auto item = make_unique(); @@ -87,6 +87,7 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type if (delay == 0 && type == SchedulerItem::TIMEOUT) { // Put in defer queue for guaranteed FIFO execution LockGuard guard{this->lock_}; + this->cancel_item_locked_(component, name_cstr, type, true); this->defer_queue_.push_back(std::move(item)); return; } @@ -122,7 +123,15 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type } #endif - this->push_(std::move(item)); + LockGuard guard{this->lock_}; + // If name is provided, do atomic cancel-and-add + if (name_cstr != nullptr && name_cstr[0] != '\0') { + // Cancel existing items + this->cancel_item_locked_(component, name_cstr, type, false); + } + // Add new item directly to to_add_ + // since we have the lock held + this->to_add_.push_back(std::move(item)); } void HOT Scheduler::set_timeout(Component *component, const char *name, uint32_t timeout, std::function func) { @@ -134,10 +143,10 @@ void HOT Scheduler::set_timeout(Component *component, const std::string &name, u this->set_timer_common_(component, SchedulerItem::TIMEOUT, false, &name, timeout, std::move(func)); } bool HOT Scheduler::cancel_timeout(Component *component, const std::string &name) { - return this->cancel_item_(component, name, SchedulerItem::TIMEOUT); + return this->cancel_item_(component, false, &name, SchedulerItem::TIMEOUT); } bool HOT Scheduler::cancel_timeout(Component *component, const char *name) { - return this->cancel_item_(component, name, SchedulerItem::TIMEOUT); + return this->cancel_item_(component, true, name, SchedulerItem::TIMEOUT); } void HOT Scheduler::set_interval(Component *component, const std::string &name, uint32_t interval, std::function func) { @@ -149,10 +158,10 @@ void HOT Scheduler::set_interval(Component *component, const char *name, uint32_ this->set_timer_common_(component, SchedulerItem::INTERVAL, true, name, interval, std::move(func)); } bool HOT Scheduler::cancel_interval(Component *component, const std::string &name) { - return this->cancel_item_(component, name, SchedulerItem::INTERVAL); + return this->cancel_item_(component, false, &name, SchedulerItem::INTERVAL); } bool HOT Scheduler::cancel_interval(Component *component, const char *name) { - return this->cancel_item_(component, name, SchedulerItem::INTERVAL); + return this->cancel_item_(component, true, name, SchedulerItem::INTERVAL); } struct RetryArgs { @@ -282,10 +291,10 @@ void HOT Scheduler::call() { } #endif // ESPHOME_DEBUG_SCHEDULER - auto to_remove_was = to_remove_; + auto to_remove_was = this->to_remove_; auto items_was = this->items_.size(); // If we have too many items to remove - if (to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) { + if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) { std::vector> valid_items; while (!this->empty_()) { LockGuard guard{this->lock_}; @@ -300,10 +309,10 @@ void HOT Scheduler::call() { } // The following should not happen unless I'm missing something - if (to_remove_ != 0) { + if (this->to_remove_ != 0) { ESP_LOGW(TAG, "to_remove_ was %" PRIu32 " now: %" PRIu32 " items where %zu now %zu. Please report this", to_remove_was, to_remove_, items_was, items_.size()); - to_remove_ = 0; + this->to_remove_ = 0; } } @@ -336,26 +345,25 @@ void HOT Scheduler::call() { } { - this->lock_.lock(); + LockGuard guard{this->lock_}; // new scope, item from before might have been moved in the vector auto item = std::move(this->items_[0]); - // Only pop after function call, this ensures we were reachable // during the function call and know if we were cancelled. this->pop_raw_(); - this->lock_.unlock(); - if (item->remove) { // We were removed/cancelled in the function call, stop - to_remove_--; + this->to_remove_--; continue; } if (item->type == SchedulerItem::INTERVAL) { item->next_execution_ = now + item->interval; - this->push_(std::move(item)); + // Add new item directly to to_add_ + // since we have the lock held + this->to_add_.push_back(std::move(item)); } } } @@ -380,7 +388,7 @@ void HOT Scheduler::cleanup_() { if (!item->remove) return; - to_remove_--; + this->to_remove_--; { LockGuard guard{this->lock_}; @@ -392,19 +400,6 @@ void HOT Scheduler::pop_raw_() { std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); this->items_.pop_back(); } -void HOT Scheduler::push_(std::unique_ptr item) { - LockGuard guard{this->lock_}; - this->to_add_.push_back(std::move(item)); -} -// Helper function to check if item matches criteria for cancellation -bool HOT Scheduler::matches_item_(const std::unique_ptr &item, Component *component, - const char *name_cstr, SchedulerItem::Type type) { - if (item->component != component || item->type != type || item->remove) { - return false; - } - const char *item_name = item->get_name(); - return item_name != nullptr && strcmp(name_cstr, item_name) == 0; -} // Helper to execute a scheduler item void HOT Scheduler::execute_item_(SchedulerItem *item) { @@ -417,11 +412,10 @@ void HOT Scheduler::execute_item_(SchedulerItem *item) { } // Common implementation for cancel operations -bool HOT Scheduler::cancel_item_common_(Component *component, bool is_static_string, const void *name_ptr, - SchedulerItem::Type type) { +bool HOT Scheduler::cancel_item_(Component *component, bool is_static_string, const void *name_ptr, + SchedulerItem::Type type) { // Get the name as const char* - const char *name_cstr = - is_static_string ? static_cast(name_ptr) : static_cast(name_ptr)->c_str(); + const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr); // Handle null or empty names if (name_cstr == nullptr) @@ -429,43 +423,49 @@ bool HOT Scheduler::cancel_item_common_(Component *component, bool is_static_str // obtain lock because this function iterates and can be called from non-loop task context LockGuard guard{this->lock_}; - bool ret = false; + return this->cancel_item_locked_(component, name_cstr, type, false); +} + +// Helper to cancel items by name - must be called with lock held +bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_cstr, SchedulerItem::Type type, + bool defer_only) { + size_t total_cancelled = 0; // Check all containers for matching items #if !defined(USE_ESP8266) && !defined(USE_RP2040) - // Only check defer_queue_ on platforms that have it - for (auto &item : this->defer_queue_) { - if (this->matches_item_(item, component, name_cstr, type)) { - item->remove = true; - ret = true; + // Only check defer queue for timeouts (intervals never go there) + if (type == SchedulerItem::TIMEOUT) { + for (auto &item : this->defer_queue_) { + if (this->matches_item_(item, component, name_cstr, type)) { + item->remove = true; + total_cancelled++; + } + } + if (defer_only) { + return total_cancelled > 0; } } #endif + // Cancel items in the main heap for (auto &item : this->items_) { if (this->matches_item_(item, component, name_cstr, type)) { item->remove = true; - ret = true; - this->to_remove_++; // Only track removals for heap items + total_cancelled++; + this->to_remove_++; // Track removals for heap items } } + // Cancel items in to_add_ for (auto &item : this->to_add_) { if (this->matches_item_(item, component, name_cstr, type)) { item->remove = true; - ret = true; + total_cancelled++; + // Don't track removals for to_add_ items } } - return ret; -} - -bool HOT Scheduler::cancel_item_(Component *component, const std::string &name, Scheduler::SchedulerItem::Type type) { - return this->cancel_item_common_(component, false, &name, type); -} - -bool HOT Scheduler::cancel_item_(Component *component, const char *name, SchedulerItem::Type type) { - return this->cancel_item_common_(component, true, name, type); + return total_cancelled > 0; } uint64_t Scheduler::millis_() { diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index bf5e63cccf..f3f78d39af 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -2,6 +2,7 @@ #include #include +#include #include #include "esphome/core/component.h" @@ -139,17 +140,36 @@ class Scheduler { uint64_t millis_(); void cleanup_(); void pop_raw_(); - void push_(std::unique_ptr item); - // Common implementation for cancel operations - bool cancel_item_common_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type); private: - bool cancel_item_(Component *component, const std::string &name, SchedulerItem::Type type); - bool cancel_item_(Component *component, const char *name, SchedulerItem::Type type); + // Helper to cancel items by name - must be called with lock held + bool cancel_item_locked_(Component *component, const char *name, SchedulerItem::Type type, bool defer_only); - // Helper functions for cancel operations - bool matches_item_(const std::unique_ptr &item, Component *component, const char *name_cstr, - SchedulerItem::Type type); + // Helper to extract name as const char* from either static string or std::string + inline const char *get_name_cstr_(bool is_static_string, const void *name_ptr) { + return is_static_string ? static_cast(name_ptr) : static_cast(name_ptr)->c_str(); + } + + // Common implementation for cancel operations + bool cancel_item_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type); + + // Helper function to check if item matches criteria for cancellation + inline bool HOT matches_item_(const std::unique_ptr &item, Component *component, const char *name_cstr, + SchedulerItem::Type type) { + if (item->component != component || item->type != type || item->remove) { + return false; + } + const char *item_name = item->get_name(); + if (item_name == nullptr) { + return false; + } + // Fast path: if pointers are equal (common with string deduplication) + if (item_name == name_cstr) { + return true; + } + // Slow path: compare string contents + return strcmp(name_cstr, item_name) == 0; + } // Helper to execute a scheduler item void execute_item_(SchedulerItem *item); diff --git a/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp index dcc9367390..cd4e019882 100644 --- a/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp +++ b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp @@ -29,7 +29,7 @@ void SchedulerRapidCancellationComponent::run_rapid_cancellation_test() { threads.reserve(NUM_THREADS); for (int thread_id = 0; thread_id < NUM_THREADS; thread_id++) { - threads.emplace_back([this, thread_id]() { + threads.emplace_back([this]() { for (int i = 0; i < OPERATIONS_PER_THREAD; i++) { // Use modulo to ensure multiple threads use the same names int name_index = i % NUM_NAMES; diff --git a/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp index e8cef41bd0..b4c2b8c6c2 100644 --- a/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp +++ b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp @@ -48,7 +48,7 @@ void SchedulerSimultaneousCallbacksComponent::run_simultaneous_callbacks_test() std::string name = ss.str(); // Schedule callback for exactly DELAY_MS from now - this->set_timeout(name, DELAY_MS, [this, thread_id, i, name]() { + this->set_timeout(name, DELAY_MS, [this, name]() { // Increment concurrent counter atomically int current = this->callbacks_at_once_.fetch_add(1) + 1; diff --git a/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp index e20745b7cc..9071e573bb 100644 --- a/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp +++ b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp @@ -59,19 +59,19 @@ void SchedulerStringNameStressComponent::run_string_name_stress_test() { // Also test nested scheduling from callbacks if (j % 10 == 0) { // Every 10th callback schedules another callback - this->set_timeout(dynamic_name, delay, [component, i, j, callback_id]() { + this->set_timeout(dynamic_name, delay, [component, callback_id]() { component->executed_callbacks_.fetch_add(1); ESP_LOGV(TAG, "Executed string-named callback %d (nested scheduler)", callback_id); // Schedule another timeout from within this callback with a new dynamic name std::string nested_name = "nested_from_" + std::to_string(callback_id); - component->set_timeout(nested_name, 1, [component, callback_id]() { + component->set_timeout(nested_name, 1, [callback_id]() { ESP_LOGV(TAG, "Executed nested string-named callback from %d", callback_id); }); }); } else { // Regular callback - this->set_timeout(dynamic_name, delay, [component, i, j, callback_id]() { + this->set_timeout(dynamic_name, delay, [component, callback_id]() { component->executed_callbacks_.fetch_add(1); ESP_LOGV(TAG, "Executed string-named callback %d", callback_id); }); diff --git a/tests/integration/fixtures/scheduler_defer_cancel.yaml b/tests/integration/fixtures/scheduler_defer_cancel.yaml new file mode 100644 index 0000000000..9e3f927c33 --- /dev/null +++ b/tests/integration/fixtures/scheduler_defer_cancel.yaml @@ -0,0 +1,51 @@ +esphome: + name: scheduler-defer-cancel + +host: + +logger: + level: DEBUG + +api: + services: + - service: test_defer_cancel + then: + - lambda: |- + // Schedule 10 defers with the same name + // Only the last one should execute + for (int i = 1; i <= 10; i++) { + App.scheduler.set_timeout(nullptr, "test_defer", 0, [i]() { + ESP_LOGI("TEST", "Defer executed: %d", i); + // Fire event with the defer number + std::string event_type = "defer_executed_" + std::to_string(i); + id(test_result)->trigger(event_type); + }); + } + + // Schedule completion notification after all defers + App.scheduler.set_timeout(nullptr, "completion", 0, []() { + ESP_LOGI("TEST", "Test complete"); + id(test_complete)->trigger("test_finished"); + }); + +event: + - platform: template + id: test_result + name: "Test Result" + event_types: + - "defer_executed_1" + - "defer_executed_2" + - "defer_executed_3" + - "defer_executed_4" + - "defer_executed_5" + - "defer_executed_6" + - "defer_executed_7" + - "defer_executed_8" + - "defer_executed_9" + - "defer_executed_10" + + - platform: template + id: test_complete + name: "Test Complete" + event_types: + - "test_finished" diff --git a/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml b/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml new file mode 100644 index 0000000000..7384082ac2 --- /dev/null +++ b/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml @@ -0,0 +1,109 @@ +esphome: + name: scheduler-defer-fifo-simple + +host: + +logger: + level: DEBUG + +api: + services: + - service: test_set_timeout + then: + - lambda: |- + // Test set_timeout with 0 delay (direct scheduler call) + static int set_timeout_order = 0; + static bool set_timeout_passed = true; + + // Reset for this test + set_timeout_order = 0; + set_timeout_passed = true; + + ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order..."); + for (int i = 0; i < 10; i++) { + int expected = i; + App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { + ESP_LOGD("defer_test", "set_timeout(0) item %d executed, order %d", expected, set_timeout_order); + if (set_timeout_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in set_timeout: expected %d but got execution order %d", expected, set_timeout_order); + set_timeout_passed = false; + } + set_timeout_order++; + + if (set_timeout_order == 10) { + if (set_timeout_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + + ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); + + - service: test_defer + then: + - lambda: |- + // Test defer() method (component method) + static int defer_order = 0; + static bool defer_passed = true; + + // Reset for this test + defer_order = 0; + defer_passed = true; + + ESP_LOGD("defer_test", "Testing defer() for FIFO order..."); + + // Create a test component class that exposes defer() + class TestComponent : public Component { + public: + void test_defer() { + for (int i = 0; i < 10; i++) { + int expected = i; + this->defer([expected]() { + ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); + if (defer_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); + defer_passed = false; + } + defer_order++; + + if (defer_order == 10) { + if (defer_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + } + }; + + // Use a static instance so it doesn't go out of scope + static TestComponent test_component; + test_component.test_defer(); + + ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/scheduler_defer_stress.yaml b/tests/integration/fixtures/scheduler_defer_stress.yaml new file mode 100644 index 0000000000..0d9c1d1405 --- /dev/null +++ b/tests/integration/fixtures/scheduler_defer_stress.yaml @@ -0,0 +1,38 @@ +esphome: + name: scheduler-defer-stress-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [defer_stress_component] + +host: + +logger: + level: VERBOSE + +defer_stress_component: + id: defer_stress + +api: + services: + - service: run_stress_test + then: + - lambda: |- + id(defer_stress)->run_multi_thread_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/test_scheduler_defer_cancel.py b/tests/integration/test_scheduler_defer_cancel.py new file mode 100644 index 0000000000..923cf946c4 --- /dev/null +++ b/tests/integration/test_scheduler_defer_cancel.py @@ -0,0 +1,94 @@ +"""Test that defer() with the same name cancels previous defers.""" + +import asyncio + +from aioesphomeapi import EntityState, Event, EventInfo, UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_defer_cancel( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() with the same name cancels previous defers.""" + + async with run_compiled(yaml_config), api_client_connected() as client: + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-defer-cancel" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test entities + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None + + for entity in entity_info: + if isinstance(entity, EventInfo): + if entity.object_id == "test_complete": + test_complete_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity + + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test service + test_defer_cancel_service: UserService | None = None + for service in services: + if service.name == "test_defer_cancel": + test_defer_cancel_service = service + + assert test_defer_cancel_service is not None, ( + "test_defer_cancel service not found" + ) + + # Get the event loop + loop = asyncio.get_running_loop() + + # Subscribe to states + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[int] = loop.create_future() + + def on_state(state: EntityState) -> None: + if not isinstance(state, Event): + return + + if ( + state.key == test_complete_entity.key + and state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + return + + if state.key == test_result_entity.key and not test_result_future.done(): + # Event type should be "defer_executed_X" where X is the defer number + if state.event_type.startswith("defer_executed_"): + defer_num = int(state.event_type.split("_")[-1]) + test_result_future.set_result(defer_num) + + client.subscribe_states(on_state) + + # Execute the test + client.execute_service(test_defer_cancel_service, {}) + + # Wait for test completion + try: + await asyncio.wait_for(test_complete_future, timeout=10.0) + executed_defer = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test did not complete within timeout") + + # Verify that only defer 10 was executed + assert executed_defer == 10, ( + f"Expected defer 10 to execute, got {executed_defer}" + ) diff --git a/tests/integration/test_scheduler_defer_fifo_simple.py b/tests/integration/test_scheduler_defer_fifo_simple.py new file mode 100644 index 0000000000..eb4058fedd --- /dev/null +++ b/tests/integration/test_scheduler_defer_fifo_simple.py @@ -0,0 +1,117 @@ +"""Simple test that defer() maintains FIFO order.""" + +import asyncio + +from aioesphomeapi import EntityState, Event, EventInfo, UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_defer_fifo_simple( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() maintains FIFO order with a simple test.""" + + async with run_compiled(yaml_config), api_client_connected() as client: + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-defer-fifo-simple" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test entities + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None + + for entity in entity_info: + if isinstance(entity, EventInfo): + if entity.object_id == "test_complete": + test_complete_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity + + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test services + test_set_timeout_service: UserService | None = None + test_defer_service: UserService | None = None + for service in services: + if service.name == "test_set_timeout": + test_set_timeout_service = service + elif service.name == "test_defer": + test_defer_service = service + + assert test_set_timeout_service is not None, ( + "test_set_timeout service not found" + ) + assert test_defer_service is not None, "test_defer service not found" + + # Get the event loop + loop = asyncio.get_running_loop() + + # Subscribe to states + # (events are delivered as EventStates through subscribe_states) + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[bool] = loop.create_future() + + def on_state(state: EntityState) -> None: + if not isinstance(state, Event): + return + + if ( + state.key == test_complete_entity.key + and state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + return + + if state.key == test_result_entity.key and not test_result_future.done(): + if state.event_type == "passed": + test_result_future.set_result(True) + elif state.event_type == "failed": + test_result_future.set_result(False) + + client.subscribe_states(on_state) + + # Test 1: Test set_timeout(0) + client.execute_service(test_set_timeout_service, {}) + + # Wait for first test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test set_timeout(0) did not complete within 5 seconds") + + assert test1_passed is True, ( + "set_timeout(0) FIFO test failed - items executed out of order" + ) + + # Reset futures for second test + test_complete_future = loop.create_future() + test_result_future = loop.create_future() + + # Test 2: Test defer() + client.execute_service(test_defer_service, {}) + + # Wait for second test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test defer() did not complete within 5 seconds") + + # Verify the test passed + assert test2_passed is True, ( + "defer() FIFO test failed - items executed out of order" + ) diff --git a/tests/integration/test_scheduler_defer_stress.py b/tests/integration/test_scheduler_defer_stress.py new file mode 100644 index 0000000000..d546b7132f --- /dev/null +++ b/tests/integration/test_scheduler_defer_stress.py @@ -0,0 +1,137 @@ +"""Stress test for defer() thread safety with multiple threads.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_defer_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() doesn't crash when called rapidly from multiple threads.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_event_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track executed defers and their order + executed_defers: set[int] = set() + thread_executions: dict[ + int, list[int] + ] = {} # thread_id -> list of indices in execution order + fifo_violations: list[str] = [] + + def on_log_line(line: str) -> None: + # Track all executed defers with thread and index info + match = re.search(r"Executed defer (\d+) \(thread (\d+), index (\d+)\)", line) + if not match: + return + + defer_id = int(match.group(1)) + thread_id = int(match.group(2)) + index = int(match.group(3)) + + executed_defers.add(defer_id) + + # Track execution order per thread + if thread_id not in thread_executions: + thread_executions[thread_id] = [] + + # Check FIFO ordering within thread + if thread_executions[thread_id] and thread_executions[thread_id][-1] >= index: + fifo_violations.append( + f"Thread {thread_id}: index {index} executed after " + f"{thread_executions[thread_id][-1]}" + ) + + thread_executions[thread_id].append(index) + + # Check if we've executed all 1000 defers (0-999) + if len(executed_defers) == 1000 and not test_complete_future.done(): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-defer-stress-test" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, "run_stress_test service not found" + + # Call the run_stress_test service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for all defers to execute (should be quick) + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + except asyncio.TimeoutError: + # Report how many we got + pytest.fail( + f"Stress test timed out. Only {len(executed_defers)} of " + f"1000 defers executed. Missing IDs: " + f"{sorted(set(range(1000)) - executed_defers)[:10]}..." + ) + + # Verify all defers executed + assert len(executed_defers) == 1000, ( + f"Expected 1000 defers, got {len(executed_defers)}" + ) + + # Verify we have all IDs from 0-999 + expected_ids = set(range(1000)) + missing_ids = expected_ids - executed_defers + assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}" + + # Verify FIFO ordering was maintained within each thread + assert not fifo_violations, "FIFO ordering violations detected:\n" + "\n".join( + fifo_violations[:10] + ) + + # Verify each thread executed all its defers in order + for thread_id, indices in thread_executions.items(): + assert len(indices) == 100, ( + f"Thread {thread_id} executed {len(indices)} defers, expected 100" + ) + # Indices should be 0-99 in ascending order + assert indices == list(range(100)), ( + f"Thread {thread_id} executed indices out of order: {indices[:10]}..." + ) + + # If we got here without crashing and with proper ordering, the test passed + assert True, ( + "Test completed successfully - all 1000 defers executed with " + "FIFO ordering preserved" + )