mirror of
https://github.com/esphome/esphome.git
synced 2025-07-28 14:16:40 +00:00
Make BLE queue lock free (#9088)
This commit is contained in:
parent
39f6f9b0dc
commit
70d66062d6
@ -23,9 +23,6 @@ namespace esp32_ble {
|
|||||||
|
|
||||||
static const char *const TAG = "esp32_ble";
|
static const char *const TAG = "esp32_ble";
|
||||||
|
|
||||||
// Maximum size of the BLE event queue
|
|
||||||
static constexpr size_t MAX_BLE_QUEUE_SIZE = SCAN_RESULT_BUFFER_SIZE * 2;
|
|
||||||
|
|
||||||
static RAMAllocator<BLEEvent> EVENT_ALLOCATOR( // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
|
static RAMAllocator<BLEEvent> EVENT_ALLOCATOR( // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
|
||||||
RAMAllocator<BLEEvent>::ALLOW_FAILURE | RAMAllocator<BLEEvent>::ALLOC_INTERNAL);
|
RAMAllocator<BLEEvent>::ALLOW_FAILURE | RAMAllocator<BLEEvent>::ALLOC_INTERNAL);
|
||||||
|
|
||||||
@ -360,21 +357,38 @@ void ESP32BLE::loop() {
|
|||||||
if (this->advertising_ != nullptr) {
|
if (this->advertising_ != nullptr) {
|
||||||
this->advertising_->loop();
|
this->advertising_->loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Log dropped events periodically
|
||||||
|
size_t dropped = this->ble_events_.get_and_reset_dropped_count();
|
||||||
|
if (dropped > 0) {
|
||||||
|
ESP_LOGW(TAG, "Dropped %zu BLE events due to buffer overflow", dropped);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename... Args> void enqueue_ble_event(Args... args) {
|
template<typename... Args> void enqueue_ble_event(Args... args) {
|
||||||
if (global_ble->ble_events_.size() >= MAX_BLE_QUEUE_SIZE) {
|
// Check if queue is full before allocating
|
||||||
ESP_LOGD(TAG, "Event queue full (%zu), dropping event", MAX_BLE_QUEUE_SIZE);
|
if (global_ble->ble_events_.full()) {
|
||||||
|
// Queue is full, drop the event
|
||||||
|
global_ble->ble_events_.increment_dropped_count();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
BLEEvent *new_event = EVENT_ALLOCATOR.allocate(1);
|
BLEEvent *new_event = EVENT_ALLOCATOR.allocate(1);
|
||||||
if (new_event == nullptr) {
|
if (new_event == nullptr) {
|
||||||
// Memory too fragmented to allocate new event. Can only drop it until memory comes back
|
// Memory too fragmented to allocate new event. Can only drop it until memory comes back
|
||||||
|
global_ble->ble_events_.increment_dropped_count();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
new (new_event) BLEEvent(args...);
|
new (new_event) BLEEvent(args...);
|
||||||
global_ble->ble_events_.push(new_event);
|
|
||||||
|
// Push the event - since we're the only producer and we checked full() above,
|
||||||
|
// this should always succeed unless we have a bug
|
||||||
|
if (!global_ble->ble_events_.push(new_event)) {
|
||||||
|
// This should not happen in SPSC queue with single producer
|
||||||
|
ESP_LOGE(TAG, "BLE queue push failed unexpectedly");
|
||||||
|
new_event->~BLEEvent();
|
||||||
|
EVENT_ALLOCATOR.deallocate(new_event, 1);
|
||||||
|
}
|
||||||
} // NOLINT(clang-analyzer-unix.Malloc)
|
} // NOLINT(clang-analyzer-unix.Malloc)
|
||||||
|
|
||||||
// Explicit template instantiations for the friend function
|
// Explicit template instantiations for the friend function
|
||||||
|
@ -30,6 +30,9 @@ static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 32;
|
|||||||
static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 20;
|
static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 20;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
// Maximum size of the BLE event queue - must be power of 2 for lock-free queue
|
||||||
|
static constexpr size_t MAX_BLE_QUEUE_SIZE = 64;
|
||||||
|
|
||||||
uint64_t ble_addr_to_uint64(const esp_bd_addr_t address);
|
uint64_t ble_addr_to_uint64(const esp_bd_addr_t address);
|
||||||
|
|
||||||
// NOLINTNEXTLINE(modernize-use-using)
|
// NOLINTNEXTLINE(modernize-use-using)
|
||||||
@ -144,7 +147,7 @@ class ESP32BLE : public Component {
|
|||||||
std::vector<BLEStatusEventHandler *> ble_status_event_handlers_;
|
std::vector<BLEStatusEventHandler *> ble_status_event_handlers_;
|
||||||
BLEComponentState state_{BLE_COMPONENT_STATE_OFF};
|
BLEComponentState state_{BLE_COMPONENT_STATE_OFF};
|
||||||
|
|
||||||
Queue<BLEEvent> ble_events_;
|
LockFreeQueue<BLEEvent, MAX_BLE_QUEUE_SIZE> ble_events_;
|
||||||
BLEAdvertising *advertising_{};
|
BLEAdvertising *advertising_{};
|
||||||
esp_ble_io_cap_t io_cap_{ESP_IO_CAP_NONE};
|
esp_ble_io_cap_t io_cap_{ESP_IO_CAP_NONE};
|
||||||
uint32_t advertising_cycle_time_{};
|
uint32_t advertising_cycle_time_{};
|
||||||
|
@ -2,63 +2,78 @@
|
|||||||
|
|
||||||
#ifdef USE_ESP32
|
#ifdef USE_ESP32
|
||||||
|
|
||||||
#include <mutex>
|
#include <atomic>
|
||||||
#include <queue>
|
#include <cstddef>
|
||||||
|
|
||||||
#include <freertos/FreeRTOS.h>
|
|
||||||
#include <freertos/semphr.h>
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* BLE events come in from a separate Task (thread) in the ESP32 stack. Rather
|
* BLE events come in from a separate Task (thread) in the ESP32 stack. Rather
|
||||||
* than trying to deal with various locking strategies, all incoming GAP and GATT
|
* than using mutex-based locking, this lock-free queue allows the BLE
|
||||||
* events will simply be placed on a semaphore guarded queue. The next time the
|
* task to enqueue events without blocking. The main loop() then processes
|
||||||
* component runs loop(), these events are popped off the queue and handed at
|
* these events at a safer time.
|
||||||
* this safer time.
|
*
|
||||||
|
* This is a Single-Producer Single-Consumer (SPSC) lock-free ring buffer.
|
||||||
|
* The BLE task is the only producer, and the main loop() is the only consumer.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
namespace esphome {
|
namespace esphome {
|
||||||
namespace esp32_ble {
|
namespace esp32_ble {
|
||||||
|
|
||||||
template<class T> class Queue {
|
template<class T, size_t SIZE> class LockFreeQueue {
|
||||||
public:
|
public:
|
||||||
Queue() { m_ = xSemaphoreCreateMutex(); }
|
LockFreeQueue() : head_(0), tail_(0), dropped_count_(0) {}
|
||||||
|
|
||||||
void push(T *element) {
|
bool push(T *element) {
|
||||||
if (element == nullptr)
|
if (element == nullptr)
|
||||||
return;
|
return false;
|
||||||
// It is not called from main loop. Thus it won't block main thread.
|
|
||||||
xSemaphoreTake(m_, portMAX_DELAY);
|
size_t current_tail = tail_.load(std::memory_order_relaxed);
|
||||||
q_.push(element);
|
size_t next_tail = (current_tail + 1) % SIZE;
|
||||||
xSemaphoreGive(m_);
|
|
||||||
|
if (next_tail == head_.load(std::memory_order_acquire)) {
|
||||||
|
// Buffer full
|
||||||
|
dropped_count_.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer_[current_tail] = element;
|
||||||
|
tail_.store(next_tail, std::memory_order_release);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
T *pop() {
|
T *pop() {
|
||||||
T *element = nullptr;
|
size_t current_head = head_.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
if (xSemaphoreTake(m_, 5L / portTICK_PERIOD_MS)) {
|
if (current_head == tail_.load(std::memory_order_acquire)) {
|
||||||
if (!q_.empty()) {
|
return nullptr; // Empty
|
||||||
element = q_.front();
|
|
||||||
q_.pop();
|
|
||||||
}
|
|
||||||
xSemaphoreGive(m_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
T *element = buffer_[current_head];
|
||||||
|
head_.store((current_head + 1) % SIZE, std::memory_order_release);
|
||||||
return element;
|
return element;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size() const {
|
size_t size() const {
|
||||||
// Lock-free size check. While std::queue::size() is not thread-safe, we intentionally
|
size_t tail = tail_.load(std::memory_order_acquire);
|
||||||
// avoid locking here to prevent blocking the BLE callback thread. The size is only
|
size_t head = head_.load(std::memory_order_acquire);
|
||||||
// used to decide whether to drop incoming events when the queue is near capacity.
|
return (tail - head + SIZE) % SIZE;
|
||||||
// With a queue limit of 40-64 events and normal processing, dropping events should
|
}
|
||||||
// be extremely rare. When it does approach capacity, being off by 1-2 events is
|
|
||||||
// acceptable to avoid blocking the BLE stack's time-sensitive callbacks.
|
size_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); }
|
||||||
// Trade-off: We prefer occasional dropped events over potential BLE stack delays.
|
|
||||||
return q_.size();
|
void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
|
||||||
|
|
||||||
|
bool empty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); }
|
||||||
|
|
||||||
|
bool full() const {
|
||||||
|
size_t next_tail = (tail_.load(std::memory_order_relaxed) + 1) % SIZE;
|
||||||
|
return next_tail == head_.load(std::memory_order_acquire);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::queue<T *> q_;
|
T *buffer_[SIZE];
|
||||||
SemaphoreHandle_t m_;
|
std::atomic<size_t> head_;
|
||||||
|
std::atomic<size_t> tail_;
|
||||||
|
std::atomic<size_t> dropped_count_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace esp32_ble
|
} // namespace esp32_ble
|
||||||
|
Loading…
x
Reference in New Issue
Block a user