From 56ae45dae251566e38fc72e42395282d62e12a13 Mon Sep 17 00:00:00 2001 From: fvanroie Date: Wed, 12 Oct 2022 14:31:10 +0200 Subject: [PATCH] Add auto-reconnect to Paho MQTT Client --- include/hasp_macro.h | 4 +- src/main_sdl2.cpp | 103 +++++++++++++---- src/mqtt/hasp_mqtt.h | 2 +- src/mqtt/hasp_mqtt_esp.cpp | 8 -- src/mqtt/hasp_mqtt_paho_single.cpp | 178 ++++++++++++++++++----------- 5 files changed, 197 insertions(+), 98 deletions(-) diff --git a/include/hasp_macro.h b/include/hasp_macro.h index f06eba80..f1f937b9 100644 --- a/include/hasp_macro.h +++ b/include/hasp_macro.h @@ -13,11 +13,11 @@ #endif #if defined(WINDOWS) || defined(POSIX) -#define HASP_RANDOM(x) rand() * x +#define HASP_RANDOM(x) rand() % x #elif defined(ARDUINO) #define HASP_RANDOM(x) random(x) #else -#define HASP_RANDOM(x) random() * x +#define HASP_RANDOM(x) random() % x #endif #if defined(WINDOWS) || defined(POSIX) diff --git a/src/main_sdl2.cpp b/src/main_sdl2.cpp index 19fc6269..d2942333 100644 --- a/src/main_sdl2.cpp +++ b/src/main_sdl2.cpp @@ -60,7 +60,7 @@ void BindStdHandlesToConsole() HANDLE hStdout = CreateFile("CONOUT$", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); HANDLE hStdin = CreateFile("CONIN$", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, - OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); + OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); SetStdHandle(STD_OUTPUT_HANDLE, hStdout); SetStdHandle(STD_ERROR_HANDLE, hStdout); @@ -123,18 +123,18 @@ void setup() // hal_setup(); guiSetup(); - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__); dispatchSetup(); // for hasp and oobe haspSetup(); #if HASP_USE_MQTT > 0 - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__); mqttSetup(); // Hasp must be running mqttStart(); #endif #if HASP_USE_GPIO > 0 - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__); gpioSetup(); #endif @@ -143,7 +143,7 @@ void setup() #endif mainLastLoopTime = millis(); // - 1000; // reset loop counter - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__); // delay(250); } @@ -188,6 +188,10 @@ void loop() haspDevice.loop_5s(); gpioEvery5Seconds(); +#if defined(HASP_USE_MQTT) + mqttEvery5Seconds(true); +#endif + #if defined(HASP_USE_CUSTOM) custom_every_5seconds(); #endif @@ -204,20 +208,23 @@ void loop() // delay(6); } -void usage(const char* progName) +void usage(const char* progName, const char* version) { - std::cout << progName << " [options]" << std::endl + std::cout << "\n\n" + << progName << " " << version << " [options]" << std::endl << std::endl << "Options:" << std::endl - << " -h | --help Print this help" << std::endl - << " -n | --name Plate hostname used in the mqtt topic" + << " -? | --help Print this help" << std::endl + << " -w | --width Width of the window" << std::endl + << " -h | --height Height of the window" << std::endl + << " --mqttname MQTT device name topic (default: computer hostname)" << std::endl + << " --mqtthost MQTT broker hostname or IP address" << std::endl + << " --mqttport MQTT broker port (default: 1883)" << std::endl + << " --mqttuser MQTT username" << std::endl + << " --mqttpass MQTT password" << std::endl + << " --mqttgroup MQTT groupname (default: plates)" << std::endl << std::endl - // << " -b | --broker Mqtt broker name or ip address" << std::endl - // << " -P | --port Mqtt broker port (default: 1883)" << std::endl - // << " -u | --user Mqtt username (optional)" << std::endl - // << " -p | --pass Mqtt password (optional)" << std::endl // << " -t | --topic Base topic of the mqtt messages (default: hasp)" << std::endl - // << " -g | --group Group topic of on which to accept incoming messages (default: plates)" // << std::endl // << " -f | --fullscreen Open the application fullscreen" << std::endl // << " -v | --verbose Verbosity level" << std::endl @@ -263,28 +270,71 @@ int main(int argc, char* argv[]) for(count = 0; count < argc; count++) std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush; + StaticJsonDocument<1024> settings; + for(count = 0; count < argc; count++) { if(argv[count][0] == '-') { - if(strncmp(argv[count], "--help", 6) == 0 || strncmp(argv[count], "-h", 2) == 0) { + if(strncmp(argv[count], "--help", 6) == 0 || strncmp(argv[count], "-?", 2) == 0) { showhelp = true; } - if(strncmp(argv[count], "--width", 7) == 0 || strncmp(argv[count], "-x", 2) == 0) { + if(strncmp(argv[count], "--width", 7) == 0 || strncmp(argv[count], "-w", 2) == 0) { int w = atoi(argv[count + 1]); if(w > 0) tft_width = w; } - if(strncmp(argv[count], "--height", 8) == 0 || strncmp(argv[count], "-y", 2) == 0) { + if(strncmp(argv[count], "--height", 8) == 0 || strncmp(argv[count], "-h", 2) == 0) { int h = atoi(argv[count + 1]); if(h > 0) tft_height = h; } - if(strncmp(argv[count], "--name", 6) == 0 || strncmp(argv[count], "-n", 2) == 0) { + if(strncmp(argv[count], "--mqttname", 10) == 0 || strncmp(argv[count], "-n", 2) == 0) { std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush; fflush(stdout); if(count + 1 < argc) { haspDevice.set_hostname(argv[count + 1]); + settings["mqtt"]["name"] = argv[count + 1]; + } else { + showhelp = true; + } + } + + if(strncmp(argv[count], "--mqtthost", 10) == 0) { + std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush; + fflush(stdout); + if(count + 1 < argc) { + settings["mqtt"]["host"] = argv[count + 1]; + } else { + showhelp = true; + } + } + + if(strncmp(argv[count], "--mqttport", 10) == 0) { + std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush; + fflush(stdout); + if(count + 1 < argc) { + settings["mqtt"]["port"] = atoi(argv[count + 1]); + } else { + showhelp = true; + } + } + + if(strncmp(argv[count], "--mqttuser", 10) == 0) { + std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush; + fflush(stdout); + if(count + 1 < argc) { + settings["mqtt"]["user"] = argv[count + 1]; + } else { + showhelp = true; + } + } + + if(strncmp(argv[count], "--mqttpass", 10) == 0) { + std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush; + fflush(stdout); + if(count + 1 < argc) { + settings["mqtt"]["pass"] = argv[count + 1]; } else { showhelp = true; } @@ -293,10 +343,10 @@ int main(int argc, char* argv[]) } if(showhelp) { - usage("openHASP"); + usage("openHASP", haspDevice.get_version()); #if defined(WINDOWS) - WriteConsole(std_out, "bye\n", 3, NULL, NULL); + WriteConsole(std_out, "bye\n\n", 3, NULL, NULL); std::cout << std::endl << std::flush; fflush(stdout); FreeConsole(); @@ -305,6 +355,11 @@ int main(int argc, char* argv[]) return 0; } + char buffer[2048]; + serializeJson(settings, buffer, sizeof(buffer)); + std::cout << buffer << std::endl << std::flush; + fflush(stdout); + mqttSetConfig(settings["mqtt"]); // printf("%s %d\n", __FILE__, __LINE__); // fflush(stdout); @@ -320,6 +375,14 @@ int main(int argc, char* argv[]) } LOG_TRACE(TAG_MAIN, "main loop completed"); +#if defined(WINDOWS) + WriteConsole(std_out, "bye\n\n", 3, NULL, NULL); + std::cout << std::endl << std::flush; + fflush(stdout); + FreeConsole(); + exit(0); +#endif + return 0; } diff --git a/src/mqtt/hasp_mqtt.h b/src/mqtt/hasp_mqtt.h index 31d2e07f..3b6627a6 100644 --- a/src/mqtt/hasp_mqtt.h +++ b/src/mqtt/hasp_mqtt.h @@ -32,8 +32,8 @@ void mqtt_get_info(JsonDocument& doc); #if HASP_USE_CONFIG > 0 bool mqttGetConfig(const JsonObject& settings); -bool mqttSetConfig(const JsonObject& settings); #endif +bool mqttSetConfig(const JsonObject& settings); #ifndef MQTT_PREFIX #define MQTT_PREFIX "hasp" diff --git a/src/mqtt/hasp_mqtt_esp.cpp b/src/mqtt/hasp_mqtt_esp.cpp index b1d95e90..bf5292ac 100644 --- a/src/mqtt/hasp_mqtt_esp.cpp +++ b/src/mqtt/hasp_mqtt_esp.cpp @@ -108,15 +108,7 @@ int mqtt_send_discovery(const char* payload, size_t len) // Receive incoming messages static void mqtt_message_cb(const char* topic, byte* payload, unsigned int length) { // Handle incoming commands from MQTT - // if(length + 1 >= mqttClient.getBufferSize()) { - // mqttFailedCount++; - // LOG_ERROR(TAG_MQTT_RCV, F(D_MQTT_PAYLOAD_TOO_LONG), (uint32_t)length); - // return; - // } else { mqttReceiveCount++; - // payload[length] = '\0'; - // } - LOG_TRACE(TAG_MQTT_RCV, F("%s = %s"), topic, (char*)payload); if(topic == strstr(topic, mqttNodeTopic)) { // startsWith mqttNodeTopic diff --git a/src/mqtt/hasp_mqtt_paho_single.cpp b/src/mqtt/hasp_mqtt_paho_single.cpp index bf75f42b..419f09f2 100644 --- a/src/mqtt/hasp_mqtt_paho_single.cpp +++ b/src/mqtt/hasp_mqtt_paho_single.cpp @@ -8,6 +8,13 @@ #if HASP_USE_MQTT > 0 #ifdef HASP_USE_PAHO +const char FP_CONFIG_HOST[] PROGMEM = "host"; +const char FP_CONFIG_PORT[] PROGMEM = "port"; +const char FP_CONFIG_NAME[] PROGMEM = "name"; +const char FP_CONFIG_USER[] PROGMEM = "user"; +const char FP_CONFIG_PASS[] PROGMEM = "pass"; +const char FP_CONFIG_GROUP[] PROGMEM = "group"; + /******************************************************************************* * Copyright (c) 2012, 2020 IBM Corp. * @@ -64,7 +71,7 @@ uint32_t mqttReceiveCount; uint32_t mqttFailedCount; std::string mqttServer = MQTT_HOSTNAME; -std::string mqttUser = MQTT_USERNAME; +std::string mqttUsername = MQTT_USERNAME; std::string mqttPassword = MQTT_PASSWORD; std::string mqttGroupName = MQTT_GROUPNAME; uint16_t mqttPort = MQTT_PORT; @@ -81,11 +88,7 @@ int mqttPublish(const char* topic, const char* payload, size_t len, bool retain) void connlost(void* context, char* cause) { - printf("\nConnection lost\n"); - if(cause) printf(" cause: %s\n", cause); - - printf("Reconnecting\n"); - mqttStart(); + LOG_WARNING(TAG_MQTT, F(D_MQTT_DISCONNECTED) ": %s", cause); } // Receive incoming messages @@ -224,7 +227,6 @@ bool mqttIsConnected() int mqtt_send_state(const __FlashStringHelper* subtopic, const char* payload) { char tmp_topic[mqttNodeTopic.length() + 20]; - // printf(("%s" MQTT_TOPIC_STATE "/%s\n"), mqttNodeTopic, subtopic); snprintf_P(tmp_topic, sizeof(tmp_topic), ("%s" MQTT_TOPIC_STATE "/%s"), mqttNodeTopic.c_str(), subtopic); return mqttPublish(tmp_topic, payload, strlen(payload), false); } @@ -232,8 +234,6 @@ int mqtt_send_state(const __FlashStringHelper* subtopic, const char* payload) int mqtt_send_discovery(const char* payload, size_t len) { char tmp_topic[128]; - // snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR(MQTT_PREFIX "/" MQTT_TOPIC_DISCOVERY - // "/%s"),haspDevice.get_hardware_id()); snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR(MQTT_PREFIX "/" MQTT_TOPIC_DISCOVERY)); return mqttPublish(tmp_topic, payload, len, false); } @@ -290,27 +290,28 @@ static void onConnect(void* context) void mqttStart() { - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer; int rc; int ch; - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); if((rc = MQTTClient_create(&mqtt_client, mqttServer.c_str(), haspDevice.get_hostname(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) { - printf("Failed to create client, return code %d\n", rc); + LOG_ERROR(TAG_MQTT, "Failed to create client, return code %d", rc); rc = EXIT_FAILURE; return; } - // if((rc = MQTTClient_setCallbacks(mqtt_client, mqtt_client, connlost, msgarrvd, NULL)) != MQTTCLIENT_SUCCESS) { - // printf("Failed to set callbacks, return code %d\n", rc); - // rc = EXIT_FAILURE; - // return; - // } + if((rc = MQTTClient_setCallbacks(mqtt_client, mqtt_client, connlost, mqtt_message_arrived, NULL)) != + MQTTCLIENT_SUCCESS) { + LOG_ERROR(TAG_MQTT, "Failed to set callbacks, return code %d", rc); + rc = EXIT_FAILURE; + return; + } - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); mqttEnabled = mqttServer.length() > 0 && mqttPort > 0; if(mqttEnabled) { @@ -322,15 +323,15 @@ void mqttStart() conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; - conn_opts.connectTimeout = 2; // seconds - conn_opts.retryInterval = 0; // no retry + conn_opts.connectTimeout = 2; // seconds + conn_opts.retryInterval = 15; // 0 = no retry - conn_opts.username = mqttUser.c_str(); + conn_opts.username = mqttUsername.c_str(); conn_opts.password = mqttPassword.c_str(); - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); if((rc = MQTTClient_connect(mqtt_client, &conn_opts)) != MQTTCLIENT_SUCCESS) { - printf("Failed to connect, return code %d\n", rc); + LOG_ERROR(TAG_MQTT, "Failed to connect, return code %d", rc); rc = EXIT_FAILURE; // goto destroy_exit; } else { @@ -338,20 +339,10 @@ void mqttStart() } } else { rc = EXIT_FAILURE; - printf("Mqtt server not configured\n"); + LOG_WARNING(TAG_MQTT, "Mqtt server not configured"); } - printf("%s %d\n", __FILE__, __LINE__); - - // while (!subscribed && !finished) - // #if defined(_WIN32) - // Sleep(100); - // #else - // usleep(10000L); - // #endif - - // if (finished) - // goto exit; + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); } void mqttStop() @@ -361,44 +352,30 @@ void mqttStop() // disc_opts.onSuccess = onDisconnect; // disc_opts.onFailure = onDisconnectFailure; if((rc = MQTTClient_disconnect(mqtt_client, 1000)) != MQTTCLIENT_SUCCESS) { - printf("Failed to disconnect, return code %d\n", rc); + LOG_ERROR(TAG_MQTT, "Failed to disconnect, return code %d", rc); rc = EXIT_FAILURE; - // goto destroy_exit; } - // while (!disc_finished) - // { - // #if defined(_WIN32) - // Sleep(100); - // #else - // usleep(10000L); - // #endif - // } - - // destroy_exit: - // MQTTClient_destroy(&client); - // exit: - // return rc; } void mqttSetup() { - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); mqttNodeTopic = MQTT_PREFIX; mqttNodeTopic += "/"; mqttNodeTopic += haspDevice.get_hostname(); mqttNodeTopic += "/"; - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); mqttGroupTopic = MQTT_PREFIX; mqttGroupTopic += "/"; mqttGroupTopic += mqttGroupName; mqttGroupTopic += "/"; - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); mqttLwtTopic = mqttNodeTopic; mqttLwtTopic += MQTT_TOPIC_LWT; - printf("%s %d\n", __FILE__, __LINE__); + LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__); } IRAM_ATTR void mqttLoop() @@ -411,28 +388,95 @@ IRAM_ATTR void mqttLoop() if(rc == MQTTCLIENT_SUCCESS && message) mqtt_message_arrived(mqtt_client, topicName, topicLen, message); }; -void mqttEvery5Seconds(bool wifiIsConnected){}; +void mqttEvery5Seconds(bool wifiIsConnected) +{ + if(!mqttIsConnected()) { + LOG_WARNING(TAG_MQTT, F(D_MQTT_RECONNECTING)); + mqttStart(); + } +}; void mqtt_get_info(JsonDocument& doc) { char mqttClientId[64]; - JsonObject info = doc.createNestedObject(F("MQTT")); - info[F(D_INFO_SERVER)] = mqttServer; - info[F(D_INFO_USERNAME)] = mqttUser; - info[F(D_INFO_CLIENTID)] = haspDevice.get_hostname(); - - if(mqttIsConnected()) { // Check MQTT connection - info[F(D_INFO_STATUS)] = F(D_SERVICE_CONNECTED); - } else { - info[F(D_INFO_STATUS)] = F("" D_SERVICE_DISCONNECTED ", return code: "); - // +String(mqttClient.returnCode()); - } - + JsonObject info = doc.createNestedObject(F("MQTT")); + info[F(D_INFO_SERVER)] = mqttServer; + info[F(D_INFO_USERNAME)] = mqttUsername; + info[F(D_INFO_CLIENTID)] = haspDevice.get_hostname(); + info[F(D_INFO_STATUS)] = mqttIsConnected() ? F(D_SERVICE_CONNECTED) : F(D_SERVICE_DISCONNECTED); info[F(D_INFO_RECEIVED)] = mqttReceiveCount; info[F(D_INFO_PUBLISHED)] = mqttPublishCount; info[F(D_INFO_FAILED)] = mqttFailedCount; } +/** Set MQTT Configuration. + * + * Read the settings from json and sets the application variables. + * + * @note: data pixel should be formated to uint32_t RGBA. Imagemagick requirements. + * + * @param[in] settings JsonObject with the config settings. + **/ +bool mqttSetConfig(const JsonObject& settings) +{ + // configOutput(settings, TAG_MQTT); + bool changed = false; + + // changed |= configSet(mqttPort, settings[FPSTR(FP_CONFIG_PORT)], F("mqttPort")); + changed |= mqttPort != settings[FPSTR(FP_CONFIG_PORT)]; + mqttPort = settings[FPSTR(FP_CONFIG_PORT)]; + + if(!settings[FPSTR(FP_CONFIG_NAME)].isNull()) { + LOG_VERBOSE(TAG_MQTT, "%s => %s", FP_CONFIG_NAME, settings[FPSTR(FP_CONFIG_NAME)].as()); + changed |= strcmp(haspDevice.get_hostname(), settings[FPSTR(FP_CONFIG_NAME)]) != 0; + // strncpy(mqttNodeName, settings[FPSTR(FP_CONFIG_NAME)], sizeof(mqttNodeName)); + haspDevice.set_hostname(settings[FPSTR(FP_CONFIG_NAME)].as()); + } + // Prefill node name + // if(strlen(haspDevice.get_hostname()) == 0) { + // char mqttNodeName[64]; + // std::string mac = halGetMacAddress(3, ""); + // mac.toLowerCase(); + // snprintf_P(mqttNodeName, sizeof(mqttNodeName), PSTR(D_MQTT_DEFAULT_NAME), mac.c_str()); + // haspDevice.set_hostname(mqttNodeName); + // changed = true; + // } + + if(!settings[FPSTR(FP_CONFIG_GROUP)].isNull()) { + changed |= mqttGroupName != settings[FPSTR(FP_CONFIG_GROUP)]; + mqttGroupName = settings[FPSTR(FP_CONFIG_GROUP)].as(); + } + + if(mqttGroupName.length() == 0) { + mqttGroupName = "plates"; + changed = true; + } + + if(!settings[FPSTR(FP_CONFIG_HOST)].isNull()) { + LOG_VERBOSE(TAG_MQTT, "%s => %s", FP_CONFIG_HOST, settings[FPSTR(FP_CONFIG_HOST)].as()); + changed |= mqttServer != settings[FPSTR(FP_CONFIG_HOST)]; + mqttServer = settings[FPSTR(FP_CONFIG_HOST)].as(); + } + + if(!settings[FPSTR(FP_CONFIG_USER)].isNull()) { + changed |= mqttUsername != settings[FPSTR(FP_CONFIG_USER)]; + mqttUsername = settings[FPSTR(FP_CONFIG_USER)].as(); + } + + if(!settings[FPSTR(FP_CONFIG_PASS)].isNull() && + settings[FPSTR(FP_CONFIG_PASS)].as() != D_PASSWORD_MASK) { + changed |= mqttPassword != settings[FPSTR(FP_CONFIG_PASS)]; + mqttPassword = settings[FPSTR(FP_CONFIG_PASS)].as(); + } + + mqttNodeTopic = MQTT_PREFIX; + mqttNodeTopic += haspDevice.get_hostname(); + mqttGroupTopic = MQTT_PREFIX; + mqttGroupTopic += mqttGroupName; + + return changed; +} + #endif // HASP_USE_PAHO #endif // USE_MQTT