From bbf6432606ad8773110a10e7bf5e2a985e2d3547 Mon Sep 17 00:00:00 2001 From: fvanroie Date: Mon, 21 Nov 2022 02:18:53 +0100 Subject: [PATCH] Add mqtt_on.cmd and mqtt_off.cmd #174 --- src/mqtt/hasp_mqtt_esp.cpp | 77 +++++++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/src/mqtt/hasp_mqtt_esp.cpp b/src/mqtt/hasp_mqtt_esp.cpp index 2aae3fd3..19573a0f 100644 --- a/src/mqtt/hasp_mqtt_esp.cpp +++ b/src/mqtt/hasp_mqtt_esp.cpp @@ -31,9 +31,8 @@ char mqttLwtTopic[28]; char mqttNodeTopic[24]; char mqttClientId[64]; char mqttGroupTopic[24]; -bool mqttEnabled = false; -bool mqttClientConnected = false; -bool mqttHAautodiscover = true; +bool mqttEnabled = false; +bool mqttHAautodiscover = true; uint32_t mqttPublishCount; uint32_t mqttReceiveCount; uint32_t mqttFailedCount; @@ -50,18 +49,63 @@ static esp_mqtt_client_config_t mqtt_cfg; extern const uint8_t rootca_crt_bundle_start[] asm("_binary_data_cert_x509_crt_bundle_bin_start"); extern const uint8_t rootca_crt_bundle_end[] asm("_binary_data_cert_x509_crt_bundle_bin_end"); +bool last_mqtt_state = false; +bool current_mqtt_state = false; +uint16_t mqtt_reconnect_counter = 0; + +void mqtt_run_scripts() +{ + if(last_mqtt_state != current_mqtt_state) { + mqtt_message_t data; + snprintf(data.topic, sizeof(data.topic), "run"); + + if(current_mqtt_state) { + snprintf(data.payload, sizeof(data.payload), "L:/mqtt_on.cmd"); + // networkStart(); + } else { + snprintf(data.payload, sizeof(data.payload), "L:/mqtt_off.cmd"); + // networkStop(); + } + + size_t attempt = 0; + while(xQueueSend(queue, &data, (TickType_t)0) == errQUEUE_FULL && attempt < 100) { + vTaskDelay(5 / portTICK_PERIOD_MS); + attempt++; + }; + + last_mqtt_state = current_mqtt_state; + } +} + +void mqtt_disconnected() +{ + current_mqtt_state = false; // now we are disconnected + mqtt_run_scripts(); + mqtt_reconnect_counter++; +} + +void mqtt_connected() +{ + if(!current_mqtt_state) { + mqtt_reconnect_counter = 0; + current_mqtt_state = true; // now we are connected + LOG_VERBOSE(TAG_MQTT, F("%s"), current_mqtt_state ? PSTR(D_SERVICE_CONNECTED) : PSTR(D_SERVICE_DISCONNECTED)); + } + mqtt_run_scripts(); +} + int mqttPublish(const char* topic, const char* payload, size_t len, bool retain) { if(!mqttEnabled) return MQTT_ERR_DISABLED; // Write directly to the client, don't use the buffer - if(mqttClientConnected && esp_mqtt_client_publish(mqttClient, topic, payload, len, 0, retain) != ESP_FAIL) { + if(current_mqtt_state && esp_mqtt_client_publish(mqttClient, topic, payload, len, 0, retain) != ESP_FAIL) { mqttPublishCount++; return MQTT_ERR_OK; } mqttFailedCount++; - return mqttClientConnected ? MQTT_ERR_PUB_FAIL : MQTT_ERR_NO_CONN; + return current_mqtt_state ? MQTT_ERR_PUB_FAIL : MQTT_ERR_NO_CONN; } int mqttPublish(const char* topic, const char* payload, bool retain) @@ -74,7 +118,7 @@ int mqttPublish(const char* topic, const char* payload, bool retain) bool mqttIsConnected() { - return mqttEnabled && mqttClientConnected; + return mqttEnabled && current_mqtt_state; } bool mqtt_send_lwt(bool online) @@ -274,7 +318,7 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) int msg_id; switch(event->event_id) { case MQTT_EVENT_DISCONNECTED: - mqttClientConnected = false; + mqtt_disconnected(); LOG_WARNING(TAG_MQTT, F(D_MQTT_DISCONNECTED)); break; case MQTT_EVENT_BEFORE_CONNECT: @@ -282,7 +326,7 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_CONNECTED: LOG_INFO(TAG_MQTT, F(D_SERVICE_STARTED)); - mqttClientConnected = true; + mqtt_connected(); onMqttConnect(event->client); break; case MQTT_EVENT_SUBSCRIBED: @@ -292,7 +336,7 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) onMqttData(event); break; case MQTT_EVENT_ERROR: { - mqttClientConnected = false; + mqtt_disconnected(); esp_mqtt_error_codes_t* error_handle = event->error_handle; switch(error_handle->error_type) { case MQTT_ERROR_TYPE_TCP_TRANSPORT: @@ -344,7 +388,7 @@ IRAM_ATTR void mqttLoop(void) // mqttClient.loop(); mqtt_message_t data; while(xQueueReceive(queue, &data, (TickType_t)0)) { - LOG_INFO(TAG_MQTT, F("[%s] Received data from queue == %s\n"), pcTaskGetTaskName(NULL), data.topic); + LOG_DEBUG(TAG_MQTT, F("[%s] Received data from queue == %s\n"), pcTaskGetTaskName(NULL), data.topic); size_t length = strlen(data.payload); dispatch_topic_payload(data.topic, data.payload, length > 0, TAG_MQTT); delay(1); @@ -433,7 +477,7 @@ void mqttStop() } if(mqttClient != NULL) { - if(mqttClientConnected) { + if(current_mqtt_state) { LOG_TRACE(TAG_MQTT, F(D_MQTT_DISCONNECTING)); } mqtt_send_lwt(false); @@ -442,8 +486,7 @@ void mqttStop() esp_mqtt_set_config(mqttClient, &mqtt_cfg); esp_err_t err = esp_mqtt_client_disconnect(mqttClient); if(err == ESP_OK) { - // mqttClient = NULL; - mqttClientConnected = false; + mqtt_disconnected(); LOG_INFO(TAG_MQTT, F(D_MQTT_DISCONNECTED)); } else { LOG_ERROR(TAG_MQTT, F(D_MQTT_FAILED " %d"), err); @@ -488,10 +531,10 @@ void mqtt_get_info(JsonDocument& doc) // break; // } // info[F(D_INFO_STATUS)] = buffer; - info[F(D_INFO_STATUS)] = !mqttEnabled ? D_SERVICE_DISABLED - : !mqttClient ? D_SERVICE_STOPPED - : mqttClientConnected ? D_SERVICE_CONNECTED - : D_SERVICE_DISCONNECTED; + info[F(D_INFO_STATUS)] = !mqttEnabled ? D_SERVICE_DISABLED + : !mqttClient ? D_SERVICE_STOPPED + : current_mqtt_state ? D_SERVICE_CONNECTED + : D_SERVICE_DISCONNECTED; info[F(D_INFO_RECEIVED)] = mqttReceiveCount; info[F(D_INFO_PUBLISHED)] = mqttPublishCount;