rumqtt icon indicating copy to clipboard operation
rumqtt copied to clipboard

RFC(rumqttc): `publish` / `subscribe` / `unsubscribe` methods return a promise that resolves into pkid when packet is handled by `Eventloop`

Open de-sh opened this issue 1 year ago • 29 comments

This is the RFC for API changes in publish / subscribe / unsubscribe in rumqttc to resolve #349. Feel free to comment and provide feedback, thanks!

Example in the case of Publish

// async
let promise = async_client.publish(..).await?;
let pkid = promise.await;

// blocking
let promise = client.publish(..)?;
let pkid = promise.blocking_recv();

Within rumqttc this will be implemented similar to how oneshot channels work, i.e. here we will respond onto the oneshot channel with the pkid value. Also we need not go so far when we know that the publish is on QoS 0 or when pkid is non-zero by default maybe?

Please note that this is not a breaking change as users can just ignore/drop the promise token.

de-sh avatar Feb 22 '24 12:02 de-sh

so something like, publish will return Pkid where:

struct Pkid {
  id_rx: Receiver<u32>,
}

impl Future for Pkid {
  type Output = u32;
  fn poll(..) -> .. {
    // if we recv something on -d_rx, return id
    // we will send on that channel for state.
  }
}

right?

so I am curious how are we going to send the Sender to state so we can use it for sending the pkid?

when pkid is non-zero by default maybe?

when is it non-zero by default?

as users can just ignore/drop the promise token.

that would mean the receiver is dropped as well. this would cause the Sender to fail!

swanandx avatar Feb 22 '24 13:02 swanandx

impl Future for Pkid {
 type Output = u32;
 fn poll(..) -> .. {
   // if we recv something on -d_rx, return id
   // we will send on that channel for state.
 }
}

We can just use oneshot as is, no need of reimplementing

so I am curious how are we going to send the Sender to state so we can use it for sending the pkid?

Include it in the request?

when is it non-zero by default?

When the publish is edited before being sent, or in the case of uplink when it is read back from file, user can choose to do that, Eventloop will skip the pkid set stage in outgoing_*

that would mean the receiver is dropped as well. this would cause the Sender to fail!

Not in the case of oneshot, it might error out, but we need not care about this either ways from within Eventloop

de-sh avatar Feb 22 '24 13:02 de-sh

Include it in the request?

gotcha. so unlike https://github.com/bytebeamio/rumqtt/compare/main...pkid-poc , by using the channels we are avoiding direct use of Arc<Mutex<_>> !

Not in the case of oneshot, it might error out,

that is what i meant by fail :sweat_smile:

but we need not care about this either ways from within Eventloop

okie.

swanandx avatar Feb 22 '24 13:02 swanandx

Hi, I am one of the possible users of this functionality from #349 :) Thanks for bringing this up again

Could you elaborate on the usage pattern for the returned promises in a multi-message scenario?

For a single message, it makes sense to wait for the return of the pkid and then continue. In case that I have a high channel capacity and an limited inflight queue, then the task using the client would block/wait at high load peaks, where the event loop would need some time to process the message. Sure, I could spawn a task that listens to the one-shot channel and after receiving the pkid it passes it on to an mpsc that gathers all the pkids.

Also the processing of the promise is a bit time critical because if I do it to late, the one-shot channel would still have a pkid value, whose PubAck was already processed by the eventloop.

Sorry that I don't have any constructive feedback, just dropping some thoughts from the user perspective

dlips avatar Feb 22 '24 14:02 dlips

Could you elaborate on the usage pattern for the returned promises in a multi-message scenario?

  1. We could do a lot of things here when considering async code, but not much when it comes to sync code.
  2. One could use the joinset.join_next() to stream pkids

de-sh avatar Feb 22 '24 14:02 de-sh

joinset.join_next() is exactly what I was looking for. Missed it when looking through the tokio docs. That would solve my problem. Thank you :)

dlips avatar Feb 22 '24 14:02 dlips

Should this feature be feature gated to ensure we don't inadvertently make life worse for other users? I am not entirely sure how memory/cpu usage would increase, maybe a good idea to figure out with some profiling @swanandx

de-sh avatar Feb 22 '24 16:02 de-sh

@de-sh Do we have a timeline for this inclusion? I need to be able to get pkid in order to wait for puback, in order to use this library.

cartertinney avatar Mar 27 '24 16:03 cartertinney

@de-sh Do we have a timeline for this inclusion? I need to be able to get pkid in order to wait for puback, in order to use this library.

