From 59512141384706e5623d6978cfe911997de8839d Mon Sep 17 00:00:00 2001 From: fvanroie <15969459+fvanroie@users.noreply.github.com> Date: Sat, 20 Feb 2021 01:20:25 +0100 Subject: [PATCH] Switch paho from async to single thread sync client --- ...mqtt_paho.cpp => hasp_mqtt_paho_async.cpp} | 15 +- src/mqtt/hasp_mqtt_paho_single.cpp | 371 ++++++++++++++++++ user_setups/win32/emulator_64bits.ini | 8 +- 3 files changed, 391 insertions(+), 3 deletions(-) rename src/mqtt/{hasp_mqtt_paho.cpp => hasp_mqtt_paho_async.cpp} (97%) create mode 100644 src/mqtt/hasp_mqtt_paho_single.cpp diff --git a/src/mqtt/hasp_mqtt_paho.cpp b/src/mqtt/hasp_mqtt_paho_async.cpp similarity index 97% rename from src/mqtt/hasp_mqtt_paho.cpp rename to src/mqtt/hasp_mqtt_paho_async.cpp index d33fa022..af3aab4e 100644 --- a/src/mqtt/hasp_mqtt_paho.cpp +++ b/src/mqtt/hasp_mqtt_paho_async.cpp @@ -1,11 +1,13 @@ /* MIT License - Copyright (c) 2020 Francis Van Roie For full license information read the LICENSE file in the project folder */ +/* Multi threaded asynchronous paho client */ + #include #include "hasp_conf.h" -#if HASP_USE_MQTT > 0 +#if HASP_USE_MQTT_ASYNC > 0 #ifdef USE_PAHO /******************************************************************************* @@ -29,6 +31,7 @@ #include #include #include +#include #include "MQTTAsync.h" @@ -90,6 +93,9 @@ bool mqttHAautodiscover = true; #define LWT_TOPIC "LWT" +std::recursive_mutex dispatch_mtx; +std::recursive_mutex publish_mtx; + char mqttServer[16] = MQTT_HOST; char mqttUser[23] = MQTT_USER; char mqttPassword[32] = MQTT_PASSW; @@ -137,7 +143,9 @@ static void mqtt_message_cb(char* topic, char* payload, unsigned int length) // Group topic topic += strlen(mqttGroupTopic); // shorten topic + dispatch_mtx.lock(); dispatch_topic_payload(topic, (const char*)payload); + dispatch_mtx.unlock(); return; } else if(topic == strstr_P(topic, PSTR("homeassistant/status"))) { // HA discovery topic @@ -170,7 +178,9 @@ static void mqtt_message_cb(char* topic, char* payload, unsigned int length) // LOG_TRACE(TAG_MQTT, F("ignoring LWT = online")); } } else { + dispatch_mtx.lock(); dispatch_topic_payload(topic, (const char*)payload); + dispatch_mtx.unlock(); } } @@ -303,9 +313,12 @@ static bool mqttPublish(const char* topic, const char* payload, size_t len, bool pubmsg.payloadlen = (int)strlen(payload); pubmsg.qos = QOS; pubmsg.retained = 0; + dispatch_mtx.lock(); if((rc = MQTTAsync_sendMessage(mqtt_client, topic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { + dispatch_mtx.unlock(); LOG_ERROR(TAG_MQTT_PUB, F(D_MQTT_FAILED " %s => %s"), topic, payload); } else { + dispatch_mtx.unlock(); LOG_TRACE(TAG_MQTT_PUB, F("%s => %s"), topic, payload); return true; } diff --git a/src/mqtt/hasp_mqtt_paho_single.cpp b/src/mqtt/hasp_mqtt_paho_single.cpp new file mode 100644 index 00000000..c4e86946 --- /dev/null +++ b/src/mqtt/hasp_mqtt_paho_single.cpp @@ -0,0 +1,371 @@ +/* MIT License - Copyright (c) 2020 Francis Van Roie + For full license information read the LICENSE file in the project folder */ + +/* Single threaded synchronous paho client */ + +#include + +#include "hasp_conf.h" + +#if HASP_USE_MQTT > 0 +#ifdef USE_PAHO + +/******************************************************************************* + * Copyright (c) 2012, 2020 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + *******************************************************************************/ + +#include +#include +#include +#include +#include + +#include "MQTTClient.h" + +#include "hasp_mqtt.h" // functions to implement here + +#include "hasp/hasp_dispatch.h" // for dispatch_topic_payload +#include "hasp_debug.h" // for logging + +#if !defined(_WIN32) +#include +#else +#include +#endif + +#if defined(_WRS_KERNEL) +#include +#endif + +#define ADDRESS "10.4.0.5:1883" +#define CLIENTID "test1123" +#define TOPIC "hasp/plate35/" +#define QOS 1 +#define TIMEOUT 1000L + +const char* mqttNodeTopic = "hasp/plate35/"; +const char* mqttGroupTopic = "hasp/plates/"; +// char mqttNodeTopic[24]; +// char mqttGroupTopic[24]; +bool mqttEnabled = false; +bool mqttHAautodiscover = true; + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// These defaults may be overwritten with values saved by the web interface +#ifndef MQTT_HOST +#define MQTT_HOST ""; +#endif + +#ifndef MQTT_PORT +#define MQTT_PORT 1883; +#endif + +#ifndef MQTT_USER +#define MQTT_USER ""; +#endif + +#ifndef MQTT_PASSW +#define MQTT_PASSW ""; +#endif +#ifndef MQTT_NODENAME +#define MQTT_NODENAME ""; +#endif +#ifndef MQTT_GROUPNAME +#define MQTT_GROUPNAME ""; +#endif + +#ifndef MQTT_PREFIX +#define MQTT_PREFIX "hasp" +#endif + +#define LWT_TOPIC "LWT" + +char mqttServer[16] = MQTT_HOST; +char mqttUser[23] = MQTT_USER; +char mqttPassword[32] = MQTT_PASSW; +// char mqttNodeName[16] = MQTT_NODENAME; +char mqttGroupName[16] = MQTT_GROUPNAME; +uint16_t mqttPort = MQTT_PORT; + +MQTTClient mqtt_client; + +int disc_finished = 0; +int subscribed = 0; +int connected = 0; + +static bool mqttPublish(const char* topic, const char* payload, size_t len, bool retain); + +/* ===== Paho event callbacks ===== */ + +void connlost(void* context, char* cause) +{ + printf("\nConnection lost\n"); + if(cause) printf(" cause: %s\n", cause); + + printf("Reconnecting\n"); + mqttStart(); +} + +// Receive incoming messages +static void mqtt_message_cb(char* topic, char* payload, unsigned int length) +{ // Handle incoming commands from MQTT + if(length + 1 >= MQTT_MAX_PACKET_SIZE) { + LOG_ERROR(TAG_MQTT_RCV, F("Payload too long (%d bytes)"), length); + return; + } else { + payload[length] = '\0'; + } + + LOG_TRACE(TAG_MQTT_RCV, F("%s = %s"), topic, (char*)payload); + + if(topic == strstr(topic, mqttNodeTopic)) { // startsWith mqttNodeTopic + + // Node topic + topic += strlen(mqttNodeTopic); // shorten topic + + } else if(topic == strstr(topic, mqttGroupTopic)) { // startsWith mqttGroupTopic + + // Group topic + topic += strlen(mqttGroupTopic); // shorten topic + dispatch_topic_payload(topic, (const char*)payload); + return; + + } else if(topic == strstr_P(topic, PSTR("homeassistant/status"))) { // HA discovery topic + if(mqttHAautodiscover && !strcasecmp_P((char*)payload, PSTR("online"))) { + // dispatch_current_state(); + // mqtt_ha_register_auto_discovery(); + } + return; + + } else { + // Other topic + LOG_ERROR(TAG_MQTT, F(D_MQTT_INVALID_TOPIC)); + return; + } + + // catch a dangling LWT from a previous connection if it appears + if(!strcmp_P(topic, PSTR(LWT_TOPIC))) { // endsWith LWT + if(!strcasecmp_P((char*)payload, PSTR("offline"))) { + { + char msg[8]; + char tmp_topic[strlen(mqttNodeTopic) + 8]; + snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR("%s" LWT_TOPIC), mqttNodeTopic); + snprintf_P(msg, sizeof(msg), PSTR("online")); + + // /*bool res =*/mqttClient.publish(tmp_topic, msg, true); + mqttPublish(tmp_topic, msg, strlen(msg), true); + } + + } else { + // LOG_TRACE(TAG_MQTT, F("ignoring LWT = online")); + } + } else { + dispatch_topic_payload(topic, (const char*)payload); + } +} + +int msgarrvd(void* context, char* topicName, int topicLen, MQTTClient_message* message) +{ + // printf("MQT RCV >> "); + // printf("%s => %.*s (%d)\n", topicName, message->payloadlen, (char *)message->payload, message->payloadlen); + + char msg[message->payloadlen + 1]; + memcpy(msg, (char*)message->payload, message->payloadlen); + msg[message->payloadlen] = '\0'; + + mqtt_message_cb(topicName, (char*)message->payload, message->payloadlen); + + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; // the message was received properly +} + +void mqtt_subscribe(void* context, const char* topic) +{ + MQTTClient client = (MQTTClient)context; + int rc; + + printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", topic, CLIENTID, QOS); + if((rc = MQTTClient_subscribe(client, topic, QOS)) != MQTTCLIENT_SUCCESS) { + printf("Failed to start subscribe, return code %d\n", rc); + } +} + +/* ===== Local HASP MQTT functions ===== */ + +bool mqttPublish(const char* topic, const char* payload, size_t len, bool retain) +{ + if(mqttIsConnected()) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + MQTTClient_deliveryToken token; + + pubmsg.payload = (char*)payload; + pubmsg.payloadlen = len; // (int)strlen(payload); + pubmsg.qos = QOS; + pubmsg.retained = retain; + + MQTTClient_publishMessage(mqtt_client, topic, &pubmsg, &token); + int rc = MQTTClient_waitForCompletion(mqtt_client, token, TIMEOUT); + + if(rc != MQTTCLIENT_SUCCESS) { + LOG_ERROR(TAG_MQTT_PUB, F(D_MQTT_FAILED " '%s' => %s"), topic, payload); + } else { + LOG_TRACE(TAG_MQTT_PUB, F("'%s' => %s OK"), topic, payload); + return true; + } + } else { + LOG_ERROR(TAG_MQTT, F(D_MQTT_NOT_CONNECTED)); + } + return false; +} + +// static bool mqttPublish(const char* topic, const char* payload, bool retain) +// { +// return mqttPublish(topic, payload, strlen(payload), retain); +// } + +/* ===== Public HASP MQTT functions ===== */ + +bool mqttIsConnected() +{ + return connected == 1; +} + +void mqtt_send_state(const __FlashStringHelper* subtopic, const char* payload) +{ + char tmp_topic[strlen(mqttNodeTopic) + 20]; + // printf(("%sstate/%s\n"), mqttNodeTopic, subtopic); + snprintf_P(tmp_topic, sizeof(tmp_topic), ("%sstate/%s"), mqttNodeTopic, subtopic); + mqttPublish(tmp_topic, payload, strlen(payload), false); +} + +void mqtt_send_object_state(uint8_t pageid, uint8_t btnid, char* payload) +{ + char tmp_topic[strlen(mqttNodeTopic) + 20]; + snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR("%sstate/p%ub%u"), mqttNodeTopic, pageid, btnid); + mqttPublish(tmp_topic, payload, strlen(payload), false); +} + +static void onConnect(void* context) +{ + MQTTClient client = (MQTTClient)context; + connected = 1; + + printf("Successful connection\n"); + + mqtt_subscribe(mqtt_client, TOPIC "command/#"); + mqtt_subscribe(mqtt_client, TOPIC "command"); + mqtt_subscribe(mqtt_client, TOPIC "light"); + mqtt_subscribe(mqtt_client, TOPIC "dim"); + + mqttPublish(TOPIC LWT_TOPIC, "online", 6, false); + + mqtt_send_object_state(0, 0, "connected"); + std::cout << std::endl; +} + +void mqttStart() +{ + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer; + int rc; + int ch; + + if((rc = MQTTClient_create(&mqtt_client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != + MQTTCLIENT_SUCCESS) { + printf("Failed to create client, return code %d\n", rc); + rc = EXIT_FAILURE; + return; + } + + // if((rc = MQTTClient_setCallbacks(mqtt_client, mqtt_client, connlost, msgarrvd, NULL)) != MQTTCLIENT_SUCCESS) { + // printf("Failed to set callbacks, return code %d\n", rc); + // rc = EXIT_FAILURE; + // return; + // } + + conn_opts.will = &will_opts; + conn_opts.will->message = "offline"; + conn_opts.will->qos = 1; + conn_opts.will->retained = 0; + conn_opts.will->topicName = "hasp/plate35/LWT"; + + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + + // conn_opts.username = ""; + // conn_opts.password = ""; + + if((rc = MQTTClient_connect(mqtt_client, &conn_opts)) != MQTTCLIENT_SUCCESS) { + printf("Failed to start connect, return code %d\n", rc); + rc = EXIT_FAILURE; + // goto destroy_exit; + } else { + onConnect(&mqtt_client); + } + + // while (!subscribed && !finished) + // #if defined(_WIN32) + // Sleep(100); + // #else + // usleep(10000L); + // #endif + + // if (finished) + // goto exit; +} + +void mqttStop() +{ + int rc; + // MQTTClient_disconnectOptions disc_opts = MQTTClient_disconnectOptions_initializer; + // disc_opts.onSuccess = onDisconnect; + // disc_opts.onFailure = onDisconnectFailure; + if((rc = MQTTClient_disconnect(mqtt_client, 1000)) != MQTTCLIENT_SUCCESS) { + printf("Failed to start disconnect, return code %d\n", rc); + rc = EXIT_FAILURE; + // goto destroy_exit; + } + // while (!disc_finished) + // { + // #if defined(_WIN32) + // Sleep(100); + // #else + // usleep(10000L); + // #endif + // } + + // destroy_exit: + // MQTTClient_destroy(&client); + // exit: + // return rc; +} + +void mqttSetup(){}; + +char* topicName; +int topicLen; +MQTTClient_message* message; +void mqttLoop() +{ + + int rc = MQTTClient_receive(mqtt_client, &topicName, &topicLen, &message, 4); + if(rc == MQTTCLIENT_SUCCESS && message) msgarrvd(mqtt_client, topicName, topicLen, message); +}; + +void mqttEvery5Seconds(bool wifiIsConnected){}; + +#endif // USE_PAHO +#endif // USE_MQTT \ No newline at end of file diff --git a/user_setups/win32/emulator_64bits.ini b/user_setups/win32/emulator_64bits.ini index 2c24a6e8..67cf5d38 100644 --- a/user_setups/win32/emulator_64bits.ini +++ b/user_setups/win32/emulator_64bits.ini @@ -87,10 +87,14 @@ src_filter = -<*.h> +<../hal/sdl2> +<../.pio/libdeps/emulator_64bits/paho/src/*.c> - -<../.pio/libdeps/emulator_64bits/paho/src/MQTTClient.c> + +<../.pio/libdeps/emulator_64bits/paho/src/MQTTClient.c> + -<../.pio/libdeps/emulator_64bits/paho/src/MQTTAsync.c> + -<../.pio/libdeps/emulator_64bits/paho/src/MQTTAsyncUtils.c> -<../.pio/libdeps/emulator_64bits/paho/src/MQTTVersion.c> -<../.pio/libdeps/emulator_64bits/paho/src/SSLSocket.c> - - + + + - + - - - -