MQTT cleanup & reduction

This commit is contained in:
Blaz Kristan 2024-03-16 15:26:52 +01:00
parent 7b366d49d2
commit 52a1b0453c
3 changed files with 200 additions and 203 deletions

View File

@ -172,7 +172,9 @@ void updateInterfaces(uint8_t callMode)
espalexaDevice->setColor(col[0], col[1], col[2]); espalexaDevice->setColor(col[0], col[1], col[2]);
} }
#endif #endif
doPublishMqtt = true; #ifndef WLED_DISABLE_MQTT
publishMqtt();
#endif
} }
@ -180,9 +182,6 @@ void handleTransitions()
{ {
//handle still pending interface update //handle still pending interface update
updateInterfaces(interfaceUpdateCallMode); updateInterfaces(interfaceUpdateCallMode);
#ifndef WLED_DISABLE_MQTT
if (doPublishMqtt) publishMqtt();
#endif
if (transitionActive && strip.getTransition() > 0) { if (transitionActive && strip.getTransition() > 0) {
float tper = (millis() - transitionStartTime)/(float)strip.getTransition(); float tper = (millis() - transitionStartTime)/(float)strip.getTransition();

View File

@ -1,198 +1,197 @@
#include "wled.h" #include "wled.h"
/* /*
* MQTT communication protocol for home automation * MQTT communication protocol for home automation
*/ */
#ifdef WLED_ENABLE_MQTT #ifdef WLED_ENABLE_MQTT
#define MQTT_KEEP_ALIVE_TIME 60 // contact the MQTT broker every 60 seconds #define MQTT_KEEP_ALIVE_TIME 60 // contact the MQTT broker every 60 seconds
void parseMQTTBriPayload(char* payload) void parseMQTTBriPayload(char* payload)
{ {
if (strstr(payload, "ON") || strstr(payload, "on") || strstr(payload, "true")) {bri = briLast; stateUpdated(CALL_MODE_DIRECT_CHANGE);} if (strstr(payload, "ON") || strstr(payload, "on") || strstr(payload, "true")) {bri = briLast; stateUpdated(CALL_MODE_DIRECT_CHANGE);}
else if (strstr(payload, "T" ) || strstr(payload, "t" )) {toggleOnOff(); stateUpdated(CALL_MODE_DIRECT_CHANGE);} else if (strstr(payload, "T" ) || strstr(payload, "t" )) {toggleOnOff(); stateUpdated(CALL_MODE_DIRECT_CHANGE);}
else { else {
uint8_t in = strtoul(payload, NULL, 10); uint8_t in = strtoul(payload, NULL, 10);
if (in == 0 && bri > 0) briLast = bri; if (in == 0 && bri > 0) briLast = bri;
bri = in; bri = in;
stateUpdated(CALL_MODE_DIRECT_CHANGE); stateUpdated(CALL_MODE_DIRECT_CHANGE);
} }
} }
void onMqttConnect(bool sessionPresent) void onMqttConnect(bool sessionPresent)
{ {
//(re)subscribe to required topics //(re)subscribe to required topics
char subuf[38]; char subuf[38];
if (mqttDeviceTopic[0] != 0) { if (mqttDeviceTopic[0] != 0) {
strlcpy(subuf, mqttDeviceTopic, 33); strlcpy(subuf, mqttDeviceTopic, 33);
mqtt->subscribe(subuf, 0); mqtt->subscribe(subuf, 0);
strcat_P(subuf, PSTR("/col")); strcat_P(subuf, PSTR("/col"));
mqtt->subscribe(subuf, 0); mqtt->subscribe(subuf, 0);
strlcpy(subuf, mqttDeviceTopic, 33); strlcpy(subuf, mqttDeviceTopic, 33);
strcat_P(subuf, PSTR("/api")); strcat_P(subuf, PSTR("/api"));
mqtt->subscribe(subuf, 0); mqtt->subscribe(subuf, 0);
} }
if (mqttGroupTopic[0] != 0) { if (mqttGroupTopic[0] != 0) {
strlcpy(subuf, mqttGroupTopic, 33); strlcpy(subuf, mqttGroupTopic, 33);
mqtt->subscribe(subuf, 0); mqtt->subscribe(subuf, 0);
strcat_P(subuf, PSTR("/col")); strcat_P(subuf, PSTR("/col"));
mqtt->subscribe(subuf, 0); mqtt->subscribe(subuf, 0);
strlcpy(subuf, mqttGroupTopic, 33); strlcpy(subuf, mqttGroupTopic, 33);
strcat_P(subuf, PSTR("/api")); strcat_P(subuf, PSTR("/api"));
mqtt->subscribe(subuf, 0); mqtt->subscribe(subuf, 0);
} }
usermods.onMqttConnect(sessionPresent); usermods.onMqttConnect(sessionPresent);
doPublishMqtt = true; DEBUG_PRINTLN(F("MQTT ready"));
DEBUG_PRINTLN(F("MQTT ready")); publishMqtt();
} }
void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
static char *payloadStr; static char *payloadStr;
DEBUG_PRINT(F("MQTT msg: ")); DEBUG_PRINT(F("MQTT msg: "));
DEBUG_PRINTLN(topic); DEBUG_PRINTLN(topic);
// paranoia check to avoid npe if no payload // paranoia check to avoid npe if no payload
if (payload==nullptr) { if (payload==nullptr) {
DEBUG_PRINTLN(F("no payload -> leave")); DEBUG_PRINTLN(F("no payload -> leave"));
return; return;
} }
if (index == 0) { // start (1st partial packet or the only packet) if (index == 0) { // start (1st partial packet or the only packet)
if (payloadStr) delete[] payloadStr; // fail-safe: release buffer if (payloadStr) delete[] payloadStr; // fail-safe: release buffer
payloadStr = new char[total+1]; // allocate new buffer payloadStr = new char[total+1]; // allocate new buffer
} }
if (payloadStr == nullptr) return; // buffer not allocated if (payloadStr == nullptr) return; // buffer not allocated
// copy (partial) packet to buffer and 0-terminate it if it is last packet // copy (partial) packet to buffer and 0-terminate it if it is last packet
char* buff = payloadStr + index; char* buff = payloadStr + index;
memcpy(buff, payload, len); memcpy(buff, payload, len);
if (index + len >= total) { // at end if (index + len >= total) { // at end
payloadStr[total] = '\0'; // terminate c style string payloadStr[total] = '\0'; // terminate c style string
} else { } else {
DEBUG_PRINTLN(F("Partial packet received.")); DEBUG_PRINTLN(F("MQTT partial packet received."));
return; // process next packet return; // process next packet
} }
DEBUG_PRINTLN(payloadStr); DEBUG_PRINTLN(payloadStr);
size_t topicPrefixLen = strlen(mqttDeviceTopic); size_t topicPrefixLen = strlen(mqttDeviceTopic);
if (strncmp(topic, mqttDeviceTopic, topicPrefixLen) == 0) { if (strncmp(topic, mqttDeviceTopic, topicPrefixLen) == 0) {
topic += topicPrefixLen; topic += topicPrefixLen;
} else { } else {
topicPrefixLen = strlen(mqttGroupTopic); topicPrefixLen = strlen(mqttGroupTopic);
if (strncmp(topic, mqttGroupTopic, topicPrefixLen) == 0) { if (strncmp(topic, mqttGroupTopic, topicPrefixLen) == 0) {
topic += topicPrefixLen; topic += topicPrefixLen;
} else { } else {
// Non-Wled Topic used here. Probably a usermod subscribed to this topic. // Non-Wled Topic used here. Probably a usermod subscribed to this topic.
usermods.onMqttMessage(topic, payloadStr); usermods.onMqttMessage(topic, payloadStr);
delete[] payloadStr; delete[] payloadStr;
payloadStr = nullptr; payloadStr = nullptr;
return; return;
} }
} }
//Prefix is stripped from the topic at this point //Prefix is stripped from the topic at this point
if (strcmp_P(topic, PSTR("/col")) == 0) { if (strcmp_P(topic, PSTR("/col")) == 0) {
colorFromDecOrHexString(col, payloadStr); colorFromDecOrHexString(col, payloadStr);
colorUpdated(CALL_MODE_DIRECT_CHANGE); colorUpdated(CALL_MODE_DIRECT_CHANGE);
} else if (strcmp_P(topic, PSTR("/api")) == 0) { } else if (strcmp_P(topic, PSTR("/api")) == 0) {
if (!requestJSONBufferLock(15)) { if (!requestJSONBufferLock(15)) {
delete[] payloadStr; delete[] payloadStr;
payloadStr = nullptr; payloadStr = nullptr;
return; return;
} }
if (payloadStr[0] == '{') { //JSON API if (payloadStr[0] == '{') { //JSON API
deserializeJson(*pDoc, payloadStr); deserializeJson(*pDoc, payloadStr);
deserializeState(pDoc->as<JsonObject>()); deserializeState(pDoc->as<JsonObject>());
} else { //HTTP API } else { //HTTP API
String apireq = "win"; apireq += '&'; // reduce flash string usage String apireq = "win"; apireq += '&'; // reduce flash string usage
apireq += payloadStr; apireq += payloadStr;
handleSet(nullptr, apireq); handleSet(nullptr, apireq);
} }
releaseJSONBufferLock(); releaseJSONBufferLock();
} else if (strlen(topic) != 0) { } else if (strlen(topic) != 0) {
// non standard topic, check with usermods // non standard topic, check with usermods
usermods.onMqttMessage(topic, payloadStr); usermods.onMqttMessage(topic, payloadStr);
} else { } else {
// topmost topic (just wled/MAC) // topmost topic (just wled/MAC)
parseMQTTBriPayload(payloadStr); parseMQTTBriPayload(payloadStr);
} }
delete[] payloadStr; delete[] payloadStr;
payloadStr = nullptr; payloadStr = nullptr;
} }
void publishMqtt() void publishMqtt()
{ {
doPublishMqtt = false; if (!WLED_MQTT_CONNECTED) return;
if (!WLED_MQTT_CONNECTED) return; DEBUG_PRINTLN(F("Publish MQTT"));
DEBUG_PRINTLN(F("Publish MQTT"));
#ifndef USERMOD_SMARTNEST
#ifndef USERMOD_SMARTNEST char s[10];
char s[10]; char subuf[48];
char subuf[38];
sprintf_P(s, PSTR("%u"), bri);
sprintf_P(s, PSTR("%u"), bri); strlcpy(subuf, mqttDeviceTopic, 33);
strlcpy(subuf, mqttDeviceTopic, 33); strcat_P(subuf, PSTR("/g"));
strcat_P(subuf, PSTR("/g")); mqtt->publish(subuf, 0, retainMqttMsg, s); // optionally retain message (#2263)
mqtt->publish(subuf, 0, retainMqttMsg, s); // optionally retain message (#2263)
sprintf_P(s, PSTR("#%06X"), (col[3] << 24) | (col[0] << 16) | (col[1] << 8) | (col[2]));
sprintf_P(s, PSTR("#%06X"), (col[3] << 24) | (col[0] << 16) | (col[1] << 8) | (col[2])); strlcpy(subuf, mqttDeviceTopic, 33);
strlcpy(subuf, mqttDeviceTopic, 33); strcat_P(subuf, PSTR("/c"));
strcat_P(subuf, PSTR("/c")); mqtt->publish(subuf, 0, retainMqttMsg, s); // optionally retain message (#2263)
mqtt->publish(subuf, 0, retainMqttMsg, s); // optionally retain message (#2263)
strlcpy(subuf, mqttDeviceTopic, 33);
strlcpy(subuf, mqttDeviceTopic, 33); strcat_P(subuf, PSTR("/status"));
strcat_P(subuf, PSTR("/status")); mqtt->publish(subuf, 0, true, "online"); // retain message for a LWT
mqtt->publish(subuf, 0, true, "online"); // retain message for a LWT
char apires[1024]; // allocating 1024 bytes from stack can be risky
char apires[1024]; // allocating 1024 bytes from stack can be risky XML_response(nullptr, apires);
XML_response(nullptr, apires); strlcpy(subuf, mqttDeviceTopic, 33);
strlcpy(subuf, mqttDeviceTopic, 33); strcat_P(subuf, PSTR("/v"));
strcat_P(subuf, PSTR("/v")); mqtt->publish(subuf, 0, retainMqttMsg, apires); // optionally retain message (#2263)
mqtt->publish(subuf, 0, retainMqttMsg, apires); // optionally retain message (#2263) #endif
#endif }
}
//HA autodiscovery was removed in favor of the native integration in HA v0.102.0
//HA autodiscovery was removed in favor of the native integration in HA v0.102.0
bool initMqtt()
bool initMqtt() {
{ if (!mqttEnabled || mqttServer[0] == 0 || !WLED_CONNECTED) return false;
if (!mqttEnabled || mqttServer[0] == 0 || !WLED_CONNECTED) return false;
if (mqtt == nullptr) {
if (mqtt == nullptr) { mqtt = new AsyncMqttClient();
mqtt = new AsyncMqttClient(); mqtt->onMessage(onMqttMessage);
mqtt->onMessage(onMqttMessage); mqtt->onConnect(onMqttConnect);
mqtt->onConnect(onMqttConnect); }
} if (mqtt->connected()) return true;
if (mqtt->connected()) return true;
DEBUG_PRINTLN(F("Reconnecting MQTT"));
DEBUG_PRINTLN(F("Reconnecting MQTT")); IPAddress mqttIP;
IPAddress mqttIP; if (mqttIP.fromString(mqttServer)) //see if server is IP or domain
if (mqttIP.fromString(mqttServer)) //see if server is IP or domain {
{ mqtt->setServer(mqttIP, mqttPort);
mqtt->setServer(mqttIP, mqttPort); } else {
} else { mqtt->setServer(mqttServer, mqttPort);
mqtt->setServer(mqttServer, mqttPort); }
} mqtt->setClientId(mqttClientID);
mqtt->setClientId(mqttClientID); if (mqttUser[0] && mqttPass[0]) mqtt->setCredentials(mqttUser, mqttPass);
if (mqttUser[0] && mqttPass[0]) mqtt->setCredentials(mqttUser, mqttPass);
#ifndef USERMOD_SMARTNEST
#ifndef USERMOD_SMARTNEST strlcpy(mqttStatusTopic, mqttDeviceTopic, 33);
strlcpy(mqttStatusTopic, mqttDeviceTopic, 33); strcat_P(mqttStatusTopic, PSTR("/status"));
strcat_P(mqttStatusTopic, PSTR("/status")); mqtt->setWill(mqttStatusTopic, 0, true, "offline"); // LWT message
mqtt->setWill(mqttStatusTopic, 0, true, "offline"); // LWT message #endif
#endif mqtt->setKeepAlive(MQTT_KEEP_ALIVE_TIME);
mqtt->setKeepAlive(MQTT_KEEP_ALIVE_TIME); mqtt->connect();
mqtt->connect(); return true;
return true; }
} #endif
#endif

1
wled00/wled.h Executable file → Normal file
View File

@ -705,7 +705,6 @@ WLED_GLOBAL byte optionType;
WLED_GLOBAL bool doSerializeConfig _INIT(false); // flag to initiate saving of config WLED_GLOBAL bool doSerializeConfig _INIT(false); // flag to initiate saving of config
WLED_GLOBAL bool doReboot _INIT(false); // flag to initiate reboot from async handlers WLED_GLOBAL bool doReboot _INIT(false); // flag to initiate reboot from async handlers
WLED_GLOBAL bool doPublishMqtt _INIT(false);
// status led // status led
#if defined(STATUSLED) #if defined(STATUSLED)