Improve native app mqtt client

This commit is contained in:
fvanroie 2021-04-25 04:34:04 +02:00
parent 034faa78b7
commit c582df6550
3 changed files with 72 additions and 72 deletions

View File

@ -266,15 +266,15 @@ int main(int argc, char* argv[])
// printf("%s %d\n", __FILE__, __LINE__);
// fflush(stdout);
printf("pre setup\n");
setup();
printf("to loop\n");
LOG_NOTICE(TAG_MAIN, "pre setup");
setup();
LOG_TRACE(TAG_MAIN, "loop started");
while(isRunning) {
loop();
// std::cout << "HSetup OK\n";
}
printf("endrunning\n");
LOG_TRACE(TAG_MAIN, "main loop completed");
return 0;
}

View File

@ -17,8 +17,8 @@
#define RETAINED true
// extern char mqttNodeName[16];
extern char mqttNodeTopic[24];
extern char mqttGroupTopic[24];
extern std::string mqttNodeTopic;
extern std::string mqttGroupTopic;
extern bool mqttEnabled;
extern bool mqttHAautodiscover;

View File

@ -6,6 +6,7 @@
#include <stdint.h>
#include "hasp_conf.h"
#include "hasplib.h"
#if HASP_USE_MQTT > 0
#ifdef USE_PAHO
@ -50,23 +51,22 @@
#include <OsWrapper.h>
#endif
#define ADDRESS "10.1.0.208:1883"
#define CLIENTID "test1123"
#define TOPIC "hasp/plate35/"
// #define ADDRESS "10.1.0.208:1883"
// #define CLIENTID "test1123"
// #define TOPIC "hasp/plate35/"
#define QOS 1
#define TIMEOUT 1000L
char mqttNodeTopic[24] = "hasp/plate35/";
const char* mqttGroupTopic = "hasp/plates/";
// char mqttNodeTopic[24];
// char mqttGroupTopic[24];
std::string mqttNodeTopic;
std::string mqttGroupTopic;
std::string mqttLwtTopic;
bool mqttEnabled = false;
bool mqttHAautodiscover = true;
////////////////////////////////////////////////////////////////////////////////////////////////////
// These defaults may be overwritten with values saved by the web interface
#ifndef MQTT_HOST
#define MQTT_HOST "";
#define MQTT_HOST "10.1.0.208";
#endif
#ifndef MQTT_PORT
@ -74,11 +74,11 @@ bool mqttHAautodiscover = true;
#endif
#ifndef MQTT_USER
#define MQTT_USER "";
#define MQTT_USER "hasp";
#endif
#ifndef MQTT_PASSW
#define MQTT_PASSW "";
#define MQTT_PASSW "hasp";
#endif
#ifndef MQTT_NODENAME
#define MQTT_NODENAME "";
@ -93,12 +93,11 @@ bool mqttHAautodiscover = true;
#define LWT_TOPIC "LWT"
char mqttServer[16] = MQTT_HOST;
char mqttUser[23] = MQTT_USER;
char mqttPassword[32] = MQTT_PASSW;
// char mqttNodeName[16] = MQTT_NODENAME;
char mqttGroupName[16] = MQTT_GROUPNAME;
uint16_t mqttPort = MQTT_PORT;
std::string mqttServer = MQTT_HOST;
std::string mqttUser = MQTT_USER;
std::string mqttPassword = MQTT_PASSW;
std::string mqttGroupName = MQTT_GROUPNAME;
uint16_t mqttPort = MQTT_PORT;
MQTTClient mqtt_client;
@ -131,15 +130,15 @@ static void mqtt_message_cb(char* topic, char* payload, size_t length)
LOG_TRACE(TAG_MQTT_RCV, F("%s = %s"), topic, (char*)payload);
if(topic == strstr(topic, mqttNodeTopic)) { // startsWith mqttNodeTopic
if(topic == strstr(topic, mqttNodeTopic.c_str())) { // startsWith mqttNodeTopic
// Node topic
topic += strlen(mqttNodeTopic); // shorten topic
topic += mqttNodeTopic.length(); // shorten topic
} else if(topic == strstr(topic, mqttGroupTopic)) { // startsWith mqttGroupTopic
} else if(topic == strstr(topic, mqttGroupTopic.c_str())) { // startsWith mqttGroupTopic
// Group topic
topic += strlen(mqttGroupTopic); // shorten topic
topic += mqttGroupTopic.length(); // shorten topic
dispatch_topic_payload(topic, (const char*)payload);
return;
@ -163,12 +162,8 @@ static void mqtt_message_cb(char* topic, char* payload, size_t length)
if(!strcasecmp_P((char*)payload, PSTR("offline"))) {
{
char msg[8];
char tmp_topic[strlen(mqttNodeTopic) + 8];
snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR("%s" LWT_TOPIC), mqttNodeTopic);
snprintf_P(msg, sizeof(msg), PSTR("online"));
// /*bool res =*/mqttClient.publish(tmp_topic, msg, true);
mqttPublish(tmp_topic, msg, strlen(msg), true);
mqttPublish(mqttLwtTopic.c_str(), msg, strlen(msg), true);
}
} else {
@ -179,11 +174,8 @@ static void mqtt_message_cb(char* topic, char* payload, size_t length)
}
}
int msgarrvd(void* context, char* topicName, int topicLen, MQTTClient_message* message)
int mqtt_message_arrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
{
// printf("MQT RCV >> ");
// printf("%s => %.*s (%d)\n", topicName, message->payloadlen, (char *)message->payload, message->payloadlen);
char msg[message->payloadlen + 1];
memcpy(msg, (char*)message->payload, message->payloadlen);
msg[message->payloadlen] = '\0';
@ -200,10 +192,10 @@ void mqtt_subscribe(void* context, const char* topic)
MQTTClient client = (MQTTClient)context;
int rc;
printf("Subscribing to topic %s\n", topic);
//\nfor client %s using QoS%d\n\n", topic, CLIENTID, QOS);
if((rc = MQTTClient_subscribe(client, topic, QOS)) != MQTTCLIENT_SUCCESS) {
printf("Failed to start subscribe, return code %d\n", rc);
LOG_WARNING(TAG_MQTT, D_BULLET D_MQTT_NOT_SUBSCRIBED, topic); // error code rc
} else {
LOG_VERBOSE(TAG_MQTT, D_BULLET D_MQTT_SUBSCRIBED, topic);
}
}
@ -222,7 +214,7 @@ int mqttPublish(const char* topic, const char* payload, size_t len, bool retain)
pubmsg.retained = retain;
MQTTClient_publishMessage(mqtt_client, topic, &pubmsg, &token);
int rc = MQTTClient_waitForCompletion(mqtt_client, token, TIMEOUT);
int rc = MQTTClient_waitForCompletion(mqtt_client, token, TIMEOUT); // time to wait in milliseconds
if(rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR(TAG_MQTT_PUB, F(D_MQTT_FAILED " '%s' => %s"), topic, payload);
@ -233,23 +225,18 @@ int mqttPublish(const char* topic, const char* payload, size_t len, bool retain)
}
}
// static bool mqttPublish(const char* topic, const char* payload, bool retain)
// {
// return mqttPublish(topic, payload, strlen(payload), retain);
// }
/* ===== Public HASP MQTT functions ===== */
bool mqttIsConnected()
{
return connected == 1;
return MQTTClient_isConnected(mqtt_client);
}
int mqtt_send_state(const __FlashStringHelper* subtopic, const char* payload)
{
char tmp_topic[strlen(mqttNodeTopic) + 20];
char tmp_topic[mqttNodeTopic.length() + 20];
// printf(("%sstate/%s\n"), mqttNodeTopic, subtopic);
snprintf_P(tmp_topic, sizeof(tmp_topic), ("%sstate/%s"), mqttNodeTopic, subtopic);
snprintf_P(tmp_topic, sizeof(tmp_topic), ("%sstate/%s"), mqttNodeTopic.c_str(), subtopic);
return mqttPublish(tmp_topic, payload, strlen(payload), false);
}
@ -262,8 +249,8 @@ int mqtt_send_discovery(const char* payload)
int mqtt_send_object_state(uint8_t pageid, uint8_t btnid, const char* payload)
{
char tmp_topic[strlen(mqttNodeTopic) + 20];
snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR("%sstate/p%ub%u"), mqttNodeTopic, pageid, btnid);
char tmp_topic[mqttNodeTopic.length() + 20];
snprintf_P(tmp_topic, sizeof(tmp_topic), PSTR("%sstate/p%ub%u"), mqttNodeTopic.c_str(), pageid, btnid);
return mqttPublish(tmp_topic, payload, strlen(payload), false);
}
@ -272,24 +259,23 @@ static void onConnect(void* context)
MQTTClient client = (MQTTClient)context;
connected = 1;
printf("Successful connection\n");
LOG_VERBOSE(TAG_MQTT, "Successful connection");
mqtt_subscribe(mqtt_client, TOPIC "command/#");
mqtt_subscribe(mqtt_client, TOPIC "config/#");
// mqtt_subscribe(mqtt_client, TOPIC "command");
mqtt_subscribe(mqtt_client, TOPIC "light/#");
mqtt_subscribe(mqtt_client, TOPIC "brightness/#");
mqtt_subscribe(mqtt_client, "hass/status");
std::string topic;
topic = mqttNodeTopic + "command/#";
mqtt_subscribe(mqtt_client, topic.c_str());
topic = mqttNodeTopic + "config/#";
mqtt_subscribe(mqtt_client, topic.c_str());
/* Home Assistant auto-configuration */
#ifdef HASP_USE_HA
if(mqttHAautodiscover) mqtt_subscribe(mqtt_client, "homeassistant/status");
topic = "homeassistant/status";
mqtt_subscribe(mqtt_client, topic.c_str());
#endif
mqttPublish(TOPIC LWT_TOPIC, "online", 6, true);
mqtt_send_object_state(0, 0, "connected");
std::cout << std::endl;
mqttPublish(mqttLwtTopic.c_str(), "online", 6, true);
}
void mqttStart()
@ -299,8 +285,8 @@ void mqttStart()
int rc;
int ch;
if((rc = MQTTClient_create(&mqtt_client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) !=
MQTTCLIENT_SUCCESS) {
if((rc = MQTTClient_create(&mqtt_client, mqttServer.c_str(), haspDevice.get_hostname(), MQTTCLIENT_PERSISTENCE_NONE,
NULL)) != MQTTCLIENT_SUCCESS) {
printf("Failed to create client, return code %d\n", rc);
rc = EXIT_FAILURE;
return;
@ -316,13 +302,13 @@ void mqttStart()
conn_opts.will->message = "offline";
conn_opts.will->qos = 1;
conn_opts.will->retained = 1;
conn_opts.will->topicName = "hasp/plate35/LWT";
conn_opts.will->topicName = mqttLwtTopic.c_str();
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = "hasp";
conn_opts.password = "hasp";
conn_opts.username = mqttUser.c_str();
conn_opts.password = mqttPassword.c_str();
if((rc = MQTTClient_connect(mqtt_client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to start connect, return code %d\n", rc);
@ -369,16 +355,30 @@ void mqttStop()
// return rc;
}
void mqttSetup(){};
void mqttSetup()
{
mqttNodeTopic = MQTT_PREFIX;
mqttNodeTopic += "/";
mqttNodeTopic += haspDevice.get_hostname();
mqttNodeTopic += "/";
mqttGroupTopic = MQTT_PREFIX;
mqttGroupTopic += "/";
mqttGroupTopic += mqttGroupName;
mqttGroupTopic += "/";
mqttLwtTopic = mqttNodeTopic;
mqttLwtTopic += LWT_TOPIC;
}
char* topicName;
int topicLen;
MQTTClient_message* message;
void mqttLoop()
{
int topicLen;
char* topicName; // Freed by msgarrvd
MQTTClient_message* message; // Freed by msgarrvd
int rc = MQTTClient_receive(mqtt_client, &topicName, &topicLen, &message, 4);
if(rc == MQTTCLIENT_SUCCESS && message) msgarrvd(mqtt_client, topicName, topicLen, message);
if(rc == MQTTCLIENT_SUCCESS && message) mqtt_message_arrived(mqtt_client, topicName, topicLen, message);
};
void mqttEvery5Seconds(bool wifiIsConnected){};