pulsar-rs
pulsar-rs copied to clipboard
Streaming interface with ack
Currently when we convert consumer into a stream via futures::FutureExt's .into_stream(self), we can no longer acknowledge messages as the consumer is owned by the stream and the ack apis assumes mutable ref to consumer
I don't know if it would work on your case but I did it like this
let consumer_stream = Box::pin(futures::stream::unfold(consumer, async |mut consumer| {
let new_msg = consumer.try_next().await.inspect_err(|e| {
tracing::error!("Error reading message from the consumer. {e}");
});
if let Ok(Some(msg)) = new_msg {
match consumer.ack(&msg).await {
Ok(_) => Some((msg.deserialize(), consumer)),
Err(e) => {
tracing::error!("Error acknowledging message. {e}");
None
}
}
} else {
None
}
}));