We need a code review on the PR mentioned above to continue with this

de-sh avatar Mar 28 '24 08:03 de-sh

Adding more thoughts on this:

Maybe it is a good idea to have this same mechanism return the error responses relating to the publish packet so that the client side can keep track of the errors as well?

enum Error {
  Mqtt(mqttbytes::Error),
  PacketIdZero,
  EmptySubscription,
  OutgoingPacketTooLarge { .. },
}

type RequestResult = Result<Pkid, Error>;

de-sh avatar Mar 28 '24 08:03 de-sh

@de-sh

Another question regarding the proposal - is there a guarantee that the pkid will be returned to the user before the associated PubAck can arrive? I've used other MQTT libraries before where by the time the id is returned, the puback may have already arrived, which makes matching the ids difficult, since you have to handle the case where you may receive a puback for an id that you are not yet tracking in your application.

Although, admittedly, even if we can guarantee that the pkid will be returned prior to the actual publish packet being sent, there is still a race condition for the asynchronous application (can the application track the id returned by the PkidPromise before the puback gets received)

One solution I've seen before that sidesteps this is to have publish return a future for the eventual ack, but that probably clashes with the polling receive pattern on the event loop in rumqttc. The intended design appears to be that since the user is controlling the event loop, they're responsible for distributing information received on it, which does makes sense, but unfortunately does leave puback matching to be a rather unpleasant experience.

cartertinney avatar Mar 28 '24 16:03 cartertinney

(can the application track the id returned by the PkidPromise before the puback gets received)

While there might be a theoretical possibility of the puback being received faster than the pkid being resolved on the client side, it is practically impossible(afaik). Thus it should be fine to consider this circumstance as an extreme case?

I need to be able to get pkid in order to wait for puback, in order to use this library.

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

de-sh avatar Mar 31 '24 08:03 de-sh

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

The issue I'm trying to solve here is an application with distributed functionality - the main event loop and connection is handled/managed in a different place than the actual MQTT operations take place. For a simplified, but illustrative example, consider that publish logic is happening in a different thread/task. The publishing thread/task naturally wants to wait upon a PUBACK, but the PUBACK is actually received by the event loop in the main thread, thus there needs to be some kind concurrency solution to communicate this information.

This is further confounded if there are multiple publishers in multiple threads. The central event loop managing logic has no idea which publisher to deliver PUBACK notifications to, because the PKID of any given outgoing publish is only known to the publishing threads - the event loop can see it happen with an outgoing event, but it doesn't know which copy of the Client the publish came from. Thus, when the PUBACK comes in with a matching PKID, the main thread does not know which copy of the Client to send the PUBACK notification to. The only way around this that I've been able to think of, is that the publishing thread/task needs to communicate to the central thread which PKID it sent, so the central thread can send back the corresponding PUBACK notification accordingly. My concern is that this process may allow in some cases a PUBACK to be able to arrive before all the relevant information to handle it has been communicated and tracked.

In other libraries, I've seen this issue resolved with a publish returning a Future that can be awaited upon the eventual PUBACK, but I can appreciate that that might be somewhat against your design goals here.

cartertinney avatar Apr 01 '24 15:04 cartertinney

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

My motivation to get the PkIds was also to track which messages I sent got a PubAck from the MQTT Broker. So having the Future resolve after the PubAck was received instead of returning the PkId would simplify my use case because I do not have to manually write the code that correlates the returned PkIds with the PubAck events from the EventLoop.

dlips avatar Apr 02 '24 11:04 dlips

For what it's worth, the current implementation on the pkid branch seems to stop the event loop reporting PubAck right now. I don't receive PubAck at all on this branch, but I do receive it when using the main release. So yeah, I can get the pkid now, but I still can't match it with a PubAck.

After experimenting with the pkid branch in general, honestly, I'm starting to feel that .publish() really does need to return a PubAck future - trying to build the mechanism to get and deliver it on top just doesn't scale well to a complex application.

cartertinney avatar Apr 02 '24 16:04 cartertinney

Made some changes to the design, but basically the same concept, please review the POC presented in the following commit(with example), comments on this are welcome!

https://github.com/bytebeamio/rumqtt/commit/b6a45b7713eebf8207abbff5a7588317fd27f7ab

de-sh avatar Apr 09 '24 08:04 de-sh

Made some changes to the design, but basically the same concept, please review the POC presented in the following commit(with example), comments on this are welcome!

b6a45b7

I put some comments in the POC, please check. I like the implement of get notified on acknowledgements instead of doing matching additional pkids.

