Add auto-reconnect to Paho MQTT Client

This commit is contained in:
fvanroie 2022-10-12 14:31:10 +02:00
parent b370733012
commit 56ae45dae2
5 changed files with 197 additions and 98 deletions

View File

@ -13,11 +13,11 @@
#endif
#if defined(WINDOWS) || defined(POSIX)
#define HASP_RANDOM(x) rand() * x
#define HASP_RANDOM(x) rand() % x
#elif defined(ARDUINO)
#define HASP_RANDOM(x) random(x)
#else
#define HASP_RANDOM(x) random() * x
#define HASP_RANDOM(x) random() % x
#endif
#if defined(WINDOWS) || defined(POSIX)

View File

@ -60,7 +60,7 @@ void BindStdHandlesToConsole()
HANDLE hStdout = CreateFile("CONOUT$", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
HANDLE hStdin = CreateFile("CONIN$", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
SetStdHandle(STD_OUTPUT_HANDLE, hStdout);
SetStdHandle(STD_ERROR_HANDLE, hStdout);
@ -123,18 +123,18 @@ void setup()
// hal_setup();
guiSetup();
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__);
dispatchSetup(); // for hasp and oobe
haspSetup();
#if HASP_USE_MQTT > 0
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__);
mqttSetup(); // Hasp must be running
mqttStart();
#endif
#if HASP_USE_GPIO > 0
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__);
gpioSetup();
#endif
@ -143,7 +143,7 @@ void setup()
#endif
mainLastLoopTime = millis(); // - 1000; // reset loop counter
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MAIN, "%s %d", __FILE__, __LINE__);
// delay(250);
}
@ -188,6 +188,10 @@ void loop()
haspDevice.loop_5s();
gpioEvery5Seconds();
#if defined(HASP_USE_MQTT)
mqttEvery5Seconds(true);
#endif
#if defined(HASP_USE_CUSTOM)
custom_every_5seconds();
#endif
@ -204,20 +208,23 @@ void loop()
// delay(6);
}
void usage(const char* progName)
void usage(const char* progName, const char* version)
{
std::cout << progName << " [options]" << std::endl
std::cout << "\n\n"
<< progName << " " << version << " [options]" << std::endl
<< std::endl
<< "Options:" << std::endl
<< " -h | --help Print this help" << std::endl
<< " -n | --name Plate hostname used in the mqtt topic"
<< " -? | --help Print this help" << std::endl
<< " -w | --width Width of the window" << std::endl
<< " -h | --height Height of the window" << std::endl
<< " --mqttname MQTT device name topic (default: computer hostname)" << std::endl
<< " --mqtthost MQTT broker hostname or IP address" << std::endl
<< " --mqttport MQTT broker port (default: 1883)" << std::endl
<< " --mqttuser MQTT username" << std::endl
<< " --mqttpass MQTT password" << std::endl
<< " --mqttgroup MQTT groupname (default: plates)" << std::endl
<< std::endl
// << " -b | --broker Mqtt broker name or ip address" << std::endl
// << " -P | --port Mqtt broker port (default: 1883)" << std::endl
// << " -u | --user Mqtt username (optional)" << std::endl
// << " -p | --pass Mqtt password (optional)" << std::endl
// << " -t | --topic Base topic of the mqtt messages (default: hasp)" << std::endl
// << " -g | --group Group topic of on which to accept incoming messages (default: plates)"
// << std::endl
// << " -f | --fullscreen Open the application fullscreen" << std::endl
// << " -v | --verbose Verbosity level" << std::endl
@ -263,28 +270,71 @@ int main(int argc, char* argv[])
for(count = 0; count < argc; count++)
std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush;
StaticJsonDocument<1024> settings;
for(count = 0; count < argc; count++) {
if(argv[count][0] == '-') {
if(strncmp(argv[count], "--help", 6) == 0 || strncmp(argv[count], "-h", 2) == 0) {
if(strncmp(argv[count], "--help", 6) == 0 || strncmp(argv[count], "-?", 2) == 0) {
showhelp = true;
}
if(strncmp(argv[count], "--width", 7) == 0 || strncmp(argv[count], "-x", 2) == 0) {
if(strncmp(argv[count], "--width", 7) == 0 || strncmp(argv[count], "-w", 2) == 0) {
int w = atoi(argv[count + 1]);
if(w > 0) tft_width = w;
}
if(strncmp(argv[count], "--height", 8) == 0 || strncmp(argv[count], "-y", 2) == 0) {
if(strncmp(argv[count], "--height", 8) == 0 || strncmp(argv[count], "-h", 2) == 0) {
int h = atoi(argv[count + 1]);
if(h > 0) tft_height = h;
}
if(strncmp(argv[count], "--name", 6) == 0 || strncmp(argv[count], "-n", 2) == 0) {
if(strncmp(argv[count], "--mqttname", 10) == 0 || strncmp(argv[count], "-n", 2) == 0) {
std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush;
fflush(stdout);
if(count + 1 < argc) {
haspDevice.set_hostname(argv[count + 1]);
settings["mqtt"]["name"] = argv[count + 1];
} else {
showhelp = true;
}
}
if(strncmp(argv[count], "--mqtthost", 10) == 0) {
std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush;
fflush(stdout);
if(count + 1 < argc) {
settings["mqtt"]["host"] = argv[count + 1];
} else {
showhelp = true;
}
}
if(strncmp(argv[count], "--mqttport", 10) == 0) {
std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush;
fflush(stdout);
if(count + 1 < argc) {
settings["mqtt"]["port"] = atoi(argv[count + 1]);
} else {
showhelp = true;
}
}
if(strncmp(argv[count], "--mqttuser", 10) == 0) {
std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush;
fflush(stdout);
if(count + 1 < argc) {
settings["mqtt"]["user"] = argv[count + 1];
} else {
showhelp = true;
}
}
if(strncmp(argv[count], "--mqttpass", 10) == 0) {
std::cout << " argv[" << count << "] " << argv[count] << std::endl << std::flush;
fflush(stdout);
if(count + 1 < argc) {
settings["mqtt"]["pass"] = argv[count + 1];
} else {
showhelp = true;
}
@ -293,10 +343,10 @@ int main(int argc, char* argv[])
}
if(showhelp) {
usage("openHASP");
usage("openHASP", haspDevice.get_version());
#if defined(WINDOWS)
WriteConsole(std_out, "bye\n", 3, NULL, NULL);
WriteConsole(std_out, "bye\n\n", 3, NULL, NULL);
std::cout << std::endl << std::flush;
fflush(stdout);
FreeConsole();
@ -305,6 +355,11 @@ int main(int argc, char* argv[])
return 0;
}
char buffer[2048];
serializeJson(settings, buffer, sizeof(buffer));
std::cout << buffer << std::endl << std::flush;
fflush(stdout);
mqttSetConfig(settings["mqtt"]);
// printf("%s %d\n", __FILE__, __LINE__);
// fflush(stdout);
@ -320,6 +375,14 @@ int main(int argc, char* argv[])
}
LOG_TRACE(TAG_MAIN, "main loop completed");
#if defined(WINDOWS)
WriteConsole(std_out, "bye\n\n", 3, NULL, NULL);
std::cout << std::endl << std::flush;
fflush(stdout);
FreeConsole();
exit(0);
#endif
return 0;
}

View File

@ -32,8 +32,8 @@ void mqtt_get_info(JsonDocument& doc);
#if HASP_USE_CONFIG > 0
bool mqttGetConfig(const JsonObject& settings);
bool mqttSetConfig(const JsonObject& settings);
#endif
bool mqttSetConfig(const JsonObject& settings);
#ifndef MQTT_PREFIX
#define MQTT_PREFIX "hasp"

View File

@ -108,15 +108,7 @@ int mqtt_send_discovery(const char* payload, size_t len)
// Receive incoming messages
static void mqtt_message_cb(const char* topic, byte* payload, unsigned int length)
{ // Handle incoming commands from MQTT
// if(length + 1 >= mqttClient.getBufferSize()) {
// mqttFailedCount++;
// LOG_ERROR(TAG_MQTT_RCV, F(D_MQTT_PAYLOAD_TOO_LONG), (uint32_t)length);
// return;
// } else {
mqttReceiveCount++;
// payload[length] = '\0';
// }
LOG_TRACE(TAG_MQTT_RCV, F("%s = %s"), topic, (char*)payload);
if(topic == strstr(topic, mqttNodeTopic)) { // startsWith mqttNodeTopic

View File

@ -8,6 +8,13 @@
#if HASP_USE_MQTT > 0
#ifdef HASP_USE_PAHO
const char FP_CONFIG_HOST[] PROGMEM = "host";
const char FP_CONFIG_PORT[] PROGMEM = "port";
const char FP_CONFIG_NAME[] PROGMEM = "name";
const char FP_CONFIG_USER[] PROGMEM = "user";
const char FP_CONFIG_PASS[] PROGMEM = "pass";
const char FP_CONFIG_GROUP[] PROGMEM = "group";
/*******************************************************************************
* Copyright (c) 2012, 2020 IBM Corp.
*
@ -64,7 +71,7 @@ uint32_t mqttReceiveCount;
uint32_t mqttFailedCount;
std::string mqttServer = MQTT_HOSTNAME;
std::string mqttUser = MQTT_USERNAME;
std::string mqttUsername = MQTT_USERNAME;
std::string mqttPassword = MQTT_PASSWORD;
std::string mqttGroupName = MQTT_GROUPNAME;
uint16_t mqttPort = MQTT_PORT;
@ -81,11 +88,7 @@ int mqttPublish(const char* topic, const char* payload, size_t len, bool retain)
void connlost(void* context, char* cause)
{
printf("\nConnection lost\n");
if(cause) printf(" cause: %s\n", cause);
printf("Reconnecting\n");
mqttStart();
LOG_WARNING(TAG_MQTT, F(D_MQTT_DISCONNECTED) ": %s", cause);
}
// Receive incoming messages
@ -224,7 +227,6 @@ bool mqttIsConnected()
int mqtt_send_state(const __FlashStringHelper* subtopic, const char* payload)
{
char tmp_topic[mqttNodeTopic.length() + 20];
// printf(("%s" MQTT_TOPIC_STATE "/%s\n"), mqttNodeTopic, subtopic);
snprintf_P(tmp_topic, sizeof(tmp_topic), ("%s" MQTT_TOPIC_STATE "/%s"), mqttNodeTopic.c_str(), subtopic);
return mqttPublish(tmp_topic, payload, strlen(payload), false);
}
@ -232,8 +234,6 @@ int mqtt_send_state(const __FlashStringHelper* subtopic, const char* payload)
int mqtt_send_discovery(const char* payload, size_t len)
{
char tmp_topic[128];
// snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR(MQTT_PREFIX "/" MQTT_TOPIC_DISCOVERY
// "/%s"),haspDevice.get_hardware_id());
snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR(MQTT_PREFIX "/" MQTT_TOPIC_DISCOVERY));
return mqttPublish(tmp_topic, payload, len, false);
}
@ -290,27 +290,28 @@ static void onConnect(void* context)
void mqttStart()
{
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
int rc;
int ch;
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
if((rc = MQTTClient_create(&mqtt_client, mqttServer.c_str(), haspDevice.get_hostname(), MQTTCLIENT_PERSISTENCE_NONE,
NULL)) != MQTTCLIENT_SUCCESS) {
printf("Failed to create client, return code %d\n", rc);
LOG_ERROR(TAG_MQTT, "Failed to create client, return code %d", rc);
rc = EXIT_FAILURE;
return;
}
// if((rc = MQTTClient_setCallbacks(mqtt_client, mqtt_client, connlost, msgarrvd, NULL)) != MQTTCLIENT_SUCCESS) {
// printf("Failed to set callbacks, return code %d\n", rc);
// rc = EXIT_FAILURE;
// return;
// }
if((rc = MQTTClient_setCallbacks(mqtt_client, mqtt_client, connlost, mqtt_message_arrived, NULL)) !=
MQTTCLIENT_SUCCESS) {
LOG_ERROR(TAG_MQTT, "Failed to set callbacks, return code %d", rc);
rc = EXIT_FAILURE;
return;
}
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
mqttEnabled = mqttServer.length() > 0 && mqttPort > 0;
if(mqttEnabled) {
@ -322,15 +323,15 @@ void mqttStart()
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.connectTimeout = 2; // seconds
conn_opts.retryInterval = 0; // no retry
conn_opts.connectTimeout = 2; // seconds
conn_opts.retryInterval = 15; // 0 = no retry
conn_opts.username = mqttUser.c_str();
conn_opts.username = mqttUsername.c_str();
conn_opts.password = mqttPassword.c_str();
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
if((rc = MQTTClient_connect(mqtt_client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc);
LOG_ERROR(TAG_MQTT, "Failed to connect, return code %d", rc);
rc = EXIT_FAILURE;
// goto destroy_exit;
} else {
@ -338,20 +339,10 @@ void mqttStart()
}
} else {
rc = EXIT_FAILURE;
printf("Mqtt server not configured\n");
LOG_WARNING(TAG_MQTT, "Mqtt server not configured");
}
printf("%s %d\n", __FILE__, __LINE__);
// while (!subscribed && !finished)
// #if defined(_WIN32)
// Sleep(100);
// #else
// usleep(10000L);
// #endif
// if (finished)
// goto exit;
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
}
void mqttStop()
@ -361,44 +352,30 @@ void mqttStop()
// disc_opts.onSuccess = onDisconnect;
// disc_opts.onFailure = onDisconnectFailure;
if((rc = MQTTClient_disconnect(mqtt_client, 1000)) != MQTTCLIENT_SUCCESS) {
printf("Failed to disconnect, return code %d\n", rc);
LOG_ERROR(TAG_MQTT, "Failed to disconnect, return code %d", rc);
rc = EXIT_FAILURE;
// goto destroy_exit;
}
// while (!disc_finished)
// {
// #if defined(_WIN32)
// Sleep(100);
// #else
// usleep(10000L);
// #endif
// }
// destroy_exit:
// MQTTClient_destroy(&client);
// exit:
// return rc;
}
void mqttSetup()
{
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
mqttNodeTopic = MQTT_PREFIX;
mqttNodeTopic += "/";
mqttNodeTopic += haspDevice.get_hostname();
mqttNodeTopic += "/";
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
mqttGroupTopic = MQTT_PREFIX;
mqttGroupTopic += "/";
mqttGroupTopic += mqttGroupName;
mqttGroupTopic += "/";
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
mqttLwtTopic = mqttNodeTopic;
mqttLwtTopic += MQTT_TOPIC_LWT;
printf("%s %d\n", __FILE__, __LINE__);
LOG_DEBUG(TAG_MQTT, "%s %d", __FILE__, __LINE__);
}
IRAM_ATTR void mqttLoop()
@ -411,28 +388,95 @@ IRAM_ATTR void mqttLoop()
if(rc == MQTTCLIENT_SUCCESS && message) mqtt_message_arrived(mqtt_client, topicName, topicLen, message);
};
void mqttEvery5Seconds(bool wifiIsConnected){};
void mqttEvery5Seconds(bool wifiIsConnected)
{
if(!mqttIsConnected()) {
LOG_WARNING(TAG_MQTT, F(D_MQTT_RECONNECTING));
mqttStart();
}
};
void mqtt_get_info(JsonDocument& doc)
{
char mqttClientId[64];
JsonObject info = doc.createNestedObject(F("MQTT"));
info[F(D_INFO_SERVER)] = mqttServer;
info[F(D_INFO_USERNAME)] = mqttUser;
info[F(D_INFO_CLIENTID)] = haspDevice.get_hostname();
if(mqttIsConnected()) { // Check MQTT connection
info[F(D_INFO_STATUS)] = F(D_SERVICE_CONNECTED);
} else {
info[F(D_INFO_STATUS)] = F("<font color='red'><b>" D_SERVICE_DISCONNECTED "</b></font>, return code: ");
// +String(mqttClient.returnCode());
}
JsonObject info = doc.createNestedObject(F("MQTT"));
info[F(D_INFO_SERVER)] = mqttServer;
info[F(D_INFO_USERNAME)] = mqttUsername;
info[F(D_INFO_CLIENTID)] = haspDevice.get_hostname();
info[F(D_INFO_STATUS)] = mqttIsConnected() ? F(D_SERVICE_CONNECTED) : F(D_SERVICE_DISCONNECTED);
info[F(D_INFO_RECEIVED)] = mqttReceiveCount;
info[F(D_INFO_PUBLISHED)] = mqttPublishCount;
info[F(D_INFO_FAILED)] = mqttFailedCount;
}
/** Set MQTT Configuration.
*
* Read the settings from json and sets the application variables.
*
* @note: data pixel should be formated to uint32_t RGBA. Imagemagick requirements.
*
* @param[in] settings JsonObject with the config settings.
**/
bool mqttSetConfig(const JsonObject& settings)
{
// configOutput(settings, TAG_MQTT);
bool changed = false;
// changed |= configSet(mqttPort, settings[FPSTR(FP_CONFIG_PORT)], F("mqttPort"));
changed |= mqttPort != settings[FPSTR(FP_CONFIG_PORT)];
mqttPort = settings[FPSTR(FP_CONFIG_PORT)];
if(!settings[FPSTR(FP_CONFIG_NAME)].isNull()) {
LOG_VERBOSE(TAG_MQTT, "%s => %s", FP_CONFIG_NAME, settings[FPSTR(FP_CONFIG_NAME)].as<const char*>());
changed |= strcmp(haspDevice.get_hostname(), settings[FPSTR(FP_CONFIG_NAME)]) != 0;
// strncpy(mqttNodeName, settings[FPSTR(FP_CONFIG_NAME)], sizeof(mqttNodeName));
haspDevice.set_hostname(settings[FPSTR(FP_CONFIG_NAME)].as<const char*>());
}
// Prefill node name
// if(strlen(haspDevice.get_hostname()) == 0) {
// char mqttNodeName[64];
// std::string mac = halGetMacAddress(3, "");
// mac.toLowerCase();
// snprintf_P(mqttNodeName, sizeof(mqttNodeName), PSTR(D_MQTT_DEFAULT_NAME), mac.c_str());
// haspDevice.set_hostname(mqttNodeName);
// changed = true;
// }
if(!settings[FPSTR(FP_CONFIG_GROUP)].isNull()) {
changed |= mqttGroupName != settings[FPSTR(FP_CONFIG_GROUP)];
mqttGroupName = settings[FPSTR(FP_CONFIG_GROUP)].as<std::string>();
}
if(mqttGroupName.length() == 0) {
mqttGroupName = "plates";
changed = true;
}
if(!settings[FPSTR(FP_CONFIG_HOST)].isNull()) {
LOG_VERBOSE(TAG_MQTT, "%s => %s", FP_CONFIG_HOST, settings[FPSTR(FP_CONFIG_HOST)].as<const char*>());
changed |= mqttServer != settings[FPSTR(FP_CONFIG_HOST)];
mqttServer = settings[FPSTR(FP_CONFIG_HOST)].as<const char*>();
}
if(!settings[FPSTR(FP_CONFIG_USER)].isNull()) {
changed |= mqttUsername != settings[FPSTR(FP_CONFIG_USER)];
mqttUsername = settings[FPSTR(FP_CONFIG_USER)].as<const char*>();
}
if(!settings[FPSTR(FP_CONFIG_PASS)].isNull() &&
settings[FPSTR(FP_CONFIG_PASS)].as<std::string>() != D_PASSWORD_MASK) {
changed |= mqttPassword != settings[FPSTR(FP_CONFIG_PASS)];
mqttPassword = settings[FPSTR(FP_CONFIG_PASS)].as<const char*>();
}
mqttNodeTopic = MQTT_PREFIX;
mqttNodeTopic += haspDevice.get_hostname();
mqttGroupTopic = MQTT_PREFIX;
mqttGroupTopic += mqttGroupName;
return changed;
}
#endif // HASP_USE_PAHO
#endif // USE_MQTT