Refactored rules `Subscribe` using LList allowing full message size and enabled by default

This commit is contained in:
Theo Arends 2024-01-22 16:57:59 +01:00
parent 1ffbbc914d
commit 10885fe60d
8 changed files with 123 additions and 131 deletions

View File

@ -27,8 +27,9 @@ All notable changes to this project will be documented in this file.
- Command ``TimedPower`` from erasing all timers to showing remaining timers - Command ``TimedPower`` from erasing all timers to showing remaining timers
- ESP8266 platform update from 2024.01.00 to 2024.01.01 (#20539) - ESP8266 platform update from 2024.01.00 to 2024.01.01 (#20539)
- ESP8266 Framework (Arduino Core) from v2.7.5 to v2.7.6 (#20539) - ESP8266 Framework (Arduino Core) from v2.7.5 to v2.7.6 (#20539)
- Refactor Pio filesystem download script (#20544) - Refactored Pio filesystem download script (#20544)
- Command ``TimedPower`` refactored from String to LList - Command ``TimedPower`` refactored from String to LList
- Refactored rules ``Subscribe`` using LList allowing full message size and enabled by default
### Fixed ### Fixed
- Scripter memory leak in `>w x` (#20473) - Scripter memory leak in `>w x` (#20473)

View File

@ -164,12 +164,13 @@ The latter links can be used for OTA upgrades too like ``OtaUrl https://ota.tasm
- ESP8266 Framework (Arduino Core) from v2.7.4.9 to v2.7.6 [#20539](https://github.com/arendst/Tasmota/issues/20539) - ESP8266 Framework (Arduino Core) from v2.7.4.9 to v2.7.6 [#20539](https://github.com/arendst/Tasmota/issues/20539)
- ESP32 platform update from 2023.11.01 to 2024.01.01 [#20473](https://github.com/arendst/Tasmota/issues/20473) - ESP32 platform update from 2023.11.01 to 2024.01.01 [#20473](https://github.com/arendst/Tasmota/issues/20473)
- Renamed button "Consoles" to "Tools" - Renamed button "Consoles" to "Tools"
- Refactored rule ``Subscribe`` using LList allowing full message size and enabled by default
- Support syslog updates every sleep or every second if `#define SYSLOG_UPDATE_SECOND` [#20260](https://github.com/arendst/Tasmota/issues/20260) - Support syslog updates every sleep or every second if `#define SYSLOG_UPDATE_SECOND` [#20260](https://github.com/arendst/Tasmota/issues/20260)
- Web file upload response on upload error [#20340](https://github.com/arendst/Tasmota/issues/20340) - Web file upload response on upload error [#20340](https://github.com/arendst/Tasmota/issues/20340)
- Header `Host` is now collected by Webserver [#20446](https://github.com/arendst/Tasmota/issues/20446) - Header `Host` is now collected by Webserver [#20446](https://github.com/arendst/Tasmota/issues/20446)
- Webcam tweaks [#20451](https://github.com/arendst/Tasmota/issues/20451) - Webcam tweaks [#20451](https://github.com/arendst/Tasmota/issues/20451)
- IP stack compatible with new Core3 IPv6 implementation [#20509](https://github.com/arendst/Tasmota/issues/20509) - IP stack compatible with new Core3 IPv6 implementation [#20509](https://github.com/arendst/Tasmota/issues/20509)
- Refactor Pio filesystem download script [#20544](https://github.com/arendst/Tasmota/issues/20544) - Refactored Pio filesystem download script [#20544](https://github.com/arendst/Tasmota/issues/20544)
### Fixed ### Fixed
- CVE-2021-36603 Cross Site Scripting (XSS) vulnerability [#12221](https://github.com/arendst/Tasmota/issues/12221) - CVE-2021-36603 Cross Site Scripting (XSS) vulnerability [#12221](https://github.com/arendst/Tasmota/issues/12221)

View File

@ -497,7 +497,7 @@
// -- Rules or Script ---------------------------- // -- Rules or Script ----------------------------
// Select none or only one of the below defines USE_RULES or USE_SCRIPT // Select none or only one of the below defines USE_RULES or USE_SCRIPT
#define USE_RULES // Add support for rules (+13k code, +768 bytes mem) #define USE_RULES // Add support for rules (+13k code, +768 bytes mem)
// #define SUPPORT_MQTT_EVENT // Support trigger event with MQTT subscriptions (+3k7 code) #define SUPPORT_MQTT_EVENT // Support trigger event with MQTT subscriptions (+1k8 code)
// #define USE_EXPRESSION // Add support for expression evaluation in rules (+3k3 code) // #define USE_EXPRESSION // Add support for expression evaluation in rules (+3k3 code)
// #define SUPPORT_IF_STATEMENT // Add support for IF statement in rules (+3k3) // #define SUPPORT_IF_STATEMENT // Add support for IF statement in rules (+3k3)
// #define USER_RULE1 "<Any rule1 data>" // Add rule1 data saved at initial firmware load or when command reset is executed // #define USER_RULE1 "<Any rule1 data>" // Add rule1 data saved at initial firmware load or when command reset is executed

View File

@ -579,6 +579,7 @@ bool IsNumeric(const char* value) {
char* Trim(char* p) { char* Trim(char* p) {
// Remove leading and trailing tab, \n, \v, \f, \r and space // Remove leading and trailing tab, \n, \v, \f, \r and space
if (p == nullptr) { return p; }
if (*p != '\0') { if (*p != '\0') {
while ((*p != '\0') && isspace(*p)) { p++; } // Trim leading spaces while ((*p != '\0') && isspace(*p)) { p++; } // Trim leading spaces
char* q = p + strlen(p) -1; char* q = p + strlen(p) -1;

View File

@ -251,6 +251,10 @@ void CmndWifiTest(void)
#endif // not defined FIRMWARE_MINIMAL_ONLY #endif // not defined FIRMWARE_MINIMAL_ONLY
void ResponseCmnd(void) {
Response_P(PSTR("{\"%s\":"), XdrvMailbox.command);
}
void ResponseCmndNumber(int value) { void ResponseCmndNumber(int value) {
Response_P(S_JSON_COMMAND_NVALUE, XdrvMailbox.command, value); Response_P(S_JSON_COMMAND_NVALUE, XdrvMailbox.command, value);
} }
@ -696,7 +700,7 @@ void ResetTimedCmnd(const char *command) {
void ShowTimedCmnd(const char *command) { void ShowTimedCmnd(const char *command) {
bool found = false; bool found = false;
uint32_t now = millis(); uint32_t now = millis();
Response_P(PSTR("{\"%s\":"), XdrvMailbox.command); ResponseCmnd(); // {"TimedPower":
for (auto &elem : timed_cmnd) { for (auto &elem : timed_cmnd) {
if (strncmp(command, elem.command, strlen(command)) == 0) { // StartsWith if (strncmp(command, elem.command, strlen(command)) == 0) { // StartsWith
ResponseAppend_P(PSTR("%s{\"" D_JSON_REMAINING "\":%d,\"" D_JSON_COMMAND "\":\"%s\"}"), ResponseAppend_P(PSTR("%s{\"" D_JSON_REMAINING "\":%d,\"" D_JSON_COMMAND "\":\"%s\"}"),

View File

@ -959,7 +959,7 @@ void EnergyCommandCalSetResponse(uint32_t cal_type) {
void EnergyCommandCalResponse(uint32_t cal_type) { void EnergyCommandCalResponse(uint32_t cal_type) {
Energy->command_code = cal_type; // Is XxxCal command too Energy->command_code = cal_type; // Is XxxCal command too
if (XnrgCall(FUNC_COMMAND)) { // XxxCal if (XnrgCall(FUNC_COMMAND)) { // XxxCal
Response_P(PSTR("{\"%s\":"), XdrvMailbox.command); ResponseCmnd();
EnergyCommandCalSetResponse(cal_type); EnergyCommandCalSetResponse(cal_type);
} }
} }

View File

@ -1178,7 +1178,7 @@ void EnergyCommandCalSetResponse(uint32_t cal_type) {
void EnergyCommandCalResponse(uint32_t cal_type) { void EnergyCommandCalResponse(uint32_t cal_type) {
Energy->command_code = cal_type; // Is XxxCal command too Energy->command_code = cal_type; // Is XxxCal command too
if (XnrgCall(FUNC_COMMAND)) { // XxxCal if (XnrgCall(FUNC_COMMAND)) { // XxxCal
Response_P(PSTR("{\"%s\":"), XdrvMailbox.command); ResponseCmnd();
EnergyCommandCalSetResponse(cal_type); EnergyCommandCalSetResponse(cal_type);
} }
} }

View File

@ -167,16 +167,6 @@ void (* const RulesCommand[])(void) PROGMEM = {
#endif #endif
}; };
#ifdef SUPPORT_MQTT_EVENT
#include <LinkedList.h> // Import LinkedList library
typedef struct {
String Event;
String Topic;
String Key;
} MQTT_Subscription;
LinkedList<MQTT_Subscription> subscriptions;
#endif // SUPPORT_MQTT_EVENT
struct RULES { struct RULES {
String event_value; String event_value;
unsigned long timer[MAX_RULE_TIMERS] = { 0 }; unsigned long timer[MAX_RULE_TIMERS] = { 0 };
@ -1156,6 +1146,14 @@ void RulesSetPower(void)
} }
#ifdef SUPPORT_MQTT_EVENT #ifdef SUPPORT_MQTT_EVENT
typedef struct {
char* event;
char* topic;
char* key;
} MQTT_Subscription;
LList<MQTT_Subscription> subscriptions;
/********************************************************************************************/ /********************************************************************************************/
/* /*
* Rules: Process received MQTT message. * Rules: Process received MQTT message.
@ -1167,68 +1165,90 @@ void RulesSetPower(void)
* false - The message is not in our list. * false - The message is not in our list.
*/ */
bool RulesMqttData(void) { bool RulesMqttData(void) {
if ((XdrvMailbox.data_len < 1) || (XdrvMailbox.data_len > RULE_MAX_MQTT_EVENTSZ)) { /*
return false; XdrvMailbox.topic = topic;
XdrvMailbox.index = strlen(topic);
XdrvMailbox.data = (char*)data;
XdrvMailbox.data_len = data_len;
*/
if (XdrvMailbox.data_len < 1) {
return false; // Process unchanged data
} }
bool serviced = false; bool serviced = false;
String sTopic = XdrvMailbox.topic; String buData = XdrvMailbox.data; // Could be very long SENSOR message
String buData = XdrvMailbox.data;
//AddLog(LOG_LEVEL_DEBUG, PSTR("RUL: MQTT Topic %s, Event %s"), XdrvMailbox.topic, XdrvMailbox.data);
MQTT_Subscription event_item;
//Looking for matched topic
char json_event[RULE_MAX_MQTT_EVENTSZ +32]; // Add chars for {"Event":{"<item.Event>": .. }
for (uint32_t index = 0; index < subscriptions.size(); index++) {
String sData = buData; // Looking for matched topic
for (auto &event_item : subscriptions) {
char stopic[strlen(event_item.topic)+2];
strcpy(stopic, event_item.topic);
strcat(stopic, "/");
if ((strcmp(XdrvMailbox.topic, event_item.topic) == 0) || // Equal
(strncmp(XdrvMailbox.topic, stopic, strlen(XdrvMailbox.topic)) == 0)) { // StartsWith
event_item = subscriptions.get(index); // This topic is subscribed by us, so serve it
//AddLog(LOG_LEVEL_DEBUG, PSTR("RUL: Match MQTT message Topic %s with subscription topic %s"), sTopic.c_str(), event_item.Topic.c_str());
if ((sTopic == event_item.Topic) || sTopic.startsWith(event_item.Topic+"/")) {
//This topic is subscribed by us, so serve it
serviced = true; serviced = true;
String value; String value;
if (event_item.Key.length() == 0) { //If did not specify Key if (strlen(event_item.key) == 0) { // If did not specify Key
value = sData; value = buData;
} else { //If specified Key, need to parse Key/Value from JSON data } else { // If specified Key, need to parse Key/Value from JSON data
String sData = buData;
JsonParser parser((char*)sData.c_str()); JsonParser parser((char*)sData.c_str());
JsonParserObject jsonData = parser.getRootObject(); JsonParserObject jsonData = parser.getRootObject();
if (!jsonData) break; // Failed to parse JSON data, ignore this message.
String key1 = event_item.Key; String key1 = event_item.key;
String key2; String key2;
if (!jsonData) break; //Failed to parse JSON data, ignore this message.
int dot; int dot;
if ((dot = key1.indexOf('.')) > 0) { if ((dot = key1.indexOf('.')) > 0) {
key2 = key1.substring(dot+1); key2 = key1.substring(dot+1);
key1 = key1.substring(0, dot); key1 = key1.substring(0, dot);
JsonParserToken value_tok = jsonData[key1.c_str()].getObject()[key2.c_str()]; JsonParserToken value_tok = jsonData[key1.c_str()].getObject()[key2.c_str()];
if (!value_tok) break; //Failed to get the key/value, ignore this message. if (!value_tok) break; // Failed to get the key/value, ignore this message.
value = value_tok.getStr(); value = value_tok.getStr();
// if (!jsonData[key1][key2].success()) break; //Failed to get the key/value, ignore this message. // if (!jsonData[key1][key2].success()) break; //Failed to get the key/value, ignore this message.
// value = (const char *)jsonData[key1][key2]; // value = (const char *)jsonData[key1][key2];
} else { } else {
JsonParserToken value_tok = jsonData[key1.c_str()]; JsonParserToken value_tok = jsonData[key1.c_str()];
if (!value_tok) break; //Failed to get the key/value, ignore this message. if (!value_tok) break; // Failed to get the key/value, ignore this message.
value = value_tok.getStr(); value = value_tok.getStr();
// if (!jsonData[key1].success()) break; // if (!jsonData[key1].success()) break;
// value = (const char *)jsonData[key1]; // value = (const char *)jsonData[key1];
} }
} }
value.trim(); value.trim();
/*
//Create an new event. Cannot directly call RulesProcessEvent().
snprintf_P(Rules.event_data, sizeof(Rules.event_data), PSTR("%s=%s"), event_item.Event.c_str(), value.c_str());
// 20230107 Superseded by the following code
*/
bool quotes = (value[0] != '{'); bool quotes = (value[0] != '{');
snprintf_P(json_event, sizeof(json_event), PSTR("{\"Event\":{\"%s\":%s%s%s}}"), event_item.Event.c_str(), (quotes)?"\"":"", value.c_str(), (quotes)?"\"":""); Response_P(PSTR("{\"Event\":{\"%s\":%s%s%s}}"), event_item.event, (quotes)?"\"":"", value.c_str(), (quotes)?"\"":"");
RulesProcessEvent(json_event); RulesProcessEvent(ResponseData());
} }
} }
return serviced; return serviced;
} }
bool RuleUnsubscribe(const char* event) {
UpperCase((char*)event, event);
bool do_all = (strcmp(event, "*") == 0); // Wildcard
//Search all subscriptions
for (auto &index : subscriptions) {
if (do_all || // All
(strcmp(event, index.event) == 0)) { // Equal
//If find exists one, remove it.
char stopic[strlen(index.topic)+3];
strcpy(stopic, index.topic);
strcat(stopic, "/#");
MqttUnsubscribe(stopic);
free(index.key);
free(index.topic);
free(index.event);
subscriptions.remove(&index);
if (!do_all) {
return true;
}
}
}
return do_all;
}
/********************************************************************************************/ /********************************************************************************************/
/* /*
* Subscribe a MQTT topic (with or without key) and assign an event name to it * Subscribe a MQTT topic (with or without key) and assign an event name to it
@ -1245,77 +1265,58 @@ bool RulesMqttData(void) {
* Return: * Return:
* A string include subscribed event, topic and key. * A string include subscribed event, topic and key.
*/ */
void CmndSubscribe(void) void CmndSubscribe(void) {
{
MQTT_Subscription subscription_item;
String events;
if (XdrvMailbox.data_len > 0) { if (XdrvMailbox.data_len > 0) {
char parameters[XdrvMailbox.data_len+1]; char* event = Trim(strtok(XdrvMailbox.data, ","));
memcpy(parameters, XdrvMailbox.data, XdrvMailbox.data_len); char* topic = Trim(strtok(nullptr, ","));
parameters[XdrvMailbox.data_len] = '\0'; char* key = Trim(strtok(nullptr, ","));
String event_name, topic, key;
char * pos = strtok(parameters, ","); if (event && topic) {
if (pos) { RuleUnsubscribe(event);
event_name = Trim(pos);
pos = strtok(nullptr, ","); // Add "/#" to the topic
if (pos) { uint32_t slen = strlen(topic);
topic = Trim(pos); char stopic[slen +3];
pos = strtok(nullptr, ","); strcpy(stopic, topic);
if (pos) { if (stopic[slen-1] != '#') {
key = Trim(pos); if (stopic[slen-1] == '/') {
} strcat(stopic, "#");
}
}
//AddLog(LOG_LEVEL_DEBUG, PSTR("RUL: Subscribe command with parameters: %s, %s, %s."), event_name.c_str(), topic.c_str(), key.c_str());
event_name.toUpperCase();
if (event_name.length() > 0 && topic.length() > 0) {
//Search all subscriptions
for (uint32_t index=0; index < subscriptions.size(); index++) {
if (subscriptions.get(index).Event.equals(event_name)) {
//If find exists one, remove it.
String stopic = subscriptions.get(index).Topic + "/#";
MqttUnsubscribe(stopic.c_str());
subscriptions.remove(index);
break;
}
}
//Add "/#" to the topic
if (!topic.endsWith("#")) {
if (topic.endsWith("/")) {
topic.concat("#");
} else { } else {
topic.concat("/#"); strcat(stopic, "/#");
} }
} }
//AddLog(LOG_LEVEL_DEBUG, PSTR("RUL: New topic: %s."), topic.c_str());
//MQTT Subscribe
subscription_item.Event = event_name;
subscription_item.Topic = topic.substring(0, topic.length() - 2); //Remove "/#" so easy to match
subscription_item.Key = key;
subscriptions.add(subscription_item);
if (2 == XdrvMailbox.index) { if (!key) { key = EmptyStr; }
topic = subscription_item.Topic; // Do not append "/#""
}
MqttSubscribe(topic.c_str());
events.concat(event_name + "," + topic // MQTT Subscribe
+ (key.length()>0 ? "," : "") char* hevent = (char*)malloc(strlen(event) +1);
+ key); char* htopic = (char*)malloc(strlen(stopic) -1); // Remove "/#"
} else { char* hkey = (char*)malloc(strlen(key) +1);
events = D_JSON_WRONG_PARAMETERS; if (hevent && htopic && hkey) {
} strcpy(hevent, event);
} else { strlcpy(htopic, stopic, strlen(stopic)-1); // Remove "/#" so easy to match
//If did not specify the event name, list all subscribed event strcpy(hkey, key);
for (uint32_t index=0; index < subscriptions.size(); index++) { MQTT_Subscription &subscription_item = subscriptions.addToLast();
subscription_item = subscriptions.get(index); subscription_item.event = hevent;
events.concat(subscription_item.Event + "," + subscription_item.Topic subscription_item.topic = htopic;
+ (subscription_item.Key.length()>0 ? "," : "") subscription_item.key = hkey;
+ subscription_item.Key + "; "); char* ftopic = (2 == XdrvMailbox.index)?htopic:stopic; // Subscribe2
MqttSubscribe(ftopic);
ResponseCmnd(); // {"Subscribe":
ResponseAppend_P(PSTR("\"%s,%s%s%s\"}"), hevent, ftopic, (strlen(hkey))?",":"", EscapeJSONString(hkey).c_str());
}
} }
return; // {"Error"}
} }
ResponseCmndChar(events.c_str()); // If did not specify the event name, list all subscribed event
bool found = false;
ResponseCmnd(); // {"Subscribe":
for (auto &items : subscriptions) {
ResponseAppend_P(PSTR("%s%s,%s%s%s"),
(found) ? "; " : "\"", items.event, items.topic, (strlen(items.key))?",":"", EscapeJSONString(items.key).c_str());
found = true;
}
ResponseAppend_P((found) ? PSTR("\"}") : PSTR("\"" D_JSON_EMPTY "\"}"));
} }
/********************************************************************************************/ /********************************************************************************************/
@ -1329,32 +1330,16 @@ void CmndSubscribe(void)
* Return: * Return:
* list all the events unsubscribed. * list all the events unsubscribed.
*/ */
void CmndUnsubscribe(void) void CmndUnsubscribe(void) {
{
MQTT_Subscription subscription_item;
String events;
if (XdrvMailbox.data_len > 0) { if (XdrvMailbox.data_len > 0) {
for (uint32_t index = 0; index < subscriptions.size(); index++) { char* event = Trim(XdrvMailbox.data);
subscription_item = subscriptions.get(index); if (RuleUnsubscribe(event)) {
if (subscription_item.Event.equalsIgnoreCase(XdrvMailbox.data)) { ResponseCmndChar(event);
String stopic = subscription_item.Topic + "/#";
MqttUnsubscribe(stopic.c_str());
events = subscription_item.Event;
subscriptions.remove(index);
break;
}
}
} else {
// If did not specify the event name, unsubscribe all event
String stopic;
while (subscriptions.size() > 0) {
events.concat(subscriptions.get(0).Event + "; ");
stopic = subscriptions.get(0).Topic + "/#";
MqttUnsubscribe(stopic.c_str());
subscriptions.remove(0);
} }
return; // {"Error"}
} }
ResponseCmndChar(events.c_str()); RuleUnsubscribe("*");
ResponseCmndDone();
} }
#endif // SUPPORT_MQTT_EVENT #endif // SUPPORT_MQTT_EVENT