Nack messages when MessageStream is dropped
Currently the "prefetched/buffered" messages that are in the MessageStream queue are silently dropped when MessageStream is dropped which causes the messages to be retried only after stream_ack_deadline_seconds passes and message is retried by PubSub.
This doesn't work well in low latency scenarios when a process needs to be restarted, e.g. deploying a new app version.
Ideally, we would have an option to:
- stop the stream (e.g. stop receiving new messages from PubSub streaming pull) and
- nack (or process) all the remaining messages from the queue
which would support cases where an app graceful shutdown is important.
As you say, when a message remains in the queue at the time of MessageStream drop, it is not nacked and waits for redelivery.
Therefore, we are currently considering the following interfaces
- Addition of
MessageStream#disposeThis will perform a nack on all messages present in the queue when disposing.
let token = tokio_util::sync::CancellationToke::new();
let child_token = token.child_token();
let task = tokio::spawn(async move {
let config = SubscribeConfig::default().with_cancellable_by(child_token);
let mut stream = subscription.subscribe(Some(config)).await.unwrap();
// None when the token is cancelled
while let Some(message) = stream.next().await {
...
}
stream.dispose().await
});
token.cancel();
task.await();
or
let token = tokio_util::sync::CancellationToke::new();
let child_token = token.child_token();
let task = tokio::spawn(async move {
let mut stream = subscription.subscribe(None).await.unwrap();
while let Some(message) = tokio::select! {
msg = stream.next() => msg,
_ = child_token.cancelled() => None
} {
...
}
stream.dispose().await
});
token.cancel();
task.await();
Both examples seem more or less the same, so not sure what's the difference supposed to be. In any case, this looks exactly what I'm looking for, so :+1: from me.
thank you @yoshidan