pulsar-rs icon indicating copy to clipboard operation
pulsar-rs copied to clipboard

Is Consumer::next cancel safe (for both Single and Multi consumers)?

Open chamons opened this issue 2 weeks ago • 1 comments

I have a high throughput system that needs to be able to process many requests in parallel. Each operation can take 10s of seconds in a synchronous API call.

I currently have my system architected with a single pulsar consumer connection worker and many worker tasks which fetch requests and send ack/nack requets back. It was found that having hundreds of pulsar consumers in a process, each listening to many topics was expensive and wasteful.

   loop {
            select! {
                biased;

                ack_nack_request = self.ack_nack_rx.recv() => {
                    self.handle_ack_request(ack_nack_request).await;
                }

                maybe_message = self.consumer.next(), if self.current_fetch_request.is_some() => {
                    self.handle_new_pulsar_message(maybe_message, false);
                }

                maybe_fetch_request = self.fetch_request_rx.recv(), if self.current_fetch_request.is_none() && !self.fetch_request_rx.is_closed() => {
                    self.handle_new_message_request(maybe_fetch_request);
                }
            }
}

In scale testing, we have seen some strange behavior where some messages get "stuck" until the consumer pods are restarted.

I have verified most of the API calls in my core select are cancel safe, however, future-util next is ambiguous. The best I can find online (https://rfd.shared.oxide.computer/rfd/400) suggests:

StreamExt::next is not documented to be cancel-safe, and its cancel safety depends on how the underlying Stream behaves. However, a Stream that couldn't handle interruptions while generating its values would clearly be buggy. After all, the entire point of a Stream is to iterate over values in situations where the next item isn't immediately available

I poked around source code attempting to find cancel-unsafe calls, but the call chain is non-trivial to analyze.

Is pulsar-rs documented to be cancel safe/unsafe?

chamons avatar Dec 05 '25 20:12 chamons

See https://github.com/streamnative/pulsar-rs/issues/346 for more details on why I've come to this architecture, due to difficulties with the API surface.

chamons avatar Dec 05 '25 21:12 chamons