Merge branch 'dev' into scheduler_copy

This commit is contained in:
J. Nick Koston 2025-06-28 15:48:11 -05:00 committed by GitHub
commit 1296165fce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 451 additions and 283 deletions

View File

@ -94,6 +94,19 @@ APIConnection::~APIConnection() {
#endif
}
#ifdef HAS_PROTO_MESSAGE_DUMP
void APIConnection::log_batch_item_(const DeferredBatch::BatchItem &item) {
// Set log-only mode
this->log_only_mode_ = true;
// Call the creator - it will create the message and log it via encode_message_to_buffer
item.creator(item.entity, this, std::numeric_limits<uint16_t>::max(), true, item.message_type);
// Clear log-only mode
this->log_only_mode_ = false;
}
#endif
void APIConnection::loop() {
if (this->next_close_) {
// requested a disconnect
@ -249,6 +262,14 @@ void APIConnection::on_disconnect_response(const DisconnectResponse &value) {
// including header and footer overhead. Returns 0 if the message doesn't fit.
uint16_t APIConnection::encode_message_to_buffer(ProtoMessage &msg, uint16_t message_type, APIConnection *conn,
uint32_t remaining_size, bool is_single) {
#ifdef HAS_PROTO_MESSAGE_DUMP
// If in log-only mode, just log and return
if (conn->log_only_mode_) {
conn->log_send_message_(msg.message_name(), msg.dump());
return 1; // Return non-zero to indicate "success" for logging
}
#endif
// Calculate size
uint32_t calculated_size = 0;
msg.calculate_size(calculated_size);
@ -276,11 +297,6 @@ uint16_t APIConnection::encode_message_to_buffer(ProtoMessage &msg, uint16_t mes
// Encode directly into buffer
msg.encode(buffer);
#ifdef HAS_PROTO_MESSAGE_DUMP
// Log the message for VV debugging
conn->log_send_message_(msg.message_name(), msg.dump());
#endif
// Calculate actual encoded size (not including header that was already added)
size_t actual_payload_size = shared_buf.size() - size_before_encode;
@ -1891,6 +1907,15 @@ void APIConnection::process_batch_() {
}
}
#ifdef HAS_PROTO_MESSAGE_DUMP
// Log messages after send attempt for VV debugging
// It's safe to use the buffer for logging at this point regardless of send result
for (size_t i = 0; i < items_processed; i++) {
const auto &item = this->deferred_batch_.items[i];
this->log_batch_item_(item);
}
#endif
// Handle remaining items more efficiently
if (items_processed < this->deferred_batch_.items.size()) {
// Remove processed items from the beginning

View File

@ -470,6 +470,10 @@ class APIConnection : public APIServerConnection {
bool sent_ping_{false};
bool service_call_subscription_{false};
bool next_close_ = false;
#ifdef HAS_PROTO_MESSAGE_DUMP
// When true, encode_message_to_buffer will only log, not encode
bool log_only_mode_{false};
#endif
uint8_t ping_retries_{0};
// 8 bytes used, no padding needed
@ -627,6 +631,10 @@ class APIConnection : public APIServerConnection {
// State for batch buffer allocation
bool batch_first_message_{false};
#ifdef HAS_PROTO_MESSAGE_DUMP
void log_batch_item_(const DeferredBatch::BatchItem &item);
#endif
// Helper function to schedule a deferred message with known message type
bool schedule_message_(EntityBase *entity, MessageCreator creator, uint16_t message_type) {
this->deferred_batch_.add_item(entity, std::move(creator), message_type);

View File

@ -1,7 +1,6 @@
#ifdef USE_ESP32
#include "ble.h"
#include "ble_event_pool.h"
#include "esphome/core/application.h"
#include "esphome/core/helpers.h"

View File

@ -12,8 +12,8 @@
#include "esphome/core/helpers.h"
#include "ble_event.h"
#include "ble_event_pool.h"
#include "queue.h"
#include "esphome/core/lock_free_queue.h"
#include "esphome/core/event_pool.h"
#ifdef USE_ESP32
@ -148,8 +148,8 @@ class ESP32BLE : public Component {
std::vector<BLEStatusEventHandler *> ble_status_event_handlers_;
BLEComponentState state_{BLE_COMPONENT_STATE_OFF};
LockFreeQueue<BLEEvent, MAX_BLE_QUEUE_SIZE> ble_events_;
BLEEventPool<MAX_BLE_QUEUE_SIZE> ble_event_pool_;
esphome::LockFreeQueue<BLEEvent, MAX_BLE_QUEUE_SIZE> ble_events_;
esphome::EventPool<BLEEvent, MAX_BLE_QUEUE_SIZE> ble_event_pool_;
BLEAdvertising *advertising_{};
esp_ble_io_cap_t io_cap_{ESP_IO_CAP_NONE};
uint32_t advertising_cycle_time_{};

View File

@ -134,13 +134,13 @@ class BLEEvent {
}
// Destructor to clean up heap allocations
~BLEEvent() { this->cleanup_heap_data(); }
~BLEEvent() { this->release(); }
// Default constructor for pre-allocation in pool
BLEEvent() : type_(GAP) {}
// Clean up any heap-allocated data
void cleanup_heap_data() {
// Invoked on return to EventPool - clean up any heap-allocated data
void release() {
if (this->type_ == GAP) {
return;
}
@ -161,19 +161,19 @@ class BLEEvent {
// Load new event data for reuse (replaces previous event data)
void load_gap_event(esp_gap_ble_cb_event_t e, esp_ble_gap_cb_param_t *p) {
this->cleanup_heap_data();
this->release();
this->type_ = GAP;
this->init_gap_data_(e, p);
}
void load_gattc_event(esp_gattc_cb_event_t e, esp_gatt_if_t i, esp_ble_gattc_cb_param_t *p) {
this->cleanup_heap_data();
this->release();
this->type_ = GATTC;
this->init_gattc_data_(e, i, p);
}
void load_gatts_event(esp_gatts_cb_event_t e, esp_gatt_if_t i, esp_ble_gatts_cb_param_t *p) {
this->cleanup_heap_data();
this->release();
this->type_ = GATTS;
this->init_gatts_data_(e, i, p);
}

View File

@ -1,72 +0,0 @@
#pragma once
#ifdef USE_ESP32
#include <atomic>
#include <cstddef>
#include "ble_event.h"
#include "queue.h"
#include "esphome/core/helpers.h"
namespace esphome {
namespace esp32_ble {
// BLE Event Pool - On-demand pool of BLEEvent objects to avoid heap fragmentation
// Events are allocated on first use and reused thereafter, growing to peak usage
template<uint8_t SIZE> class BLEEventPool {
public:
BLEEventPool() : total_created_(0) {}
~BLEEventPool() {
// Clean up any remaining events in the free list
BLEEvent *event;
while ((event = this->free_list_.pop()) != nullptr) {
delete event;
}
}
// Allocate an event from the pool
// Returns nullptr if pool is full
BLEEvent *allocate() {
// Try to get from free list first
BLEEvent *event = this->free_list_.pop();
if (event != nullptr)
return event;
// Need to create a new event
if (this->total_created_ >= SIZE) {
// Pool is at capacity
return nullptr;
}
// Use internal RAM for better performance
RAMAllocator<BLEEvent> allocator(RAMAllocator<BLEEvent>::ALLOC_INTERNAL);
event = allocator.allocate(1);
if (event == nullptr) {
// Memory allocation failed
return nullptr;
}
// Placement new to construct the object
new (event) BLEEvent();
this->total_created_++;
return event;
}
// Return an event to the pool for reuse
void release(BLEEvent *event) {
if (event != nullptr) {
this->free_list_.push(event);
}
}
private:
LockFreeQueue<BLEEvent, SIZE> free_list_; // Free events ready for reuse
uint8_t total_created_; // Total events created (high water mark)
};
} // namespace esp32_ble
} // namespace esphome
#endif

View File

@ -1,85 +0,0 @@
#pragma once
#ifdef USE_ESP32
#include <atomic>
#include <cstddef>
/*
* BLE events come in from a separate Task (thread) in the ESP32 stack. Rather
* than using mutex-based locking, this lock-free queue allows the BLE
* task to enqueue events without blocking. The main loop() then processes
* these events at a safer time.
*
* This is a Single-Producer Single-Consumer (SPSC) lock-free ring buffer.
* The BLE task is the only producer, and the main loop() is the only consumer.
*/
namespace esphome {
namespace esp32_ble {
template<class T, uint8_t SIZE> class LockFreeQueue {
public:
LockFreeQueue() : head_(0), tail_(0), dropped_count_(0) {}
bool push(T *element) {
if (element == nullptr)
return false;
uint8_t current_tail = tail_.load(std::memory_order_relaxed);
uint8_t next_tail = (current_tail + 1) % SIZE;
if (next_tail == head_.load(std::memory_order_acquire)) {
// Buffer full
dropped_count_.fetch_add(1, std::memory_order_relaxed);
return false;
}
buffer_[current_tail] = element;
tail_.store(next_tail, std::memory_order_release);
return true;
}
T *pop() {
uint8_t current_head = head_.load(std::memory_order_relaxed);
if (current_head == tail_.load(std::memory_order_acquire)) {
return nullptr; // Empty
}
T *element = buffer_[current_head];
head_.store((current_head + 1) % SIZE, std::memory_order_release);
return element;
}
size_t size() const {
uint8_t tail = tail_.load(std::memory_order_acquire);
uint8_t head = head_.load(std::memory_order_acquire);
return (tail - head + SIZE) % SIZE;
}
uint16_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); }
void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
bool empty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); }
bool full() const {
uint8_t next_tail = (tail_.load(std::memory_order_relaxed) + 1) % SIZE;
return next_tail == head_.load(std::memory_order_acquire);
}
protected:
T *buffer_[SIZE];
// Atomic: written by producer (push/increment), read+reset by consumer (get_and_reset)
std::atomic<uint16_t> dropped_count_; // 65535 max - more than enough for drop tracking
// Atomic: written by consumer (pop), read by producer (push) to check if full
std::atomic<uint8_t> head_;
// Atomic: written by producer (push), read by consumer (pop) to check if empty
std::atomic<uint8_t> tail_;
};
} // namespace esp32_ble
} // namespace esphome
#endif

View File

@ -11,7 +11,7 @@ namespace internal {
/// Wrapper class for memory using big endian data layout, transparently converting it to native order.
template<typename T> class BigEndianLayout {
public:
constexpr14 operator T() { return convert_big_endian(val_); }
constexpr operator T() { return convert_big_endian(val_); }
private:
T val_;
@ -20,7 +20,7 @@ template<typename T> class BigEndianLayout {
/// Wrapper class for memory using big endian data layout, transparently converting it to native order.
template<typename T> class LittleEndianLayout {
public:
constexpr14 operator T() { return convert_little_endian(val_); }
constexpr operator T() { return convert_little_endian(val_); }
private:
T val_;

81
esphome/core/event_pool.h Normal file
View File

@ -0,0 +1,81 @@
#pragma once
#if defined(USE_ESP32) || defined(USE_LIBRETINY)
#include <atomic>
#include <cstddef>
#include "esphome/core/helpers.h"
#include "esphome/core/lock_free_queue.h"
namespace esphome {
// Event Pool - On-demand pool of objects to avoid heap fragmentation
// Events are allocated on first use and reused thereafter, growing to peak usage
// @tparam T The type of objects managed by the pool (must have a release() method)
// @tparam SIZE The maximum number of objects in the pool (1-255, limited by uint8_t)
template<class T, uint8_t SIZE> class EventPool {
public:
EventPool() : total_created_(0) {}
~EventPool() {
// Clean up any remaining events in the free list
// IMPORTANT: This destructor assumes no concurrent access. The EventPool must not
// be destroyed while any thread might still call allocate() or release().
// In practice, this is typically ensured by destroying the pool only during
// component shutdown when all producer/consumer threads have been stopped.
T *event;
RAMAllocator<T> allocator(RAMAllocator<T>::ALLOC_INTERNAL);
while ((event = this->free_list_.pop()) != nullptr) {
// Call destructor
event->~T();
// Deallocate using RAMAllocator
allocator.deallocate(event, 1);
}
}
// Allocate an event from the pool
// Returns nullptr if pool is full
T *allocate() {
// Try to get from free list first
T *event = this->free_list_.pop();
if (event != nullptr)
return event;
// Need to create a new event
if (this->total_created_ >= SIZE) {
// Pool is at capacity
return nullptr;
}
// Use internal RAM for better performance
RAMAllocator<T> allocator(RAMAllocator<T>::ALLOC_INTERNAL);
event = allocator.allocate(1);
if (event == nullptr) {
// Memory allocation failed
return nullptr;
}
// Placement new to construct the object
new (event) T();
this->total_created_++;
return event;
}
// Return an event to the pool for reuse
void release(T *event) {
if (event != nullptr) {
// Clean up the event's allocated memory
event->release();
this->free_list_.push(event);
}
}
private:
LockFreeQueue<T, SIZE> free_list_; // Free events ready for reuse
uint8_t total_created_; // Total events created (high water mark, max 255)
};
} // namespace esphome
#endif // defined(USE_ESP32) || defined(USE_LIBRETINY)

View File

@ -76,23 +76,8 @@ static const uint16_t CRC16_1021_BE_LUT_H[] = {0x0000, 0x1231, 0x2462, 0x3653, 0
0x9188, 0x83b9, 0xb5ea, 0xa7db, 0xd94c, 0xcb7d, 0xfd2e, 0xef1f};
#endif
// STL backports
#if _GLIBCXX_RELEASE < 8
std::string to_string(int value) { return str_snprintf("%d", 32, value); } // NOLINT
std::string to_string(long value) { return str_snprintf("%ld", 32, value); } // NOLINT
std::string to_string(long long value) { return str_snprintf("%lld", 32, value); } // NOLINT
std::string to_string(unsigned value) { return str_snprintf("%u", 32, value); } // NOLINT
std::string to_string(unsigned long value) { return str_snprintf("%lu", 32, value); } // NOLINT
std::string to_string(unsigned long long value) { return str_snprintf("%llu", 32, value); } // NOLINT
std::string to_string(float value) { return str_snprintf("%f", 32, value); }
std::string to_string(double value) { return str_snprintf("%f", 32, value); }
std::string to_string(long double value) { return str_snprintf("%Lf", 32, value); }
#endif
// Mathematics
float lerp(float completion, float start, float end) { return start + (end - start) * completion; }
uint8_t crc8(const uint8_t *data, uint8_t len) {
uint8_t crc = 0;

View File

@ -37,89 +37,18 @@
#define ESPHOME_ALWAYS_INLINE __attribute__((always_inline))
#define PACKED __attribute__((packed))
// Various functions can be constexpr in C++14, but not in C++11 (because their body isn't just a return statement).
// Define a substitute constexpr keyword for those functions, until we can drop C++11 support.
#if __cplusplus >= 201402L
#define constexpr14 constexpr
#else
#define constexpr14 inline // constexpr implies inline
#endif
namespace esphome {
/// @name STL backports
///@{
// Backports for various STL features we like to use. Pull in the STL implementation wherever available, to avoid
// ambiguity and to provide a uniform API.
// std::to_string() from C++11, available from libstdc++/g++ 8
// See https://github.com/espressif/esp-idf/issues/1445
#if _GLIBCXX_RELEASE >= 8
// Keep "using" even after the removal of our backports, to avoid breaking existing code.
using std::to_string;
#else
std::string to_string(int value); // NOLINT
std::string to_string(long value); // NOLINT
std::string to_string(long long value); // NOLINT
std::string to_string(unsigned value); // NOLINT
std::string to_string(unsigned long value); // NOLINT
std::string to_string(unsigned long long value); // NOLINT
std::string to_string(float value);
std::string to_string(double value);
std::string to_string(long double value);
#endif
// std::is_trivially_copyable from C++11, implemented in libstdc++/g++ 5.1 (but minor releases can't be detected)
#if _GLIBCXX_RELEASE >= 6
using std::is_trivially_copyable;
#else
// Implementing this is impossible without compiler intrinsics, so don't bother. Invalid usage will be detected on
// other variants that use a newer compiler anyway.
// NOLINTNEXTLINE(readability-identifier-naming)
template<typename T> struct is_trivially_copyable : public std::integral_constant<bool, true> {};
#endif
// std::make_unique() from C++14
#if __cpp_lib_make_unique >= 201304
using std::make_unique;
#else
template<typename T, typename... Args> std::unique_ptr<T> make_unique(Args &&...args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
#endif
// std::enable_if_t from C++14
#if __cplusplus >= 201402L
using std::enable_if_t;
#else
template<bool B, class T = void> using enable_if_t = typename std::enable_if<B, T>::type;
#endif
// std::clamp from C++17
#if __cpp_lib_clamp >= 201603
using std::clamp;
#else
template<typename T, typename Compare> constexpr const T &clamp(const T &v, const T &lo, const T &hi, Compare comp) {
return comp(v, lo) ? lo : comp(hi, v) ? hi : v;
}
template<typename T> constexpr const T &clamp(const T &v, const T &lo, const T &hi) {
return clamp(v, lo, hi, std::less<T>{});
}
#endif
// std::is_invocable from C++17
#if __cpp_lib_is_invocable >= 201703
using std::is_invocable;
#else
// https://stackoverflow.com/a/37161919/8924614
template<class T, class... Args> struct is_invocable { // NOLINT(readability-identifier-naming)
template<class U> static auto test(U *p) -> decltype((*p)(std::declval<Args>()...), void(), std::true_type());
template<class U> static auto test(...) -> decltype(std::false_type());
static constexpr auto value = decltype(test<T>(nullptr))::value; // NOLINT
};
#endif
// std::bit_cast from C++20
#if __cpp_lib_bit_cast >= 201806
using std::bit_cast;
#else
@ -134,31 +63,29 @@ To bit_cast(const From &src) {
return dst;
}
#endif
using std::lerp;
// std::byteswap from C++23
template<typename T> constexpr14 T byteswap(T n) {
template<typename T> constexpr T byteswap(T n) {
T m;
for (size_t i = 0; i < sizeof(T); i++)
reinterpret_cast<uint8_t *>(&m)[i] = reinterpret_cast<uint8_t *>(&n)[sizeof(T) - 1 - i];
return m;
}
template<> constexpr14 uint8_t byteswap(uint8_t n) { return n; }
template<> constexpr14 uint16_t byteswap(uint16_t n) { return __builtin_bswap16(n); }
template<> constexpr14 uint32_t byteswap(uint32_t n) { return __builtin_bswap32(n); }
template<> constexpr14 uint64_t byteswap(uint64_t n) { return __builtin_bswap64(n); }
template<> constexpr14 int8_t byteswap(int8_t n) { return n; }
template<> constexpr14 int16_t byteswap(int16_t n) { return __builtin_bswap16(n); }
template<> constexpr14 int32_t byteswap(int32_t n) { return __builtin_bswap32(n); }
template<> constexpr14 int64_t byteswap(int64_t n) { return __builtin_bswap64(n); }
template<> constexpr uint8_t byteswap(uint8_t n) { return n; }
template<> constexpr uint16_t byteswap(uint16_t n) { return __builtin_bswap16(n); }
template<> constexpr uint32_t byteswap(uint32_t n) { return __builtin_bswap32(n); }
template<> constexpr uint64_t byteswap(uint64_t n) { return __builtin_bswap64(n); }
template<> constexpr int8_t byteswap(int8_t n) { return n; }
template<> constexpr int16_t byteswap(int16_t n) { return __builtin_bswap16(n); }
template<> constexpr int32_t byteswap(int32_t n) { return __builtin_bswap32(n); }
template<> constexpr int64_t byteswap(int64_t n) { return __builtin_bswap64(n); }
///@}
/// @name Mathematics
///@{
/// Linearly interpolate between \p start and \p end by \p completion (between 0 and 1).
float lerp(float completion, float start, float end);
/// Remap \p value from the range (\p min, \p max) to (\p min_out, \p max_out).
template<typename T, typename U> T remap(U value, U min, U max, T min_out, T max_out) {
return (value - min) * (max_out - min_out) / (max - min) + min_out;
@ -203,8 +130,7 @@ constexpr uint32_t encode_uint32(uint8_t byte1, uint8_t byte2, uint8_t byte3, ui
}
/// Encode a value from its constituent bytes (from most to least significant) in an array with length sizeof(T).
template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0>
constexpr14 T encode_value(const uint8_t *bytes) {
template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0> constexpr T encode_value(const uint8_t *bytes) {
T val = 0;
for (size_t i = 0; i < sizeof(T); i++) {
val <<= 8;
@ -214,12 +140,12 @@ constexpr14 T encode_value(const uint8_t *bytes) {
}
/// Encode a value from its constituent bytes (from most to least significant) in an std::array with length sizeof(T).
template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0>
constexpr14 T encode_value(const std::array<uint8_t, sizeof(T)> bytes) {
constexpr T encode_value(const std::array<uint8_t, sizeof(T)> bytes) {
return encode_value<T>(bytes.data());
}
/// Decode a value into its constituent bytes (from most to least significant).
template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0>
constexpr14 std::array<uint8_t, sizeof(T)> decode_value(T val) {
constexpr std::array<uint8_t, sizeof(T)> decode_value(T val) {
std::array<uint8_t, sizeof(T)> ret{};
for (size_t i = sizeof(T); i > 0; i--) {
ret[i - 1] = val & 0xFF;
@ -246,7 +172,7 @@ inline uint32_t reverse_bits(uint32_t x) {
}
/// Convert a value between host byte order and big endian (most significant byte first) order.
template<typename T> constexpr14 T convert_big_endian(T val) {
template<typename T> constexpr T convert_big_endian(T val) {
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return byteswap(val);
#else
@ -255,7 +181,7 @@ template<typename T> constexpr14 T convert_big_endian(T val) {
}
/// Convert a value between host byte order and little endian (least significant byte first) order.
template<typename T> constexpr14 T convert_little_endian(T val) {
template<typename T> constexpr T convert_little_endian(T val) {
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return val;
#else
@ -276,9 +202,6 @@ bool str_startswith(const std::string &str, const std::string &start);
/// Check whether a string ends with a value.
bool str_endswith(const std::string &str, const std::string &end);
/// Convert the value to a string (added as extra overload so that to_string() can be used on all stringifiable types).
inline std::string to_string(const std::string &val) { return val; }
/// Truncate a string to a specific length.
std::string str_truncate(const std::string &str, size_t length);

View File

@ -0,0 +1,132 @@
#pragma once
#if defined(USE_ESP32) || defined(USE_LIBRETINY)
#include <atomic>
#include <cstddef>
#if defined(USE_ESP32)
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#elif defined(USE_LIBRETINY)
#include <FreeRTOS.h>
#include <task.h>
#endif
/*
* Lock-free queue for single-producer single-consumer scenarios.
* This allows one thread to push items and another to pop them without
* blocking each other.
*
* This is a Single-Producer Single-Consumer (SPSC) lock-free ring buffer.
* Available on platforms with FreeRTOS support (ESP32, LibreTiny).
*
* Common use cases:
* - BLE events: BLE task produces, main loop consumes
* - MQTT messages: main task produces, MQTT thread consumes
*
* @tparam T The type of elements stored in the queue (must be a pointer type)
* @tparam SIZE The maximum number of elements (1-255, limited by uint8_t indices)
*/
namespace esphome {
template<class T, uint8_t SIZE> class LockFreeQueue {
public:
LockFreeQueue() : head_(0), tail_(0), dropped_count_(0), task_to_notify_(nullptr) {}
bool push(T *element) {
if (element == nullptr)
return false;
uint8_t current_tail = tail_.load(std::memory_order_relaxed);
uint8_t next_tail = (current_tail + 1) % SIZE;
// Read head before incrementing tail
uint8_t head_before = head_.load(std::memory_order_acquire);
if (next_tail == head_before) {
// Buffer full
dropped_count_.fetch_add(1, std::memory_order_relaxed);
return false;
}
// Check if queue was empty before push
bool was_empty = (current_tail == head_before);
buffer_[current_tail] = element;
tail_.store(next_tail, std::memory_order_release);
// Notify optimization: only notify if we need to
if (task_to_notify_ != nullptr) {
if (was_empty) {
// Queue was empty - consumer might be going to sleep, must notify
xTaskNotifyGive(task_to_notify_);
} else {
// Queue wasn't empty - check if consumer has caught up to previous tail
uint8_t head_after = head_.load(std::memory_order_acquire);
if (head_after == current_tail) {
// Consumer just caught up to where tail was - might go to sleep, must notify
// Note: There's a benign race here - between reading head_after and calling
// xTaskNotifyGive(), the consumer could advance further. This would result
// in an unnecessary wake-up, but is harmless and extremely rare in practice.
xTaskNotifyGive(task_to_notify_);
}
// Otherwise: consumer is still behind, no need to notify
}
}
return true;
}
T *pop() {
uint8_t current_head = head_.load(std::memory_order_relaxed);
if (current_head == tail_.load(std::memory_order_acquire)) {
return nullptr; // Empty
}
T *element = buffer_[current_head];
head_.store((current_head + 1) % SIZE, std::memory_order_release);
return element;
}
size_t size() const {
uint8_t tail = tail_.load(std::memory_order_acquire);
uint8_t head = head_.load(std::memory_order_acquire);
return (tail - head + SIZE) % SIZE;
}
uint16_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); }
void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
bool empty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); }
bool full() const {
uint8_t next_tail = (tail_.load(std::memory_order_relaxed) + 1) % SIZE;
return next_tail == head_.load(std::memory_order_acquire);
}
// Set the FreeRTOS task handle to notify when items are pushed to the queue
// This enables efficient wake-up of a consumer task that's waiting for data
// @param task The FreeRTOS task handle to notify, or nullptr to disable notifications
void set_task_to_notify(TaskHandle_t task) { task_to_notify_ = task; }
protected:
T *buffer_[SIZE];
// Atomic: written by producer (push/increment), read+reset by consumer (get_and_reset)
std::atomic<uint16_t> dropped_count_; // 65535 max - more than enough for drop tracking
// Atomic: written by consumer (pop), read by producer (push) to check if full
// Using uint8_t limits queue size to 255 elements but saves memory and ensures
// atomic operations are efficient on all platforms
std::atomic<uint8_t> head_;
// Atomic: written by producer (push), read by consumer (pop) to check if empty
std::atomic<uint8_t> tail_;
// Task handle for notification (optional)
TaskHandle_t task_to_notify_;
};
} // namespace esphome
#endif // defined(USE_ESP32) || defined(USE_LIBRETINY)

View File

@ -0,0 +1,89 @@
esphome:
name: vv-logging-test
host:
api:
logger:
level: VERY_VERBOSE
# Enable VV logging for API components where the issue occurs
logs:
api.connection: VERY_VERBOSE
api.service: VERY_VERBOSE
api.proto: VERY_VERBOSE
sensor: VERY_VERBOSE
# Create many sensors that update frequently to generate API traffic
# This will cause many messages to be batched and sent, triggering the
# code path where VV logging could cause buffer corruption
sensor:
- platform: template
name: "Test Sensor 1"
lambda: 'return millis() / 1000.0;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 2"
lambda: 'return (millis() / 1000.0) + 10;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 3"
lambda: 'return (millis() / 1000.0) + 20;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 4"
lambda: 'return (millis() / 1000.0) + 30;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 5"
lambda: 'return (millis() / 1000.0) + 40;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 6"
lambda: 'return (millis() / 1000.0) + 50;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 7"
lambda: 'return (millis() / 1000.0) + 60;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 8"
lambda: 'return (millis() / 1000.0) + 70;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 9"
lambda: 'return (millis() / 1000.0) + 80;'
update_interval: 50ms
unit_of_measurement: "s"
- platform: template
name: "Test Sensor 10"
lambda: 'return (millis() / 1000.0) + 90;'
update_interval: 50ms
unit_of_measurement: "s"
# Add some binary sensors too for variety
binary_sensor:
- platform: template
name: "Test Binary 1"
lambda: 'return (millis() / 1000) % 2 == 0;'
- platform: template
name: "Test Binary 2"
lambda: 'return (millis() / 1000) % 3 == 0;'

View File

@ -0,0 +1,83 @@
"""Integration test for API with VERY_VERBOSE logging to verify no buffer corruption."""
from __future__ import annotations
import asyncio
from typing import Any
from aioesphomeapi import LogLevel
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_api_vv_logging(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that VERY_VERBOSE logging doesn't cause buffer corruption with API messages."""
# Track that we're receiving VV log messages and sensor updates
vv_logs_received = 0
sensor_updates_received = 0
errors_detected = []
def on_log(msg: Any) -> None:
"""Capture log messages."""
nonlocal vv_logs_received
# msg is a SubscribeLogsResponse object with 'message' attribute
# The message field is always bytes
message_text = msg.message.decode("utf-8", errors="replace")
# Only count VV logs specifically
if "[VV]" in message_text:
vv_logs_received += 1
# Check for assertion or error messages
if "assert" in message_text.lower() or "error" in message_text.lower():
errors_detected.append(message_text)
# Write, compile and run the ESPHome device
async with run_compiled(yaml_config), api_client_connected() as client:
# Subscribe to VERY_VERBOSE logs - this enables the code path that could cause corruption
client.subscribe_logs(on_log, log_level=LogLevel.LOG_LEVEL_VERY_VERBOSE)
# Wait for device to be ready
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "vv-logging-test"
# Subscribe to sensor states
states = {}
def on_state(state):
nonlocal sensor_updates_received
sensor_updates_received += 1
states[state.key] = state
client.subscribe_states(on_state)
# List entities to find our test sensors
entity_info, _ = await client.list_entities_services()
# Count sensors
sensor_count = sum(1 for e in entity_info if hasattr(e, "unit_of_measurement"))
assert sensor_count >= 10, f"Expected at least 10 sensors, got {sensor_count}"
# Wait for sensor updates to flow with VV logging active
# The sensors update every 50ms, so we should get many updates
await asyncio.sleep(0.25)
# Verify we received both VV logs and sensor updates
assert vv_logs_received > 0, "Expected to receive VERY_VERBOSE log messages"
assert sensor_updates_received > 10, (
f"Expected many sensor updates, got {sensor_updates_received}"
)
# Check for any errors
if errors_detected:
pytest.fail(f"Errors detected during test: {errors_detected}")
# The test passes if we didn't hit any assertions or buffer corruption