From fbc963f9e1add8376400960b139ddc5ea5682eee Mon Sep 17 00:00:00 2001 From: fvanroie <15969459+fvanroie@users.noreply.github.com> Date: Mon, 17 Jan 2022 23:55:52 +0100 Subject: [PATCH] Move MQTT keep alive to separate task #174 --- src/mqtt/hasp_mqtt_pubsubclient.cpp | 270 ++++++++++++++++++++-------- 1 file changed, 200 insertions(+), 70 deletions(-) diff --git a/src/mqtt/hasp_mqtt_pubsubclient.cpp b/src/mqtt/hasp_mqtt_pubsubclient.cpp index 646e09a5..691ca675 100644 --- a/src/mqtt/hasp_mqtt_pubsubclient.cpp +++ b/src/mqtt/hasp_mqtt_pubsubclient.cpp @@ -41,6 +41,9 @@ EthernetClient mqttNetworkClient; #include "hasp_config.h" #include "../hasp/hasp_dispatch.h" +#include "freertos/FreeRTOS.h" + +SemaphoreHandle_t sema_MQTT_KeepAlive; // #ifdef USE_CONFIG_OVERRIDE // #include "user_config_override.h" @@ -62,20 +65,132 @@ char mqttGroupName[16] = MQTT_GROUPNAME; uint16_t mqttPort = MQTT_PORT; PubSubClient mqttClient(mqttNetworkClient); +//// MQTT keep alive task functions ///////////////////////////////////////////////// +bool mqtt_send_lwt(bool online) +{ + char tmp_payload[8]; + char tmp_topic[strlen(mqttNodeTopic) + 4]; + + strncpy(tmp_topic, mqttNodeTopic, sizeof(tmp_topic)); + strncat_P(tmp_topic, PSTR(MQTT_TOPIC_LWT), sizeof(tmp_topic)); + + size_t len = snprintf_P(tmp_payload, sizeof(tmp_payload), online ? PSTR("online") : PSTR("offline")); + bool res = mqttPublish(tmp_topic, tmp_payload, len, true); + return res; +} + +static void mqttSubscribeTo(const char* topic) +{ + if(mqttClient.subscribe(topic)) { + LOG_VERBOSE(TAG_MQTT, F(D_BULLET D_MQTT_SUBSCRIBED), topic); + } else { + LOG_ERROR(TAG_MQTT, F(D_MQTT_NOT_SUBSCRIBED), topic); + } +} + +void mqttSubscribeAllTopics() +{ + // Subscribe to our incoming topics + char topic[64]; + snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_COMMAND "/#"), mqttGroupTopic); + mqttSubscribeTo(topic); + snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_COMMAND "/#"), mqttNodeTopic); + mqttSubscribeTo(topic); + snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CONFIG "/#"), mqttGroupTopic); + mqttSubscribeTo(topic); + snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CONFIG "/#"), mqttNodeTopic); + mqttSubscribeTo(topic); + +#if defined(HASP_USE_CUSTOM) + snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CUSTOM "/#"), mqttGroupTopic); + mqttSubscribeTo(topic); + snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CUSTOM "/#"), mqttNodeTopic); + mqttSubscribeTo(topic); +#endif + +#ifdef HASP_USE_BROADCAST + snprintf_P(topic, sizeof(topic), PSTR(MQTT_PREFIX "/" MQTT_TOPIC_BROADCAST "/" MQTT_TOPIC_COMMAND "/#")); + mqttSubscribeTo(topic); +#endif + + /* Home Assistant auto-configuration */ +#ifdef HASP_USE_HA + if(mqttHAautodiscover) { + char topic[64]; + snprintf_P(topic, sizeof(topic), PSTR("hass/status")); + mqttSubscribeTo(topic); + snprintf_P(topic, sizeof(topic), PSTR("homeassistant/status")); + mqttSubscribeTo(topic); + } +#endif + + // Force any subscribed clients to toggle offline/online when we first connect to + // make sure we get a full panel refresh at power on. Sending offline, + // "online" will be sent by the mqttStatusTopic subscription action. + mqtt_send_lwt(true); +} + +void mqttLogDisconnectReason() +{ + char buffer[64]; + + switch(mqttClient.state()) { + case MQTT_CONNECTION_TIMEOUT: + LOG_WARNING(TAG_MQTT, F("Connection timeout")); + break; + case MQTT_CONNECTION_LOST: + LOG_WARNING(TAG_MQTT, F("Connection lost")); + break; + case MQTT_CONNECT_FAILED: + LOG_WARNING(TAG_MQTT, F("Connection failed")); + break; + case MQTT_DISCONNECTED: + LOG_WARNING(TAG_MQTT, F(D_MQTT_DISCONNECTED)); + break; + case MQTT_CONNECTED: + break; + case MQTT_CONNECT_BAD_PROTOCOL: + LOG_WARNING(TAG_MQTT, F("MQTT version not suported")); + break; + case MQTT_CONNECT_BAD_CLIENT_ID: + LOG_WARNING(TAG_MQTT, F("Client ID rejected")); + break; + case MQTT_CONNECT_UNAVAILABLE: + LOG_WARNING(TAG_MQTT, F("Server unavailable")); + break; + case MQTT_CONNECT_BAD_CREDENTIALS: + LOG_WARNING(TAG_MQTT, F("Bad credentials")); + break; + case MQTT_CONNECT_UNAUTHORIZED: + LOG_WARNING(TAG_MQTT, F("Unauthorized")); + break; + default: + LOG_WARNING(TAG_MQTT, F("Unknown failure")); + } +} + +//// Arduino task functions /////////////////////////////////////////////////////// + int mqttPublish(const char* topic, const char* payload, size_t len, bool retain) { if(!mqttEnabled) return MQTT_ERR_DISABLED; - if(!mqttClient.connected()) { - mqttFailedCount++; - return MQTT_ERR_NO_CONN; - } + // if(xSemaphoreTake(sema_MQTT_KeepAlive, (TickType_t)2) == pdTRUE) + { - if(mqttClient.beginPublish(topic, len, retain)) { - mqttPublishCount++; - mqttClient.write((uint8_t*)payload, len); - mqttClient.endPublish(); - return MQTT_ERR_OK; + if(!mqttClient.connected()) { + // xSemaphoreGive(sema_MQTT_KeepAlive); + mqttFailedCount++; + return MQTT_ERR_NO_CONN; + } + + if(mqttClient.beginPublish(topic, len, retain)) { + mqttPublishCount++; + mqttClient.write((uint8_t*)payload, len); + mqttClient.endPublish(); + // xSemaphoreGive(sema_MQTT_KeepAlive); + return MQTT_ERR_OK; + } } mqttFailedCount++; @@ -95,19 +210,6 @@ bool mqttIsConnected() return mqttEnabled && mqttClient.connected(); } -bool mqtt_send_lwt(bool online) -{ - char tmp_payload[8]; - char tmp_topic[strlen(mqttNodeTopic) + 4]; - - strncpy(tmp_topic, mqttNodeTopic, sizeof(tmp_topic)); - strncat_P(tmp_topic, PSTR(MQTT_TOPIC_LWT), sizeof(tmp_topic)); - - size_t len = snprintf_P(tmp_payload, sizeof(tmp_payload), online ? PSTR("online") : PSTR("offline")); - bool res = mqttPublish(tmp_topic, tmp_payload, len, true); - return res; -} - int mqtt_send_object_state(uint8_t pageid, uint8_t btnid, const char* payload) { char tmp_topic[strlen(mqttNodeTopic) + 16]; @@ -206,15 +308,6 @@ static void mqtt_message_cb(char* topic, byte* payload, unsigned int length) } } -static void mqttSubscribeTo(const char* topic) -{ - if(mqttClient.subscribe(topic)) { - LOG_VERBOSE(TAG_MQTT, F(D_BULLET D_MQTT_SUBSCRIBED), topic); - } else { - LOG_ERROR(TAG_MQTT, F(D_MQTT_NOT_SUBSCRIBED), topic); - } -} - void mqttStart() { char buffer[64]; @@ -244,41 +337,7 @@ void mqttStart() if(!mqttClient.connect(mqttClientId, mqttUsername, mqttPassword, buffer, 0, true, lastWillPayload, true)) { // Retry until we give up and restart after connectTimeout seconds mqttReconnectCount++; - - switch(mqttClient.state()) { - case MQTT_CONNECTION_TIMEOUT: - LOG_WARNING(TAG_MQTT, F("Connection timeout")); - break; - case MQTT_CONNECTION_LOST: - LOG_WARNING(TAG_MQTT, F("Connection lost")); - break; - case MQTT_CONNECT_FAILED: - LOG_WARNING(TAG_MQTT, F("Connection failed")); - break; - case MQTT_DISCONNECTED: - snprintf_P(buffer, sizeof(buffer), PSTR(D_MQTT_DISCONNECTED)); - break; - case MQTT_CONNECTED: - break; - case MQTT_CONNECT_BAD_PROTOCOL: - LOG_WARNING(TAG_MQTT, F("MQTT version not suported")); - break; - case MQTT_CONNECT_BAD_CLIENT_ID: - LOG_WARNING(TAG_MQTT, F("Client ID rejected")); - break; - case MQTT_CONNECT_UNAVAILABLE: - LOG_WARNING(TAG_MQTT, F("Server unavailable")); - break; - case MQTT_CONNECT_BAD_CREDENTIALS: - LOG_WARNING(TAG_MQTT, F("Bad credentials")); - break; - case MQTT_CONNECT_UNAUTHORIZED: - LOG_WARNING(TAG_MQTT, F("Unauthorized")); - break; - default: - LOG_WARNING(TAG_MQTT, F("Unknown failure")); - } - + mqttLogDisconnectReason(); if(mqttReconnectCount > 20) { LOG_ERROR(TAG_MQTT, F("Retry count exceeded, rebooting...")); dispatch_reboot(false); @@ -336,8 +395,73 @@ void mqttStart() dispatch_current_state(TAG_MQTT); } +/* + Important to not set vTaskDelay to less then 10. Errors begin to develop with the MQTT and network connection. + makes the initial wifi/mqtt connection and works to keeps those connections open. +*/ + +///// TASK connect fuction +void connectToMQTT() +{ + if(!mqttEnabled) return; + + char buffer[64]; + char mqttClientId[64]; + char lastWillPayload[8]; + + mqttClient.setServer(mqttServer, mqttPort); + // mqttClient.setSocketTimeout(10); //in seconds + + /* Construct unique Client ID*/ + { + String mac = halGetMacAddress(3, ""); + mac.toLowerCase(); + memset(mqttClientId, 0, sizeof(mqttClientId)); + snprintf_P(mqttClientId, sizeof(mqttClientId), PSTR(D_MQTT_DEFAULT_NAME), mac.c_str()); + // LOG_INFO(TAG_MQTT, mqttClientId); + } + + // Attempt to connect and set LWT and Clean Session + snprintf_P(buffer, sizeof(buffer), PSTR("%s" MQTT_TOPIC_LWT), mqttNodeTopic); // lastWillTopic + snprintf_P(lastWillPayload, sizeof(lastWillPayload), PSTR("offline")); // lastWillPayload + + while(!mqttIsConnected()) { + if(!mqttClient.connect(mqttClientId, mqttUsername, mqttPassword, buffer, 0, true, lastWillPayload, true)) { + mqttLogDisconnectReason(); + } else { + LOG_INFO(TAG_MQTT, F(D_MQTT_CONNECTED), mqttServer, mqttClientId); + mqttSubscribeAllTopics(); + dispatch_current_state(TAG_MQTT); + } + vTaskDelay(5000); + } +} + +///// TASK keep alive fuction +void MQTTkeepalive(void* pvParameters) +{ + // setting must be set before a mqtt connection is made + mqttClient.setKeepAlive(90); // setting keep alive to 90 seconds makes for a very reliable connection, must be set + // before the 1st connection is made. + for(;;) { + // check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than + // just a single check + // LOG_INFO(TAG_MQTT, F("MQTT keep alive found MQTT status %s WiFi status %s"), + // String(mqttNetworkClient.connected()).c_str(), String(WiFi.status()).c_str()); + if((WiFi.status() == WL_CONNECTED) && mqttEnabled && !mqttNetworkClient.connected()) { + connectToMQTT(); + } + vTaskDelay(500); // task runs approx every 250 mS + } + vTaskDelete(NULL); +} + void mqttSetup() { + sema_MQTT_KeepAlive = xSemaphoreCreateBinary(); + xSemaphoreGive(sema_MQTT_KeepAlive); + xTaskCreatePinnedToCore(MQTTkeepalive, "MQTTkeepalive", 10000, NULL, 3, NULL, ARDUINO_RUNNING_CORE); + mqttEnabled = strlen(mqttServer) > 0 && mqttPort > 0; if(mqttEnabled) { mqttClient.setServer(mqttServer, mqttPort); @@ -354,15 +478,21 @@ void mqttSetup() IRAM_ATTR void mqttLoop(void) { + // Moved from task + + // xSemaphoreTake(sema_MQTT_KeepAlive, portMAX_DELAY); + // whiles MQTTlient.loop() is running no other mqtt operations should be in process mqttClient.loop(); + // xSemaphoreGive(sema_MQTT_KeepAlive); } void mqttEvery5Seconds(bool networkIsConnected) { - if(mqttEnabled && networkIsConnected && !mqttClient.connected()) { - LOG_TRACE(TAG_MQTT, F(D_MQTT_RECONNECTING)); - mqttStart(); - } + // Moved to task + // if(mqttEnabled && networkIsConnected && !mqttClient.connected()) { + // LOG_TRACE(TAG_MQTT, F(D_MQTT_RECONNECTING)); + // mqttStart(); + // } } void mqttStop()