Is Consumer::next cancel safe (for both Single and Multi consumers)?
I have a high throughput system that needs to be able to process many requests in parallel. Each operation can take 10s of seconds in a synchronous API call.
I currently have my system architected with a single pulsar consumer connection worker and many worker tasks which fetch requests and send ack/nack requets back. It was found that having hundreds of pulsar consumers in a process, each listening to many topics was expensive and wasteful.
loop {
select! {
biased;
ack_nack_request = self.ack_nack_rx.recv() => {
self.handle_ack_request(ack_nack_request).await;
}
maybe_message = self.consumer.next(), if self.current_fetch_request.is_some() => {
self.handle_new_pulsar_message(maybe_message, false);
}
maybe_fetch_request = self.fetch_request_rx.recv(), if self.current_fetch_request.is_none() && !self.fetch_request_rx.is_closed() => {
self.handle_new_message_request(maybe_fetch_request);
}
}
}
In scale testing, we have seen some strange behavior where some messages get "stuck" until the consumer pods are restarted.
I have verified most of the API calls in my core select are cancel safe, however, future-util next is ambiguous. The best I can find online (https://rfd.shared.oxide.computer/rfd/400) suggests:
StreamExt::next is not documented to be cancel-safe, and its cancel safety depends on how the underlying Stream behaves. However, a Stream that couldn't handle interruptions while generating its values would clearly be buggy. After all, the entire point of a Stream is to iterate over values in situations where the next item isn't immediately available
I poked around source code attempting to find cancel-unsafe calls, but the call chain is non-trivial to analyze.
Is pulsar-rs documented to be cancel safe/unsafe?
See https://github.com/streamnative/pulsar-rs/issues/346 for more details on why I've come to this architecture, due to difficulties with the API surface.