Move MQTT keep alive to separate task #174

This commit is contained in:
fvanroie 2022-01-17 23:55:52 +01:00
parent bbd4ba8d40
commit fbc963f9e1

View File

@ -41,6 +41,9 @@ EthernetClient mqttNetworkClient;
#include "hasp_config.h"
#include "../hasp/hasp_dispatch.h"
#include "freertos/FreeRTOS.h"
SemaphoreHandle_t sema_MQTT_KeepAlive;
// #ifdef USE_CONFIG_OVERRIDE
// #include "user_config_override.h"
@ -62,20 +65,132 @@ char mqttGroupName[16] = MQTT_GROUPNAME;
uint16_t mqttPort = MQTT_PORT;
PubSubClient mqttClient(mqttNetworkClient);
//// MQTT keep alive task functions /////////////////////////////////////////////////
bool mqtt_send_lwt(bool online)
{
char tmp_payload[8];
char tmp_topic[strlen(mqttNodeTopic) + 4];
strncpy(tmp_topic, mqttNodeTopic, sizeof(tmp_topic));
strncat_P(tmp_topic, PSTR(MQTT_TOPIC_LWT), sizeof(tmp_topic));
size_t len = snprintf_P(tmp_payload, sizeof(tmp_payload), online ? PSTR("online") : PSTR("offline"));
bool res = mqttPublish(tmp_topic, tmp_payload, len, true);
return res;
}
static void mqttSubscribeTo(const char* topic)
{
if(mqttClient.subscribe(topic)) {
LOG_VERBOSE(TAG_MQTT, F(D_BULLET D_MQTT_SUBSCRIBED), topic);
} else {
LOG_ERROR(TAG_MQTT, F(D_MQTT_NOT_SUBSCRIBED), topic);
}
}
void mqttSubscribeAllTopics()
{
// Subscribe to our incoming topics
char topic[64];
snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_COMMAND "/#"), mqttGroupTopic);
mqttSubscribeTo(topic);
snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_COMMAND "/#"), mqttNodeTopic);
mqttSubscribeTo(topic);
snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CONFIG "/#"), mqttGroupTopic);
mqttSubscribeTo(topic);
snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CONFIG "/#"), mqttNodeTopic);
mqttSubscribeTo(topic);
#if defined(HASP_USE_CUSTOM)
snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CUSTOM "/#"), mqttGroupTopic);
mqttSubscribeTo(topic);
snprintf_P(topic, sizeof(topic), PSTR("%s" MQTT_TOPIC_CUSTOM "/#"), mqttNodeTopic);
mqttSubscribeTo(topic);
#endif
#ifdef HASP_USE_BROADCAST
snprintf_P(topic, sizeof(topic), PSTR(MQTT_PREFIX "/" MQTT_TOPIC_BROADCAST "/" MQTT_TOPIC_COMMAND "/#"));
mqttSubscribeTo(topic);
#endif
/* Home Assistant auto-configuration */
#ifdef HASP_USE_HA
if(mqttHAautodiscover) {
char topic[64];
snprintf_P(topic, sizeof(topic), PSTR("hass/status"));
mqttSubscribeTo(topic);
snprintf_P(topic, sizeof(topic), PSTR("homeassistant/status"));
mqttSubscribeTo(topic);
}
#endif
// Force any subscribed clients to toggle offline/online when we first connect to
// make sure we get a full panel refresh at power on. Sending offline,
// "online" will be sent by the mqttStatusTopic subscription action.
mqtt_send_lwt(true);
}
void mqttLogDisconnectReason()
{
char buffer[64];
switch(mqttClient.state()) {
case MQTT_CONNECTION_TIMEOUT:
LOG_WARNING(TAG_MQTT, F("Connection timeout"));
break;
case MQTT_CONNECTION_LOST:
LOG_WARNING(TAG_MQTT, F("Connection lost"));
break;
case MQTT_CONNECT_FAILED:
LOG_WARNING(TAG_MQTT, F("Connection failed"));
break;
case MQTT_DISCONNECTED:
LOG_WARNING(TAG_MQTT, F(D_MQTT_DISCONNECTED));
break;
case MQTT_CONNECTED:
break;
case MQTT_CONNECT_BAD_PROTOCOL:
LOG_WARNING(TAG_MQTT, F("MQTT version not suported"));
break;
case MQTT_CONNECT_BAD_CLIENT_ID:
LOG_WARNING(TAG_MQTT, F("Client ID rejected"));
break;
case MQTT_CONNECT_UNAVAILABLE:
LOG_WARNING(TAG_MQTT, F("Server unavailable"));
break;
case MQTT_CONNECT_BAD_CREDENTIALS:
LOG_WARNING(TAG_MQTT, F("Bad credentials"));
break;
case MQTT_CONNECT_UNAUTHORIZED:
LOG_WARNING(TAG_MQTT, F("Unauthorized"));
break;
default:
LOG_WARNING(TAG_MQTT, F("Unknown failure"));
}
}
//// Arduino task functions ///////////////////////////////////////////////////////
int mqttPublish(const char* topic, const char* payload, size_t len, bool retain)
{
if(!mqttEnabled) return MQTT_ERR_DISABLED;
if(!mqttClient.connected()) {
mqttFailedCount++;
return MQTT_ERR_NO_CONN;
}
// if(xSemaphoreTake(sema_MQTT_KeepAlive, (TickType_t)2) == pdTRUE)
{
if(mqttClient.beginPublish(topic, len, retain)) {
mqttPublishCount++;
mqttClient.write((uint8_t*)payload, len);
mqttClient.endPublish();
return MQTT_ERR_OK;
if(!mqttClient.connected()) {
// xSemaphoreGive(sema_MQTT_KeepAlive);
mqttFailedCount++;
return MQTT_ERR_NO_CONN;
}
if(mqttClient.beginPublish(topic, len, retain)) {
mqttPublishCount++;
mqttClient.write((uint8_t*)payload, len);
mqttClient.endPublish();
// xSemaphoreGive(sema_MQTT_KeepAlive);
return MQTT_ERR_OK;
}
}
mqttFailedCount++;
@ -95,19 +210,6 @@ bool mqttIsConnected()
return mqttEnabled && mqttClient.connected();
}
bool mqtt_send_lwt(bool online)
{
char tmp_payload[8];
char tmp_topic[strlen(mqttNodeTopic) + 4];
strncpy(tmp_topic, mqttNodeTopic, sizeof(tmp_topic));
strncat_P(tmp_topic, PSTR(MQTT_TOPIC_LWT), sizeof(tmp_topic));
size_t len = snprintf_P(tmp_payload, sizeof(tmp_payload), online ? PSTR("online") : PSTR("offline"));
bool res = mqttPublish(tmp_topic, tmp_payload, len, true);
return res;
}
int mqtt_send_object_state(uint8_t pageid, uint8_t btnid, const char* payload)
{
char tmp_topic[strlen(mqttNodeTopic) + 16];
@ -206,15 +308,6 @@ static void mqtt_message_cb(char* topic, byte* payload, unsigned int length)
}
}
static void mqttSubscribeTo(const char* topic)
{
if(mqttClient.subscribe(topic)) {
LOG_VERBOSE(TAG_MQTT, F(D_BULLET D_MQTT_SUBSCRIBED), topic);
} else {
LOG_ERROR(TAG_MQTT, F(D_MQTT_NOT_SUBSCRIBED), topic);
}
}
void mqttStart()
{
char buffer[64];
@ -244,41 +337,7 @@ void mqttStart()
if(!mqttClient.connect(mqttClientId, mqttUsername, mqttPassword, buffer, 0, true, lastWillPayload, true)) {
// Retry until we give up and restart after connectTimeout seconds
mqttReconnectCount++;
switch(mqttClient.state()) {
case MQTT_CONNECTION_TIMEOUT:
LOG_WARNING(TAG_MQTT, F("Connection timeout"));
break;
case MQTT_CONNECTION_LOST:
LOG_WARNING(TAG_MQTT, F("Connection lost"));
break;
case MQTT_CONNECT_FAILED:
LOG_WARNING(TAG_MQTT, F("Connection failed"));
break;
case MQTT_DISCONNECTED:
snprintf_P(buffer, sizeof(buffer), PSTR(D_MQTT_DISCONNECTED));
break;
case MQTT_CONNECTED:
break;
case MQTT_CONNECT_BAD_PROTOCOL:
LOG_WARNING(TAG_MQTT, F("MQTT version not suported"));
break;
case MQTT_CONNECT_BAD_CLIENT_ID:
LOG_WARNING(TAG_MQTT, F("Client ID rejected"));
break;
case MQTT_CONNECT_UNAVAILABLE:
LOG_WARNING(TAG_MQTT, F("Server unavailable"));
break;
case MQTT_CONNECT_BAD_CREDENTIALS:
LOG_WARNING(TAG_MQTT, F("Bad credentials"));
break;
case MQTT_CONNECT_UNAUTHORIZED:
LOG_WARNING(TAG_MQTT, F("Unauthorized"));
break;
default:
LOG_WARNING(TAG_MQTT, F("Unknown failure"));
}
mqttLogDisconnectReason();
if(mqttReconnectCount > 20) {
LOG_ERROR(TAG_MQTT, F("Retry count exceeded, rebooting..."));
dispatch_reboot(false);
@ -336,8 +395,73 @@ void mqttStart()
dispatch_current_state(TAG_MQTT);
}
/*
Important to not set vTaskDelay to less then 10. Errors begin to develop with the MQTT and network connection.
makes the initial wifi/mqtt connection and works to keeps those connections open.
*/
///// TASK connect fuction
void connectToMQTT()
{
if(!mqttEnabled) return;
char buffer[64];
char mqttClientId[64];
char lastWillPayload[8];
mqttClient.setServer(mqttServer, mqttPort);
// mqttClient.setSocketTimeout(10); //in seconds
/* Construct unique Client ID*/
{
String mac = halGetMacAddress(3, "");
mac.toLowerCase();
memset(mqttClientId, 0, sizeof(mqttClientId));
snprintf_P(mqttClientId, sizeof(mqttClientId), PSTR(D_MQTT_DEFAULT_NAME), mac.c_str());
// LOG_INFO(TAG_MQTT, mqttClientId);
}
// Attempt to connect and set LWT and Clean Session
snprintf_P(buffer, sizeof(buffer), PSTR("%s" MQTT_TOPIC_LWT), mqttNodeTopic); // lastWillTopic
snprintf_P(lastWillPayload, sizeof(lastWillPayload), PSTR("offline")); // lastWillPayload
while(!mqttIsConnected()) {
if(!mqttClient.connect(mqttClientId, mqttUsername, mqttPassword, buffer, 0, true, lastWillPayload, true)) {
mqttLogDisconnectReason();
} else {
LOG_INFO(TAG_MQTT, F(D_MQTT_CONNECTED), mqttServer, mqttClientId);
mqttSubscribeAllTopics();
dispatch_current_state(TAG_MQTT);
}
vTaskDelay(5000);
}
}
///// TASK keep alive fuction
void MQTTkeepalive(void* pvParameters)
{
// setting must be set before a mqtt connection is made
mqttClient.setKeepAlive(90); // setting keep alive to 90 seconds makes for a very reliable connection, must be set
// before the 1st connection is made.
for(;;) {
// check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than
// just a single check
// LOG_INFO(TAG_MQTT, F("MQTT keep alive found MQTT status %s WiFi status %s"),
// String(mqttNetworkClient.connected()).c_str(), String(WiFi.status()).c_str());
if((WiFi.status() == WL_CONNECTED) && mqttEnabled && !mqttNetworkClient.connected()) {
connectToMQTT();
}
vTaskDelay(500); // task runs approx every 250 mS
}
vTaskDelete(NULL);
}
void mqttSetup()
{
sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
xSemaphoreGive(sema_MQTT_KeepAlive);
xTaskCreatePinnedToCore(MQTTkeepalive, "MQTTkeepalive", 10000, NULL, 3, NULL, ARDUINO_RUNNING_CORE);
mqttEnabled = strlen(mqttServer) > 0 && mqttPort > 0;
if(mqttEnabled) {
mqttClient.setServer(mqttServer, mqttPort);
@ -354,15 +478,21 @@ void mqttSetup()
IRAM_ATTR void mqttLoop(void)
{
// Moved from task
// xSemaphoreTake(sema_MQTT_KeepAlive, portMAX_DELAY);
// whiles MQTTlient.loop() is running no other mqtt operations should be in process
mqttClient.loop();
// xSemaphoreGive(sema_MQTT_KeepAlive);
}
void mqttEvery5Seconds(bool networkIsConnected)
{
if(mqttEnabled && networkIsConnected && !mqttClient.connected()) {
LOG_TRACE(TAG_MQTT, F(D_MQTT_RECONNECTING));
mqttStart();
}
// Moved to task
// if(mqttEnabled && networkIsConnected && !mqttClient.connected()) {
// LOG_TRACE(TAG_MQTT, F(D_MQTT_RECONNECTING));
// mqttStart();
// }
}
void mqttStop()