diff --git a/esphome/components/mqtt/mqtt_backend_esp32.cpp b/esphome/components/mqtt/mqtt_backend_esp32.cpp index 64dc27d84b..62b153e676 100644 --- a/esphome/components/mqtt/mqtt_backend_esp32.cpp +++ b/esphome/components/mqtt/mqtt_backend_esp32.cpp @@ -6,6 +6,7 @@ #include #include "esphome/core/helpers.h" #include "esphome/core/log.h" +#include "esphome/core/application.h" namespace esphome { namespace mqtt { @@ -100,9 +101,24 @@ bool MQTTBackendESP32::initialize_() { handler_.reset(mqtt_client); is_initalized_ = true; esp_mqtt_client_register_event(mqtt_client, MQTT_EVENT_ANY, mqtt_event_handler, this); +#if defined(USE_MQTT_IDF_ENQUEUE) + // Create the task only after MQTT client is initialized successfully + // Use larger stack size when TLS is enabled + size_t stack_size = this->ca_certificate_.has_value() ? TASK_STACK_SIZE_TLS : TASK_STACK_SIZE; + xTaskCreate(esphome_mqtt_task, "esphome_mqtt", stack_size, (void *) this, TASK_PRIORITY, &this->task_handle_); + if (this->task_handle_ == nullptr) { + ESP_LOGE(TAG, "Failed to create MQTT task"); + // Clean up MQTT client since we can't start the async task + handler_.reset(); + is_initalized_ = false; + return false; + } + // Set the task handle so the queue can notify it + this->mqtt_queue_.set_task_to_notify(this->task_handle_); +#endif return true; } else { - ESP_LOGE(TAG, "Failed to initialize IDF-MQTT"); + ESP_LOGE(TAG, "Failed to init client"); return false; } } @@ -115,6 +131,26 @@ void MQTTBackendESP32::loop() { mqtt_event_handler_(event); mqtt_events_.pop(); } + +#if defined(USE_MQTT_IDF_ENQUEUE) + // Periodically log dropped messages to avoid blocking during spikes. + // During high load, many messages can be dropped in quick succession. + // Logging each drop immediately would flood the logs and potentially + // cause more drops if MQTT logging is enabled (cascade effect). + // Instead, we accumulate the count and log a summary periodically. + // IMPORTANT: Don't move this to the scheduler - if drops are due to memory + // pressure, the scheduler's heap allocations would make things worse. + uint32_t now = App.get_loop_component_start_time(); + // Handle rollover: (now - last_time) works correctly with unsigned arithmetic + // even when now < last_time due to rollover + if ((now - this->last_dropped_log_time_) >= DROP_LOG_INTERVAL_MS) { + uint16_t dropped = this->mqtt_queue_.get_and_reset_dropped_count(); + if (dropped > 0) { + ESP_LOGW(TAG, "Dropped %u messages (%us)", dropped, DROP_LOG_INTERVAL_MS / 1000); + } + this->last_dropped_log_time_ = now; + } +#endif } void MQTTBackendESP32::mqtt_event_handler_(const Event &event) { @@ -188,6 +224,86 @@ void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t b } } +#if defined(USE_MQTT_IDF_ENQUEUE) +void MQTTBackendESP32::esphome_mqtt_task(void *params) { + MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params; + + while (true) { + // Wait for notification indefinitely + ulTaskNotifyTake(pdTRUE, portMAX_DELAY); + + // Process all queued items + struct QueueElement *elem; + while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) { + if (this_mqtt->is_connected_) { + switch (elem->type) { + case MQTT_QUEUE_TYPE_SUBSCRIBE: + esp_mqtt_client_subscribe(this_mqtt->handler_.get(), elem->topic, elem->qos); + break; + + case MQTT_QUEUE_TYPE_UNSUBSCRIBE: + esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), elem->topic); + break; + + case MQTT_QUEUE_TYPE_PUBLISH: + esp_mqtt_client_publish(this_mqtt->handler_.get(), elem->topic, elem->payload, elem->payload_len, elem->qos, + elem->retain); + break; + + default: + ESP_LOGE(TAG, "Invalid operation type from MQTT queue"); + break; + } + } + this_mqtt->mqtt_event_pool_.release(elem); + } + } + + // Clean up any remaining items in the queue + struct QueueElement *elem; + while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) { + this_mqtt->mqtt_event_pool_.release(elem); + } + + // Note: EventPool destructor will clean up the pool itself + // Task will delete itself + vTaskDelete(nullptr); +} + +bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload, + size_t len) { + auto *elem = this->mqtt_event_pool_.allocate(); + + if (!elem) { + // Queue is full - increment counter but don't log immediately. + // Logging here can cause a cascade effect: if MQTT logging is enabled, + // each dropped message would generate a log message, which could itself + // be sent via MQTT, causing more drops and more logs in a feedback loop + // that eventually triggers a watchdog reset. Instead, we log periodically + // in loop() to prevent blocking the event loop during spikes. + this->mqtt_queue_.increment_dropped_count(); + return false; + } + + elem->type = type; + elem->qos = qos; + elem->retain = retain; + + // Use the helper to allocate and copy data + if (!elem->set_data(topic, payload, len)) { + // Allocation failed, return elem to pool + this->mqtt_event_pool_.release(elem); + // Increment counter without logging to avoid cascade effect during memory pressure + this->mqtt_queue_.increment_dropped_count(); + return false; + } + + // Push to queue - always succeeds since we allocated from the pool + this->mqtt_queue_.push(elem); + return true; +} +#endif // USE_MQTT_IDF_ENQUEUE + } // namespace mqtt } // namespace esphome #endif // USE_ESP32 diff --git a/esphome/components/mqtt/mqtt_backend_esp32.h b/esphome/components/mqtt/mqtt_backend_esp32.h index 9054702115..57286a24b2 100644 --- a/esphome/components/mqtt/mqtt_backend_esp32.h +++ b/esphome/components/mqtt/mqtt_backend_esp32.h @@ -6,9 +6,14 @@ #include #include +#include #include +#include +#include #include "esphome/components/network/ip_address.h" #include "esphome/core/helpers.h" +#include "esphome/core/lock_free_queue.h" +#include "esphome/core/event_pool.h" namespace esphome { namespace mqtt { @@ -42,9 +47,79 @@ struct Event { error_handle(*event.error_handle) {} }; +enum MqttQueueTypeT : uint8_t { + MQTT_QUEUE_TYPE_NONE = 0, + MQTT_QUEUE_TYPE_SUBSCRIBE, + MQTT_QUEUE_TYPE_UNSUBSCRIBE, + MQTT_QUEUE_TYPE_PUBLISH, +}; + +struct QueueElement { + char *topic; + char *payload; + uint16_t payload_len; // MQTT max payload is 64KiB + uint8_t type : 2; + uint8_t qos : 2; // QoS only needs values 0-2 + uint8_t retain : 1; + uint8_t reserved : 3; // Reserved for future use + + QueueElement() : topic(nullptr), payload(nullptr), payload_len(0), qos(0), retain(0), reserved(0) {} + + // Helper to set topic/payload (uses RAMAllocator) + bool set_data(const char *topic_str, const char *payload_data, size_t len) { + // Check payload size limit (MQTT max is 64KiB) + if (len > std::numeric_limits::max()) { + return false; + } + + // Use RAMAllocator with default flags (tries external RAM first, falls back to internal) + RAMAllocator allocator; + + // Allocate and copy topic + size_t topic_len = strlen(topic_str) + 1; + topic = allocator.allocate(topic_len); + if (!topic) + return false; + memcpy(topic, topic_str, topic_len); + + if (payload_data && len) { + payload = allocator.allocate(len); + if (!payload) { + allocator.deallocate(topic, topic_len); + topic = nullptr; + return false; + } + memcpy(payload, payload_data, len); + payload_len = static_cast(len); + } else { + payload = nullptr; + payload_len = 0; + } + return true; + } + + // Helper to release (uses RAMAllocator) + void release() { + RAMAllocator allocator; + if (topic) { + allocator.deallocate(topic, strlen(topic) + 1); + topic = nullptr; + } + if (payload) { + allocator.deallocate(payload, payload_len); + payload = nullptr; + } + payload_len = 0; + } +}; + class MQTTBackendESP32 final : public MQTTBackend { public: static const size_t MQTT_BUFFER_SIZE = 4096; + static const size_t TASK_STACK_SIZE = 2048; + static const size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations + static const ssize_t TASK_PRIORITY = 5; + static const uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360 void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; } void set_client_id(const char *client_id) final { this->client_id_ = client_id; } @@ -105,15 +180,23 @@ class MQTTBackendESP32 final : public MQTTBackend { } bool subscribe(const char *topic, uint8_t qos) final { +#if defined(USE_MQTT_IDF_ENQUEUE) + return enqueue_(MQTT_QUEUE_TYPE_SUBSCRIBE, topic, qos); +#else return esp_mqtt_client_subscribe(handler_.get(), topic, qos) != -1; +#endif + } + bool unsubscribe(const char *topic) final { +#if defined(USE_MQTT_IDF_ENQUEUE) + return enqueue_(MQTT_QUEUE_TYPE_UNSUBSCRIBE, topic); +#else + return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1; +#endif } - bool unsubscribe(const char *topic) final { return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1; } bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final { #if defined(USE_MQTT_IDF_ENQUEUE) - // use the non-blocking version - // it can delay sending a couple of seconds but won't block - return esp_mqtt_client_enqueue(handler_.get(), topic, payload, length, qos, retain, true) != -1; + return enqueue_(MQTT_QUEUE_TYPE_PUBLISH, topic, qos, retain, payload, length); #else // might block for several seconds, either due to network timeout (10s) // or if publishing payloads longer than internal buffer (due to message fragmentation) @@ -129,6 +212,12 @@ class MQTTBackendESP32 final : public MQTTBackend { void set_cl_key(const std::string &key) { cl_key_ = key; } void set_skip_cert_cn_check(bool skip_check) { skip_cert_cn_check_ = skip_check; } + // No destructor needed: ESPHome components live for the entire device runtime. + // The MQTT task and queue will run until the device reboots or loses power, + // at which point the entire process terminates and FreeRTOS cleans up all tasks. + // Implementing a destructor would add complexity and potential race conditions + // for a scenario that never occurs in practice. + protected: bool initialize_(); void mqtt_event_handler_(const Event &event); @@ -160,6 +249,14 @@ class MQTTBackendESP32 final : public MQTTBackend { optional cl_certificate_; optional cl_key_; bool skip_cert_cn_check_{false}; +#if defined(USE_MQTT_IDF_ENQUEUE) + static void esphome_mqtt_task(void *params); + EventPool mqtt_event_pool_; + LockFreeQueue mqtt_queue_; + TaskHandle_t task_handle_{nullptr}; + bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL, + size_t len = 0); +#endif // callbacks CallbackManager on_connect_; @@ -169,6 +266,11 @@ class MQTTBackendESP32 final : public MQTTBackend { CallbackManager on_message_; CallbackManager on_publish_; std::queue mqtt_events_; + +#if defined(USE_MQTT_IDF_ENQUEUE) + uint32_t last_dropped_log_time_{0}; + static constexpr uint32_t DROP_LOG_INTERVAL_MS = 10000; // Log every 10 seconds +#endif }; } // namespace mqtt