mqtt_enqueue_message

This commit is contained in:
fvanroie 2024-04-12 03:46:10 +02:00
parent 75af4aed10
commit b49f82a425

View File

@ -64,8 +64,8 @@ int mqttQos = 0;
esp_mqtt_client_handle_t mqttClient; esp_mqtt_client_handle_t mqttClient;
static esp_mqtt_client_config_t mqtt_cfg; static esp_mqtt_client_config_t mqtt_cfg;
extern const uint8_t rootca_crt_bundle_start[] asm("_binary_data_cert_x509_crt_bundle_bin_start"); // extern const uint8_t rootca_crt_bundle_start[] asm("_binary_data_cert_x509_crt_bundle_bin_start");
extern const uint8_t rootca_crt_bundle_end[] asm("_binary_data_cert_x509_crt_bundle_bin_end"); // extern const uint8_t rootca_crt_bundle_end[] asm("_binary_data_cert_x509_crt_bundle_bin_end");
bool last_mqtt_state = false; bool last_mqtt_state = false;
bool current_mqtt_state = false; bool current_mqtt_state = false;
@ -104,7 +104,7 @@ void mqtt_run_scripts()
void mqtt_disconnected() void mqtt_disconnected()
{ {
current_mqtt_state = false; // now we are disconnected current_mqtt_state = false; // now we are disconnected
mqtt_run_scripts(); // mqtt_run_scripts();
mqtt_reconnect_counter++; mqtt_reconnect_counter++;
} }
@ -115,7 +115,7 @@ void mqtt_connected()
current_mqtt_state = true; // now we are connected current_mqtt_state = true; // now we are connected
LOG_VERBOSE(TAG_MQTT, F("%s"), current_mqtt_state ? PSTR(D_SERVICE_CONNECTED) : PSTR(D_SERVICE_DISCONNECTED)); LOG_VERBOSE(TAG_MQTT, F("%s"), current_mqtt_state ? PSTR(D_SERVICE_CONNECTED) : PSTR(D_SERVICE_DISCONNECTED));
} }
mqtt_run_scripts(); // mqtt_run_scripts();
} }
int mqttPublish(const char* topic, const char* payload, size_t len, bool retain) int mqttPublish(const char* topic, const char* payload, size_t len, bool retain)
@ -190,6 +190,36 @@ static inline size_t mqtt_msg_length(size_t len)
return (len / 64) * 64 + 64; return (len / 64) * 64 + 64;
} }
void mqtt_enqueue_message(const char* topic, const char* payload, size_t payload_len)
{
// Add new message to the queue
mqtt_message_t data;
size_t topic_len = strlen(topic);
data.topic = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(topic_len + 1));
data.payload = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(payload_len + 1));
if(!data.topic || !data.payload) {
LOG_ERROR(TAG_MQTT_RCV, D_ERROR_OUT_OF_MEMORY);
hasp_free(data.topic);
hasp_free(data.payload);
return;
}
memcpy(data.topic, topic, topic_len);
memcpy(data.payload, payload, payload_len);
{
size_t attempt = 0;
while(xQueueSend(queue, &data, (TickType_t)0) == errQUEUE_FULL && attempt < 100) {
delay(5);
attempt++;
};
if(attempt >= 100) {
LOG_ERROR(TAG_MQTT_RCV, D_ERROR_OUT_OF_MEMORY);
}
}
}
void mqtt_process_topic_payload(const char* topic, const char* payload, unsigned int length) void mqtt_process_topic_payload(const char* topic, const char* payload, unsigned int length)
{ {
if(gui_acquire(pdMS_TO_TICKS(30))) { if(gui_acquire(pdMS_TO_TICKS(30))) {
@ -198,30 +228,7 @@ void mqtt_process_topic_payload(const char* topic, const char* payload, unsigned
dispatch_topic_payload(topic, payload, length > 0, TAG_MQTT); dispatch_topic_payload(topic, payload, length > 0, TAG_MQTT);
gui_release(); gui_release();
} else { } else {
// Add new message to the queue mqtt_enqueue_message(topic, payload, length);
mqtt_message_t data;
size_t topic_len = strlen(topic);
size_t payload_len = length;
data.topic = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(topic_len + 1));
data.payload = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(payload_len + 1));
if(!data.topic || !data.payload) {
LOG_ERROR(TAG_MQTT_RCV, D_ERROR_OUT_OF_MEMORY);
hasp_free(data.topic);
hasp_free(data.payload);
return;
}
memcpy(data.topic, topic, topic_len);
memcpy(data.payload, payload, payload_len);
{
size_t attempt = 0;
while(xQueueSend(queue, &data, (TickType_t)0) == errQUEUE_FULL && attempt < 100) {
delay(5);
attempt++;
};
}
} }
} }
@ -465,8 +472,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
void mqttSetup() void mqttSetup()
{ {
queue = xQueueCreate(64, sizeof(mqtt_message_t)); queue = xQueueCreate(64, sizeof(mqtt_message_t));
//esp_crt_bundle_set(rootca_crt_bundle_start, rootca_crt_bundle_end-rootca_crt_bundle_start); // esp_crt_bundle_set(rootca_crt_bundle_start, rootca_crt_bundle_end-rootca_crt_bundle_start);
arduino_esp_crt_bundle_set(rootca_crt_bundle_start); // arduino_esp_crt_bundle_set(rootca_crt_bundle_start);
mqttStart(); mqttStart();
} }
@ -489,6 +496,7 @@ IRAM_ATTR void mqttLoop(void)
void mqttEvery5Seconds(bool networkIsConnected) void mqttEvery5Seconds(bool networkIsConnected)
{ {
mqtt_run_scripts();
// if(mqttEnabled && networkIsConnected && !mqttClientConnected) { // if(mqttEnabled && networkIsConnected && !mqttClientConnected) {
// LOG_TRACE(TAG_MQTT, F(D_MQTT_RECONNECTING)); // LOG_TRACE(TAG_MQTT, F(D_MQTT_RECONNECTING));
// mqttStart(); // mqttStart();