esp-aws-iot icon indicating copy to clipboard operation
esp-aws-iot copied to clipboard

MQTT publish and yield in one thread, which result in subscribe message blocked by publishing, how to improve efficiency? (CA-185)

Open zhouyu2021 opened this issue 3 years ago • 18 comments

In shadow APIs,MQTT publish timeout is set to 20 seconds by default, and subscribed topic message is not handled in this period, so I reduced timeout period to 1 seconds, it seems works when the network is not well, but I wander if this will cause any other problems? 
I think it's better to separate publish and yield operation into different thread, but it fail returned for error code "MQTT_CLIENT_NOT_IDLE_ERROR", it means that I can't call publish and yield at the same time, so what can I do to improve communicate efficiency?

zhouyu2021 avatar Jan 28 '22 06:01 zhouyu2021

When your write the word "yield" I suspect you are using an older library??

I use MQTT_ProcessLoop(pMqttContext, timeoutMS) for my message handling and it works very well. When you are performing a Subscription, Unsubscription, or a Publish you can't "yield" at exactly the same time inside the same thread. You perform your action then wait in the MQTT_ProcessLoop for a response to occur. When you're not actively doing something, you wait in the MQTT_ProcessLoop for something to occur (because the MQTT_PACKET_TYPE_PINGRESP messages can't arrive unless you are in that ProcessLoop). So, you will move from action to ProcessLoop to catch all your direct responses AND to catch all the ping responses that the system is generating automatically for you. When you're not actively doing something, you just stay in the ProcessLoop in your one thread.

I somewhat suspect that you may have more difficultly using 2 threads and correctly synchronizing message handling between them.

I have successfully merged all the test demos into one -- and it has one MQTT_ProcessLoop is at the core of the idle routine -- just waiting for things to happen.

I run my software from all the way around the world from the region that I'm registered in, and my results are pretty solid.

I hope this helps you in making progress.

SolidStateLEDLighting avatar Jan 30 '22 18:01 SolidStateLEDLighting

Thanks for your reply, and I tried the new library with MQTT_ProcessLoop function as you suggested, when I test the SDK with the following codes(use api form shadow_demo_helpers.c): image it keeps waiting for message arriving, and I publish new message in AWS console to check the receive function, as the result it works unstable: 1.connecting to broker with many times of retries; 2.message arrived with randomly delay, some times happens to be half minutes or more; maybe it relative to the network environment, but I tried the same SDK on windows simulator, and it works much more stable with the same network, so I think it may have something to do with hardware configuration(my platform is ESP32-C3), as our application needs rapids response between APP and device, so we cannot bear such delay, do you have any suggestion?

zhouyu2021 avatar Feb 10 '22 02:02 zhouyu2021

My suggestion???

You haven't given much effort in looking at the examples because your MQTT_ProcessLoop isn't written to include any of the error handling that is shown in the demo samples. And you have no way to keep returning to MQTT_ProcessLoop.

You'll need to implement a master run loop that accommodates a state transition architecture where the task enters and exits the MQTT_ProcessLoop area at will when IOT needs to do other things. You're very far from the solution.

The idea is like this:

Send a message Enter MQTT_ProcessLoop handle response Send a message MQTT_ProcessLoop handle resopnse nothing to do MQTT_ProcessLoop handing unexpected incoming message MQTT_ProcessLoop (most of the time because it accepts the keep alive ping requests / handled in your callback) If you don't stay in the MQTT_ProcessLoop the callback will never get called.

You only temporarily leave the MQTT_ProcessLoop when you need to invoke another activity or handle an incoming message.

If you are not periodically, continuously calling MQTT_ProcessLoop -- then you can not be receiving anything.


From: zhouyu2021 @.> Sent: Thursday, February 10, 2022 10:25 AM To: espressif/esp-aws-iot @.> Cc: keith ssledlighting.com @.>; Comment @.> Subject: Re: [espressif/esp-aws-iot] MQTT publish and yield in one thread, which result in subscribe message blocked by publishing, how to improve efficiency? (CA-185) (Issue #89)

Thanks for your reply, and I tried the new library with MQTT_ProcessLoop function as you suggested, when I test the SDK with the following codes(use api form shadow_demo_helpers.c): [image]https://user-images.githubusercontent.com/78245567/153322842-a54e37a6-f35e-453f-8a3e-03ce54897722.png it keeps waiting for message arriving, and I publish new message in AWS console to check the receive function, as the result it works unstable: 1.connecting to broker with many times of retries; 2.message arrived with randomly delay, some times happens to be half minutes or more; maybe it relative to the network environment, but I tried the same SDK on windows simulator, and it works much more stable with the same network, so I think it may have something to do with hardware configuration(my platform is ESP32-C3), as our application needs rapids response between APP and device, so we cannot bear such delay, do you have any suggestion?

— Reply to this email directly, view it on GitHubhttps://github.com/espressif/esp-aws-iot/issues/89#issuecomment-1034425440, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AGGOKEZTHXJDC4FMBAQWEELU2MOYDANCNFSM5M75PDUA. Triage notifications on the go with GitHub Mobile for iOShttps://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Androidhttps://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub. You are receiving this because you commented.Message ID: @.***>

SolidStateLEDLighting avatar Feb 10 '22 04:02 SolidStateLEDLighting

Thank you for your reply. Actually I have read your demo samples carefully, and now I was just trying to test the receive function, so for the code I send to you in the pic I did not include any sending procedure. My question is, just to receive the message(without doing any other things), I still can not receive message stably in the ESP32-C3( lost almost 90% of the message I send from the AWS console) . But it seems work when it works on windows simulator, all the message can be received.

zhouyu2021 avatar Feb 10 '22 06:02 zhouyu2021

You are working this in the original Linux platform?

I'm doing everything in Windows 10. Its seems like they finally got all the Windows tool working and the environment has all that I need.

This wasn't the case as recently as about 6 months ago. I was using PIO on top of Windows for the better part of a year.... but when all the IDF tools finally came on-line -- they fought for control of the environment against PIO. I abandon PIO and installed just the Espressif tools for Windows and I couldn't be happier.

In a global basis -- it is likely that there are more Windows users now that this product has become mainstream. The story goes that they needed the Linux people at the start (or the tools were Linux based), but now I suspect that Windows tools will be the more popular building platform going forward.

K.


From: zhouyu2021 @.> Sent: Thursday, February 10, 2022 2:51 PM To: espressif/esp-aws-iot @.> Cc: keith ssledlighting.com @.>; Comment @.> Subject: Re: [espressif/esp-aws-iot] MQTT publish and yield in one thread, which result in subscribe message blocked by publishing, how to improve efficiency? (CA-185) (Issue #89)

Thank you for your reply. Actually I have read your demo samples carefully, and now I was just trying to test the receive function, so for the code I send to you in the pic I did not include any sending procedure. My question is, just to receive the message(without doing any other things), I still can not receive message stably in the ESP32-C3( lost almost 90% of the message I send from the AWS console) . But it seems work when it works on windows simulator, all the message can be received.

— Reply to this email directly, view it on GitHubhttps://github.com/espressif/esp-aws-iot/issues/89#issuecomment-1034555927, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AGGOKE6JMR6JJLJKVTKAQPDU2NN6JANCNFSM5M75PDUA. Triage notifications on the go with GitHub Mobile for iOShttps://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Androidhttps://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub. You are receiving this because you commented.Message ID: @.***>

SolidStateLEDLighting avatar Feb 10 '22 09:02 SolidStateLEDLighting

Thank you for your reply. But I do not get your point. Let me clarify my question: The receive message code works well in my simulator (running in windows) and it not work in my ESP32-C3 chip( which means lost almost 90% of the message I send from the AWS console). Now I suspect maybe the new SDK is not support the ESP32-C3 chip.

And for your information , I did building this in the Linux platform.

zhouyu2021 avatar Feb 10 '22 10:02 zhouyu2021

I thought you were talking about a Windows simulator possibly running on another OS.

What simulator are you talking about? A hardware simulator for the ESP32 running on a Windows machine???

I'm working with the ESP32-WROOM-32D. I have not seen the C or S products yet. You may be right.

This sample code you posted is very short of the full design needed because you must return and spend time in the MQTT_ProcessLoop waiting for each and every received message. I use only one MQTT_ProcessLoop in my entire app -- but you could call this in each area where you expect a message (as I believe is shown in the demos).

K.


From: zhouyu2021 @.> Sent: Thursday, February 10, 2022 6:19 PM To: espressif/esp-aws-iot @.> Cc: keith ssledlighting.com @.>; Comment @.> Subject: Re: [espressif/esp-aws-iot] MQTT publish and yield in one thread, which result in subscribe message blocked by publishing, how to improve efficiency? (CA-185) (Issue #89)

Thank you for your reply. But I do not get your point. Let me clarify my question: The receive message code works well in my simulator (running in windows) and it not work in my ESP32-C3 chip( which means lost almost 90% of the message I send from the AWS console). Now I suspect maybe the new SDK is not support the ESP32-C3 chip.

And for your information , I did building this in the Linux platform.

— Reply to this email directly, view it on GitHubhttps://github.com/espressif/esp-aws-iot/issues/89#issuecomment-1034739773, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AGGOKE3GQOJGEVTQ5CGH4NTU2OGJPANCNFSM5M75PDUA. Triage notifications on the go with GitHub Mobile for iOShttps://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Androidhttps://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub. You are receiving this because you commented.Message ID: @.***>

SolidStateLEDLighting avatar Feb 10 '22 10:02 SolidStateLEDLighting

How to manage the delay of MQTT_ProcessLoop() function?

Branch: master commit id: [544e0d4]

My implementation is in the way that I am calling MQTT_ProcessLoop() continuously in a task to handle all the incoming publish messages on the subscribed topics. The issue which I am facing currently is that If have to publish any message from ESP32 then i call the MQTT_Publish(), but the message actually gets published when MQTT_ProcessLoop() returns after some delay. I want to reduce that delay of MQTT_ProcessLoop() function, so that message get publish quickly as i called the MQTT_Publish() function.

devUsama avatar Apr 13 '22 08:04 devUsama

The process loop looks like this: mqttStatus = MQTT_ProcessLoop(pMqttContext, 100);

In my experiments, I have never been able to shorten the process time beyond a minimum wait period. You can lengthen it, but not shorten it lower than that time it takes to process. I have 100mS here but I believe it takes a longer than this during an idle period.

The only thing that I do is just not enter the ProcessLoop more than once a second unless the system immediately has something to do. If you launch a command, then call ProcessLoop immediately. You may want to manually lengthen the ProcessLoop wait time on those command calls.

In this way, you will be more likely to avoid unnecessary wait time.

My approach is to actively call ProcessLoop after a command -- and then call it periodically while idle.

If you happen to launch a command while not inside ProcessLoop, you'll get your desirable minimum delay. If you happen to try to launch a command while that thread is held up (checking for an incoming message) -- then you will get the unfavorable delay. This is my opinion. If you hear of anything better -- please let us know.

SolidStateLEDLighting avatar Apr 13 '22 08:04 SolidStateLEDLighting

Yeah I am along the lines of calling the MQTT_ProcessLoop(pMqttContext, 100); after a command/message has to be publish. But the point is this message actually gets published when MQTT_ProcessLoop returns after its processing time (which is around 1.5sec despite of passing the timeout parameter with shorter value). So it actually slows down the process and message get receive on the other end after that delay.

devUsama avatar Apr 14 '22 07:04 devUsama

Ok, then I think you're doing the best that you can. One fundamental premise of IOT is that device may be disconnected or poorly connected, so I don't think MQTT is very keen on fast communication (my personal guess).

You may be able to reduce a bit of time if you connect with a less reliable quality of service. More reliable service probably takes more time to complete. This is just a guess.

K.


From: Usama Shahid @.> Sent: Thursday, April 14, 2022 3:21 PM To: espressif/esp-aws-iot @.> Cc: keith ssledlighting.com @.>; Comment @.> Subject: Re: [espressif/esp-aws-iot] MQTT publish and yield in one thread, which result in subscribe message blocked by publishing, how to improve efficiency? (CA-185) (Issue #89)

Yeah I am along the lines of calling the MQTT_ProcessLoop(pMqttContext, 100); after a command/message has to be publish. But the point is this message actually gets published when MQTT_ProcessLoop returns after its processing time (which is around 1.5sec despite of passing the timeout parameter with shorter value). So it actually slows down the process and message get receive on the other end after that delay.

— Reply to this email directly, view it on GitHubhttps://github.com/espressif/esp-aws-iot/issues/89#issuecomment-1098789627, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AGGOKE56Q4SO5R6IDH7QD7LVE7BQFANCNFSM5M75PDUA. You are receiving this because you commented.Message ID: @.***>

SolidStateLEDLighting avatar Apr 14 '22 07:04 SolidStateLEDLighting

Actually MQTT_ProcessLoop(pMqttContext, 100); function always takes same time either we call it after a publish message with QoS1 or we just call it in a task randomly anywhere so don't think it will effect on this delay. But yeah in previous master branch commit that was with release/v3.1x aws_iot_shadow_yield(&mqttClient, 200); was doing the job nicely, I have tested it with 200ms delay and it worked perfectly.

devUsama avatar Apr 14 '22 07:04 devUsama

I am having problems for getting published messages from the AWS console. In my ESP32 code I have subscribed to the /update/delta, when the application is running, it is not getting anything from /update/delta if I changed the desired state from AWS Console.

So this is what I am doing:

  1. Subscribe to /update/delta
returnStatus = SubscribeToTopic( SHADOW_TOPIC_STR_UPDATE_DELTA( THING_NAME, SHADOW_NAME ),
                                                 SHADOW_TOPIC_LEN_UPDATE_DELTA( THING_NAME_LENGTH, SHADOW_NAME_LENGTH ) );
  1. Start thread process_messages:
void process_messages()
{
    MQTTStatus_t status;
    uint32_t timeoutMs = 100;
    // This context is assumed to be initialized and connected.
    MQTTContext_t * pMQTTContext = &mqttContext;
    while( true )
    {
        status = MQTT_ProcessLoop( pMQTTContext, timeoutMs );

        if( status != MQTTSuccess )
        {
            // Determine the error. It's possible we might need to disconnect
            // the underlying transport connection.
            LogError(("MQTT Process Loop Error"));
        }
        else
        {
            // Other application functions.
            LogInfo(("MQTT Process Loop Info"));
        }
        vTaskDelay(10 / portTICK_RATE_MS);

    }
}
  1. Publish the desired stated message in the /update topic with this content:

{"state":{"desired":{"powerOn":0}}}

  1. Since the reported state is powerOn = 1, then it AWS IoT publishes a message in /update/delta:

{
  "version": 5006,
  "timestamp": 1661310883,
  "state": {
    "powerOn": 0
  },
  "metadata": {
    "powerOn": {
      "timestamp": 1661310883
    }
  }
}
  1. At this point I would assume that my thread process_message listens for the message published in /update/delta, but it is not. I have defined in the eventCallBack the following action when it detects that message is ShadowMessageTypeUpdateDelta
            if( messageType == ShadowMessageTypeUpdateDelta )
            {
                /* Handler function to process payload. */
                updateDeltaHandler( pDeserializedInfo->pPublishInfo );
            }

All of these functions were taken from the demo, I am just adding the MQTT_processLoop in a thread for listening the upcoming messages published by the AWS console or from an application that is changing the state of the shadow document.

The demo works good, it reads the message from /update/delta, but the logic of the demo is sequential, just for showing step by step what it should normally do, but it does not implement a real use case.

If you know how to solve this, please let me know. Thanks!

borch84 avatar Aug 24 '22 03:08 borch84

4 things come to mind....

  1. Make sure your receive buffer is large enough to handle the message.
  2. Look at CloudWatch to make sure the system is "publishing out" a message to you.
  3. You'll need to be careful when using the console testing area.... the subscription/publishing there should be to the update topic and the delta should be created by system automatically for you. Make sure you can publish to the update topic and see if the system is generating the delta message for you. You can subscribe to the delta topic right here and see the message being generated I believe.
  4. Make sure your ESP32 is moving through the MQTT_Process() loop while you are waiting for a delta message to show up.

I suspect a lot of people are stumbling around with AWS because the library examples are all torn apart into single examples. In my mind AWS IOT is a holistic operation where you have one grand process loop and are just handling pub/sub messages based on device state. That approach has been successful for me.

You must start with a simple client log-in first -- and then build on that. ---> Client --> Provisioning to Thing --> Shadow --> Jobs --> OTA. Add one function at a time based on pub/sub of messages.

Would you be interested in hiring me to pull this all together for you? Are you working on a commercial product that would warrant the expense?

Keith


From: borch84 @.> Sent: Wednesday, August 24, 2022 11:32 AM To: espressif/esp-aws-iot @.> Cc: keith ssledlighting.com @.>; Comment @.> Subject: Re: [espressif/esp-aws-iot] MQTT publish and yield in one thread, which result in subscribe message blocked by publishing, how to improve efficiency? (CA-185) (Issue #89)

I am having problems for getting published messages from the AWS console. In my ESP32 code I have subscribed to the /update/delta, when the application is running, it is not getting anything from /update/delta if I changed the desired state from AWS Console.

So this is what I am doing:

  1. Subscribe to /update/delta

returnStatus = SubscribeToTopic( SHADOW_TOPIC_STR_UPDATE_DELTA( THING_NAME, SHADOW_NAME ), SHADOW_TOPIC_LEN_UPDATE_DELTA( THING_NAME_LENGTH, SHADOW_NAME_LENGTH ) );

  1. Start thread process_messages:

void process_messages() { MQTTStatus_t status; uint32_t timeoutMs = 100; // This context is assumed to be initialized and connected. MQTTContext_t * pMQTTContext = &mqttContext; while( true ) { status = MQTT_ProcessLoop( pMQTTContext, timeoutMs );

    if( status != MQTTSuccess )
    {
        // Determine the error. It's possible we might need to disconnect
        // the underlying transport connection.
        LogError(("MQTT Process Loop Error"));
    }
    else
    {
        // Other application functions.
        LogInfo(("MQTT Process Loop Info"));
    }
    vTaskDelay(10 / portTICK_RATE_MS);

}

}

  1. Publish the desired stated message in the /update topic with this content:

{"state":{"desired":{"powerOn":0}}}

  1. Since the reported state is powerOn = 1, then it AWS IoT publishes a message in /update/delta:

{ "version": 5006, "timestamp": 1661310883, "state": { "powerOn": 0 }, "metadata": { "powerOn": { "timestamp": 1661310883 } } }

  1. At this point I would assume that my thread process_message listens for the message published in /update/delta, but it is not. I have defined in the eventCallBack the following action when it detects that message is ShadowMessageTypeUpdateDelta

      if( messageType == ShadowMessageTypeUpdateDelta )
      {
          /* Handler function to process payload. */
          updateDeltaHandler( pDeserializedInfo->pPublishInfo );
      }
    

All of these functions were taken from the demo, I am just adding the MQTT_processLoop in a thread for listening the upcoming messages published by the AWS console or from an application that is changing the state of the shadow document.

The demo works good, it reads the message from /update/delta, but the logic of the demo is sequential, just for showing step by step what it should normally do, but it does not implement a real use case.

If you know how to solve this, please let me know. Thanks!

— Reply to this email directly, view it on GitHubhttps://github.com/espressif/esp-aws-iot/issues/89#issuecomment-1225138687, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AGGOKE546O4BLFQOYLC7K4LV2WJUHANCNFSM5M75PDUA. You are receiving this because you commented.Message ID: @.***>

SolidStateLEDLighting avatar Aug 24 '22 03:08 SolidStateLEDLighting

I am having problems for getting published messages from the AWS console. In my ESP32 code I have subscribed to the /update/delta, when the application is running, it is not getting anything from /update/delta if I changed the desired state from AWS Console.

So this is what I am doing:

  1. Subscribe to /update/delta
returnStatus = SubscribeToTopic( SHADOW_TOPIC_STR_UPDATE_DELTA( THING_NAME, SHADOW_NAME ),
                                                 SHADOW_TOPIC_LEN_UPDATE_DELTA( THING_NAME_LENGTH, SHADOW_NAME_LENGTH ) );
  1. Start thread process_messages:
void process_messages()
{
    MQTTStatus_t status;
    uint32_t timeoutMs = 100;
    // This context is assumed to be initialized and connected.
    MQTTContext_t * pMQTTContext = &mqttContext;
    while( true )
    {
        status = MQTT_ProcessLoop( pMQTTContext, timeoutMs );

        if( status != MQTTSuccess )
        {
            // Determine the error. It's possible we might need to disconnect
            // the underlying transport connection.
            LogError(("MQTT Process Loop Error"));
        }
        else
        {
            // Other application functions.
            LogInfo(("MQTT Process Loop Info"));
        }
        vTaskDelay(10 / portTICK_RATE_MS);

    }
}
  1. Publish the desired stated message in the /update topic with this content:

{"state":{"desired":{"powerOn":0}}}

  1. Since the reported state is powerOn = 1, then it AWS IoT publishes a message in /update/delta:

{
  "version": 5006,
  "timestamp": 1661310883,
  "state": {
    "powerOn": 0
  },
  "metadata": {
    "powerOn": {
      "timestamp": 1661310883
    }
  }
}
  1. At this point I would assume that my thread process_message listens for the message published in /update/delta, but it is not. I have defined in the eventCallBack the following action when it detects that message is ShadowMessageTypeUpdateDelta
            if( messageType == ShadowMessageTypeUpdateDelta )
            {
                /* Handler function to process payload. */
                updateDeltaHandler( pDeserializedInfo->pPublishInfo );
            }

All of these functions were taken from the demo, I am just adding the MQTT_processLoop in a thread for listening the upcoming messages published by the AWS console or from an application that is changing the state of the shadow document.

The demo works good, it reads the message from /update/delta, but the logic of the demo is sequential, just for showing step by step what it should normally do, but it does not implement a real use case.

If you know how to solve this, please let me know. Thanks!

Hi @borch84, I will try to replicate the issue that you are describing. Meanwhile, we offer a production-ready starting point that you can refer to, here. It uses this esp-aws-iot port of the LTS libraries but offers a single, extensible example that combines various functionalities. Since it uses coreMQTT-Agent, the MQTT_ProcessLoop issue that you described is solved by moving MQTTAgent_CommandLoop to its own task.

Hope this helps, Dhaval

dhavalgujar avatar Aug 24 '22 09:08 dhavalgujar

Thanks for the info @dhavalgujar Do you think the esp32 production example works with DevKitC? I am not using esp32c3, mine has only 4MB of flash. So instead of using the MQTT_ProcessLoop, the reference example uses MQTTAgent_CommandLoop for listening incomming messages, maybe I can try to implement the MQTTAgent_CommandLoop in my thread instead.

Another question, is the reference example using thing shadow? I saw that it just subscribes to a topic, but not the shadow topics used by shadow document update process.

borch84 avatar Aug 24 '22 13:08 borch84

@borch84, The reference example does not have anything specific to the ESP32-C3 (apart from the temperature sensor reading API, which can be easily changed), so it should work for your ESP32 DevKitC with 4MB of flash with little modification.

The reference example does not include a demo of Device Shadow, but you can add a task that uses the coreMQTT-Agent for Device Shadow, in a similar manner to the PubSub and OTA tasks. Please refer to the shadow_device_task.c file here for an example of how this can be accomplished.

dhavalgujar avatar Aug 24 '22 13:08 dhavalgujar

Thanks for the information @dhavalgujar I will try to implement the example reference and incorporate the device shadow task. Best regards

borch84 avatar Aug 26 '22 15:08 borch84