google-cloud-rust icon indicating copy to clipboard operation
google-cloud-rust copied to clipboard

Nack messages when MessageStream is dropped

Open damjankuznar opened this issue 1 year ago • 2 comments

Currently the "prefetched/buffered" messages that are in the MessageStream queue are silently dropped when MessageStream is dropped which causes the messages to be retried only after stream_ack_deadline_seconds passes and message is retried by PubSub.

This doesn't work well in low latency scenarios when a process needs to be restarted, e.g. deploying a new app version.

Ideally, we would have an option to:

  • stop the stream (e.g. stop receiving new messages from PubSub streaming pull) and
  • nack (or process) all the remaining messages from the queue

which would support cases where an app graceful shutdown is important.

damjankuznar avatar May 13 '24 10:05 damjankuznar

As you say, when a message remains in the queue at the time of MessageStream drop, it is not nacked and waits for redelivery.

Therefore, we are currently considering the following interfaces

  • Addition of MessageStream#dispose This will perform a nack on all messages present in the queue when disposing.
let token = tokio_util::sync::CancellationToke::new();
let child_token = token.child_token();

let task = tokio::spawn(async move {
   let config = SubscribeConfig::default().with_cancellable_by(child_token);
   let mut stream = subscription.subscribe(Some(config)).await.unwrap();
            
    // None when the token is cancelled
    while let Some(message) = stream.next().await {
         ...
    }
    stream.dispose().await
});

token.cancel();
task.await();

or

let token = tokio_util::sync::CancellationToke::new();
let child_token = token.child_token();

let task = tokio::spawn(async move {
   let mut stream = subscription.subscribe(None).await.unwrap();
            
    while let Some(message) = tokio::select! {
        msg = stream.next() => msg,
        _ = child_token.cancelled() => None
    } {
         ...
    }
    stream.dispose().await
});

token.cancel();
task.await();

yoshidan avatar May 14 '24 09:05 yoshidan

Both examples seem more or less the same, so not sure what's the difference supposed to be. In any case, this looks exactly what I'm looking for, so :+1: from me.

damjankuznar avatar May 14 '24 15:05 damjankuznar

thank you @yoshidan

damjankuznar avatar May 17 '24 05:05 damjankuznar