ESP8266MQTTMesh icon indicating copy to clipboard operation
ESP8266MQTTMesh copied to clipboard

onMqttMessage() not handling fragmented packets, which can break OTA

Open c- opened this issue 6 years ago • 2 comments

This is basically the cause of #51, and I added a similar comment there, but I figured I should break it out as a separate issue.

The onMqttMessage callback doesn't handle the index/total buffering when MQTT messages get fragmented as described in https://github.com/marvinroger/async-mqtt-client/issues/98

MQTT fragmentation is entirely dependent on the TCP/IP library MSS, which varies according to lwIP. Since onMqttMessage() doesn't recognize fragmentation, packets larger than the MSS will get treated as multiple distinct messages.

If you use the v2 Higher Bandwidth option (mss 1460) then it works fine; OTA packets never get fragmented and everything is happy.

If you use the v2 Lower Memory lwIP variant, TCP/IP buffers are so small (mss 536) that the OTA chunks fragment, checksum errors get thrown, and hilarity ensues.

You can make onMqttMessage() handle fragmentation quite easily:

diff --git a/src/ESP8266MQTTMesh.cpp b/src/ESP8266MQTTMesh.cpp
index be659f4..a12ece8 100644
--- a/src/ESP8266MQTTMesh.cpp
+++ b/src/ESP8266MQTTMesh.cpp
@@ -1008,11 +1008,20 @@ void ESP8266MQTTMesh::onMqttUnsubscribe(uint16_t packetId) {
 }
 
 void ESP8266MQTTMesh::onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
-  memcpy(inbuffer[0], payload, len);
-  inbuffer[0][len]= 0;
-  dbgPrintln(EMMDBG_MQTT_EXTRA, "Message arrived [" + String(topic) + "] '" + String(inbuffer[0]) + "'");
-  broadcast_message(topic, inbuffer[0]);
-  parse_message(topic, inbuffer[0]);
+  if( total >= MQTT_MAX_PACKET_SIZE ) {
+    dbgPrintln(EMMDBG_WIFI, "Ignoring oversize " + String(total) + " byte packet");
+	 return;
+  }
+  memcpy(&inbuffer[0][index], payload, len);
+  inbuffer[0][index+len]= 0;
+  if( index+len == total ) {
+	  dbgPrintln(EMMDBG_MQTT_EXTRA, "Message arrived [" + String(topic) + "] '" + String(inbuffer[0]) + "'");
+	  broadcast_message(topic, inbuffer[0]);
+	  parse_message(topic, inbuffer[0]);
+  } else {
+	  dbgPrintln(EMMDBG_MQTT_EXTRA, "Partial message [" + String(index+len)
+	  + " of " + String(total) + "]");
+  }
 }
 
 void ESP8266MQTTMesh::onMqttPublish(uint16_t packetId) {

I tested it, and it works okay. However I don't know if it'll work in a true mesh situation because broadcast_message() then needs to get these "oversize" messages out, and I haven't the foggiest as to what happens in broadcast_message() with oversize MQTT packets. I'm not really set up to test that part.

A quick and dirty fix is to send smaller OTA chunks. This works with either lwIP, at the expense of way more packets.

diff --git a/utils/send_ota.py b/utils/send_ota.py
index 7d67011..4ff4923 100755
--- a/utils/send_ota.py
+++ b/utils/send_ota.py
@@ -90,9 +90,9 @@ def send_firmware(client, data, nodes):
     print("Updating firmware on the following nodes:\n\t{}".format("\n\t".join(nodes)))
     pos = 0
     while len(data):
-        d = data[0:768]
+        d = data[0:256]
         b64d = base64.b64encode(d)
-        data = data[768:]
+        data = data[256:]
         topic = "{}{}".format(send_topic, str(pos))
         client.publish(topic, b64d)
         expected_md5 = hashlib.md5(d).hexdigest().encode('utf-8')
@@ -120,7 +120,7 @@ def send_firmware(client, data, nodes):
                 print("\t {} (expected: {})".format(md5, expected_md5))
                 return
         pos += len(d)
-        if pos % (768 * 13) == 0:
+        if pos % (256 * 13) == 0:
             print("Transmitted %d bytes" % (pos))
     print("Completed send")
     client.publish("{}check".format(send_topic), "")

Another approach is just "document it":

diff --git a/src/ESP8266MQTTMesh.h b/src/ESP8266MQTTMesh.h
index 58fb6bc..2e7ab3d 100644
--- a/src/ESP8266MQTTMesh.h
+++ b/src/ESP8266MQTTMesh.h
@@ -10,6 +10,10 @@
         #error "Must define MQTT_MAX_PACKET_SIZE >= 1152"
     #endif
     #define HAS_OTA 1
+
+        #if TCP_MSS < MQTT_MAX_PACKET_SIZE
+               #error "TCP MSS must be >1152 bytes. Use the Higher Bandwidth lwIP"
+        #endif
 #else
     #define HAS_OTA 0
 #endif

I don't have a strong preference one way or the other; now that I understand it, I can get OTA working.

c- avatar Jun 13 '18 00:06 c-

ITs works for me thanks friend, you are the best!!!!

Transmitted 359424 bytes Transmitted 369408 bytes Completed send Checksum verified. Flashing and rebooting now...

bfruiz04 avatar May 18 '19 18:05 bfruiz04

just a little tipp: i have an Implementation of send_ota, which has an Parameter called "packageLength". with this Parameter you can specify how large the OTA Chunks should be and if you lower this Value (defaults to 768). This way the Update takes a bit longer, but also works with all Configurations of this Library. In the next days I'm also planing to update this Project on my Fork of it quite a bit (alredy done) and write a Documentation for it. My Fork of the Project is: ESP8266MQTTMesh

simone1999 avatar Jul 25 '19 13:07 simone1999