Should you need any assistance, don't hesitate to reach out. I'm eagerly anticipating the integration of this feature into the main branch.

xiaocq2001 avatar Apr 17 '24 03:04 xiaocq2001

I did some changes based on your POC and did some validation locally, I share the changes in PR here: https://github.com/bytebeamio/rumqtt/pull/851, feel free to use them to in improvement feature or discuss.

xiaocq2001 avatar Apr 26 '24 06:04 xiaocq2001

Given the successful POC and code reviews, I believe the ACK await feature is great. What are the next steps to ensure a smooth integration?

xiaocq2001 avatar May 07 '24 01:05 xiaocq2001

We are working on making time for the review(running low on bandwidth), post that we should be good to go. Would have been wonderful if there were more inputs along those lines from the community!

de-sh avatar May 08 '24 14:05 de-sh

According to the recent changes in https://github.com/bytebeamio/rumqtt/pull/869, the notice struct needs update.

FixedBitSet helps to reduce memory usage comparing to vec, but is not suitable for the ack waiting, since ack notification needs packet linked internal struct instead of single bit. For ack waiting, the current implement of HashMap seems to be the better fit (to keep order of packets, LinkedHashMap).

Is there any suggestion on balance the needs?

xiaocq2001 avatar May 23 '24 04:05 xiaocq2001

Another thing, while waiting ACKs, it's possible the connection is reconnected with broker CONNACK session present=0, in such case, the pending waitings should be terminated with some error (state error that session closed by broker).

xiaocq2001 avatar May 23 '24 06:05 xiaocq2001

What do you think about making the NoticeTx public? This will make possible the construction of the NoticeFuture for test and mocking via mockall. This includes also the success and error methods.

joshuachp avatar Jun 12 '24 09:06 joshuachp

I've rebased the acked branch onto main in #883.

Currently this conflicts with the FixedBitset changes as mentioned by @xiaocq2001 which means I had to revert some of it.

My idea on how to fix this such that only people who use this feature have to pay a performance penalty is like this:

Instead of always returning a NoticeFuture, instead allow anyone to create a NoticeTx and NoticeFuture pair and change the API so that instead of returning NoticeFuture they take a NoticeTx as parameter. Then also in the Mqtt state, keed the FixedBitSet plus additionaly a HashMap<u16, NoticeTx> but only populate that second one if a notification for the given message was actually requested (by passing a NoticeTx). This means that you don't have to pay any penalty when not using that feature (except for the occasional lookup in an empty HashMap, which should be fast).

FSMaxB avatar Jun 20 '24 16:06 FSMaxB

I think another good addition would be a non consuming try_wait(&mut self) method on the NotifceFuture. This will make it possible to not block while trying to wait for the acknowledgement of multiple packets.

For example, you are storing the NoticeFutures in a Vec, and you want to find the first acknowledge packet.

This would be similar to the try_recv(&mut) of the inner channel.

One implementation could be:

diff --git a/rumqttc/src/notice.rs b/rumqttc/src/notice.rs
index 4818335..2142e81 100644
--- a/rumqttc/src/notice.rs
+++ b/rumqttc/src/notice.rs
@@ -53,6 +53,21 @@ pub fn wait(self) -> NoticeResult {
     pub async fn wait_async(self) -> NoticeResult {
         self.0.await?
     }
+
+    /// Attempts to check if the broker acknowledged the packet, without blocking the current thread
+    /// or consuming the notice.
+    ///
+    /// It will return [`None`] if the packet wasn't acknowledged.
+    ///
+    /// Multiple calls to this functions can fail with [`NoticeError::Recv`] if the notice was
+    /// already waited and the packet was already acknowledged and [`Some`] value was returned.
+    pub fn try_wait(&mut self) -> Option<NoticeResult> {
+        match self.0.try_recv() {
+            Ok(res) => Some(res),
+            Err(oneshot::error::TryRecvError::Closed) => Some(Err(NoticeError::Recv)),
+            Err(oneshot::error::TryRecvError::Empty) => None,
+        }
+    }
 }
 
 #[derive(Debug)]

Here, it returns an Option instead of an error for the oneshot::error::TryRecvError::Empty, just to not add another variant to the error.

joshuachp avatar Jul 10 '24 13:07 joshuachp

I think another good addition would be a non consuming try_wait(&mut self) method on the NotifceFuture. This will make it possible to not block while trying to wait for the acknowledgement of multiple packets.

That's a great suggestion, thanks for the input!

de-sh avatar Jul 10 '24 17:07 de-sh