diff --git a/tasmota/xdrv_02_mqtt_1_file.ino b/tasmota/xdrv_02_mqtt_1_file.ino index ec1a9e698..035dd6ef6 100644 --- a/tasmota/xdrv_02_mqtt_1_file.ino +++ b/tasmota/xdrv_02_mqtt_1_file.ino @@ -24,6 +24,8 @@ * MQTT file transfer * * Supports both binary and base64 encoded binary data transfer + * + * See tools/mqtt-file for python ota-upload and settings-upload and download examples \*********************************************************************************************/ #include @@ -41,32 +43,13 @@ struct FMQTT { String file_md5; // MQTT received file md5 (32 chars) uint16_t topic_size; // MQTT topic length with terminating uint8_t file_id = 0; // MQTT unique file id during upload/download + bool file_binary = false; // MQTT binary file transfer } FMqtt; -/* - The download chunk size is the data size before it is encoded to base64. - It is smaller than the upload chunksize as it is bound by MESSZ - The download buffer with length MESSZ (1042) contains - - Payload ({"Id":117,"Data":""}) -*/ const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""} -const uint32_t mqtt_file_chuck_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2; - -uint32_t FileUploadChunckSize(void) { -/* - The upload chunk size is the data size of the payload. - It can be larger than the download chunksize which is bound by MESSZ - The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains - - Header of 5 bytes (MQTT_MAX_HEADER_SIZE) - - Topic string terminated with a zero (stat/demo/FILEUPLOAD) - - Payload ({"Id":116,"Data":""}) or () -*/ - const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE - return MqttClient.getBufferSize() - PubSubClientHeaderSize - FMqtt.topic_size -1; -} uint32_t MqttFileUploadValidate(uint32_t rcv_id) { - if (XdrvMailbox.grpflg) { return 5; } + if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported if ((0 == FMqtt.file_id) && (rcv_id > 0) && (FMqtt.file_size > 0) && (FMqtt.file_type > 0)) { FMqtt.file_buffer = nullptr; // Init upload buffer @@ -92,7 +75,8 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) { if (UPL_TASMOTA == FMqtt.file_type) { if (Update.begin(FMqtt.file_size)) { FMqtt.file_buffer = &FMqtt.file_id; // Dummy buffer -// TasmotaGlobal.blinkstate = true; // Stay lit + TasmotaGlobal.blinks = 201; + TasmotaGlobal.blinkstate = true; // Stay lit SettingsSave(1); // Free flash for OTA update } } @@ -114,11 +98,12 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) { ResponseCmndChar(PSTR(D_JSON_STARTED)); MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD } - else if ((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) { + else if (((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) || (0 == XdrvMailbox.payload)) { // Error receiving data if (UPL_TASMOTA == FMqtt.file_type) { Update.end(true); + TasmotaGlobal.blinkstate = false; // Turn led off } else if (UPL_SETTINGS == FMqtt.file_type) { SettingsBufferFree(); @@ -128,6 +113,36 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) { return 0; // No error } +void MqttFileValidate(uint32_t error) { + if (error) { + FMqtt.file_buffer = nullptr; + + TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging + + if (4 == error) { + ResponseCmndChar(PSTR(D_JSON_ABORTED)); + } else { + char error_txt[20]; + snprintf_P(error_txt, sizeof(error_txt), PSTR(D_JSON_ERROR " %d"), error); + ResponseCmndChar(error_txt); + } + } +} + +void MqttFilePublish(void) { + if (!FMqtt.file_buffer) { + TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging + FMqtt.file_id = 0; + FMqtt.file_size = 0; + FMqtt.file_type = 0; + FMqtt.file_binary = false; + FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory + FMqtt.file_password = nullptr; + } + MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); + ResponseClear(); +} + void CmndFileUpload(void) { /* Upload bytes chunks of data either base64 encoded or binary with MD5 hash @@ -152,13 +167,12 @@ void CmndFileUpload(void) { */ const char* base64_data = nullptr; uint32_t rcv_id = 0; - char* dataBuf = (char*)XdrvMailbox.data; bool binary_data = (XdrvMailbox.index > 199); // Check for raw data if (!binary_data) { - if (strlen(dataBuf) > 8) { // Workaround exception if empty JSON like {} - Needs checks - JsonParser parser((char*) dataBuf); + if (strlen(XdrvMailbox.data) > 8) { // Workaround exception if empty JSON like {} - Needs checks + JsonParser parser((char*) XdrvMailbox.data); JsonParserObject root = parser.getRootObject(); if (root) { JsonParserToken val = root[PSTR("ID")]; @@ -178,17 +192,7 @@ void CmndFileUpload(void) { } else { rcv_id = FMqtt.file_id; } - - uint32_t error = MqttFileUploadValidate(rcv_id); - if (error) { - FMqtt.file_buffer = nullptr; - - TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging - - char error_txt[20]; - snprintf_P(error_txt, sizeof(error_txt), PSTR(D_JSON_ERROR " %d"), error); - ResponseCmndChar(error_txt); - } + MqttFileValidate(MqttFileUploadValidate(rcv_id)); if (FMqtt.file_buffer) { if ((FMqtt.file_pos < FMqtt.file_size) && (binary_data || base64_data)) { @@ -232,7 +236,16 @@ void CmndFileUpload(void) { if ((FMqtt.file_pos < FMqtt.file_size) || (FMqtt.file_md5.length() != 32)) { TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging - uint32_t chunk_size = FileUploadChunckSize(); + /* + The upload chunk size is the data size of the payload. + It can be larger than the download chunksize which is bound by MESSZ + The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains + - Header of 5 bytes (MQTT_MAX_HEADER_SIZE) + - Topic string terminated with a zero (stat/demo/FILEUPLOAD) + - Payload ({"Id":116,"Data":""}) or () + */ + const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE + uint32_t chunk_size = MqttClient.getBufferSize() - PubSubClientHeaderSize - FMqtt.topic_size -1; if (!binary_data) { chunk_size = (((chunk_size - FileTransferHeaderSize) / 4) * 3) -2; // Calculate base64 chunk size } @@ -248,16 +261,17 @@ void CmndFileUpload(void) { if (UPL_TASMOTA == FMqtt.file_type) { if (!Update.end(true)) { + TasmotaGlobal.blinkstate = false; // Turn led off ResponseCmndFailed(); } else { - TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update + TasmotaGlobal.restart_flag = 2; // Restart to load new firmware } } else if (UPL_SETTINGS == FMqtt.file_type) { if (!SettingsConfigRestore()) { ResponseCmndFailed(); } else { - TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update + TasmotaGlobal.restart_flag = 2; // Restart to load new settings } } @@ -266,95 +280,137 @@ void CmndFileUpload(void) { } } - if (!FMqtt.file_buffer) { - TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging - FMqtt.file_id = 0; - FMqtt.file_size = 0; - FMqtt.file_type = 0; - FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory - FMqtt.file_password = nullptr; + MqttFilePublish(); +} + +uint32_t MqttFileDownloadValidate(void) { + if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported + + if ((0 == FMqtt.file_id) && (FMqtt.file_type > 0)) { + FMqtt.file_buffer = nullptr; // Init upload buffer + + if (!FMqtt.file_password || (strcmp(FMqtt.file_password, SettingsText(SET_MQTT_PWD)) != 0)) { + return 1; // Invalid password + } + + FMqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255 + + // Init file_buffer + if (UPL_SETTINGS == FMqtt.file_type) { + uint32_t len = SettingsConfigBackup(); + if (!len) { return 2; } + + FMqtt.file_type = UPL_SETTINGS; + FMqtt.file_buffer = settings_buffer; + FMqtt.file_size = len; + + // {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096} + Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"), + SettingsConfigFilename().c_str(), FMqtt.file_id, FMqtt.file_type, len); + } + else { + return 3; // Invalid file type + } + + FMqtt.file_pos = 0; + + FMqtt.md5 = MD5Builder(); + FMqtt.md5.begin(); + + char payload[50]; + snprintf_P(payload, sizeof(payload), S_JSON_COMMAND_SVALUE, XdrvMailbox.command, PSTR(D_JSON_STARTED)); + MqttPublishPayloadPrefixTopic_P(STAT, XdrvMailbox.command, payload); // Enforce stat/wemos10/FILEUPLOAD + + TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging } - MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD - ResponseClear(); + else if (0 == XdrvMailbox.payload) { + + if (UPL_SETTINGS == FMqtt.file_type) { + SettingsBufferFree(); + } + return 4; // Upload aborted + } + return 0; // No error } void CmndFileDownload(void) { /* - Download (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data - Currently supports Settings (file type 2) - Filedownload 0 - Abort current download - FileDownload 2 - Start download of settings file - FileDownload - Continue downloading data until reception of MD5 hash + Download chunks of data base64 encoded with MD5 hash + + Supported Type: + 2 - Settings + + FileDownload 0 - Abort current download + + Start a download session: + FileDownload {"Password":"","Type":2} + + Download data using base64 until reception of MD5 hash: + FileDownload */ - if (XdrvMailbox.grpflg) { return; } - - if (FMqtt.file_id && FMqtt.file_buffer) { - bool finished = false; - - if (0 == XdrvMailbox.payload) { // Abort file download - ResponseCmndChar(PSTR(D_JSON_ABORTED)); - finished = true; - } - else if (FMqtt.file_pos < FMqtt.file_size) { + if (FMqtt.file_buffer) { + if (FMqtt.file_pos < FMqtt.file_size) { uint32_t bytes_left = FMqtt.file_size - FMqtt.file_pos; - uint32_t write_bytes = (bytes_left < mqtt_file_chuck_size) ? bytes_left : mqtt_file_chuck_size; + + /* + The download chunk size is the data size before it is encoded to base64. + It is smaller than the upload chunksize as it is bound by MESSZ + The download buffer with length MESSZ (1042) contains + - Payload ({"Id":117,"Data":""}) + */ + const uint32_t mqtt_file_chunk_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2; + uint32_t chunk_size = (FMqtt.file_binary) ? 4096 : mqtt_file_chunk_size; + uint32_t write_bytes = (bytes_left < chunk_size) ? bytes_left : chunk_size; uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos; FMqtt.md5.add(buffer, write_bytes); - // {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} - Response_P(PSTR("{\"Id\":%d,\"Data\":\""), FMqtt.file_id); // FileTransferHeaderSize - char base64_data[encode_base64_length(write_bytes)]; - encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data); - ResponseAppend_P(base64_data); - ResponseAppend_P("\"}"); - FMqtt.file_pos += write_bytes; + + if (FMqtt.file_binary) { + MqttPublishPayloadPrefixTopic_P(STAT, XdrvMailbox.command, (const char*)buffer, write_bytes); + } else { + // {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} + Response_P(PSTR("{\"Id\":%d,\"Data\":\""), FMqtt.file_id); // FileTransferHeaderSize + char base64_data[encode_base64_length(write_bytes)]; + encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data); + ResponseAppend_P(base64_data); + ResponseAppend_P("\"}"); + MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); + } + ResponseClear(); + return; } else { FMqtt.md5.calculate(); // {"Id":117,"Md5":"496fcbb433bbca89833063174d2c5747"} Response_P(PSTR("{\"Id\":%d,\"Md5\":\"%s\"}"), FMqtt.file_id, FMqtt.md5.toString().c_str()); - finished = true; - } + MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD + ResponseCmndDone(); - if (finished) { if (UPL_SETTINGS == FMqtt.file_type) { SettingsBufferFree(); } - FMqtt.file_id = 0; + FMqtt.file_buffer = nullptr; } } - else if (XdrvMailbox.data_len) { - FMqtt.file_buffer = nullptr; - FMqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255 - if (UPL_SETTINGS == XdrvMailbox.payload) { - uint32_t len = SettingsConfigBackup(); - if (len) { - FMqtt.file_type = UPL_SETTINGS; - FMqtt.file_buffer = settings_buffer; - FMqtt.file_size = len; - - // {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096} - Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"), - SettingsConfigFilename().c_str(), FMqtt.file_id, FMqtt.file_type, len); - } - } - - if (FMqtt.file_buffer) { - FMqtt.file_pos = 0; - - FMqtt.md5 = MD5Builder(); - FMqtt.md5.begin(); - } else { - FMqtt.file_id = 0; - ResponseCmndFailed(); + if (strlen(XdrvMailbox.data) > 8) { // Workaround exception if empty JSON like {} - Needs checks + JsonParser parser((char*) XdrvMailbox.data); + JsonParserObject root = parser.getRootObject(); + if (root) { + JsonParserToken val = root[PSTR("TYPE")]; + if (val) { FMqtt.file_type = val.getUInt(); } + val = root[PSTR("BINARY")]; + if (val) { FMqtt.file_binary = val.getUInt(); } + val = root[PSTR("PASSWORD")]; + if (val) { FMqtt.file_password = val.getStr(); } } } - MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); - ResponseClear(); + MqttFileValidate(MqttFileDownloadValidate()); + + MqttFilePublish(); } #endif // USE_MQTT_FILE diff --git a/tasmota/xdrv_02_mqtt_9_impl.ino b/tasmota/xdrv_02_mqtt_9_impl.ino index beca7778d..3a7ed3d63 100644 --- a/tasmota/xdrv_02_mqtt_9_impl.ino +++ b/tasmota/xdrv_02_mqtt_9_impl.ino @@ -179,7 +179,7 @@ void MakeValidMqtt(uint32_t option, char* str) { * bool MqttIsConnected() * void MqttDisconnect() * void MqttSubscribeLib(char *topic) - * bool MqttPublishLib(const char* topic, bool retained) + * bool MqttPublishLib(const char* topic, const uint8_t* payload, unsigned int plength, bool retained) \*********************************************************************************************/ #include @@ -465,7 +465,7 @@ void MqttUnsubscribeLib(const char *topic) { MqttClient.loop(); // Solve LmacRxBlk:1 messages } -bool MqttPublishLib(const char* topic, bool retained) { +bool MqttPublishLib(const char* topic, const uint8_t* payload, unsigned int plength, bool retained) { // If Prefix1 equals Prefix2 disable next MQTT subscription to prevent loop if (!strcmp(SettingsText(SET_MQTTPREFIX1), SettingsText(SET_MQTTPREFIX2))) { char *str = strstr(topic, SettingsText(SET_MQTTPREFIX1)); @@ -475,35 +475,34 @@ bool MqttPublishLib(const char* topic, bool retained) { } } - bool result; #ifdef USE_MQTT_AZURE_IOT String sourceTopicString = urlEncodeBase64(String(topic)); String topicString = "devices/" + String(SettingsText(SET_MQTT_CLIENT)); - topicString+= "/messages/events/topic=" + sourceTopicString; + topicString += "/messages/events/topic=" + sourceTopicString; - JsonParser mqtt_message((char*) String(TasmotaGlobal.mqtt_data).c_str()); + JsonParser mqtt_message((char*) String((const char*)payload).c_str()); JsonParserObject message_object = mqtt_message.getRootObject(); - if (message_object.isValid()) { // only sending valid JSON, yet this is optional - result = MqttClient.publish(topicString.c_str(), TasmotaGlobal.mqtt_data, retained); - AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Sending '%s'"), TasmotaGlobal.mqtt_data); - } else { - AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Invalid JSON, '%s' for topic '%s', not sending to Azure IoT Hub"), TasmotaGlobal.mqtt_data, topic); - result = true; + if (!message_object.isValid()) { // only sending valid JSON, yet this is optional + AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Invalid JSON for topic '%s', not sending to Azure IoT Hub"), topic); + return true; } -#else - result = MqttClient.publish(topic, TasmotaGlobal.mqtt_data, retained); + topic = topicString.c_str(); #endif // USE_MQTT_AZURE_IOT - yield(); // #3313 - return result; -} -#ifdef DEBUG_TASMOTA_CORE -void MqttDumpData(char* topic, char* data, uint32_t data_len) { - char dump_data[data_len +1]; - memcpy(dump_data, data, sizeof(dump_data)); // Make another copy for removing optional control characters - DEBUG_CORE_LOG(PSTR(D_LOG_MQTT "Size %d, \"%s %s\""), data_len, topic, RemoveControlCharacter(dump_data)); + if (!MqttClient.beginPublish(topic, plength, retained)) { +// AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Connection lost or message too large")); + return false; + } + uint32_t written = MqttClient.write(payload, plength); + if (written != plength) { + AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Message too large")); + return false; + } + MqttClient.endPublish(); + + yield(); // #3313 + return true; } -#endif void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len) { #ifdef USE_DEBUG_DRIVER @@ -554,10 +553,6 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len char data[data_len +1]; memcpy(data, mqtt_data, sizeof(data)); -#ifdef DEBUG_TASMOTA_CORE - MqttDumpData(topic, data, data_len); // Use a function to save stack space used by dump_data -#endif - // MQTT pre-processing XdrvMailbox.index = strlen(topic); XdrvMailbox.data_len = data_len; @@ -598,21 +593,28 @@ void MqttPublishLoggingAsync(bool refresh) { strlcpy(TasmotaGlobal.mqtt_data, line, len); // No JSON and ugly!! char stopic[TOPSZ]; GetTopic_P(stopic, STAT, TasmotaGlobal.mqtt_topic, PSTR("LOGGING")); - MqttPublishLib(stopic, false); + MqttPublishLib(stopic, (const uint8_t*)TasmotaGlobal.mqtt_data, strlen(TasmotaGlobal.mqtt_data), false); } } -void MqttPublish(const char* topic, bool retained) { +void MqttPublishPayload(const char* topic, const char* payload, uint32_t binary_length, bool retained) { + // Publish payload string or binary when binary_length set with optional retained + #ifdef USE_DEBUG_DRIVER ShowFreeMem(PSTR("MqttPublish")); #endif + bool binary_data = (binary_length > 0); + if (!binary_data) { + binary_length = strlen(payload); + } + if (Settings.flag4.mqtt_no_retain) { // SetOption104 - Disable all MQTT retained messages, some brokers don't support it: AWS IoT, Losant retained = false; // Some brokers don't support retained, they will disconnect if received } String log_data; // 20210420 Moved to heap to solve tight stack resulting in exception 2 - if (Settings.flag.mqtt_enabled && MqttPublishLib(topic, retained)) { // SetOption3 - Enable MQTT + if (Settings.flag.mqtt_enabled && MqttPublishLib(topic, (const uint8_t*)payload, binary_length, retained)) { // SetOption3 - Enable MQTT log_data = F(D_LOG_MQTT); // MQT: log_data += topic; // stat/tasmota/STATUS2 } else { @@ -621,7 +623,7 @@ void MqttPublish(const char* topic, bool retained) { retained = false; // Without MQTT enabled there is no retained message } log_data += F(" = "); // = - log_data += TasmotaGlobal.mqtt_data; // {"StatusFWR":{"Version":... + log_data += (binary_data) ? HexToString((uint8_t*)payload, binary_length) : payload; if (retained) { log_data += F(" (" D_RETAINED ")"); } // (retained) AddLogData(LOG_LEVEL_INFO, log_data.c_str()); // MQT: stat/tasmota/STATUS2 = {"StatusFWR":{"Version":... @@ -630,18 +632,32 @@ void MqttPublish(const char* topic, bool retained) { } } +void MqttPublishPayload(const char* topic, const char* payload) { + // Publish payload string no retained + MqttPublishPayload(topic, payload, 0, false); +} + +void MqttPublish(const char* topic, bool retained) { + // Publish default TasmotaGlobal.mqtt_data string with optional retained + MqttPublishPayload(topic, TasmotaGlobal.mqtt_data, 0, retained); +} + void MqttPublish(const char* topic) { + // Publish default TasmotaGlobal.mqtt_data string no retained MqttPublish(topic, false); } -void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retained) { -/* prefix 0 = cmnd using subtopic - * prefix 1 = stat using subtopic - * prefix 2 = tele using subtopic - * prefix 4 = cmnd using subtopic or RESULT - * prefix 5 = stat using subtopic or RESULT - * prefix 6 = tele using subtopic or RESULT - */ +void MqttPublishPayloadPrefixTopic_P(uint32_t prefix, const char* subtopic, const char* payload, uint32_t binary_length, bool retained) { +/* + Publish //> payload string or binary when binary_length set with optional retained + + prefix 0 = cmnd using subtopic + prefix 1 = stat using subtopic + prefix 2 = tele using subtopic + prefix 4 = cmnd using subtopic or RESULT + prefix 5 = stat using subtopic or RESULT + prefix 6 = tele using subtopic or RESULT +*/ char romram[64]; snprintf_P(romram, sizeof(romram), ((prefix > 3) && !Settings.flag.mqtt_response) ? S_RSLT_RESULT : subtopic); // SetOption4 - Switch between MQTT RESULT or COMMAND UpperCase(romram, romram); @@ -649,7 +665,7 @@ void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retain prefix &= 3; char stopic[TOPSZ]; GetTopic_P(stopic, prefix, TasmotaGlobal.mqtt_topic, romram); - MqttPublish(stopic, retained); + MqttPublishPayload(stopic, payload, binary_length, retained); #if defined(USE_MQTT_AWS_IOT) || defined(USE_MQTT_AWS_IOT_LIGHT) if ((prefix > 0) && (Settings.flag4.awsiot_shadow) && (Mqtt.connected)) { // placeholder for SetOptionXX @@ -669,33 +685,53 @@ void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retain snprintf_P(romram, sizeof(romram), PSTR("$aws/things/%s/shadow/update"), topic2); // copy buffer - char *mqtt_save = (char*) malloc(strlen(TasmotaGlobal.mqtt_data)+1); - if (!mqtt_save) { return; } // abort - strcpy(mqtt_save, TasmotaGlobal.mqtt_data); - snprintf_P(TasmotaGlobal.mqtt_data, sizeof(TasmotaGlobal.mqtt_data), PSTR("{\"state\":{\"reported\":%s}}"), mqtt_save); - free(mqtt_save); + String aws_payload = F("{\"state\":{\"reported\":%s}}"); + aws_payload += payload; + + MqttClient.publish(romram, aws_payload.c_str(), false); - bool result = MqttClient.publish(romram, TasmotaGlobal.mqtt_data, false); AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Updated shadow: %s"), romram); yield(); // #3313 } #endif // USE_MQTT_AWS_IOT } +void MqttPublishPayloadPrefixTopic_P(uint32_t prefix, const char* subtopic, const char* payload, uint32_t binary_length) { + // Publish //> payload string or binary when binary_length set no retained + MqttPublishPayloadPrefixTopic_P(prefix, subtopic, payload, binary_length, false); +} + +void MqttPublishPayloadPrefixTopic_P(uint32_t prefix, const char* subtopic, const char* payload) { + // Publish //> payload string no retained + MqttPublishPayloadPrefixTopic_P(prefix, subtopic, payload, 0, false); +} + +void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retained) { + // Publish //> default TasmotaGlobal.mqtt_data string with optional retained + MqttPublishPayloadPrefixTopic_P(prefix, subtopic, TasmotaGlobal.mqtt_data, 0, retained); +} + void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic) { + // Publish //> default TasmotaGlobal.mqtt_data string no retained MqttPublishPrefixTopic_P(prefix, subtopic, false); } void MqttPublishPrefixTopicRulesProcess_P(uint32_t prefix, const char* subtopic, bool retained) { + // Publish //> default TasmotaGlobal.mqtt_data string with optional retained + // then process rules MqttPublishPrefixTopic_P(prefix, subtopic, retained); XdrvRulesProcess(0); } void MqttPublishPrefixTopicRulesProcess_P(uint32_t prefix, const char* subtopic) { + // Publish //> default TasmotaGlobal.mqtt_data string no retained + // then process rules MqttPublishPrefixTopicRulesProcess_P(prefix, subtopic, false); } void MqttPublishTeleSensor(void) { + // Publish tele//SENSOR default TasmotaGlobal.mqtt_data string with optional retained + // then process rules MqttPublishPrefixTopicRulesProcess_P(TELE, PSTR(D_RSLT_SENSOR), Settings.flag.mqtt_sensor_retain); // CMND_SENSORRETAIN } diff --git a/tasmota/xdrv_07_domoticz.ino b/tasmota/xdrv_07_domoticz.ino index b7ae440cc..473dab297 100644 --- a/tasmota/xdrv_07_domoticz.ino +++ b/tasmota/xdrv_07_domoticz.ino @@ -360,8 +360,9 @@ bool DomoticzSendKey(uint8_t key, uint8_t device, uint8_t state, uint8_t svalflg \*********************************************************************************************/ void DomoticzSendData(uint32_t sensor_idx, uint32_t idx, char *data) { + char payload[128]; if (DZ_AIRQUALITY == sensor_idx) { - Response_P(PSTR("{\"idx\":%d,\"nvalue\":%s,\"Battery\":%d,\"RSSI\":%d}"), + snprintf_P(payload, sizeof(payload), PSTR("{\"idx\":%d,\"nvalue\":%s,\"Battery\":%d,\"RSSI\":%d}"), idx, data, DomoticzBatteryQuality(), DomoticzRssiQuality()); } else { uint8_t nvalue = 0; @@ -371,19 +372,15 @@ void DomoticzSendData(uint32_t sensor_idx, uint32_t idx, char *data) { nvalue = position < 2 ? 0 : (position == 100 ? 1 : 2); } #endif // USE_SHUTTER - Response_P(DOMOTICZ_MESSAGE, // "{\"idx\":%d,\"nvalue\":%d,\"svalue\":\"%s\",\"Battery\":%d,\"RSSI\":%d}" + snprintf_P(payload, sizeof(payload), DOMOTICZ_MESSAGE, // "{\"idx\":%d,\"nvalue\":%d,\"svalue\":\"%s\",\"Battery\":%d,\"RSSI\":%d}" idx, nvalue, data, DomoticzBatteryQuality(), DomoticzRssiQuality()); } - MqttPublish(domoticz_in_topic); + MqttPublishPayload(domoticz_in_topic, payload); } void DomoticzSensor(uint8_t idx, char *data) { if (Settings.domoticz_sensor_idx[idx]) { - char dmess[128]; // {"idx":26700,"nvalue":0,"svalue":"22330.1;10234.4;22000.5;10243.4;1006;3000","Battery":100,"RSSI":10} - - memcpy(dmess, TasmotaGlobal.mqtt_data, sizeof(dmess)); DomoticzSendData(idx, Settings.domoticz_sensor_idx[idx], data); - memcpy(TasmotaGlobal.mqtt_data, dmess, sizeof(dmess)); } } diff --git a/tools/mqtt-file/download-settings.py b/tools/mqtt-file/download-settings.py index 046a35997..0f990ee5d 100644 --- a/tools/mqtt-file/download-settings.py +++ b/tools/mqtt-file/download-settings.py @@ -42,16 +42,20 @@ import json broker = "domus1" # MQTT broker ip address or name broker_port = 1883 # MQTT broker port +mypassword = "" # Tasmota MQTT password mytopic = "demo" # Tasmota MQTT topic myfiletype = 2 # Tasmota Settings file type # **** End of User Configuration Section +use_base64 = True + # Derive fulltopic from broker LWT message mypublish = "cmnd/"+mytopic+"/filedownload" mysubscribe = "stat/"+mytopic+"/FILEDOWNLOAD" # Case sensitive Ack_flag = False +Err_flag = False file_name = "" file_id = 0 @@ -62,6 +66,7 @@ file_md5 = "" # The callback for when mysubscribe message is received def on_message(client, userdata, msg): global Ack_flag + global Err_flag global Run_flag global file_name global file_id @@ -73,56 +78,76 @@ def on_message(client, userdata, msg): base64_data = "" rcv_id = 0 -# print("Received message =",str(msg.payload.decode("utf-8"))) +# try: +# print("Received message =",str(msg.payload.decode("utf-8"))) +# except: +# print("Received message = binary data") - root = json.loads(msg.payload.decode("utf-8")) - if "File" in root: file_name = root["File"] - if "Id" in root: rcv_id = root["Id"] - if "Type" in root: file_type = root["Type"] - if "Size" in root: file_size = root["Size"] - if "Data" in root: base64_data = root["Data"] - if "Md5" in root: file_md5 = root["Md5"] + try: + root = json.loads(msg.payload.decode("utf-8")) + if root: + if "FileDownload" in root: + rcv_code = root["FileDownload"] + if "Started" in rcv_code: + return + if "Error" in rcv_code: + print("Error: "+rcv_code) + Err_flag = True + return + if "Command" in root: + rcv_code = root["Command"] + if rcv_code == "Error": + print("Error: Command error") + Err_flag = True + return + if "File" in root: file_name = root["File"] + if "Id" in root: rcv_id = root["Id"] + if "Type" in root: file_type = root["Type"] + if "Size" in root: file_size = root["Size"] + if "Data" in root: base64_data = root["Data"] + if "Md5" in root: file_md5 = root["Md5"] + except: + pass if file_id == 0 and rcv_id > 0 and file_size > 0 and file_type > 0 and file_name: file_id = rcv_id fi = open(file_name,"wb") fi.close() - else: - if file_id > 0 and file_id != rcv_id: - Run_flag = False + if use_base64 and file_id > 0 and file_id != rcv_id: + Err_flag = True return - if file_md5 == "" and base64_data: - base64_decoded_data = base64_data.encode('utf-8') - chunk = base64.decodebytes(base64_decoded_data) - in_hash_md5.update(chunk) # Update hash - - fi = open(file_name,"ab") - fi.write(chunk) - fi.close() + if file_md5 == "" and file_name: + if use_base64 and base64_data: + base64_decoded_data = base64_data.encode('utf-8') + chunk = base64.decodebytes(base64_decoded_data) + in_hash_md5.update(chunk) # Update hash + fi = open(file_name,"ab") + fi.write(chunk) + fi.close() + if use_base64 == False and 0 == rcv_id: + chunk = msg.payload + in_hash_md5.update(chunk) # Update hash + fi = open(file_name,"ab") + fi.write(chunk) + fi.close() if file_md5 != "": md5_hash = in_hash_md5.hexdigest() if md5_hash != file_md5: print("Error: MD5 mismatch") - Run_flag = False + Err_flag = True Ack_flag = False def wait_for_ack(): - global Ack_flag - global Run_flag - if Run_flag == False: - print("Error: Transmission") - return True - timeout = 100 - while Ack_flag and timeout > 0: + while Ack_flag and Err_flag == False and timeout > 0: time.sleep(0.01) timeout = timeout -1 - if Ack_flag: + if 0 == timeout: print("Error: Timeout") return Ack_flag @@ -138,35 +163,32 @@ print("Downloading file from "+mytopic+" ...") in_hash_md5 = hashlib.md5() -Err_flag = False - -client.publish(mypublish, str(myfiletype)) +if use_base64: + client.publish(mypublish, "{\"Password\":\""+mypassword+"\",\"Type\":"+str(myfiletype)+"}") +else: + client.publish(mypublish, "{\"Password\":\""+mypassword+"\",\"Type\":"+str(myfiletype)+",\"Binary\":1}") Ack_flag = True Run_flag = True while Run_flag: if wait_for_ack(): # We use Ack here - Err_flag = True + client.publish(mypublish, "0") # Abort any failed download Run_flag = False else: - if file_md5 == "": + if file_md5 == "": # Request chunk client.publish(mypublish, "?") Ack_flag = True - else: Run_flag = False -if Err_flag: - client.publish(mypublish, "0") # Abort any failed download +if Err_flag == False: + file_type_name = "Data" + if file_type == 2: + file_type_name = "Settings" + print("Downloaded "+file_type_name+" saved as "+file_name) time_taken = time.time() - time_start - -file_type_name = " Data" -if file_type == 2: - file_type_name = " Settings" - -print("Downloaded"+file_type_name+" saved as "+file_name) print("Done in "+str("%.2f"%time_taken)+" seconds") client.disconnect() # Disconnect diff --git a/tools/mqtt-file/upload-ota.py b/tools/mqtt-file/upload-ota.py index 791c9b1ae..22ea721f9 100644 --- a/tools/mqtt-file/upload-ota.py +++ b/tools/mqtt-file/upload-ota.py @@ -50,19 +50,22 @@ myfiletype = 1 # Tasmota firmware file type # **** End of User Configuration Section +use_base64 = False + # Derive fulltopic from broker LWT message mypublish = "cmnd/"+mytopic+"/fileupload" mysubscribe = "stat/"+mytopic+"/FILEUPLOAD" # Case sensitive Ack_flag = False +Err_flag = False -use_base64 = False file_id = 114 # Even id between 2 and 254 file_chunk_size = 700 # Default Tasmota MQTT max message size # The callback for when mysubscribe message is received def on_message(client, userdata, msg): global Ack_flag + global Err_flag global file_chunk_size rcv_code = "" @@ -71,31 +74,35 @@ def on_message(client, userdata, msg): # print("Received message =",str(msg.payload.decode("utf-8"))) root = json.loads(msg.payload.decode("utf-8")) - if "FileUpload" in root: rcv_code = root["FileUpload"] - if "Error" in rcv_code: - print("Error: "+rcv_code) - return - - if "Command" in root: rcv_code = root["Command"] - if rcv_code == "Error": - print("Error: Command error") - return - - if "Id" in root: rcv_id = root["Id"] - if rcv_id == file_id: - if "MaxSize" in root: file_chunk_size = root["MaxSize"] + if "FileUpload" in root: + rcv_code = root["FileUpload"] + if "Started" in rcv_code: + return + if "Error" in rcv_code: + print("Error: "+rcv_code) + Err_flag = True + return + if "Command" in root: + rcv_code = root["Command"] + if rcv_code == "Error": + print("Error: Command error") + Err_flag = True + return + if "Id" in root: + rcv_id = root["Id"] + if rcv_id == file_id: + if "MaxSize" in root: file_chunk_size = root["MaxSize"] Ack_flag = False def wait_for_ack(): - global Ack_flag timeout = 100 - while Ack_flag and timeout > 0: + while Ack_flag and Err_flag == False and timeout > 0: time.sleep(0.01) timeout = timeout -1 - if Ack_flag: - print("Error: Ack timeout") + if 0 == timeout: + print("Error: Timeout") return Ack_flag @@ -120,13 +127,14 @@ out_hash_md5 = hashlib.md5() Run_flag = True while Run_flag: - if wait_for_ack(): # We use Ack here + if wait_for_ack(): # We use Ack here + client.publish(mypublish, "0") # Abort any failed upload Run_flag = False else: chunk = fo.read(file_chunk_size) if chunk: - out_hash_md5.update(chunk) # Update hash + out_hash_md5.update(chunk) # Update hash if use_base64: base64_encoded_data = base64.b64encode(chunk) base64_data = base64_encoded_data.decode('utf-8') diff --git a/tools/mqtt-file/upload-settings.py b/tools/mqtt-file/upload-settings.py index e5cb717cc..a128b8924 100644 --- a/tools/mqtt-file/upload-settings.py +++ b/tools/mqtt-file/upload-settings.py @@ -44,24 +44,27 @@ broker_port = 1883 # MQTT broker port mypassword = "" # Tasmota MQTT password mytopic = "demo" # Tasmota MQTT topic -myfile = "Config_demo_9.4.0.3.dmp" # Tasmota Settings file name +myfile = "Config_demo_9.4.0.4.dmp" # Tasmota Settings file name myfiletype = 2 # Tasmota Settings file type # **** End of User Configuration Section +use_base64 = True + # Derive fulltopic from broker LWT message mypublish = "cmnd/"+mytopic+"/fileupload" mysubscribe = "stat/"+mytopic+"/FILEUPLOAD" # Case sensitive Ack_flag = False +Err_flag = False -use_base64 = True file_id = 116 # Even id between 2 and 254 file_chunk_size = 700 # Default Tasmota MQTT max message size # The callback for when mysubscribe message is received def on_message(client, userdata, msg): global Ack_flag + global Err_flag global file_chunk_size rcv_code = "" @@ -70,31 +73,35 @@ def on_message(client, userdata, msg): # print("Received message =",str(msg.payload.decode("utf-8"))) root = json.loads(msg.payload.decode("utf-8")) - if "FileUpload" in root: rcv_code = root["FileUpload"] - if "Error" in rcv_code: - print("Error: "+rcv_code) - return - - if "Command" in root: rcv_code = root["Command"] - if rcv_code == "Error": - print("Error: Command error") - return - - if "Id" in root: rcv_id = root["Id"] - if rcv_id == file_id: - if "MaxSize" in root: file_chunk_size = root["MaxSize"] + if "FileUpload" in root: + rcv_code = root["FileUpload"] + if "Started" in rcv_code: + return + if "Error" in rcv_code: + print("Error: "+rcv_code) + Err_flag = True + return + if "Command" in root: + rcv_code = root["Command"] + if rcv_code == "Error": + print("Error: Command error") + Err_flag = True + return + if "Id" in root: + rcv_id = root["Id"] + if rcv_id == file_id: + if "MaxSize" in root: file_chunk_size = root["MaxSize"] Ack_flag = False def wait_for_ack(): - global Ack_flag timeout = 100 - while Ack_flag and timeout > 0: + while Ack_flag and Err_flag == False and timeout > 0: time.sleep(0.01) timeout = timeout -1 - if Ack_flag: - print("Error: Ack timeout") + if 0 == timeout: + print("Error: Timeout") return Ack_flag @@ -119,7 +126,8 @@ out_hash_md5 = hashlib.md5() Run_flag = True while Run_flag: - if wait_for_ack(): # We use Ack here + if wait_for_ack(): # We use Ack here + client.publish(mypublish, "0") # Abort any failed upload Run_flag = False else: