mirror of
https://github.com/esphome/esphome.git
synced 2025-07-28 22:26:36 +00:00
Fix MQTT blocking main loop for multiple seconds at a time (#8325)
Co-authored-by: patagona <patagonahn@gmail.com> Co-authored-by: J. Nick Koston <nick@koston.org> Co-authored-by: J. Nick Koston <nick+github@koston.org>
This commit is contained in:
parent
b743577ebe
commit
971bbd088c
@ -6,6 +6,7 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include "esphome/core/helpers.h"
|
#include "esphome/core/helpers.h"
|
||||||
#include "esphome/core/log.h"
|
#include "esphome/core/log.h"
|
||||||
|
#include "esphome/core/application.h"
|
||||||
|
|
||||||
namespace esphome {
|
namespace esphome {
|
||||||
namespace mqtt {
|
namespace mqtt {
|
||||||
@ -100,9 +101,24 @@ bool MQTTBackendESP32::initialize_() {
|
|||||||
handler_.reset(mqtt_client);
|
handler_.reset(mqtt_client);
|
||||||
is_initalized_ = true;
|
is_initalized_ = true;
|
||||||
esp_mqtt_client_register_event(mqtt_client, MQTT_EVENT_ANY, mqtt_event_handler, this);
|
esp_mqtt_client_register_event(mqtt_client, MQTT_EVENT_ANY, mqtt_event_handler, this);
|
||||||
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
|
// Create the task only after MQTT client is initialized successfully
|
||||||
|
// Use larger stack size when TLS is enabled
|
||||||
|
size_t stack_size = this->ca_certificate_.has_value() ? TASK_STACK_SIZE_TLS : TASK_STACK_SIZE;
|
||||||
|
xTaskCreate(esphome_mqtt_task, "esphome_mqtt", stack_size, (void *) this, TASK_PRIORITY, &this->task_handle_);
|
||||||
|
if (this->task_handle_ == nullptr) {
|
||||||
|
ESP_LOGE(TAG, "Failed to create MQTT task");
|
||||||
|
// Clean up MQTT client since we can't start the async task
|
||||||
|
handler_.reset();
|
||||||
|
is_initalized_ = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Set the task handle so the queue can notify it
|
||||||
|
this->mqtt_queue_.set_task_to_notify(this->task_handle_);
|
||||||
|
#endif
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
ESP_LOGE(TAG, "Failed to initialize IDF-MQTT");
|
ESP_LOGE(TAG, "Failed to init client");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,6 +131,26 @@ void MQTTBackendESP32::loop() {
|
|||||||
mqtt_event_handler_(event);
|
mqtt_event_handler_(event);
|
||||||
mqtt_events_.pop();
|
mqtt_events_.pop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
|
// Periodically log dropped messages to avoid blocking during spikes.
|
||||||
|
// During high load, many messages can be dropped in quick succession.
|
||||||
|
// Logging each drop immediately would flood the logs and potentially
|
||||||
|
// cause more drops if MQTT logging is enabled (cascade effect).
|
||||||
|
// Instead, we accumulate the count and log a summary periodically.
|
||||||
|
// IMPORTANT: Don't move this to the scheduler - if drops are due to memory
|
||||||
|
// pressure, the scheduler's heap allocations would make things worse.
|
||||||
|
uint32_t now = App.get_loop_component_start_time();
|
||||||
|
// Handle rollover: (now - last_time) works correctly with unsigned arithmetic
|
||||||
|
// even when now < last_time due to rollover
|
||||||
|
if ((now - this->last_dropped_log_time_) >= DROP_LOG_INTERVAL_MS) {
|
||||||
|
uint16_t dropped = this->mqtt_queue_.get_and_reset_dropped_count();
|
||||||
|
if (dropped > 0) {
|
||||||
|
ESP_LOGW(TAG, "Dropped %u messages (%us)", dropped, DROP_LOG_INTERVAL_MS / 1000);
|
||||||
|
}
|
||||||
|
this->last_dropped_log_time_ = now;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTBackendESP32::mqtt_event_handler_(const Event &event) {
|
void MQTTBackendESP32::mqtt_event_handler_(const Event &event) {
|
||||||
@ -188,6 +224,86 @@ void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t b
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
|
void MQTTBackendESP32::esphome_mqtt_task(void *params) {
|
||||||
|
MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// Wait for notification indefinitely
|
||||||
|
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
|
||||||
|
|
||||||
|
// Process all queued items
|
||||||
|
struct QueueElement *elem;
|
||||||
|
while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) {
|
||||||
|
if (this_mqtt->is_connected_) {
|
||||||
|
switch (elem->type) {
|
||||||
|
case MQTT_QUEUE_TYPE_SUBSCRIBE:
|
||||||
|
esp_mqtt_client_subscribe(this_mqtt->handler_.get(), elem->topic, elem->qos);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MQTT_QUEUE_TYPE_UNSUBSCRIBE:
|
||||||
|
esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), elem->topic);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MQTT_QUEUE_TYPE_PUBLISH:
|
||||||
|
esp_mqtt_client_publish(this_mqtt->handler_.get(), elem->topic, elem->payload, elem->payload_len, elem->qos,
|
||||||
|
elem->retain);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
ESP_LOGE(TAG, "Invalid operation type from MQTT queue");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this_mqtt->mqtt_event_pool_.release(elem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up any remaining items in the queue
|
||||||
|
struct QueueElement *elem;
|
||||||
|
while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) {
|
||||||
|
this_mqtt->mqtt_event_pool_.release(elem);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: EventPool destructor will clean up the pool itself
|
||||||
|
// Task will delete itself
|
||||||
|
vTaskDelete(nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload,
|
||||||
|
size_t len) {
|
||||||
|
auto *elem = this->mqtt_event_pool_.allocate();
|
||||||
|
|
||||||
|
if (!elem) {
|
||||||
|
// Queue is full - increment counter but don't log immediately.
|
||||||
|
// Logging here can cause a cascade effect: if MQTT logging is enabled,
|
||||||
|
// each dropped message would generate a log message, which could itself
|
||||||
|
// be sent via MQTT, causing more drops and more logs in a feedback loop
|
||||||
|
// that eventually triggers a watchdog reset. Instead, we log periodically
|
||||||
|
// in loop() to prevent blocking the event loop during spikes.
|
||||||
|
this->mqtt_queue_.increment_dropped_count();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
elem->type = type;
|
||||||
|
elem->qos = qos;
|
||||||
|
elem->retain = retain;
|
||||||
|
|
||||||
|
// Use the helper to allocate and copy data
|
||||||
|
if (!elem->set_data(topic, payload, len)) {
|
||||||
|
// Allocation failed, return elem to pool
|
||||||
|
this->mqtt_event_pool_.release(elem);
|
||||||
|
// Increment counter without logging to avoid cascade effect during memory pressure
|
||||||
|
this->mqtt_queue_.increment_dropped_count();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push to queue - always succeeds since we allocated from the pool
|
||||||
|
this->mqtt_queue_.push(elem);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
#endif // USE_MQTT_IDF_ENQUEUE
|
||||||
|
|
||||||
} // namespace mqtt
|
} // namespace mqtt
|
||||||
} // namespace esphome
|
} // namespace esphome
|
||||||
#endif // USE_ESP32
|
#endif // USE_ESP32
|
||||||
|
@ -6,9 +6,14 @@
|
|||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
|
#include <cstring>
|
||||||
#include <mqtt_client.h>
|
#include <mqtt_client.h>
|
||||||
|
#include <freertos/FreeRTOS.h>
|
||||||
|
#include <freertos/task.h>
|
||||||
#include "esphome/components/network/ip_address.h"
|
#include "esphome/components/network/ip_address.h"
|
||||||
#include "esphome/core/helpers.h"
|
#include "esphome/core/helpers.h"
|
||||||
|
#include "esphome/core/lock_free_queue.h"
|
||||||
|
#include "esphome/core/event_pool.h"
|
||||||
|
|
||||||
namespace esphome {
|
namespace esphome {
|
||||||
namespace mqtt {
|
namespace mqtt {
|
||||||
@ -42,9 +47,79 @@ struct Event {
|
|||||||
error_handle(*event.error_handle) {}
|
error_handle(*event.error_handle) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum MqttQueueTypeT : uint8_t {
|
||||||
|
MQTT_QUEUE_TYPE_NONE = 0,
|
||||||
|
MQTT_QUEUE_TYPE_SUBSCRIBE,
|
||||||
|
MQTT_QUEUE_TYPE_UNSUBSCRIBE,
|
||||||
|
MQTT_QUEUE_TYPE_PUBLISH,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct QueueElement {
|
||||||
|
char *topic;
|
||||||
|
char *payload;
|
||||||
|
uint16_t payload_len; // MQTT max payload is 64KiB
|
||||||
|
uint8_t type : 2;
|
||||||
|
uint8_t qos : 2; // QoS only needs values 0-2
|
||||||
|
uint8_t retain : 1;
|
||||||
|
uint8_t reserved : 3; // Reserved for future use
|
||||||
|
|
||||||
|
QueueElement() : topic(nullptr), payload(nullptr), payload_len(0), qos(0), retain(0), reserved(0) {}
|
||||||
|
|
||||||
|
// Helper to set topic/payload (uses RAMAllocator)
|
||||||
|
bool set_data(const char *topic_str, const char *payload_data, size_t len) {
|
||||||
|
// Check payload size limit (MQTT max is 64KiB)
|
||||||
|
if (len > std::numeric_limits<uint16_t>::max()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use RAMAllocator with default flags (tries external RAM first, falls back to internal)
|
||||||
|
RAMAllocator<char> allocator;
|
||||||
|
|
||||||
|
// Allocate and copy topic
|
||||||
|
size_t topic_len = strlen(topic_str) + 1;
|
||||||
|
topic = allocator.allocate(topic_len);
|
||||||
|
if (!topic)
|
||||||
|
return false;
|
||||||
|
memcpy(topic, topic_str, topic_len);
|
||||||
|
|
||||||
|
if (payload_data && len) {
|
||||||
|
payload = allocator.allocate(len);
|
||||||
|
if (!payload) {
|
||||||
|
allocator.deallocate(topic, topic_len);
|
||||||
|
topic = nullptr;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
memcpy(payload, payload_data, len);
|
||||||
|
payload_len = static_cast<uint16_t>(len);
|
||||||
|
} else {
|
||||||
|
payload = nullptr;
|
||||||
|
payload_len = 0;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper to release (uses RAMAllocator)
|
||||||
|
void release() {
|
||||||
|
RAMAllocator<char> allocator;
|
||||||
|
if (topic) {
|
||||||
|
allocator.deallocate(topic, strlen(topic) + 1);
|
||||||
|
topic = nullptr;
|
||||||
|
}
|
||||||
|
if (payload) {
|
||||||
|
allocator.deallocate(payload, payload_len);
|
||||||
|
payload = nullptr;
|
||||||
|
}
|
||||||
|
payload_len = 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
class MQTTBackendESP32 final : public MQTTBackend {
|
class MQTTBackendESP32 final : public MQTTBackend {
|
||||||
public:
|
public:
|
||||||
static const size_t MQTT_BUFFER_SIZE = 4096;
|
static const size_t MQTT_BUFFER_SIZE = 4096;
|
||||||
|
static const size_t TASK_STACK_SIZE = 2048;
|
||||||
|
static const size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations
|
||||||
|
static const ssize_t TASK_PRIORITY = 5;
|
||||||
|
static const uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360
|
||||||
|
|
||||||
void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; }
|
void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; }
|
||||||
void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
|
void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
|
||||||
@ -105,15 +180,23 @@ class MQTTBackendESP32 final : public MQTTBackend {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool subscribe(const char *topic, uint8_t qos) final {
|
bool subscribe(const char *topic, uint8_t qos) final {
|
||||||
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
|
return enqueue_(MQTT_QUEUE_TYPE_SUBSCRIBE, topic, qos);
|
||||||
|
#else
|
||||||
return esp_mqtt_client_subscribe(handler_.get(), topic, qos) != -1;
|
return esp_mqtt_client_subscribe(handler_.get(), topic, qos) != -1;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
bool unsubscribe(const char *topic) final {
|
||||||
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
|
return enqueue_(MQTT_QUEUE_TYPE_UNSUBSCRIBE, topic);
|
||||||
|
#else
|
||||||
|
return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
bool unsubscribe(const char *topic) final { return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1; }
|
|
||||||
|
|
||||||
bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final {
|
bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final {
|
||||||
#if defined(USE_MQTT_IDF_ENQUEUE)
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
// use the non-blocking version
|
return enqueue_(MQTT_QUEUE_TYPE_PUBLISH, topic, qos, retain, payload, length);
|
||||||
// it can delay sending a couple of seconds but won't block
|
|
||||||
return esp_mqtt_client_enqueue(handler_.get(), topic, payload, length, qos, retain, true) != -1;
|
|
||||||
#else
|
#else
|
||||||
// might block for several seconds, either due to network timeout (10s)
|
// might block for several seconds, either due to network timeout (10s)
|
||||||
// or if publishing payloads longer than internal buffer (due to message fragmentation)
|
// or if publishing payloads longer than internal buffer (due to message fragmentation)
|
||||||
@ -129,6 +212,12 @@ class MQTTBackendESP32 final : public MQTTBackend {
|
|||||||
void set_cl_key(const std::string &key) { cl_key_ = key; }
|
void set_cl_key(const std::string &key) { cl_key_ = key; }
|
||||||
void set_skip_cert_cn_check(bool skip_check) { skip_cert_cn_check_ = skip_check; }
|
void set_skip_cert_cn_check(bool skip_check) { skip_cert_cn_check_ = skip_check; }
|
||||||
|
|
||||||
|
// No destructor needed: ESPHome components live for the entire device runtime.
|
||||||
|
// The MQTT task and queue will run until the device reboots or loses power,
|
||||||
|
// at which point the entire process terminates and FreeRTOS cleans up all tasks.
|
||||||
|
// Implementing a destructor would add complexity and potential race conditions
|
||||||
|
// for a scenario that never occurs in practice.
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool initialize_();
|
bool initialize_();
|
||||||
void mqtt_event_handler_(const Event &event);
|
void mqtt_event_handler_(const Event &event);
|
||||||
@ -160,6 +249,14 @@ class MQTTBackendESP32 final : public MQTTBackend {
|
|||||||
optional<std::string> cl_certificate_;
|
optional<std::string> cl_certificate_;
|
||||||
optional<std::string> cl_key_;
|
optional<std::string> cl_key_;
|
||||||
bool skip_cert_cn_check_{false};
|
bool skip_cert_cn_check_{false};
|
||||||
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
|
static void esphome_mqtt_task(void *params);
|
||||||
|
EventPool<struct QueueElement, MQTT_QUEUE_LENGTH> mqtt_event_pool_;
|
||||||
|
LockFreeQueue<struct QueueElement, MQTT_QUEUE_LENGTH> mqtt_queue_;
|
||||||
|
TaskHandle_t task_handle_{nullptr};
|
||||||
|
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL,
|
||||||
|
size_t len = 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
// callbacks
|
// callbacks
|
||||||
CallbackManager<on_connect_callback_t> on_connect_;
|
CallbackManager<on_connect_callback_t> on_connect_;
|
||||||
@ -169,6 +266,11 @@ class MQTTBackendESP32 final : public MQTTBackend {
|
|||||||
CallbackManager<on_message_callback_t> on_message_;
|
CallbackManager<on_message_callback_t> on_message_;
|
||||||
CallbackManager<on_publish_user_callback_t> on_publish_;
|
CallbackManager<on_publish_user_callback_t> on_publish_;
|
||||||
std::queue<Event> mqtt_events_;
|
std::queue<Event> mqtt_events_;
|
||||||
|
|
||||||
|
#if defined(USE_MQTT_IDF_ENQUEUE)
|
||||||
|
uint32_t last_dropped_log_time_{0};
|
||||||
|
static constexpr uint32_t DROP_LOG_INTERVAL_MS = 10000; // Log every 10 seconds
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace mqtt
|
} // namespace mqtt
|
||||||
|
Loading…
x
Reference in New Issue
Block a user