mirror of
https://github.com/esphome/esphome.git
synced 2025-07-28 14:16:40 +00:00
Fix scheduler rollover detection with concurrent task calls (#9624)
This commit is contained in:
parent
7f807e08b1
commit
dfa8c8c77f
@ -14,6 +14,8 @@ namespace esphome {
|
|||||||
static const char *const TAG = "scheduler";
|
static const char *const TAG = "scheduler";
|
||||||
|
|
||||||
static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10;
|
static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10;
|
||||||
|
// Half the 32-bit range - used to detect rollovers vs normal time progression
|
||||||
|
static const uint32_t HALF_MAX_UINT32 = 0x80000000UL;
|
||||||
|
|
||||||
// Uncomment to debug scheduler
|
// Uncomment to debug scheduler
|
||||||
// #define ESPHOME_DEBUG_SCHEDULER
|
// #define ESPHOME_DEBUG_SCHEDULER
|
||||||
@ -91,7 +93,8 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
const auto now = this->millis_64_(millis());
|
// Get fresh timestamp for new timer/interval - ensures accurate scheduling
|
||||||
|
const auto now = this->millis_64_(millis()); // Fresh millis() call
|
||||||
|
|
||||||
// Type-specific setup
|
// Type-specific setup
|
||||||
if (type == SchedulerItem::INTERVAL) {
|
if (type == SchedulerItem::INTERVAL) {
|
||||||
@ -220,7 +223,8 @@ optional<uint32_t> HOT Scheduler::next_schedule_in(uint32_t now) {
|
|||||||
if (this->empty_())
|
if (this->empty_())
|
||||||
return {};
|
return {};
|
||||||
auto &item = this->items_[0];
|
auto &item = this->items_[0];
|
||||||
const auto now_64 = this->millis_64_(now);
|
// Convert the fresh timestamp from caller (usually Application::loop()) to 64-bit
|
||||||
|
const auto now_64 = this->millis_64_(now); // 'now' from parameter - fresh from caller
|
||||||
if (item->next_execution_ < now_64)
|
if (item->next_execution_ < now_64)
|
||||||
return 0;
|
return 0;
|
||||||
return item->next_execution_ - now_64;
|
return item->next_execution_ - now_64;
|
||||||
@ -259,7 +263,8 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
const auto now_64 = this->millis_64_(now);
|
// Convert the fresh timestamp from main loop to 64-bit for scheduler operations
|
||||||
|
const auto now_64 = this->millis_64_(now); // 'now' from parameter - fresh from Application::loop()
|
||||||
this->process_to_add();
|
this->process_to_add();
|
||||||
|
|
||||||
#ifdef ESPHOME_DEBUG_SCHEDULER
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
@ -268,8 +273,13 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
if (now_64 - last_print > 2000) {
|
if (now_64 - last_print > 2000) {
|
||||||
last_print = now_64;
|
last_print = now_64;
|
||||||
std::vector<std::unique_ptr<SchedulerItem>> old_items;
|
std::vector<std::unique_ptr<SchedulerItem>> old_items;
|
||||||
|
#if !defined(USE_ESP8266) && !defined(USE_RP2040)
|
||||||
|
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%u, %" PRIu32 ")", this->items_.size(), now_64,
|
||||||
|
this->millis_major_, this->last_millis_.load(std::memory_order_relaxed));
|
||||||
|
#else
|
||||||
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%u, %" PRIu32 ")", this->items_.size(), now_64,
|
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%u, %" PRIu32 ")", this->items_.size(), now_64,
|
||||||
this->millis_major_, this->last_millis_);
|
this->millis_major_, this->last_millis_);
|
||||||
|
#endif
|
||||||
while (!this->empty_()) {
|
while (!this->empty_()) {
|
||||||
std::unique_ptr<SchedulerItem> item;
|
std::unique_ptr<SchedulerItem> item;
|
||||||
{
|
{
|
||||||
@ -483,16 +493,70 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
}
|
}
|
||||||
|
|
||||||
uint64_t Scheduler::millis_64_(uint32_t now) {
|
uint64_t Scheduler::millis_64_(uint32_t now) {
|
||||||
// Check for rollover by comparing with last value
|
// THREAD SAFETY NOTE:
|
||||||
if (now < this->last_millis_) {
|
// This function can be called from multiple threads simultaneously on ESP32/LibreTiny.
|
||||||
// Detected rollover (happens every ~49.7 days)
|
// On single-threaded platforms (ESP8266, RP2040), atomics are not needed.
|
||||||
|
//
|
||||||
|
// IMPORTANT: Always pass fresh millis() values to this function. The implementation
|
||||||
|
// handles out-of-order timestamps between threads, but minimizing time differences
|
||||||
|
// helps maintain accuracy.
|
||||||
|
//
|
||||||
|
// The implementation handles the 32-bit rollover (every 49.7 days) by:
|
||||||
|
// 1. Using a lock when detecting rollover to ensure atomic update
|
||||||
|
// 2. Restricting normal updates to forward movement within the same epoch
|
||||||
|
// This prevents race conditions at the rollover boundary without requiring
|
||||||
|
// 64-bit atomics or locking on every call.
|
||||||
|
|
||||||
|
#if !defined(USE_ESP8266) && !defined(USE_RP2040)
|
||||||
|
// Multi-threaded platforms: Need to handle rollover carefully
|
||||||
|
uint32_t last = this->last_millis_.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
|
// If we might be near a rollover (large backwards jump), take the lock for the entire operation
|
||||||
|
// This ensures rollover detection and last_millis_ update are atomic together
|
||||||
|
if (now < last && (last - now) > HALF_MAX_UINT32) {
|
||||||
|
// Potential rollover - need lock for atomic rollover detection + update
|
||||||
|
LockGuard guard{this->lock_};
|
||||||
|
// Re-read with lock held
|
||||||
|
last = this->last_millis_.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
|
if (now < last && (last - now) > HALF_MAX_UINT32) {
|
||||||
|
// True rollover detected (happens every ~49.7 days)
|
||||||
|
this->millis_major_++;
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGD(TAG, "Detected true 32-bit rollover at %" PRIu32 "ms (was %" PRIu32 ")", now, last);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
// Update last_millis_ while holding lock to prevent races
|
||||||
|
this->last_millis_.store(now, std::memory_order_relaxed);
|
||||||
|
} else {
|
||||||
|
// Normal case: Try lock-free update, but only allow forward movement within same epoch
|
||||||
|
// This prevents accidentally moving backwards across a rollover boundary
|
||||||
|
while (now > last && (now - last) < HALF_MAX_UINT32) {
|
||||||
|
if (this->last_millis_.compare_exchange_weak(last, now, std::memory_order_relaxed)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// last is automatically updated by compare_exchange_weak if it fails
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
// Single-threaded platforms: No atomics needed
|
||||||
|
uint32_t last = this->last_millis_;
|
||||||
|
|
||||||
|
// Check for rollover
|
||||||
|
if (now < last && (last - now) > HALF_MAX_UINT32) {
|
||||||
this->millis_major_++;
|
this->millis_major_++;
|
||||||
#ifdef ESPHOME_DEBUG_SCHEDULER
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
ESP_LOGD(TAG, "Incrementing scheduler major at %" PRIu64 "ms",
|
ESP_LOGD(TAG, "Detected true 32-bit rollover at %" PRIu32 "ms (was %" PRIu32 ")", now, last);
|
||||||
now + (static_cast<uint64_t>(this->millis_major_) << 32));
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
this->last_millis_ = now;
|
|
||||||
|
// Only update if time moved forward
|
||||||
|
if (now > last) {
|
||||||
|
this->last_millis_ = now;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// Combine major (high 32 bits) and now (low 32 bits) into 64-bit time
|
// Combine major (high 32 bits) and now (low 32 bits) into 64-bit time
|
||||||
return now + (static_cast<uint64_t>(this->millis_major_) << 32);
|
return now + (static_cast<uint64_t>(this->millis_major_) << 32);
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,9 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
#if !defined(USE_ESP8266) && !defined(USE_RP2040)
|
||||||
|
#include <atomic>
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "esphome/core/component.h"
|
#include "esphome/core/component.h"
|
||||||
#include "esphome/core/helpers.h"
|
#include "esphome/core/helpers.h"
|
||||||
@ -52,8 +55,12 @@ class Scheduler {
|
|||||||
std::function<RetryResult(uint8_t)> func, float backoff_increase_factor = 1.0f);
|
std::function<RetryResult(uint8_t)> func, float backoff_increase_factor = 1.0f);
|
||||||
bool cancel_retry(Component *component, const std::string &name);
|
bool cancel_retry(Component *component, const std::string &name);
|
||||||
|
|
||||||
|
// Calculate when the next scheduled item should run
|
||||||
|
// @param now Fresh timestamp from millis() - must not be stale/cached
|
||||||
optional<uint32_t> next_schedule_in(uint32_t now);
|
optional<uint32_t> next_schedule_in(uint32_t now);
|
||||||
|
|
||||||
|
// Execute all scheduled items that are ready
|
||||||
|
// @param now Fresh timestamp from millis() - must not be stale/cached
|
||||||
void call(uint32_t now);
|
void call(uint32_t now);
|
||||||
|
|
||||||
void process_to_add();
|
void process_to_add();
|
||||||
@ -203,7 +210,15 @@ class Scheduler {
|
|||||||
// Both platforms save 40 bytes of RAM by excluding this
|
// Both platforms save 40 bytes of RAM by excluding this
|
||||||
std::deque<std::unique_ptr<SchedulerItem>> defer_queue_; // FIFO queue for defer() calls
|
std::deque<std::unique_ptr<SchedulerItem>> defer_queue_; // FIFO queue for defer() calls
|
||||||
#endif
|
#endif
|
||||||
|
#if !defined(USE_ESP8266) && !defined(USE_RP2040)
|
||||||
|
// Multi-threaded platforms: last_millis_ needs atomic for lock-free updates
|
||||||
|
std::atomic<uint32_t> last_millis_{0};
|
||||||
|
#else
|
||||||
|
// Single-threaded platforms: no atomics needed
|
||||||
uint32_t last_millis_{0};
|
uint32_t last_millis_{0};
|
||||||
|
#endif
|
||||||
|
// millis_major_ is protected by lock when incrementing, volatile ensures
|
||||||
|
// reads outside the lock see fresh values (not cached in registers)
|
||||||
uint16_t millis_major_{0};
|
uint16_t millis_major_{0};
|
||||||
uint32_t to_remove_{0};
|
uint32_t to_remove_{0};
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user