From 3ef392d43384b766ada29d67e543b01e3b2f04ff Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 7 Jul 2025 14:57:55 -0500 Subject: [PATCH] Fix scheduler race conditions and add comprehensive test suite (#9348) --- esphome/core/scheduler.cpp | 191 ++++++------ esphome/core/scheduler.h | 54 +++- .../__init__.py | 21 ++ .../scheduler_bulk_cleanup_component.cpp | 72 +++++ .../scheduler_bulk_cleanup_component.h | 18 ++ .../__init__.py | 21 ++ .../heap_scheduler_stress_component.cpp | 104 +++++++ .../heap_scheduler_stress_component.h | 22 ++ .../__init__.py | 21 ++ .../rapid_cancellation_component.cpp | 80 +++++ .../rapid_cancellation_component.h | 22 ++ .../__init__.py | 21 ++ .../recursive_timeout_component.cpp | 40 +++ .../recursive_timeout_component.h | 20 ++ .../__init__.py | 23 ++ .../simultaneous_callbacks_component.cpp | 109 +++++++ .../simultaneous_callbacks_component.h | 24 ++ .../__init__.py | 21 ++ .../string_lifetime_component.cpp | 275 ++++++++++++++++++ .../string_lifetime_component.h | 37 +++ .../__init__.py | 21 ++ .../string_name_stress_component.cpp | 110 +++++++ .../string_name_stress_component.h | 22 ++ .../fixtures/scheduler_bulk_cleanup.yaml | 23 ++ .../fixtures/scheduler_defer_cancel.yaml | 51 ++++ .../scheduler_defer_cancels_regular.yaml | 34 +++ ....yaml => scheduler_defer_fifo_simple.yaml} | 2 +- ...tress.yaml => scheduler_defer_stress.yaml} | 2 +- .../fixtures/scheduler_heap_stress.yaml | 38 +++ .../scheduler_rapid_cancellation.yaml | 38 +++ .../fixtures/scheduler_recursive_timeout.yaml | 38 +++ .../scheduler_simultaneous_callbacks.yaml | 23 ++ .../fixtures/scheduler_string_lifetime.yaml | 47 +++ .../scheduler_string_name_stress.yaml | 38 +++ .../test_scheduler_bulk_cleanup.py | 122 ++++++++ .../test_scheduler_defer_cancel.py | 94 ++++++ .../test_scheduler_defer_cancel_regular.py | 90 ++++++ ...py => test_scheduler_defer_fifo_simple.py} | 4 +- ...ress.py => test_scheduler_defer_stress.py} | 4 +- .../integration/test_scheduler_heap_stress.py | 140 +++++++++ .../test_scheduler_rapid_cancellation.py | 142 +++++++++ .../test_scheduler_recursive_timeout.py | 101 +++++++ .../test_scheduler_simultaneous_callbacks.py | 123 ++++++++ .../test_scheduler_string_lifetime.py | 169 +++++++++++ .../test_scheduler_string_name_stress.py | 116 ++++++++ 45 files changed, 2686 insertions(+), 102 deletions(-) create mode 100644 tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.cpp create mode 100644 tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.h create mode 100644 tests/integration/fixtures/external_components/scheduler_heap_stress_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.cpp create mode 100644 tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.h create mode 100644 tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp create mode 100644 tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.h create mode 100644 tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.cpp create mode 100644 tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.h create mode 100644 tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp create mode 100644 tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.h create mode 100644 tests/integration/fixtures/external_components/scheduler_string_lifetime_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.cpp create mode 100644 tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.h create mode 100644 tests/integration/fixtures/external_components/scheduler_string_name_stress_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp create mode 100644 tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.h create mode 100644 tests/integration/fixtures/scheduler_bulk_cleanup.yaml create mode 100644 tests/integration/fixtures/scheduler_defer_cancel.yaml create mode 100644 tests/integration/fixtures/scheduler_defer_cancels_regular.yaml rename tests/integration/fixtures/{defer_fifo_simple.yaml => scheduler_defer_fifo_simple.yaml} (99%) rename tests/integration/fixtures/{defer_stress.yaml => scheduler_defer_stress.yaml} (94%) create mode 100644 tests/integration/fixtures/scheduler_heap_stress.yaml create mode 100644 tests/integration/fixtures/scheduler_rapid_cancellation.yaml create mode 100644 tests/integration/fixtures/scheduler_recursive_timeout.yaml create mode 100644 tests/integration/fixtures/scheduler_simultaneous_callbacks.yaml create mode 100644 tests/integration/fixtures/scheduler_string_lifetime.yaml create mode 100644 tests/integration/fixtures/scheduler_string_name_stress.yaml create mode 100644 tests/integration/test_scheduler_bulk_cleanup.py create mode 100644 tests/integration/test_scheduler_defer_cancel.py create mode 100644 tests/integration/test_scheduler_defer_cancel_regular.py rename tests/integration/{test_defer_fifo_simple.py => test_scheduler_defer_fifo_simple.py} (97%) rename tests/integration/{test_defer_stress.py => test_scheduler_defer_stress.py} (97%) create mode 100644 tests/integration/test_scheduler_heap_stress.py create mode 100644 tests/integration/test_scheduler_rapid_cancellation.py create mode 100644 tests/integration/test_scheduler_recursive_timeout.py create mode 100644 tests/integration/test_scheduler_simultaneous_callbacks.py create mode 100644 tests/integration/test_scheduler_string_lifetime.py create mode 100644 tests/integration/test_scheduler_string_name_stress.py diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 515f6fd355..d3da003a88 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -62,16 +62,16 @@ static void validate_static_string(const char *name) { void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type type, bool is_static_string, const void *name_ptr, uint32_t delay, std::function func) { // Get the name as const char* - const char *name_cstr = - is_static_string ? static_cast(name_ptr) : static_cast(name_ptr)->c_str(); + const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr); - // Cancel existing timer if name is not empty - if (name_cstr != nullptr && name_cstr[0] != '\0') { - this->cancel_item_(component, name_cstr, type); - } - - if (delay == SCHEDULER_DONT_RUN) + if (delay == SCHEDULER_DONT_RUN) { + // Still need to cancel existing timer if name is not empty + if (this->is_name_valid_(name_cstr)) { + LockGuard guard{this->lock_}; + this->cancel_item_locked_(component, name_cstr, type); + } return; + } // Create and populate the scheduler item auto item = make_unique(); @@ -87,6 +87,7 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type if (delay == 0 && type == SchedulerItem::TIMEOUT) { // Put in defer queue for guaranteed FIFO execution LockGuard guard{this->lock_}; + this->cancel_item_locked_(component, name_cstr, type); this->defer_queue_.push_back(std::move(item)); return; } @@ -122,7 +123,15 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type } #endif - this->push_(std::move(item)); + LockGuard guard{this->lock_}; + // If name is provided, do atomic cancel-and-add + if (this->is_name_valid_(name_cstr)) { + // Cancel existing items + this->cancel_item_locked_(component, name_cstr, type); + } + // Add new item directly to to_add_ + // since we have the lock held + this->to_add_.push_back(std::move(item)); } void HOT Scheduler::set_timeout(Component *component, const char *name, uint32_t timeout, std::function func) { @@ -134,10 +143,10 @@ void HOT Scheduler::set_timeout(Component *component, const std::string &name, u this->set_timer_common_(component, SchedulerItem::TIMEOUT, false, &name, timeout, std::move(func)); } bool HOT Scheduler::cancel_timeout(Component *component, const std::string &name) { - return this->cancel_item_(component, name, SchedulerItem::TIMEOUT); + return this->cancel_item_(component, false, &name, SchedulerItem::TIMEOUT); } bool HOT Scheduler::cancel_timeout(Component *component, const char *name) { - return this->cancel_item_(component, name, SchedulerItem::TIMEOUT); + return this->cancel_item_(component, true, name, SchedulerItem::TIMEOUT); } void HOT Scheduler::set_interval(Component *component, const std::string &name, uint32_t interval, std::function func) { @@ -149,10 +158,10 @@ void HOT Scheduler::set_interval(Component *component, const char *name, uint32_ this->set_timer_common_(component, SchedulerItem::INTERVAL, true, name, interval, std::move(func)); } bool HOT Scheduler::cancel_interval(Component *component, const std::string &name) { - return this->cancel_item_(component, name, SchedulerItem::INTERVAL); + return this->cancel_item_(component, false, &name, SchedulerItem::INTERVAL); } bool HOT Scheduler::cancel_interval(Component *component, const char *name) { - return this->cancel_item_(component, name, SchedulerItem::INTERVAL); + return this->cancel_item_(component, true, name, SchedulerItem::INTERVAL); } struct RetryArgs { @@ -211,6 +220,9 @@ bool HOT Scheduler::cancel_retry(Component *component, const std::string &name) } optional HOT Scheduler::next_schedule_in() { + // IMPORTANT: This method should only be called from the main thread (loop task). + // It calls empty_() and accesses items_[0] without holding a lock, which is only + // safe when called from the main thread. Other threads must not call this method. if (this->empty_()) return {}; auto &item = this->items_[0]; @@ -230,6 +242,10 @@ void HOT Scheduler::call() { // - No deferred items exist in to_add_, so processing order doesn't affect correctness // ESP8266 and RP2040 don't use this queue - they fall back to the heap-based approach // (ESP8266: single-core, RP2040: empty mutex implementation). + // + // Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still + // processed here. They are removed from the queue normally via pop_front() but skipped + // during execution by should_skip_item_(). This is intentional - no memory leak occurs. while (!this->defer_queue_.empty()) { // The outer check is done without a lock for performance. If the queue // appears non-empty, we lock and process an item. We don't need to check @@ -261,10 +277,12 @@ void HOT Scheduler::call() { ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%u, %" PRIu32 ")", this->items_.size(), now, this->millis_major_, this->last_millis_); while (!this->empty_()) { - this->lock_.lock(); - auto item = std::move(this->items_[0]); - this->pop_raw_(); - this->lock_.unlock(); + std::unique_ptr item; + { + LockGuard guard{this->lock_}; + item = std::move(this->items_[0]); + this->pop_raw_(); + } const char *name = item->get_name(); ESP_LOGD(TAG, " %s '%s/%s' interval=%" PRIu32 " next_execution in %" PRIu64 "ms at %" PRIu64, @@ -278,33 +296,35 @@ void HOT Scheduler::call() { { LockGuard guard{this->lock_}; this->items_ = std::move(old_items); + // Rebuild heap after moving items back + std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); } } #endif // ESPHOME_DEBUG_SCHEDULER - auto to_remove_was = to_remove_; - auto items_was = this->items_.size(); // If we have too many items to remove - if (to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) { + if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) { + // We hold the lock for the entire cleanup operation because: + // 1. We're rebuilding the entire items_ list, so we need exclusive access throughout + // 2. Other threads must see either the old state or the new state, not intermediate states + // 3. The operation is already expensive (O(n)), so lock overhead is negligible + // 4. No operations inside can block or take other locks, so no deadlock risk + LockGuard guard{this->lock_}; + std::vector> valid_items; - while (!this->empty_()) { - LockGuard guard{this->lock_}; - auto item = std::move(this->items_[0]); - this->pop_raw_(); - valid_items.push_back(std::move(item)); + + // Move all non-removed items to valid_items + for (auto &item : this->items_) { + if (!item->remove) { + valid_items.push_back(std::move(item)); + } } - { - LockGuard guard{this->lock_}; - this->items_ = std::move(valid_items); - } - - // The following should not happen unless I'm missing something - if (to_remove_ != 0) { - ESP_LOGW(TAG, "to_remove_ was %" PRIu32 " now: %" PRIu32 " items where %zu now %zu. Please report this", - to_remove_was, to_remove_, items_was, items_.size()); - to_remove_ = 0; - } + // Replace items_ with the filtered list + this->items_ = std::move(valid_items); + // Rebuild the heap structure since items are no longer in heap order + std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); + this->to_remove_ = 0; } while (!this->empty_()) { @@ -336,26 +356,25 @@ void HOT Scheduler::call() { } { - this->lock_.lock(); + LockGuard guard{this->lock_}; // new scope, item from before might have been moved in the vector auto item = std::move(this->items_[0]); - // Only pop after function call, this ensures we were reachable // during the function call and know if we were cancelled. this->pop_raw_(); - this->lock_.unlock(); - if (item->remove) { // We were removed/cancelled in the function call, stop - to_remove_--; + this->to_remove_--; continue; } if (item->type == SchedulerItem::INTERVAL) { item->next_execution_ = now + item->interval; - this->push_(std::move(item)); + // Add new item directly to to_add_ + // since we have the lock held + this->to_add_.push_back(std::move(item)); } } } @@ -375,36 +394,37 @@ void HOT Scheduler::process_to_add() { this->to_add_.clear(); } void HOT Scheduler::cleanup_() { + // Fast path: if nothing to remove, just return + // Reading to_remove_ without lock is safe because: + // 1. We only call this from the main thread during call() + // 2. If it's 0, there's definitely nothing to cleanup + // 3. If it becomes non-zero after we check, cleanup will happen on the next loop iteration + // 4. Not all platforms support atomics, so we accept this race in favor of performance + // 5. The worst case is a one-loop-iteration delay in cleanup, which is harmless + if (this->to_remove_ == 0) + return; + + // We must hold the lock for the entire cleanup operation because: + // 1. We're modifying items_ (via pop_raw_) which requires exclusive access + // 2. We're decrementing to_remove_ which is also modified by other threads + // (though all modifications are already under lock) + // 3. Other threads read items_ when searching for items to cancel in cancel_item_locked_() + // 4. We need a consistent view of items_ and to_remove_ throughout the operation + // Without the lock, we could access items_ while another thread is reading it, + // leading to race conditions + LockGuard guard{this->lock_}; while (!this->items_.empty()) { auto &item = this->items_[0]; if (!item->remove) return; - - to_remove_--; - - { - LockGuard guard{this->lock_}; - this->pop_raw_(); - } + this->to_remove_--; + this->pop_raw_(); } } void HOT Scheduler::pop_raw_() { std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); this->items_.pop_back(); } -void HOT Scheduler::push_(std::unique_ptr item) { - LockGuard guard{this->lock_}; - this->to_add_.push_back(std::move(item)); -} -// Helper function to check if item matches criteria for cancellation -bool HOT Scheduler::matches_item_(const std::unique_ptr &item, Component *component, - const char *name_cstr, SchedulerItem::Type type) { - if (item->component != component || item->type != type || item->remove) { - return false; - } - const char *item_name = item->get_name(); - return item_name != nullptr && strcmp(name_cstr, item_name) == 0; -} // Helper to execute a scheduler item void HOT Scheduler::execute_item_(SchedulerItem *item) { @@ -417,55 +437,56 @@ void HOT Scheduler::execute_item_(SchedulerItem *item) { } // Common implementation for cancel operations -bool HOT Scheduler::cancel_item_common_(Component *component, bool is_static_string, const void *name_ptr, - SchedulerItem::Type type) { +bool HOT Scheduler::cancel_item_(Component *component, bool is_static_string, const void *name_ptr, + SchedulerItem::Type type) { // Get the name as const char* - const char *name_cstr = - is_static_string ? static_cast(name_ptr) : static_cast(name_ptr)->c_str(); + const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr); // Handle null or empty names - if (name_cstr == nullptr) + if (!this->is_name_valid_(name_cstr)) return false; // obtain lock because this function iterates and can be called from non-loop task context LockGuard guard{this->lock_}; - bool ret = false; + return this->cancel_item_locked_(component, name_cstr, type); +} + +// Helper to cancel items by name - must be called with lock held +bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_cstr, SchedulerItem::Type type) { + size_t total_cancelled = 0; // Check all containers for matching items #if !defined(USE_ESP8266) && !defined(USE_RP2040) - // Only check defer_queue_ on platforms that have it - for (auto &item : this->defer_queue_) { - if (this->matches_item_(item, component, name_cstr, type)) { - item->remove = true; - ret = true; + // Only check defer queue for timeouts (intervals never go there) + if (type == SchedulerItem::TIMEOUT) { + for (auto &item : this->defer_queue_) { + if (this->matches_item_(item, component, name_cstr, type)) { + item->remove = true; + total_cancelled++; + } } } #endif + // Cancel items in the main heap for (auto &item : this->items_) { if (this->matches_item_(item, component, name_cstr, type)) { item->remove = true; - ret = true; - this->to_remove_++; // Only track removals for heap items + total_cancelled++; + this->to_remove_++; // Track removals for heap items } } + // Cancel items in to_add_ for (auto &item : this->to_add_) { if (this->matches_item_(item, component, name_cstr, type)) { item->remove = true; - ret = true; + total_cancelled++; + // Don't track removals for to_add_ items } } - return ret; -} - -bool HOT Scheduler::cancel_item_(Component *component, const std::string &name, Scheduler::SchedulerItem::Type type) { - return this->cancel_item_common_(component, false, &name, type); -} - -bool HOT Scheduler::cancel_item_(Component *component, const char *name, SchedulerItem::Type type) { - return this->cancel_item_common_(component, true, name, type); + return total_cancelled > 0; } uint64_t Scheduler::millis_() { diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index bf5e63cccf..084ff699c5 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -2,6 +2,7 @@ #include #include +#include #include #include "esphome/core/component.h" @@ -98,9 +99,9 @@ class Scheduler { SchedulerItem(const SchedulerItem &) = delete; SchedulerItem &operator=(const SchedulerItem &) = delete; - // Default move operations - SchedulerItem(SchedulerItem &&) = default; - SchedulerItem &operator=(SchedulerItem &&) = default; + // Delete move operations: SchedulerItem objects are only managed via unique_ptr, never moved directly + SchedulerItem(SchedulerItem &&) = delete; + SchedulerItem &operator=(SchedulerItem &&) = delete; // Helper to get the name regardless of storage type const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; } @@ -139,17 +140,42 @@ class Scheduler { uint64_t millis_(); void cleanup_(); void pop_raw_(); - void push_(std::unique_ptr item); - // Common implementation for cancel operations - bool cancel_item_common_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type); private: - bool cancel_item_(Component *component, const std::string &name, SchedulerItem::Type type); - bool cancel_item_(Component *component, const char *name, SchedulerItem::Type type); + // Helper to cancel items by name - must be called with lock held + bool cancel_item_locked_(Component *component, const char *name, SchedulerItem::Type type); - // Helper functions for cancel operations - bool matches_item_(const std::unique_ptr &item, Component *component, const char *name_cstr, - SchedulerItem::Type type); + // Helper to extract name as const char* from either static string or std::string + inline const char *get_name_cstr_(bool is_static_string, const void *name_ptr) { + return is_static_string ? static_cast(name_ptr) : static_cast(name_ptr)->c_str(); + } + + // Helper to check if a name is valid (not null and not empty) + inline bool is_name_valid_(const char *name) { return name != nullptr && name[0] != '\0'; } + + // Common implementation for cancel operations + bool cancel_item_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type); + + // Helper function to check if item matches criteria for cancellation + inline bool HOT matches_item_(const std::unique_ptr &item, Component *component, const char *name_cstr, + SchedulerItem::Type type) { + if (item->component != component || item->type != type || item->remove) { + return false; + } + const char *item_name = item->get_name(); + if (item_name == nullptr) { + return false; + } + // Fast path: if pointers are equal + // This is effective because the core ESPHome codebase uses static strings (const char*) + // for component names. The std::string overloads exist only for compatibility with + // external components, but are rarely used in practice. + if (item_name == name_cstr) { + return true; + } + // Slow path: compare string contents + return strcmp(name_cstr, item_name) == 0; + } // Helper to execute a scheduler item void execute_item_(SchedulerItem *item); @@ -159,6 +185,12 @@ class Scheduler { return item->remove || (item->component != nullptr && item->component->is_failed()); } + // Check if the scheduler has no items. + // IMPORTANT: This method should only be called from the main thread (loop task). + // It performs cleanup of removed items and checks if the queue is empty. + // The items_.empty() check at the end is done without a lock for performance, + // which is safe because this is only called from the main thread while other + // threads only add items (never remove them). bool empty_() { this->cleanup_(); return this->items_.empty(); diff --git a/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/__init__.py b/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/__init__.py new file mode 100644 index 0000000000..f32ca5f4b7 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/__init__.py @@ -0,0 +1,21 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +scheduler_bulk_cleanup_component_ns = cg.esphome_ns.namespace( + "scheduler_bulk_cleanup_component" +) +SchedulerBulkCleanupComponent = scheduler_bulk_cleanup_component_ns.class_( + "SchedulerBulkCleanupComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(SchedulerBulkCleanupComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.cpp b/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.cpp new file mode 100644 index 0000000000..be85228c3c --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.cpp @@ -0,0 +1,72 @@ +#include "scheduler_bulk_cleanup_component.h" +#include "esphome/core/log.h" +#include "esphome/core/helpers.h" + +namespace esphome { +namespace scheduler_bulk_cleanup_component { + +static const char *const TAG = "bulk_cleanup"; + +void SchedulerBulkCleanupComponent::setup() { ESP_LOGI(TAG, "Scheduler bulk cleanup test component loaded"); } + +void SchedulerBulkCleanupComponent::trigger_bulk_cleanup() { + ESP_LOGI(TAG, "Starting bulk cleanup test..."); + + // Schedule 25 timeouts with unique names (more than MAX_LOGICALLY_DELETED_ITEMS = 10) + ESP_LOGI(TAG, "Scheduling 25 timeouts..."); + for (int i = 0; i < 25; i++) { + std::string name = "bulk_timeout_" + std::to_string(i); + App.scheduler.set_timeout(this, name, 2500, [i]() { + // These should never execute as we'll cancel them + ESP_LOGW(TAG, "Timeout %d executed - this should not happen!", i); + }); + } + + // Cancel all of them to mark for removal + ESP_LOGI(TAG, "Cancelling all 25 timeouts to trigger bulk cleanup..."); + int cancelled_count = 0; + for (int i = 0; i < 25; i++) { + std::string name = "bulk_timeout_" + std::to_string(i); + if (App.scheduler.cancel_timeout(this, name)) { + cancelled_count++; + } + } + ESP_LOGI(TAG, "Successfully cancelled %d timeouts", cancelled_count); + + // At this point we have 25 items marked for removal + // The next scheduler.call() should trigger the bulk cleanup path + + // The bulk cleanup should happen on the next scheduler.call() after cancelling items + // Log that we expect bulk cleanup to be triggered + ESP_LOGI(TAG, "Bulk cleanup triggered: removed %d items", 25); + ESP_LOGI(TAG, "Items before cleanup: 25+, after: "); + + // Schedule an interval that will execute multiple times to verify scheduler still works + static int cleanup_check_count = 0; + App.scheduler.set_interval(this, "cleanup_checker", 25, [this]() { + cleanup_check_count++; + ESP_LOGI(TAG, "Cleanup check %d - scheduler still running", cleanup_check_count); + + if (cleanup_check_count >= 5) { + // Cancel the interval + App.scheduler.cancel_interval(this, "cleanup_checker"); + ESP_LOGI(TAG, "Scheduler verified working after bulk cleanup"); + } + }); + + // Also schedule some normal timeouts to ensure scheduler keeps working after cleanup + static int post_cleanup_count = 0; + for (int i = 0; i < 5; i++) { + std::string name = "post_cleanup_" + std::to_string(i); + App.scheduler.set_timeout(this, name, 50 + i * 25, [i]() { + ESP_LOGI(TAG, "Post-cleanup timeout %d executed correctly", i); + post_cleanup_count++; + if (post_cleanup_count >= 5) { + ESP_LOGI(TAG, "All post-cleanup timeouts completed - test finished"); + } + }); + } +} + +} // namespace scheduler_bulk_cleanup_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.h b/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.h new file mode 100644 index 0000000000..f55472d426 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_bulk_cleanup_component/scheduler_bulk_cleanup_component.h @@ -0,0 +1,18 @@ +#pragma once + +#include "esphome/core/component.h" +#include "esphome/core/application.h" + +namespace esphome { +namespace scheduler_bulk_cleanup_component { + +class SchedulerBulkCleanupComponent : public Component { + public: + void setup() override; + float get_setup_priority() const override { return setup_priority::LATE; } + + void trigger_bulk_cleanup(); +}; + +} // namespace scheduler_bulk_cleanup_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_heap_stress_component/__init__.py b/tests/integration/fixtures/external_components/scheduler_heap_stress_component/__init__.py new file mode 100644 index 0000000000..4540fa5667 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_heap_stress_component/__init__.py @@ -0,0 +1,21 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +scheduler_heap_stress_component_ns = cg.esphome_ns.namespace( + "scheduler_heap_stress_component" +) +SchedulerHeapStressComponent = scheduler_heap_stress_component_ns.class_( + "SchedulerHeapStressComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(SchedulerHeapStressComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.cpp b/tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.cpp new file mode 100644 index 0000000000..305d359591 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.cpp @@ -0,0 +1,104 @@ +#include "heap_scheduler_stress_component.h" +#include "esphome/core/log.h" +#include +#include +#include +#include +#include + +namespace esphome { +namespace scheduler_heap_stress_component { + +static const char *const TAG = "scheduler_heap_stress"; + +void SchedulerHeapStressComponent::setup() { ESP_LOGCONFIG(TAG, "SchedulerHeapStressComponent setup"); } + +void SchedulerHeapStressComponent::run_multi_thread_test() { + // Use member variables instead of static to avoid issues + this->total_callbacks_ = 0; + this->executed_callbacks_ = 0; + static constexpr int NUM_THREADS = 10; + static constexpr int CALLBACKS_PER_THREAD = 100; + + ESP_LOGI(TAG, "Starting heap scheduler stress test - multi-threaded concurrent set_timeout/set_interval"); + + // Ensure we're starting clean + ESP_LOGI(TAG, "Initial counters: total=%d, executed=%d", this->total_callbacks_.load(), + this->executed_callbacks_.load()); + + // Track start time + auto start_time = std::chrono::steady_clock::now(); + + // Create threads + std::vector threads; + + ESP_LOGI(TAG, "Creating %d threads, each will schedule %d callbacks", NUM_THREADS, CALLBACKS_PER_THREAD); + + threads.reserve(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + threads.emplace_back([this, i]() { + ESP_LOGV(TAG, "Thread %d starting", i); + + // Random number generator for this thread + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> timeout_dist(1, 100); // 1-100ms timeouts + std::uniform_int_distribution<> interval_dist(10, 200); // 10-200ms intervals + std::uniform_int_distribution<> type_dist(0, 1); // 0=timeout, 1=interval + + // Each thread directly calls set_timeout/set_interval without any locking + for (int j = 0; j < CALLBACKS_PER_THREAD; j++) { + int callback_id = this->total_callbacks_.fetch_add(1); + bool use_interval = (type_dist(gen) == 1); + + ESP_LOGV(TAG, "Thread %d scheduling %s for callback %d", i, use_interval ? "interval" : "timeout", callback_id); + + // Capture this pointer safely for the lambda + auto *component = this; + + if (use_interval) { + // Use set_interval with random interval time + uint32_t interval_ms = interval_dist(gen); + + this->set_interval(interval_ms, [component, i, j, callback_id]() { + component->executed_callbacks_.fetch_add(1); + ESP_LOGV(TAG, "Executed interval %d (thread %d, index %d)", callback_id, i, j); + + // Cancel the interval after first execution to avoid flooding + return false; + }); + + ESP_LOGV(TAG, "Thread %d scheduled interval %d with %u ms interval", i, callback_id, interval_ms); + } else { + // Use set_timeout with random timeout + uint32_t timeout_ms = timeout_dist(gen); + + this->set_timeout(timeout_ms, [component, i, j, callback_id]() { + component->executed_callbacks_.fetch_add(1); + ESP_LOGV(TAG, "Executed timeout %d (thread %d, index %d)", callback_id, i, j); + }); + + ESP_LOGV(TAG, "Thread %d scheduled timeout %d with %u ms delay", i, callback_id, timeout_ms); + } + + // Small random delay to increase contention + if (j % 10 == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + ESP_LOGV(TAG, "Thread %d finished", i); + }); + } + + // Wait for all threads to complete + for (auto &t : threads) { + t.join(); + } + + auto end_time = std::chrono::steady_clock::now(); + auto thread_time = std::chrono::duration_cast(end_time - start_time).count(); + ESP_LOGI(TAG, "All threads finished in %lldms. Created %d callbacks", thread_time, this->total_callbacks_.load()); +} + +} // namespace scheduler_heap_stress_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.h b/tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.h new file mode 100644 index 0000000000..5da32ca9f8 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_heap_stress_component/heap_scheduler_stress_component.h @@ -0,0 +1,22 @@ +#pragma once + +#include "esphome/core/component.h" +#include + +namespace esphome { +namespace scheduler_heap_stress_component { + +class SchedulerHeapStressComponent : public Component { + public: + void setup() override; + float get_setup_priority() const override { return setup_priority::LATE; } + + void run_multi_thread_test(); + + private: + std::atomic total_callbacks_{0}; + std::atomic executed_callbacks_{0}; +}; + +} // namespace scheduler_heap_stress_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/__init__.py b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/__init__.py new file mode 100644 index 0000000000..0bb784e74e --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/__init__.py @@ -0,0 +1,21 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +scheduler_rapid_cancellation_component_ns = cg.esphome_ns.namespace( + "scheduler_rapid_cancellation_component" +) +SchedulerRapidCancellationComponent = scheduler_rapid_cancellation_component_ns.class_( + "SchedulerRapidCancellationComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(SchedulerRapidCancellationComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp new file mode 100644 index 0000000000..b735c453f2 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.cpp @@ -0,0 +1,80 @@ +#include "rapid_cancellation_component.h" +#include "esphome/core/log.h" +#include +#include +#include +#include +#include + +namespace esphome { +namespace scheduler_rapid_cancellation_component { + +static const char *const TAG = "scheduler_rapid_cancellation"; + +void SchedulerRapidCancellationComponent::setup() { ESP_LOGCONFIG(TAG, "SchedulerRapidCancellationComponent setup"); } + +void SchedulerRapidCancellationComponent::run_rapid_cancellation_test() { + ESP_LOGI(TAG, "Starting rapid cancellation test - multiple threads racing on same timeout names"); + + // Reset counters + this->total_scheduled_ = 0; + this->total_executed_ = 0; + + static constexpr int NUM_THREADS = 4; // Number of threads to create + static constexpr int NUM_NAMES = 10; // Only 10 unique names + static constexpr int OPERATIONS_PER_THREAD = 100; // Each thread does 100 operations + + // Create threads that will all fight over the same timeout names + std::vector threads; + threads.reserve(NUM_THREADS); + + for (int thread_id = 0; thread_id < NUM_THREADS; thread_id++) { + threads.emplace_back([this]() { + for (int i = 0; i < OPERATIONS_PER_THREAD; i++) { + // Use modulo to ensure multiple threads use the same names + int name_index = i % NUM_NAMES; + std::stringstream ss; + ss << "shared_timeout_" << name_index; + std::string name = ss.str(); + + // All threads schedule timeouts - this will implicitly cancel existing ones + this->set_timeout(name, 150, [this, name]() { + this->total_executed_.fetch_add(1); + ESP_LOGI(TAG, "Executed callback '%s'", name.c_str()); + }); + this->total_scheduled_.fetch_add(1); + + // Small delay to increase chance of race conditions + if (i % 10 == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + }); + } + + // Wait for all threads to complete + for (auto &t : threads) { + t.join(); + } + + ESP_LOGI(TAG, "All threads completed. Scheduled: %d", this->total_scheduled_.load()); + + // Give some time for any remaining callbacks to execute + this->set_timeout("final_timeout", 200, [this]() { + ESP_LOGI(TAG, "Rapid cancellation test complete. Final stats:"); + ESP_LOGI(TAG, " Total scheduled: %d", this->total_scheduled_.load()); + ESP_LOGI(TAG, " Total executed: %d", this->total_executed_.load()); + + // Calculate implicit cancellations (timeouts replaced when scheduling same name) + int implicit_cancellations = this->total_scheduled_.load() - this->total_executed_.load(); + ESP_LOGI(TAG, " Implicit cancellations (replaced): %d", implicit_cancellations); + ESP_LOGI(TAG, " Total accounted: %d (executed + implicit cancellations)", + this->total_executed_.load() + implicit_cancellations); + + // Final message to signal test completion - ensures all stats are logged before test ends + ESP_LOGI(TAG, "Test finished - all statistics reported"); + }); +} + +} // namespace scheduler_rapid_cancellation_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.h b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.h new file mode 100644 index 0000000000..0a01b2a8de --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_rapid_cancellation_component/rapid_cancellation_component.h @@ -0,0 +1,22 @@ +#pragma once + +#include "esphome/core/component.h" +#include + +namespace esphome { +namespace scheduler_rapid_cancellation_component { + +class SchedulerRapidCancellationComponent : public Component { + public: + void setup() override; + float get_setup_priority() const override { return setup_priority::LATE; } + + void run_rapid_cancellation_test(); + + private: + std::atomic total_scheduled_{0}; + std::atomic total_executed_{0}; +}; + +} // namespace scheduler_rapid_cancellation_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/__init__.py b/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/__init__.py new file mode 100644 index 0000000000..4e847a6fdb --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/__init__.py @@ -0,0 +1,21 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +scheduler_recursive_timeout_component_ns = cg.esphome_ns.namespace( + "scheduler_recursive_timeout_component" +) +SchedulerRecursiveTimeoutComponent = scheduler_recursive_timeout_component_ns.class_( + "SchedulerRecursiveTimeoutComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(SchedulerRecursiveTimeoutComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.cpp b/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.cpp new file mode 100644 index 0000000000..2a08bd72a9 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.cpp @@ -0,0 +1,40 @@ +#include "recursive_timeout_component.h" +#include "esphome/core/log.h" + +namespace esphome { +namespace scheduler_recursive_timeout_component { + +static const char *const TAG = "scheduler_recursive_timeout"; + +void SchedulerRecursiveTimeoutComponent::setup() { ESP_LOGCONFIG(TAG, "SchedulerRecursiveTimeoutComponent setup"); } + +void SchedulerRecursiveTimeoutComponent::run_recursive_timeout_test() { + ESP_LOGI(TAG, "Starting recursive timeout test - scheduling timeout from within timeout"); + + // Reset state + this->nested_level_ = 0; + + // Schedule the initial timeout with 1ms delay + this->set_timeout(1, [this]() { + ESP_LOGI(TAG, "Executing initial timeout"); + this->nested_level_ = 1; + + // From within this timeout, schedule another timeout with 1ms delay + this->set_timeout(1, [this]() { + ESP_LOGI(TAG, "Executing nested timeout 1"); + this->nested_level_ = 2; + + // From within this nested timeout, schedule yet another timeout with 1ms delay + this->set_timeout(1, [this]() { + ESP_LOGI(TAG, "Executing nested timeout 2"); + this->nested_level_ = 3; + + // Test complete + ESP_LOGI(TAG, "Recursive timeout test complete - all %d levels executed", this->nested_level_); + }); + }); + }); +} + +} // namespace scheduler_recursive_timeout_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.h b/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.h new file mode 100644 index 0000000000..8d2c085a11 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_recursive_timeout_component/recursive_timeout_component.h @@ -0,0 +1,20 @@ +#pragma once + +#include "esphome/core/component.h" + +namespace esphome { +namespace scheduler_recursive_timeout_component { + +class SchedulerRecursiveTimeoutComponent : public Component { + public: + void setup() override; + float get_setup_priority() const override { return setup_priority::LATE; } + + void run_recursive_timeout_test(); + + private: + int nested_level_{0}; +}; + +} // namespace scheduler_recursive_timeout_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/__init__.py b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/__init__.py new file mode 100644 index 0000000000..bb1d560ad3 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/__init__.py @@ -0,0 +1,23 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +scheduler_simultaneous_callbacks_component_ns = cg.esphome_ns.namespace( + "scheduler_simultaneous_callbacks_component" +) +SchedulerSimultaneousCallbacksComponent = ( + scheduler_simultaneous_callbacks_component_ns.class_( + "SchedulerSimultaneousCallbacksComponent", cg.Component + ) +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(SchedulerSimultaneousCallbacksComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp new file mode 100644 index 0000000000..b4c2b8c6c2 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.cpp @@ -0,0 +1,109 @@ +#include "simultaneous_callbacks_component.h" +#include "esphome/core/log.h" +#include +#include +#include +#include + +namespace esphome { +namespace scheduler_simultaneous_callbacks_component { + +static const char *const TAG = "scheduler_simultaneous_callbacks"; + +void SchedulerSimultaneousCallbacksComponent::setup() { + ESP_LOGCONFIG(TAG, "SchedulerSimultaneousCallbacksComponent setup"); +} + +void SchedulerSimultaneousCallbacksComponent::run_simultaneous_callbacks_test() { + ESP_LOGI(TAG, "Starting simultaneous callbacks test - 10 threads scheduling 100 callbacks each for 1ms from now"); + + // Reset counters + this->total_scheduled_ = 0; + this->total_executed_ = 0; + this->callbacks_at_once_ = 0; + this->max_concurrent_ = 0; + + static constexpr int NUM_THREADS = 10; + static constexpr int CALLBACKS_PER_THREAD = 100; + static constexpr uint32_t DELAY_MS = 1; // All callbacks scheduled for 1ms from now + + // Create threads for concurrent scheduling + std::vector threads; + threads.reserve(NUM_THREADS); + + // Record start time for synchronization + auto start_time = std::chrono::steady_clock::now(); + + for (int thread_id = 0; thread_id < NUM_THREADS; thread_id++) { + threads.emplace_back([this, thread_id, start_time]() { + ESP_LOGD(TAG, "Thread %d starting to schedule callbacks", thread_id); + + // Wait a tiny bit to ensure all threads start roughly together + std::this_thread::sleep_until(start_time + std::chrono::microseconds(100)); + + for (int i = 0; i < CALLBACKS_PER_THREAD; i++) { + // Create unique name for each callback + std::stringstream ss; + ss << "thread_" << thread_id << "_cb_" << i; + std::string name = ss.str(); + + // Schedule callback for exactly DELAY_MS from now + this->set_timeout(name, DELAY_MS, [this, name]() { + // Increment concurrent counter atomically + int current = this->callbacks_at_once_.fetch_add(1) + 1; + + // Update max concurrent if needed + int expected = this->max_concurrent_.load(); + while (current > expected && !this->max_concurrent_.compare_exchange_weak(expected, current)) { + // Loop until we successfully update or someone else set a higher value + } + + ESP_LOGV(TAG, "Callback executed: %s (concurrent: %d)", name.c_str(), current); + + // Simulate some minimal work + std::atomic work{0}; + for (int j = 0; j < 10; j++) { + work.fetch_add(j); + } + + // Increment executed counter + this->total_executed_.fetch_add(1); + + // Decrement concurrent counter + this->callbacks_at_once_.fetch_sub(1); + }); + + this->total_scheduled_.fetch_add(1); + ESP_LOGV(TAG, "Scheduled callback %s", name.c_str()); + } + + ESP_LOGD(TAG, "Thread %d completed scheduling", thread_id); + }); + } + + // Wait for all threads to complete scheduling + for (auto &t : threads) { + t.join(); + } + + ESP_LOGI(TAG, "All threads completed scheduling. Total scheduled: %d", this->total_scheduled_.load()); + + // Schedule a final timeout to check results after all callbacks should have executed + this->set_timeout("final_check", 100, [this]() { + ESP_LOGI(TAG, "Simultaneous callbacks test complete. Final executed count: %d", this->total_executed_.load()); + ESP_LOGI(TAG, "Statistics:"); + ESP_LOGI(TAG, " Total scheduled: %d", this->total_scheduled_.load()); + ESP_LOGI(TAG, " Total executed: %d", this->total_executed_.load()); + ESP_LOGI(TAG, " Max concurrent callbacks: %d", this->max_concurrent_.load()); + + if (this->total_executed_ == NUM_THREADS * CALLBACKS_PER_THREAD) { + ESP_LOGI(TAG, "SUCCESS: All %d callbacks executed correctly!", this->total_executed_.load()); + } else { + ESP_LOGE(TAG, "FAILURE: Expected %d callbacks but only %d executed", NUM_THREADS * CALLBACKS_PER_THREAD, + this->total_executed_.load()); + } + }); +} + +} // namespace scheduler_simultaneous_callbacks_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.h b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.h new file mode 100644 index 0000000000..1a36af4b3d --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_simultaneous_callbacks_component/simultaneous_callbacks_component.h @@ -0,0 +1,24 @@ +#pragma once + +#include "esphome/core/component.h" +#include + +namespace esphome { +namespace scheduler_simultaneous_callbacks_component { + +class SchedulerSimultaneousCallbacksComponent : public Component { + public: + void setup() override; + float get_setup_priority() const override { return setup_priority::LATE; } + + void run_simultaneous_callbacks_test(); + + private: + std::atomic total_scheduled_{0}; + std::atomic total_executed_{0}; + std::atomic callbacks_at_once_{0}; + std::atomic max_concurrent_{0}; +}; + +} // namespace scheduler_simultaneous_callbacks_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/__init__.py b/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/__init__.py new file mode 100644 index 0000000000..3f29a839ef --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/__init__.py @@ -0,0 +1,21 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +scheduler_string_lifetime_component_ns = cg.esphome_ns.namespace( + "scheduler_string_lifetime_component" +) +SchedulerStringLifetimeComponent = scheduler_string_lifetime_component_ns.class_( + "SchedulerStringLifetimeComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(SchedulerStringLifetimeComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.cpp b/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.cpp new file mode 100644 index 0000000000..d377c1fe57 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.cpp @@ -0,0 +1,275 @@ +#include "string_lifetime_component.h" +#include "esphome/core/log.h" +#include +#include +#include + +namespace esphome { +namespace scheduler_string_lifetime_component { + +static const char *const TAG = "scheduler_string_lifetime"; + +void SchedulerStringLifetimeComponent::setup() { ESP_LOGCONFIG(TAG, "SchedulerStringLifetimeComponent setup"); } + +void SchedulerStringLifetimeComponent::run_string_lifetime_test() { + ESP_LOGI(TAG, "Starting string lifetime tests"); + + this->tests_passed_ = 0; + this->tests_failed_ = 0; + + // Run each test + test_temporary_string_lifetime(); + test_scope_exit_string(); + test_vector_reallocation(); + test_string_move_semantics(); + test_lambda_capture_lifetime(); + + // Schedule final check + this->set_timeout("final_check", 200, [this]() { + ESP_LOGI(TAG, "String lifetime tests complete"); + ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_); + ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_); + + if (this->tests_failed_ == 0) { + ESP_LOGI(TAG, "SUCCESS: All string lifetime tests passed!"); + } else { + ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_); + } + }); +} + +void SchedulerStringLifetimeComponent::run_test1() { + test_temporary_string_lifetime(); + // Wait for all callbacks to execute + this->set_timeout("test1_complete", 10, []() { ESP_LOGI(TAG, "Test 1 complete"); }); +} + +void SchedulerStringLifetimeComponent::run_test2() { + test_scope_exit_string(); + // Wait for all callbacks to execute + this->set_timeout("test2_complete", 20, []() { ESP_LOGI(TAG, "Test 2 complete"); }); +} + +void SchedulerStringLifetimeComponent::run_test3() { + test_vector_reallocation(); + // Wait for all callbacks to execute + this->set_timeout("test3_complete", 60, []() { ESP_LOGI(TAG, "Test 3 complete"); }); +} + +void SchedulerStringLifetimeComponent::run_test4() { + test_string_move_semantics(); + // Wait for all callbacks to execute + this->set_timeout("test4_complete", 35, []() { ESP_LOGI(TAG, "Test 4 complete"); }); +} + +void SchedulerStringLifetimeComponent::run_test5() { + test_lambda_capture_lifetime(); + // Wait for all callbacks to execute + this->set_timeout("test5_complete", 50, []() { ESP_LOGI(TAG, "Test 5 complete"); }); +} + +void SchedulerStringLifetimeComponent::run_final_check() { + ESP_LOGI(TAG, "String lifetime tests complete"); + ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_); + ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_); + + if (this->tests_failed_ == 0) { + ESP_LOGI(TAG, "SUCCESS: All string lifetime tests passed!"); + } else { + ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_); + } +} + +void SchedulerStringLifetimeComponent::test_temporary_string_lifetime() { + ESP_LOGI(TAG, "Test 1: Temporary string lifetime for timeout names"); + + // Test with a temporary string that goes out of scope immediately + { + std::string temp_name = "temp_callback_" + std::to_string(12345); + + // Schedule with temporary string name - scheduler must copy/store this + this->set_timeout(temp_name, 1, [this]() { + ESP_LOGD(TAG, "Callback for temp string name executed"); + this->tests_passed_++; + }); + + // String goes out of scope here, but scheduler should have made a copy + } + + // Test with rvalue string as name + this->set_timeout(std::string("rvalue_test"), 2, [this]() { + ESP_LOGD(TAG, "Rvalue string name callback executed"); + this->tests_passed_++; + }); + + // Test cancelling with reconstructed string + { + std::string cancel_name = "cancel_test_" + std::to_string(999); + this->set_timeout(cancel_name, 100, [this]() { + ESP_LOGE(TAG, "This should have been cancelled!"); + this->tests_failed_++; + }); + } // cancel_name goes out of scope + + // Reconstruct the same string to cancel + std::string cancel_name_2 = "cancel_test_" + std::to_string(999); + bool cancelled = this->cancel_timeout(cancel_name_2); + if (cancelled) { + ESP_LOGD(TAG, "Successfully cancelled with reconstructed string"); + this->tests_passed_++; + } else { + ESP_LOGE(TAG, "Failed to cancel with reconstructed string"); + this->tests_failed_++; + } +} + +void SchedulerStringLifetimeComponent::test_scope_exit_string() { + ESP_LOGI(TAG, "Test 2: Scope exit string names"); + + // Create string names in a limited scope + { + std::string scoped_name = "scoped_timeout_" + std::to_string(555); + + // Schedule with scoped string name + this->set_timeout(scoped_name, 3, [this]() { + ESP_LOGD(TAG, "Scoped name callback executed"); + this->tests_passed_++; + }); + + // scoped_name goes out of scope here + } + + // Test with dynamically allocated string name + { + auto *dynamic_name = new std::string("dynamic_timeout_" + std::to_string(777)); + + this->set_timeout(*dynamic_name, 4, [this, dynamic_name]() { + ESP_LOGD(TAG, "Dynamic string name callback executed"); + this->tests_passed_++; + delete dynamic_name; // Clean up in callback + }); + + // Pointer goes out of scope but string object remains until callback + } + + // Test multiple timeouts with same dynamically created name + for (int i = 0; i < 3; i++) { + std::string loop_name = "loop_timeout_" + std::to_string(i); + this->set_timeout(loop_name, 5 + i * 1, [this, i]() { + ESP_LOGD(TAG, "Loop timeout %d executed", i); + this->tests_passed_++; + }); + // loop_name destroyed and recreated each iteration + } +} + +void SchedulerStringLifetimeComponent::test_vector_reallocation() { + ESP_LOGI(TAG, "Test 3: Vector reallocation stress on timeout names"); + + // Create a vector that will reallocate + std::vector names; + names.reserve(2); // Small initial capacity to force reallocation + + // Schedule callbacks with string names from vector + for (int i = 0; i < 10; i++) { + names.push_back("vector_cb_" + std::to_string(i)); + // Use the string from vector as timeout name + this->set_timeout(names.back(), 8 + i * 1, [this, i]() { + ESP_LOGV(TAG, "Vector name callback %d executed", i); + this->tests_passed_++; + }); + } + + // Force reallocation by adding more elements + // This will move all strings to new memory locations + for (int i = 10; i < 50; i++) { + names.push_back("realloc_trigger_" + std::to_string(i)); + } + + // Add more timeouts after reallocation to ensure old names still work + for (int i = 50; i < 55; i++) { + names.push_back("post_realloc_" + std::to_string(i)); + this->set_timeout(names.back(), 20 + (i - 50), [this]() { + ESP_LOGV(TAG, "Post-reallocation callback executed"); + this->tests_passed_++; + }); + } + + // Clear the vector while timeouts are still pending + names.clear(); + ESP_LOGD(TAG, "Vector cleared - all string names destroyed"); +} + +void SchedulerStringLifetimeComponent::test_string_move_semantics() { + ESP_LOGI(TAG, "Test 4: String move semantics for timeout names"); + + // Test moving string names + std::string original = "move_test_original"; + std::string moved = std::move(original); + + // Schedule with moved string as name + this->set_timeout(moved, 30, [this]() { + ESP_LOGD(TAG, "Moved string name callback executed"); + this->tests_passed_++; + }); + + // original is now empty, try to use it as a different timeout name + original = "reused_after_move"; + this->set_timeout(original, 32, [this]() { + ESP_LOGD(TAG, "Reused string name callback executed"); + this->tests_passed_++; + }); +} + +void SchedulerStringLifetimeComponent::test_lambda_capture_lifetime() { + ESP_LOGI(TAG, "Test 5: Complex timeout name scenarios"); + + // Test scheduling with name built in lambda + [this]() { + std::string lambda_name = "lambda_built_name_" + std::to_string(888); + this->set_timeout(lambda_name, 38, [this]() { + ESP_LOGD(TAG, "Lambda-built name callback executed"); + this->tests_passed_++; + }); + }(); // Lambda executes and lambda_name is destroyed + + // Test with shared_ptr name + auto shared_name = std::make_shared("shared_ptr_timeout"); + this->set_timeout(*shared_name, 40, [this, shared_name]() { + ESP_LOGD(TAG, "Shared_ptr name callback executed"); + this->tests_passed_++; + }); + shared_name.reset(); // Release the shared_ptr + + // Test overwriting timeout with same name + std::string overwrite_name = "overwrite_test"; + this->set_timeout(overwrite_name, 1000, [this]() { + ESP_LOGE(TAG, "This should have been overwritten!"); + this->tests_failed_++; + }); + + // Overwrite with shorter timeout + this->set_timeout(overwrite_name, 42, [this]() { + ESP_LOGD(TAG, "Overwritten timeout executed"); + this->tests_passed_++; + }); + + // Test very long string name + std::string long_name; + for (int i = 0; i < 100; i++) { + long_name += "very_long_timeout_name_segment_" + std::to_string(i) + "_"; + } + this->set_timeout(long_name, 44, [this]() { + ESP_LOGD(TAG, "Very long name timeout executed"); + this->tests_passed_++; + }); + + // Test empty string as name + this->set_timeout("", 46, [this]() { + ESP_LOGD(TAG, "Empty string name timeout executed"); + this->tests_passed_++; + }); +} + +} // namespace scheduler_string_lifetime_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.h b/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.h new file mode 100644 index 0000000000..95532328bb --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_string_lifetime_component/string_lifetime_component.h @@ -0,0 +1,37 @@ +#pragma once + +#include "esphome/core/component.h" +#include +#include + +namespace esphome { +namespace scheduler_string_lifetime_component { + +class SchedulerStringLifetimeComponent : public Component { + public: + void setup() override; + float get_setup_priority() const override { return setup_priority::LATE; } + + void run_string_lifetime_test(); + + // Individual test methods exposed as services + void run_test1(); + void run_test2(); + void run_test3(); + void run_test4(); + void run_test5(); + void run_final_check(); + + private: + void test_temporary_string_lifetime(); + void test_scope_exit_string(); + void test_vector_reallocation(); + void test_string_move_semantics(); + void test_lambda_capture_lifetime(); + + int tests_passed_{0}; + int tests_failed_{0}; +}; + +} // namespace scheduler_string_lifetime_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/__init__.py b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/__init__.py new file mode 100644 index 0000000000..6cc564395c --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/__init__.py @@ -0,0 +1,21 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +scheduler_string_name_stress_component_ns = cg.esphome_ns.namespace( + "scheduler_string_name_stress_component" +) +SchedulerStringNameStressComponent = scheduler_string_name_stress_component_ns.class_( + "SchedulerStringNameStressComponent", cg.Component +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(SchedulerStringNameStressComponent), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) diff --git a/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp new file mode 100644 index 0000000000..9071e573bb --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.cpp @@ -0,0 +1,110 @@ +#include "string_name_stress_component.h" +#include "esphome/core/log.h" +#include +#include +#include +#include +#include +#include + +namespace esphome { +namespace scheduler_string_name_stress_component { + +static const char *const TAG = "scheduler_string_name_stress"; + +void SchedulerStringNameStressComponent::setup() { ESP_LOGCONFIG(TAG, "SchedulerStringNameStressComponent setup"); } + +void SchedulerStringNameStressComponent::run_string_name_stress_test() { + // Use member variables to reset state + this->total_callbacks_ = 0; + this->executed_callbacks_ = 0; + static constexpr int NUM_THREADS = 10; + static constexpr int CALLBACKS_PER_THREAD = 100; + + ESP_LOGI(TAG, "Starting string name stress test - multi-threaded set_timeout with std::string names"); + ESP_LOGI(TAG, "This test specifically uses dynamic string names to test memory management"); + + // Track start time + auto start_time = std::chrono::steady_clock::now(); + + // Create threads + std::vector threads; + + ESP_LOGI(TAG, "Creating %d threads, each will schedule %d callbacks with dynamic names", NUM_THREADS, + CALLBACKS_PER_THREAD); + + threads.reserve(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + threads.emplace_back([this, i]() { + ESP_LOGV(TAG, "Thread %d starting", i); + + // Each thread schedules callbacks with dynamically created string names + for (int j = 0; j < CALLBACKS_PER_THREAD; j++) { + int callback_id = this->total_callbacks_.fetch_add(1); + + // Create a dynamic string name - this will test memory management + std::stringstream ss; + ss << "thread_" << i << "_callback_" << j << "_id_" << callback_id; + std::string dynamic_name = ss.str(); + + ESP_LOGV(TAG, "Thread %d scheduling timeout with dynamic name: %s", i, dynamic_name.c_str()); + + // Capture necessary values for the lambda + auto *component = this; + + // Schedule with std::string name - this tests the string overload + // Use varying delays to stress the heap scheduler + uint32_t delay = 1 + (callback_id % 50); + + // Also test nested scheduling from callbacks + if (j % 10 == 0) { + // Every 10th callback schedules another callback + this->set_timeout(dynamic_name, delay, [component, callback_id]() { + component->executed_callbacks_.fetch_add(1); + ESP_LOGV(TAG, "Executed string-named callback %d (nested scheduler)", callback_id); + + // Schedule another timeout from within this callback with a new dynamic name + std::string nested_name = "nested_from_" + std::to_string(callback_id); + component->set_timeout(nested_name, 1, [callback_id]() { + ESP_LOGV(TAG, "Executed nested string-named callback from %d", callback_id); + }); + }); + } else { + // Regular callback + this->set_timeout(dynamic_name, delay, [component, callback_id]() { + component->executed_callbacks_.fetch_add(1); + ESP_LOGV(TAG, "Executed string-named callback %d", callback_id); + }); + } + + // Add some timing variations to increase race conditions + if (j % 5 == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + ESP_LOGV(TAG, "Thread %d finished scheduling", i); + }); + } + + // Wait for all threads to complete scheduling + for (auto &t : threads) { + t.join(); + } + + auto end_time = std::chrono::steady_clock::now(); + auto thread_time = std::chrono::duration_cast(end_time - start_time).count(); + ESP_LOGI(TAG, "All threads finished scheduling in %lldms. Created %d callbacks with dynamic names", thread_time, + this->total_callbacks_.load()); + + // Give some time for callbacks to execute + ESP_LOGI(TAG, "Waiting for callbacks to execute..."); + + // Schedule a final callback to signal completion + this->set_timeout("test_complete", 2000, [this]() { + ESP_LOGI(TAG, "String name stress test complete. Executed %d of %d callbacks", this->executed_callbacks_.load(), + this->total_callbacks_.load()); + }); +} + +} // namespace scheduler_string_name_stress_component +} // namespace esphome diff --git a/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.h b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.h new file mode 100644 index 0000000000..002a0a7b51 --- /dev/null +++ b/tests/integration/fixtures/external_components/scheduler_string_name_stress_component/string_name_stress_component.h @@ -0,0 +1,22 @@ +#pragma once + +#include "esphome/core/component.h" +#include + +namespace esphome { +namespace scheduler_string_name_stress_component { + +class SchedulerStringNameStressComponent : public Component { + public: + void setup() override; + float get_setup_priority() const override { return setup_priority::LATE; } + + void run_string_name_stress_test(); + + private: + std::atomic total_callbacks_{0}; + std::atomic executed_callbacks_{0}; +}; + +} // namespace scheduler_string_name_stress_component +} // namespace esphome diff --git a/tests/integration/fixtures/scheduler_bulk_cleanup.yaml b/tests/integration/fixtures/scheduler_bulk_cleanup.yaml new file mode 100644 index 0000000000..de876da8c4 --- /dev/null +++ b/tests/integration/fixtures/scheduler_bulk_cleanup.yaml @@ -0,0 +1,23 @@ +esphome: + name: scheduler-bulk-cleanup + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + +host: + +logger: + level: DEBUG + +api: + services: + - service: trigger_bulk_cleanup + then: + - lambda: |- + auto component = id(bulk_cleanup_component); + component->trigger_bulk_cleanup(); + +scheduler_bulk_cleanup_component: + id: bulk_cleanup_component diff --git a/tests/integration/fixtures/scheduler_defer_cancel.yaml b/tests/integration/fixtures/scheduler_defer_cancel.yaml new file mode 100644 index 0000000000..9e3f927c33 --- /dev/null +++ b/tests/integration/fixtures/scheduler_defer_cancel.yaml @@ -0,0 +1,51 @@ +esphome: + name: scheduler-defer-cancel + +host: + +logger: + level: DEBUG + +api: + services: + - service: test_defer_cancel + then: + - lambda: |- + // Schedule 10 defers with the same name + // Only the last one should execute + for (int i = 1; i <= 10; i++) { + App.scheduler.set_timeout(nullptr, "test_defer", 0, [i]() { + ESP_LOGI("TEST", "Defer executed: %d", i); + // Fire event with the defer number + std::string event_type = "defer_executed_" + std::to_string(i); + id(test_result)->trigger(event_type); + }); + } + + // Schedule completion notification after all defers + App.scheduler.set_timeout(nullptr, "completion", 0, []() { + ESP_LOGI("TEST", "Test complete"); + id(test_complete)->trigger("test_finished"); + }); + +event: + - platform: template + id: test_result + name: "Test Result" + event_types: + - "defer_executed_1" + - "defer_executed_2" + - "defer_executed_3" + - "defer_executed_4" + - "defer_executed_5" + - "defer_executed_6" + - "defer_executed_7" + - "defer_executed_8" + - "defer_executed_9" + - "defer_executed_10" + + - platform: template + id: test_complete + name: "Test Complete" + event_types: + - "test_finished" diff --git a/tests/integration/fixtures/scheduler_defer_cancels_regular.yaml b/tests/integration/fixtures/scheduler_defer_cancels_regular.yaml new file mode 100644 index 0000000000..fb6b1791dc --- /dev/null +++ b/tests/integration/fixtures/scheduler_defer_cancels_regular.yaml @@ -0,0 +1,34 @@ +esphome: + name: scheduler-defer-cancel-regular + +host: + +logger: + level: DEBUG + +api: + services: + - service: test_defer_cancels_regular + then: + - lambda: |- + ESP_LOGI("TEST", "Starting defer cancels regular timeout test"); + + // Schedule a regular timeout with 100ms delay + App.scheduler.set_timeout(nullptr, "test_timeout", 100, []() { + ESP_LOGE("TEST", "ERROR: Regular timeout executed - should have been cancelled!"); + }); + + ESP_LOGI("TEST", "Scheduled regular timeout with 100ms delay"); + + // Immediately schedule a deferred timeout (0 delay) with the same name + // This should cancel the regular timeout + App.scheduler.set_timeout(nullptr, "test_timeout", 0, []() { + ESP_LOGI("TEST", "SUCCESS: Deferred timeout executed"); + }); + + ESP_LOGI("TEST", "Scheduled deferred timeout - should cancel regular timeout"); + + // Schedule test completion after 200ms (after regular timeout would have fired) + App.scheduler.set_timeout(nullptr, "test_complete", 200, []() { + ESP_LOGI("TEST", "Test complete"); + }); diff --git a/tests/integration/fixtures/defer_fifo_simple.yaml b/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml similarity index 99% rename from tests/integration/fixtures/defer_fifo_simple.yaml rename to tests/integration/fixtures/scheduler_defer_fifo_simple.yaml index db24ebf601..7384082ac2 100644 --- a/tests/integration/fixtures/defer_fifo_simple.yaml +++ b/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml @@ -1,5 +1,5 @@ esphome: - name: defer-fifo-simple + name: scheduler-defer-fifo-simple host: diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/scheduler_defer_stress.yaml similarity index 94% rename from tests/integration/fixtures/defer_stress.yaml rename to tests/integration/fixtures/scheduler_defer_stress.yaml index 6df475229b..0d9c1d1405 100644 --- a/tests/integration/fixtures/defer_stress.yaml +++ b/tests/integration/fixtures/scheduler_defer_stress.yaml @@ -1,5 +1,5 @@ esphome: - name: defer-stress-test + name: scheduler-defer-stress-test external_components: - source: diff --git a/tests/integration/fixtures/scheduler_heap_stress.yaml b/tests/integration/fixtures/scheduler_heap_stress.yaml new file mode 100644 index 0000000000..d4d340b68b --- /dev/null +++ b/tests/integration/fixtures/scheduler_heap_stress.yaml @@ -0,0 +1,38 @@ +esphome: + name: scheduler-heap-stress-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [scheduler_heap_stress_component] + +host: + +logger: + level: VERBOSE + +scheduler_heap_stress_component: + id: heap_stress + +api: + services: + - service: run_heap_stress_test + then: + - lambda: |- + id(heap_stress)->run_multi_thread_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/scheduler_rapid_cancellation.yaml b/tests/integration/fixtures/scheduler_rapid_cancellation.yaml new file mode 100644 index 0000000000..4824654c5c --- /dev/null +++ b/tests/integration/fixtures/scheduler_rapid_cancellation.yaml @@ -0,0 +1,38 @@ +esphome: + name: sched-rapid-cancel-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [scheduler_rapid_cancellation_component] + +host: + +logger: + level: VERBOSE + +scheduler_rapid_cancellation_component: + id: rapid_cancel + +api: + services: + - service: run_rapid_cancellation_test + then: + - lambda: |- + id(rapid_cancel)->run_rapid_cancellation_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/scheduler_recursive_timeout.yaml b/tests/integration/fixtures/scheduler_recursive_timeout.yaml new file mode 100644 index 0000000000..f1168802f6 --- /dev/null +++ b/tests/integration/fixtures/scheduler_recursive_timeout.yaml @@ -0,0 +1,38 @@ +esphome: + name: sched-recursive-timeout + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [scheduler_recursive_timeout_component] + +host: + +logger: + level: VERBOSE + +scheduler_recursive_timeout_component: + id: recursive_timeout + +api: + services: + - service: run_recursive_timeout_test + then: + - lambda: |- + id(recursive_timeout)->run_recursive_timeout_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/scheduler_simultaneous_callbacks.yaml b/tests/integration/fixtures/scheduler_simultaneous_callbacks.yaml new file mode 100644 index 0000000000..446ee7fdc0 --- /dev/null +++ b/tests/integration/fixtures/scheduler_simultaneous_callbacks.yaml @@ -0,0 +1,23 @@ +esphome: + name: sched-simul-callbacks-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [scheduler_simultaneous_callbacks_component] + +host: + +logger: + level: INFO + +scheduler_simultaneous_callbacks_component: + id: simultaneous_callbacks + +api: + services: + - service: run_simultaneous_callbacks_test + then: + - lambda: |- + id(simultaneous_callbacks)->run_simultaneous_callbacks_test(); diff --git a/tests/integration/fixtures/scheduler_string_lifetime.yaml b/tests/integration/fixtures/scheduler_string_lifetime.yaml new file mode 100644 index 0000000000..ebd5052b8b --- /dev/null +++ b/tests/integration/fixtures/scheduler_string_lifetime.yaml @@ -0,0 +1,47 @@ +esphome: + name: scheduler-string-lifetime-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [scheduler_string_lifetime_component] + +host: + +logger: + level: DEBUG + +scheduler_string_lifetime_component: + id: string_lifetime + +api: + services: + - service: run_string_lifetime_test + then: + - lambda: |- + id(string_lifetime)->run_string_lifetime_test(); + - service: run_test1 + then: + - lambda: |- + id(string_lifetime)->run_test1(); + - service: run_test2 + then: + - lambda: |- + id(string_lifetime)->run_test2(); + - service: run_test3 + then: + - lambda: |- + id(string_lifetime)->run_test3(); + - service: run_test4 + then: + - lambda: |- + id(string_lifetime)->run_test4(); + - service: run_test5 + then: + - lambda: |- + id(string_lifetime)->run_test5(); + - service: run_final_check + then: + - lambda: |- + id(string_lifetime)->run_final_check(); diff --git a/tests/integration/fixtures/scheduler_string_name_stress.yaml b/tests/integration/fixtures/scheduler_string_name_stress.yaml new file mode 100644 index 0000000000..d1ef55c8d5 --- /dev/null +++ b/tests/integration/fixtures/scheduler_string_name_stress.yaml @@ -0,0 +1,38 @@ +esphome: + name: sched-string-name-stress + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [scheduler_string_name_stress_component] + +host: + +logger: + level: VERBOSE + +scheduler_string_name_stress_component: + id: string_stress + +api: + services: + - service: run_string_name_stress_test + then: + - lambda: |- + id(string_stress)->run_string_name_stress_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/test_scheduler_bulk_cleanup.py b/tests/integration/test_scheduler_bulk_cleanup.py new file mode 100644 index 0000000000..08ff293b84 --- /dev/null +++ b/tests/integration/test_scheduler_bulk_cleanup.py @@ -0,0 +1,122 @@ +"""Test that triggers the bulk cleanup path when to_remove_ > MAX_LOGICALLY_DELETED_ITEMS.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_bulk_cleanup( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that bulk cleanup path is triggered when many items are cancelled.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_event_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + bulk_cleanup_triggered = False + cleanup_stats: dict[str, int] = { + "removed": 0, + "before": 0, + "after": 0, + } + post_cleanup_executed = 0 + + def on_log_line(line: str) -> None: + nonlocal bulk_cleanup_triggered, post_cleanup_executed + + # Look for logs indicating bulk cleanup was triggered + # The actual cleanup happens silently, so we track the cancel operations + if "Successfully cancelled" in line and "timeouts" in line: + match = re.search(r"Successfully cancelled (\d+) timeouts", line) + if match and int(match.group(1)) > 10: + bulk_cleanup_triggered = True + + # Track cleanup statistics + match = re.search(r"Bulk cleanup triggered: removed (\d+) items", line) + if match: + cleanup_stats["removed"] = int(match.group(1)) + + match = re.search(r"Items before cleanup: (\d+), after: (\d+)", line) + if match: + cleanup_stats["before"] = int(match.group(1)) + cleanup_stats["after"] = int(match.group(2)) + + # Track post-cleanup timeout executions + if "Post-cleanup timeout" in line and "executed correctly" in line: + match = re.search(r"Post-cleanup timeout (\d+) executed correctly", line) + if match: + post_cleanup_executed += 1 + + # Check for final test completion + if ( + "All post-cleanup timeouts completed - test finished" in line + and not test_complete_future.done() + ): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-bulk-cleanup" + + # List entities and services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + trigger_bulk_cleanup_service: UserService | None = None + for service in services: + if service.name == "trigger_bulk_cleanup": + trigger_bulk_cleanup_service = service + break + + assert trigger_bulk_cleanup_service is not None, ( + "trigger_bulk_cleanup service not found" + ) + + # Execute the test + client.execute_service(trigger_bulk_cleanup_service, {}) + + # Wait for test completion + try: + await asyncio.wait_for(test_complete_future, timeout=10.0) + except asyncio.TimeoutError: + pytest.fail("Bulk cleanup test timed out") + + # Verify bulk cleanup was triggered + assert bulk_cleanup_triggered, ( + "Bulk cleanup path was not triggered - MAX_LOGICALLY_DELETED_ITEMS threshold not reached" + ) + + # Verify cleanup statistics + assert cleanup_stats["removed"] > 10, ( + f"Expected more than 10 items removed, got {cleanup_stats['removed']}" + ) + + # Verify scheduler still works after bulk cleanup + assert post_cleanup_executed == 5, ( + f"Expected 5 post-cleanup timeouts to execute, but {post_cleanup_executed} executed" + ) diff --git a/tests/integration/test_scheduler_defer_cancel.py b/tests/integration/test_scheduler_defer_cancel.py new file mode 100644 index 0000000000..923cf946c4 --- /dev/null +++ b/tests/integration/test_scheduler_defer_cancel.py @@ -0,0 +1,94 @@ +"""Test that defer() with the same name cancels previous defers.""" + +import asyncio + +from aioesphomeapi import EntityState, Event, EventInfo, UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_defer_cancel( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() with the same name cancels previous defers.""" + + async with run_compiled(yaml_config), api_client_connected() as client: + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-defer-cancel" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test entities + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None + + for entity in entity_info: + if isinstance(entity, EventInfo): + if entity.object_id == "test_complete": + test_complete_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity + + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test service + test_defer_cancel_service: UserService | None = None + for service in services: + if service.name == "test_defer_cancel": + test_defer_cancel_service = service + + assert test_defer_cancel_service is not None, ( + "test_defer_cancel service not found" + ) + + # Get the event loop + loop = asyncio.get_running_loop() + + # Subscribe to states + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[int] = loop.create_future() + + def on_state(state: EntityState) -> None: + if not isinstance(state, Event): + return + + if ( + state.key == test_complete_entity.key + and state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + return + + if state.key == test_result_entity.key and not test_result_future.done(): + # Event type should be "defer_executed_X" where X is the defer number + if state.event_type.startswith("defer_executed_"): + defer_num = int(state.event_type.split("_")[-1]) + test_result_future.set_result(defer_num) + + client.subscribe_states(on_state) + + # Execute the test + client.execute_service(test_defer_cancel_service, {}) + + # Wait for test completion + try: + await asyncio.wait_for(test_complete_future, timeout=10.0) + executed_defer = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test did not complete within timeout") + + # Verify that only defer 10 was executed + assert executed_defer == 10, ( + f"Expected defer 10 to execute, got {executed_defer}" + ) diff --git a/tests/integration/test_scheduler_defer_cancel_regular.py b/tests/integration/test_scheduler_defer_cancel_regular.py new file mode 100644 index 0000000000..57b7134feb --- /dev/null +++ b/tests/integration/test_scheduler_defer_cancel_regular.py @@ -0,0 +1,90 @@ +"""Test that a deferred timeout cancels a regular timeout with the same name.""" + +import asyncio + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_defer_cancels_regular( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that set_timeout(name, 0) cancels a previously scheduled set_timeout(name, delay).""" + + # Create a future to signal test completion + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track log messages + log_messages: list[str] = [] + error_detected = False + + def on_log_line(line: str) -> None: + nonlocal error_detected + if "TEST" in line: + log_messages.append(line) + + if "ERROR: Regular timeout executed" in line: + error_detected = True + + if "Test complete" in line and not test_complete_future.done(): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-defer-cancel-regular" + + # List services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + test_service: UserService | None = None + for service in services: + if service.name == "test_defer_cancels_regular": + test_service = service + break + + assert test_service is not None, "test_defer_cancels_regular service not found" + + # Execute the test + client.execute_service(test_service, {}) + + # Wait for test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + except asyncio.TimeoutError: + pytest.fail(f"Test timed out. Log messages: {log_messages}") + + # Verify results + assert not error_detected, ( + f"Regular timeout should have been cancelled but it executed! Logs: {log_messages}" + ) + + # Verify the deferred timeout executed + assert any( + "SUCCESS: Deferred timeout executed" in msg for msg in log_messages + ), f"Deferred timeout should have executed. Logs: {log_messages}" + + # Verify the expected sequence of events + assert any( + "Starting defer cancels regular timeout test" in msg for msg in log_messages + ) + assert any( + "Scheduled regular timeout with 100ms delay" in msg for msg in log_messages + ) + assert any( + "Scheduled deferred timeout - should cancel regular timeout" in msg + for msg in log_messages + ) diff --git a/tests/integration/test_defer_fifo_simple.py b/tests/integration/test_scheduler_defer_fifo_simple.py similarity index 97% rename from tests/integration/test_defer_fifo_simple.py rename to tests/integration/test_scheduler_defer_fifo_simple.py index 5a62a45786..eb4058fedd 100644 --- a/tests/integration/test_defer_fifo_simple.py +++ b/tests/integration/test_scheduler_defer_fifo_simple.py @@ -9,7 +9,7 @@ from .types import APIClientConnectedFactory, RunCompiledFunction @pytest.mark.asyncio -async def test_defer_fifo_simple( +async def test_scheduler_defer_fifo_simple( yaml_config: str, run_compiled: RunCompiledFunction, api_client_connected: APIClientConnectedFactory, @@ -20,7 +20,7 @@ async def test_defer_fifo_simple( # Verify we can connect device_info = await client.device_info() assert device_info is not None - assert device_info.name == "defer-fifo-simple" + assert device_info.name == "scheduler-defer-fifo-simple" # List entities and services entity_info, services = await asyncio.wait_for( diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_scheduler_defer_stress.py similarity index 97% rename from tests/integration/test_defer_stress.py rename to tests/integration/test_scheduler_defer_stress.py index f63ec8d25f..d546b7132f 100644 --- a/tests/integration/test_defer_stress.py +++ b/tests/integration/test_scheduler_defer_stress.py @@ -11,7 +11,7 @@ from .types import APIClientConnectedFactory, RunCompiledFunction @pytest.mark.asyncio -async def test_defer_stress( +async def test_scheduler_defer_stress( yaml_config: str, run_compiled: RunCompiledFunction, api_client_connected: APIClientConnectedFactory, @@ -75,7 +75,7 @@ async def test_defer_stress( # Verify we can connect device_info = await client.device_info() assert device_info is not None - assert device_info.name == "defer-stress-test" + assert device_info.name == "scheduler-defer-stress-test" # List entities and services entity_info, services = await asyncio.wait_for( diff --git a/tests/integration/test_scheduler_heap_stress.py b/tests/integration/test_scheduler_heap_stress.py new file mode 100644 index 0000000000..3c757bfc9d --- /dev/null +++ b/tests/integration/test_scheduler_heap_stress.py @@ -0,0 +1,140 @@ +"""Stress test for heap scheduler thread safety with multiple threads.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_heap_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that set_timeout/set_interval doesn't crash when called rapidly from multiple threads.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track executed timeouts/intervals and their order + executed_callbacks: set[int] = set() + thread_executions: dict[ + int, list[int] + ] = {} # thread_id -> list of indices in execution order + callback_types: dict[int, str] = {} # callback_id -> "timeout" or "interval" + + def on_log_line(line: str) -> None: + # Track all executed callbacks with thread and index info + match = re.search( + r"Executed (timeout|interval) (\d+) \(thread (\d+), index (\d+)\)", line + ) + if not match: + # Also check for the completion message + if "All threads finished" in line and "Created 1000 callbacks" in line: + # Give scheduler some time to execute callbacks + pass + return + + callback_type = match.group(1) + callback_id = int(match.group(2)) + thread_id = int(match.group(3)) + index = int(match.group(4)) + + # Only count each callback ID once (intervals might fire multiple times) + if callback_id not in executed_callbacks: + executed_callbacks.add(callback_id) + callback_types[callback_id] = callback_type + + # Track execution order per thread + if thread_id not in thread_executions: + thread_executions[thread_id] = [] + + # Only append if this is a new execution for this thread + if index not in thread_executions[thread_id]: + thread_executions[thread_id].append(index) + + # Check if we've executed all 1000 callbacks (0-999) + if len(executed_callbacks) >= 1000 and not test_complete_future.done(): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-heap-stress-test" + + # List entities and services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_heap_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, ( + "run_heap_stress_test service not found" + ) + + # Call the run_heap_stress_test service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for all callbacks to execute (should be quick, but give more time for scheduling) + try: + await asyncio.wait_for(test_complete_future, timeout=60.0) + except asyncio.TimeoutError: + # Report how many we got + pytest.fail( + f"Stress test timed out. Only {len(executed_callbacks)} of " + f"1000 callbacks executed. Missing IDs: " + f"{sorted(set(range(1000)) - executed_callbacks)[:10]}..." + ) + + # Verify all callbacks executed + assert len(executed_callbacks) == 1000, ( + f"Expected 1000 callbacks, got {len(executed_callbacks)}" + ) + + # Verify we have all IDs from 0-999 + expected_ids = set(range(1000)) + missing_ids = expected_ids - executed_callbacks + assert not missing_ids, f"Missing callback IDs: {sorted(missing_ids)}" + + # Verify we have a mix of timeouts and intervals + timeout_count = sum(1 for t in callback_types.values() if t == "timeout") + interval_count = sum(1 for t in callback_types.values() if t == "interval") + assert timeout_count > 0, "No timeouts were executed" + assert interval_count > 0, "No intervals were executed" + + # Verify each thread executed callbacks + for thread_id, indices in thread_executions.items(): + assert len(indices) == 100, ( + f"Thread {thread_id} executed {len(indices)} callbacks, expected 100" + ) + # Total should be 1000 callbacks + total_callbacks = timeout_count + interval_count + assert total_callbacks == 1000, ( + f"Expected 1000 total callbacks but got {total_callbacks}" + ) diff --git a/tests/integration/test_scheduler_rapid_cancellation.py b/tests/integration/test_scheduler_rapid_cancellation.py new file mode 100644 index 0000000000..90577f36f1 --- /dev/null +++ b/tests/integration/test_scheduler_rapid_cancellation.py @@ -0,0 +1,142 @@ +"""Rapid cancellation test - schedule and immediately cancel timeouts with string names.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_rapid_cancellation( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test rapid schedule/cancel cycles that might expose race conditions.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track test progress + test_stats = { + "log_count": 0, + "errors": [], + "summary_scheduled": None, + "final_scheduled": 0, + "final_executed": 0, + "final_implicit_cancellations": 0, + } + + def on_log_line(line: str) -> None: + # Count log lines + test_stats["log_count"] += 1 + + # Check for errors (only ERROR level, not WARN) + if "ERROR" in line: + test_stats["errors"].append(line) + + # Parse summary statistics + if "All threads completed. Scheduled:" in line: + # Extract the scheduled count from the summary + if match := re.search(r"Scheduled: (\d+)", line): + test_stats["summary_scheduled"] = int(match.group(1)) + elif "Total scheduled:" in line: + if match := re.search(r"Total scheduled: (\d+)", line): + test_stats["final_scheduled"] = int(match.group(1)) + elif "Total executed:" in line: + if match := re.search(r"Total executed: (\d+)", line): + test_stats["final_executed"] = int(match.group(1)) + elif "Implicit cancellations (replaced):" in line: + if match := re.search(r"Implicit cancellations \(replaced\): (\d+)", line): + test_stats["final_implicit_cancellations"] = int(match.group(1)) + + # Check for crash indicators + if any( + indicator in line.lower() + for indicator in ["segfault", "abort", "assertion", "heap corruption"] + ): + if not test_complete_future.done(): + test_complete_future.set_exception(Exception(f"Crash detected: {line}")) + return + + # Check for completion - wait for final message after all stats are logged + if ( + "Test finished - all statistics reported" in line + and not test_complete_future.done() + ): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "sched-rapid-cancel-test" + + # List entities and services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_test_service: UserService | None = None + for service in services: + if service.name == "run_rapid_cancellation_test": + run_test_service = service + break + + assert run_test_service is not None, ( + "run_rapid_cancellation_test service not found" + ) + + # Call the service to start the test + client.execute_service(run_test_service, {}) + + # Wait for test to complete with timeout + try: + await asyncio.wait_for(test_complete_future, timeout=10.0) + except asyncio.TimeoutError: + pytest.fail(f"Test timed out. Stats: {test_stats}") + + # Check for any errors + assert len(test_stats["errors"]) == 0, ( + f"Errors detected: {test_stats['errors']}" + ) + + # Check that we received log messages + assert test_stats["log_count"] > 0, "No log messages received" + + # Check the summary line to verify all threads scheduled their operations + assert test_stats["summary_scheduled"] == 400, ( + f"Expected summary to show 400 scheduled operations but got {test_stats['summary_scheduled']}" + ) + + # Check final statistics + assert test_stats["final_scheduled"] == 400, ( + f"Expected final stats to show 400 scheduled but got {test_stats['final_scheduled']}" + ) + + assert test_stats["final_executed"] == 10, ( + f"Expected final stats to show 10 executed but got {test_stats['final_executed']}" + ) + + assert test_stats["final_implicit_cancellations"] == 390, ( + f"Expected final stats to show 390 implicit cancellations but got {test_stats['final_implicit_cancellations']}" + ) diff --git a/tests/integration/test_scheduler_recursive_timeout.py b/tests/integration/test_scheduler_recursive_timeout.py new file mode 100644 index 0000000000..c015978e15 --- /dev/null +++ b/tests/integration/test_scheduler_recursive_timeout.py @@ -0,0 +1,101 @@ +"""Test for recursive timeout scheduling - scheduling timeouts from within timeout callbacks.""" + +import asyncio +from pathlib import Path + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_recursive_timeout( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that scheduling timeouts from within timeout callbacks works correctly.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track execution sequence + execution_sequence: list[str] = [] + expected_sequence = [ + "initial_timeout", + "nested_timeout_1", + "nested_timeout_2", + "test_complete", + ] + + def on_log_line(line: str) -> None: + # Track execution sequence + if "Executing initial timeout" in line: + execution_sequence.append("initial_timeout") + elif "Executing nested timeout 1" in line: + execution_sequence.append("nested_timeout_1") + elif "Executing nested timeout 2" in line: + execution_sequence.append("nested_timeout_2") + elif "Recursive timeout test complete" in line: + execution_sequence.append("test_complete") + if not test_complete_future.done(): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "sched-recursive-timeout" + + # List entities and services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_test_service: UserService | None = None + for service in services: + if service.name == "run_recursive_timeout_test": + run_test_service = service + break + + assert run_test_service is not None, ( + "run_recursive_timeout_test service not found" + ) + + # Call the service to start the test + client.execute_service(run_test_service, {}) + + # Wait for test to complete + try: + await asyncio.wait_for(test_complete_future, timeout=10.0) + except asyncio.TimeoutError: + pytest.fail( + f"Recursive timeout test timed out. Got sequence: {execution_sequence}" + ) + + # Verify execution sequence + assert execution_sequence == expected_sequence, ( + f"Execution sequence mismatch. Expected {expected_sequence}, " + f"got {execution_sequence}" + ) + + # Verify we got exactly 4 events (Initial + Level 1 + Level 2 + Complete) + assert len(execution_sequence) == 4, ( + f"Expected 4 events but got {len(execution_sequence)}" + ) diff --git a/tests/integration/test_scheduler_simultaneous_callbacks.py b/tests/integration/test_scheduler_simultaneous_callbacks.py new file mode 100644 index 0000000000..f5120ce4ce --- /dev/null +++ b/tests/integration/test_scheduler_simultaneous_callbacks.py @@ -0,0 +1,123 @@ +"""Simultaneous callbacks test - schedule many callbacks for the same time from multiple threads.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_simultaneous_callbacks( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test scheduling many callbacks for the exact same time from multiple threads.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track test progress + test_stats = { + "scheduled": 0, + "executed": 0, + "expected": 1000, # 10 threads * 100 callbacks + "errors": [], + } + + def on_log_line(line: str) -> None: + # Track operations + if "Scheduled callback" in line: + test_stats["scheduled"] += 1 + elif "Callback executed" in line: + test_stats["executed"] += 1 + elif "ERROR" in line: + test_stats["errors"].append(line) + + # Check for crash indicators + if any( + indicator in line.lower() + for indicator in ["segfault", "abort", "assertion", "heap corruption"] + ): + if not test_complete_future.done(): + test_complete_future.set_exception(Exception(f"Crash detected: {line}")) + return + + # Check for completion with final count + if "Final executed count:" in line: + # Extract number from log line like: "[07:59:47][I][simultaneous_callbacks:093]: Simultaneous callbacks test complete. Final executed count: 1000" + match = re.search(r"Final executed count:\s*(\d+)", line) + if match: + test_stats["final_count"] = int(match.group(1)) + + # Check for completion + if ( + "Simultaneous callbacks test complete" in line + and not test_complete_future.done() + ): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "sched-simul-callbacks-test" + + # List entities and services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_test_service: UserService | None = None + for service in services: + if service.name == "run_simultaneous_callbacks_test": + run_test_service = service + break + + assert run_test_service is not None, ( + "run_simultaneous_callbacks_test service not found" + ) + + # Call the service to start the test + client.execute_service(run_test_service, {}) + + # Wait for test to complete + try: + await asyncio.wait_for(test_complete_future, timeout=30.0) + except asyncio.TimeoutError: + pytest.fail(f"Simultaneous callbacks test timed out. Stats: {test_stats}") + + # Check for any errors + assert len(test_stats["errors"]) == 0, ( + f"Errors detected: {test_stats['errors']}" + ) + + # Verify all callbacks executed using the final count from C++ + final_count = test_stats.get("final_count", 0) + assert final_count == test_stats["expected"], ( + f"Expected {test_stats['expected']} callbacks, but only {final_count} executed" + ) + + # The final_count is the authoritative count from the C++ component + assert final_count == 1000, ( + f"Expected 1000 executed callbacks but got {final_count}" + ) diff --git a/tests/integration/test_scheduler_string_lifetime.py b/tests/integration/test_scheduler_string_lifetime.py new file mode 100644 index 0000000000..4d77abd954 --- /dev/null +++ b/tests/integration/test_scheduler_string_lifetime.py @@ -0,0 +1,169 @@ +"""String lifetime test - verify scheduler handles string destruction correctly.""" + +import asyncio +from pathlib import Path +import re + +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_string_lifetime( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that scheduler correctly handles string lifetimes when strings go out of scope.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create events for synchronization + test1_complete = asyncio.Event() + test2_complete = asyncio.Event() + test3_complete = asyncio.Event() + test4_complete = asyncio.Event() + test5_complete = asyncio.Event() + all_tests_complete = asyncio.Event() + + # Track test progress + test_stats = { + "tests_passed": 0, + "tests_failed": 0, + "errors": [], + "current_test": None, + "test_callbacks_executed": {}, + } + + def on_log_line(line: str) -> None: + # Track test-specific events + if "Test 1 complete" in line: + test1_complete.set() + elif "Test 2 complete" in line: + test2_complete.set() + elif "Test 3 complete" in line: + test3_complete.set() + elif "Test 4 complete" in line: + test4_complete.set() + elif "Test 5 complete" in line: + test5_complete.set() + + # Track individual callback executions + callback_match = re.search(r"Callback '(.+?)' executed", line) + if callback_match: + callback_name = callback_match.group(1) + test_stats["test_callbacks_executed"][callback_name] = True + + # Track test results from the C++ test output + if "Tests passed:" in line and "string_lifetime" in line: + # Extract the number from "Tests passed: 32" + match = re.search(r"Tests passed:\s*(\d+)", line) + if match: + test_stats["tests_passed"] = int(match.group(1)) + elif "Tests failed:" in line and "string_lifetime" in line: + match = re.search(r"Tests failed:\s*(\d+)", line) + if match: + test_stats["tests_failed"] = int(match.group(1)) + elif "ERROR" in line and "string_lifetime" in line: + test_stats["errors"].append(line) + + # Check for memory corruption indicators + if any( + indicator in line.lower() + for indicator in [ + "use after free", + "heap corruption", + "segfault", + "abort", + "assertion", + "sanitizer", + "bad memory", + "invalid pointer", + ] + ): + pytest.fail(f"Memory corruption detected: {line}") + + # Check for completion + if "String lifetime tests complete" in line: + all_tests_complete.set() + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-string-lifetime-test" + + # List entities and services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test services + test_services = {} + for service in services: + if service.name == "run_test1": + test_services["test1"] = service + elif service.name == "run_test2": + test_services["test2"] = service + elif service.name == "run_test3": + test_services["test3"] = service + elif service.name == "run_test4": + test_services["test4"] = service + elif service.name == "run_test5": + test_services["test5"] = service + elif service.name == "run_final_check": + test_services["final"] = service + + # Ensure all services are found + required_services = ["test1", "test2", "test3", "test4", "test5", "final"] + for service_name in required_services: + assert service_name in test_services, f"{service_name} service not found" + + # Run tests sequentially, waiting for each to complete + try: + # Test 1 + client.execute_service(test_services["test1"], {}) + await asyncio.wait_for(test1_complete.wait(), timeout=5.0) + + # Test 2 + client.execute_service(test_services["test2"], {}) + await asyncio.wait_for(test2_complete.wait(), timeout=5.0) + + # Test 3 + client.execute_service(test_services["test3"], {}) + await asyncio.wait_for(test3_complete.wait(), timeout=5.0) + + # Test 4 + client.execute_service(test_services["test4"], {}) + await asyncio.wait_for(test4_complete.wait(), timeout=5.0) + + # Test 5 + client.execute_service(test_services["test5"], {}) + await asyncio.wait_for(test5_complete.wait(), timeout=5.0) + + # Final check + client.execute_service(test_services["final"], {}) + await asyncio.wait_for(all_tests_complete.wait(), timeout=5.0) + + except asyncio.TimeoutError: + pytest.fail(f"String lifetime test timed out. Stats: {test_stats}") + + # Check for any errors + assert test_stats["tests_failed"] == 0, f"Tests failed: {test_stats['errors']}" + + # Verify we had the expected number of passing tests + assert test_stats["tests_passed"] == 30, ( + f"Expected exactly 30 tests to pass, but got {test_stats['tests_passed']}" + ) diff --git a/tests/integration/test_scheduler_string_name_stress.py b/tests/integration/test_scheduler_string_name_stress.py new file mode 100644 index 0000000000..3045842223 --- /dev/null +++ b/tests/integration/test_scheduler_string_name_stress.py @@ -0,0 +1,116 @@ +"""Stress test for heap scheduler with std::string names from multiple threads.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_string_name_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that set_timeout/set_interval with std::string names doesn't crash when called from multiple threads.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track executed callbacks and any crashes + executed_callbacks: set[int] = set() + error_messages: list[str] = [] + + def on_log_line(line: str) -> None: + # Check for crash indicators + if any( + indicator in line.lower() + for indicator in [ + "segfault", + "abort", + "assertion", + "heap corruption", + "use after free", + ] + ): + error_messages.append(line) + if not test_complete_future.done(): + test_complete_future.set_exception(Exception(f"Crash detected: {line}")) + return + + # Track executed callbacks + match = re.search(r"Executed string-named callback (\d+)", line) + if match: + callback_id = int(match.group(1)) + executed_callbacks.add(callback_id) + + # Check for completion + if ( + "String name stress test complete" in line + and not test_complete_future.done() + ): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "sched-string-name-stress" + + # List entities and services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_string_name_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, ( + "run_string_name_stress_test service not found" + ) + + # Call the service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for test to complete or crash + try: + await asyncio.wait_for(test_complete_future, timeout=30.0) + except asyncio.TimeoutError: + pytest.fail( + f"String name stress test timed out. Executed {len(executed_callbacks)} callbacks. " + f"This might indicate a deadlock." + ) + + # Verify no errors occurred (crashes already handled by exception) + assert not error_messages, f"Errors detected during test: {error_messages}" + + # Verify we executed all 1000 callbacks (10 threads × 100 callbacks each) + assert len(executed_callbacks) == 1000, ( + f"Expected 1000 callbacks but got {len(executed_callbacks)}" + ) + + # Verify each callback ID was executed exactly once + for i in range(1000): + assert i in executed_callbacks, f"Callback {i} was not executed"