mirror of
https://github.com/esphome/esphome.git
synced 2025-07-30 07:06:38 +00:00
pool for scheduler
This commit is contained in:
parent
226d465f6a
commit
0824c21841
@ -76,8 +76,22 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get fresh timestamp BEFORE taking lock - millis_64_ may need to acquire lock itself
|
||||||
|
const uint64_t now = this->millis_64_(millis());
|
||||||
|
|
||||||
|
// Take lock early to protect scheduler_item_pool_ access
|
||||||
|
LockGuard guard{this->lock_};
|
||||||
|
|
||||||
// Create and populate the scheduler item
|
// Create and populate the scheduler item
|
||||||
auto item = make_unique<SchedulerItem>();
|
std::unique_ptr<SchedulerItem> item;
|
||||||
|
if (!this->scheduler_item_pool_.empty()) {
|
||||||
|
// Reuse from pool
|
||||||
|
item = std::move(this->scheduler_item_pool_.back());
|
||||||
|
this->scheduler_item_pool_.pop_back();
|
||||||
|
} else {
|
||||||
|
// Allocate new if pool is empty
|
||||||
|
item = make_unique<SchedulerItem>();
|
||||||
|
}
|
||||||
item->component = component;
|
item->component = component;
|
||||||
item->set_name(name_cstr, !is_static_string);
|
item->set_name(name_cstr, !is_static_string);
|
||||||
item->type = type;
|
item->type = type;
|
||||||
@ -90,16 +104,12 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
// Single-core platforms don't need thread-safe defer handling
|
// Single-core platforms don't need thread-safe defer handling
|
||||||
if (delay == 0 && type == SchedulerItem::TIMEOUT) {
|
if (delay == 0 && type == SchedulerItem::TIMEOUT) {
|
||||||
// Put in defer queue for guaranteed FIFO execution
|
// Put in defer queue for guaranteed FIFO execution
|
||||||
LockGuard guard{this->lock_};
|
|
||||||
this->cancel_item_locked_(component, name_cstr, type);
|
this->cancel_item_locked_(component, name_cstr, type);
|
||||||
this->defer_queue_.push_back(std::move(item));
|
this->defer_queue_.push_back(std::move(item));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
// Get fresh timestamp for new timer/interval - ensures accurate scheduling
|
|
||||||
const auto now = this->millis_64_(millis()); // Fresh millis() call
|
|
||||||
|
|
||||||
// Type-specific setup
|
// Type-specific setup
|
||||||
if (type == SchedulerItem::INTERVAL) {
|
if (type == SchedulerItem::INTERVAL) {
|
||||||
item->interval = delay;
|
item->interval = delay;
|
||||||
@ -131,8 +141,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
}
|
}
|
||||||
#endif /* ESPHOME_DEBUG_SCHEDULER */
|
#endif /* ESPHOME_DEBUG_SCHEDULER */
|
||||||
|
|
||||||
LockGuard guard{this->lock_};
|
|
||||||
|
|
||||||
// For retries, check if there's a cancelled timeout first
|
// For retries, check if there's a cancelled timeout first
|
||||||
if (is_retry && name_cstr != nullptr && type == SchedulerItem::TIMEOUT &&
|
if (is_retry && name_cstr != nullptr && type == SchedulerItem::TIMEOUT &&
|
||||||
(has_cancelled_timeout_in_container_(this->items_, component, name_cstr, /* match_retry= */ true) ||
|
(has_cancelled_timeout_in_container_(this->items_, component, name_cstr, /* match_retry= */ true) ||
|
||||||
@ -322,11 +330,11 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
const auto last_dbg = this->last_millis_.load(std::memory_order_relaxed);
|
const auto last_dbg = this->last_millis_.load(std::memory_order_relaxed);
|
||||||
const auto major_dbg = this->millis_major_.load(std::memory_order_relaxed);
|
const auto major_dbg = this->millis_major_.load(std::memory_order_relaxed);
|
||||||
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), now_64,
|
ESP_LOGD(TAG, "Items: count=%zu, pool=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(),
|
||||||
major_dbg, last_dbg);
|
this->scheduler_item_pool_.size(), now_64, major_dbg, last_dbg);
|
||||||
#else /* not ESPHOME_THREAD_MULTI_ATOMICS */
|
#else /* not ESPHOME_THREAD_MULTI_ATOMICS */
|
||||||
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), now_64,
|
ESP_LOGD(TAG, "Items: count=%zu, pool=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(),
|
||||||
this->millis_major_, this->last_millis_);
|
this->scheduler_item_pool_.size(), now_64, this->millis_major_, this->last_millis_);
|
||||||
#endif /* else ESPHOME_THREAD_MULTI_ATOMICS */
|
#endif /* else ESPHOME_THREAD_MULTI_ATOMICS */
|
||||||
// Cleanup before debug output
|
// Cleanup before debug output
|
||||||
this->cleanup_();
|
this->cleanup_();
|
||||||
@ -358,6 +366,7 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
|
|
||||||
// If we have too many items to remove
|
// If we have too many items to remove
|
||||||
if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
|
if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
|
||||||
|
// ESP_LOGD(TAG, "Starting cleanup of %u removed items", this->to_remove_);
|
||||||
// We hold the lock for the entire cleanup operation because:
|
// We hold the lock for the entire cleanup operation because:
|
||||||
// 1. We're rebuilding the entire items_ list, so we need exclusive access throughout
|
// 1. We're rebuilding the entire items_ list, so we need exclusive access throughout
|
||||||
// 2. Other threads must see either the old state or the new state, not intermediate states
|
// 2. Other threads must see either the old state or the new state, not intermediate states
|
||||||
@ -367,10 +376,13 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
|
|
||||||
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
|
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
|
||||||
|
|
||||||
// Move all non-removed items to valid_items
|
// Move all non-removed items to valid_items, recycle removed ones
|
||||||
for (auto &item : this->items_) {
|
for (auto &item : this->items_) {
|
||||||
if (!item->remove) {
|
if (!item->remove) {
|
||||||
valid_items.push_back(std::move(item));
|
valid_items.push_back(std::move(item));
|
||||||
|
} else {
|
||||||
|
// Recycle removed items
|
||||||
|
this->recycle_item_(std::move(item));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -378,6 +390,7 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
this->items_ = std::move(valid_items);
|
this->items_ = std::move(valid_items);
|
||||||
// Rebuild the heap structure since items are no longer in heap order
|
// Rebuild the heap structure since items are no longer in heap order
|
||||||
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
||||||
|
// ESP_LOGD(TAG, "Cleanup complete - pool size now: %zu", this->scheduler_item_pool_.size());
|
||||||
this->to_remove_ = 0;
|
this->to_remove_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -431,6 +444,9 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
// Add new item directly to to_add_
|
// Add new item directly to to_add_
|
||||||
// since we have the lock held
|
// since we have the lock held
|
||||||
this->to_add_.push_back(std::move(item));
|
this->to_add_.push_back(std::move(item));
|
||||||
|
} else {
|
||||||
|
// Timeout completed - recycle it
|
||||||
|
this->recycle_item_(std::move(item));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -480,6 +496,10 @@ size_t HOT Scheduler::cleanup_() {
|
|||||||
}
|
}
|
||||||
void HOT Scheduler::pop_raw_() {
|
void HOT Scheduler::pop_raw_() {
|
||||||
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
||||||
|
|
||||||
|
// Instead of destroying, recycle the item
|
||||||
|
this->recycle_item_(std::move(this->items_.back()));
|
||||||
|
|
||||||
this->items_.pop_back();
|
this->items_.pop_back();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -514,18 +534,14 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
|
|
||||||
// Check all containers for matching items
|
// Check all containers for matching items
|
||||||
#ifndef ESPHOME_THREAD_SINGLE
|
#ifndef ESPHOME_THREAD_SINGLE
|
||||||
// Only check defer queue for timeouts (intervals never go there)
|
// Cancel and immediately recycle items in defer queue
|
||||||
if (type == SchedulerItem::TIMEOUT) {
|
if (type == SchedulerItem::TIMEOUT) {
|
||||||
for (auto &item : this->defer_queue_) {
|
total_cancelled +=
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
this->cancel_and_recycle_from_container_(this->defer_queue_, component, name_cstr, type, match_retry);
|
||||||
item->remove = true;
|
|
||||||
total_cancelled++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
// Cancel items in the main heap
|
// Cancel items in the main heap (can't recycle immediately due to heap structure)
|
||||||
for (auto &item : this->items_) {
|
for (auto &item : this->items_) {
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
||||||
item->remove = true;
|
item->remove = true;
|
||||||
@ -534,14 +550,8 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel items in to_add_
|
// Cancel and immediately recycle items in to_add_ since they're not in heap yet
|
||||||
for (auto &item : this->to_add_) {
|
total_cancelled += this->cancel_and_recycle_from_container_(this->to_add_, component, name_cstr, type, match_retry);
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
|
||||||
item->remove = true;
|
|
||||||
total_cancelled++;
|
|
||||||
// Don't track removals for to_add_ items
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return total_cancelled > 0;
|
return total_cancelled > 0;
|
||||||
}
|
}
|
||||||
@ -709,4 +719,22 @@ bool HOT Scheduler::SchedulerItem::cmp(const std::unique_ptr<SchedulerItem> &a,
|
|||||||
return a->next_execution_ > b->next_execution_;
|
return a->next_execution_ > b->next_execution_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Scheduler::recycle_item_(std::unique_ptr<SchedulerItem> item) {
|
||||||
|
if (!item)
|
||||||
|
return;
|
||||||
|
|
||||||
|
static constexpr size_t MAX_POOL_SIZE = 16;
|
||||||
|
if (this->scheduler_item_pool_.size() < MAX_POOL_SIZE) {
|
||||||
|
// Clear callback to release captured resources
|
||||||
|
item->callback = nullptr;
|
||||||
|
// Clear dynamic name if any
|
||||||
|
item->clear_dynamic_name();
|
||||||
|
this->scheduler_item_pool_.push_back(std::move(item));
|
||||||
|
// ESP_LOGD(TAG, "Recycled SchedulerItem to pool (pool size now: %zu)", this->scheduler_item_pool_.size());
|
||||||
|
} else {
|
||||||
|
// ESP_LOGD(TAG, "Discarding SchedulerItem (pool full at %zu items)", this->scheduler_item_pool_.size());
|
||||||
|
// unique_ptr will delete the item when it goes out of scope
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace esphome
|
} // namespace esphome
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
#include <array>
|
||||||
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#endif
|
#endif
|
||||||
@ -117,11 +118,7 @@ class Scheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Destructor to clean up dynamic names
|
// Destructor to clean up dynamic names
|
||||||
~SchedulerItem() {
|
~SchedulerItem() { clear_dynamic_name(); }
|
||||||
if (name_is_dynamic) {
|
|
||||||
delete[] name_.dynamic_name;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete copy operations to prevent accidental copies
|
// Delete copy operations to prevent accidental copies
|
||||||
SchedulerItem(const SchedulerItem &) = delete;
|
SchedulerItem(const SchedulerItem &) = delete;
|
||||||
@ -134,13 +131,19 @@ class Scheduler {
|
|||||||
// Helper to get the name regardless of storage type
|
// Helper to get the name regardless of storage type
|
||||||
const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; }
|
const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; }
|
||||||
|
|
||||||
|
// Helper to clear dynamic name if allocated
|
||||||
|
void clear_dynamic_name() {
|
||||||
|
if (name_is_dynamic && name_.dynamic_name) {
|
||||||
|
delete[] name_.dynamic_name;
|
||||||
|
name_.dynamic_name = nullptr;
|
||||||
|
name_is_dynamic = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Helper to set name with proper ownership
|
// Helper to set name with proper ownership
|
||||||
void set_name(const char *name, bool make_copy = false) {
|
void set_name(const char *name, bool make_copy = false) {
|
||||||
// Clean up old dynamic name if any
|
// Clean up old dynamic name if any
|
||||||
if (name_is_dynamic && name_.dynamic_name) {
|
clear_dynamic_name();
|
||||||
delete[] name_.dynamic_name;
|
|
||||||
name_is_dynamic = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!name) {
|
if (!name) {
|
||||||
// nullptr case - no name provided
|
// nullptr case - no name provided
|
||||||
@ -219,6 +222,9 @@ class Scheduler {
|
|||||||
return item->remove || (item->component != nullptr && item->component->is_failed());
|
return item->remove || (item->component != nullptr && item->component->is_failed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper to recycle a SchedulerItem
|
||||||
|
void recycle_item_(std::unique_ptr<SchedulerItem> item);
|
||||||
|
|
||||||
// Template helper to check if any item in a container matches our criteria
|
// Template helper to check if any item in a container matches our criteria
|
||||||
template<typename Container>
|
template<typename Container>
|
||||||
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
||||||
@ -232,6 +238,24 @@ class Scheduler {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Template helper to cancel and recycle items from a container
|
||||||
|
template<typename Container>
|
||||||
|
size_t cancel_and_recycle_from_container_(Container &container, Component *component, const char *name_cstr,
|
||||||
|
SchedulerItem::Type type, bool match_retry) {
|
||||||
|
size_t cancelled = 0;
|
||||||
|
for (auto it = container.begin(); it != container.end();) {
|
||||||
|
if (this->matches_item_(*it, component, name_cstr, type, match_retry)) {
|
||||||
|
// Recycle the cancelled item immediately
|
||||||
|
this->recycle_item_(std::move(*it));
|
||||||
|
it = container.erase(it);
|
||||||
|
cancelled++;
|
||||||
|
} else {
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cancelled;
|
||||||
|
}
|
||||||
|
|
||||||
Mutex lock_;
|
Mutex lock_;
|
||||||
std::vector<std::unique_ptr<SchedulerItem>> items_;
|
std::vector<std::unique_ptr<SchedulerItem>> items_;
|
||||||
std::vector<std::unique_ptr<SchedulerItem>> to_add_;
|
std::vector<std::unique_ptr<SchedulerItem>> to_add_;
|
||||||
@ -241,6 +265,9 @@ class Scheduler {
|
|||||||
#endif /* ESPHOME_THREAD_SINGLE */
|
#endif /* ESPHOME_THREAD_SINGLE */
|
||||||
uint32_t to_remove_{0};
|
uint32_t to_remove_{0};
|
||||||
|
|
||||||
|
// Memory pool for recycling SchedulerItem objects
|
||||||
|
std::vector<std::unique_ptr<SchedulerItem>> scheduler_item_pool_;
|
||||||
|
|
||||||
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
/*
|
/*
|
||||||
* Multi-threaded platforms with atomic support: last_millis_ needs atomic for lock-free updates
|
* Multi-threaded platforms with atomic support: last_millis_ needs atomic for lock-free updates
|
||||||
|
Loading…
x
Reference in New Issue
Block a user