pulsar-rs icon indicating copy to clipboard operation
pulsar-rs copied to clipboard

Streaming interface with ack

Open danielphan2003 opened this issue 9 months ago • 1 comments

Currently when we convert consumer into a stream via futures::FutureExt's .into_stream(self), we can no longer acknowledge messages as the consumer is owned by the stream and the ack apis assumes mutable ref to consumer

danielphan2003 avatar Apr 01 '25 18:04 danielphan2003

I don't know if it would work on your case but I did it like this

let consumer_stream = Box::pin(futures::stream::unfold(consumer, async |mut consumer| {
	let new_msg = consumer.try_next().await.inspect_err(|e| {
		tracing::error!("Error reading message from the consumer. {e}");
	});
	if let Ok(Some(msg)) = new_msg {
		match consumer.ack(&msg).await {
			Ok(_) => Some((msg.deserialize(), consumer)),
			Err(e) => {
				tracing::error!("Error acknowledging message. {e}");
				None
			}
		}
	} else {
		None
	}
}));

mzaniolo avatar Jul 24 '25 08:07 mzaniolo