pulsar-rs icon indicating copy to clipboard operation
pulsar-rs copied to clipboard

Timeout/Disconnects during waits on the returned future during sending, safe to retry? a bug, since handled elsewhere?

Open chamons opened this issue 3 months ago • 5 comments

We have some pulsar delivery code that roughly looks like this:

        let send_future = match producer
            .send_non_blocking(
                topic,
                PulsarPayload {
                    payload,
                    deliver_at_time,
                },
            )
            .await
        {
            Ok(send_future) => send_future,
            Err(e) => {
                let _ = producer.close_producer(topic).await;
                return Err(PulsarDeliveryError::SendError(e));
            }
        };

        match send_future.await {
            Ok(_) => Ok(()),
            Err(e) => {
                let _ = producer.close_producer(topic).await;
                Err(PulsarDeliveryError::AwaitConfirmationError(e))
            }
        }

Under significant load, we receive a small % of errors in that second case (the one that returns AwaitConfirmationError) with either Disconnect or timeout.

Looking into the source code, I see in src/producer.rs that send_inner loops and retries if you get either one of those cases on the initial send.

However, inside the await:

                    let fut = async move {
                        let res = fut.await;
                        res.map_err(|e| {
                            error!("wait send receipt got error: {:?}", e);
                            Error::Producer(ProducerError::Connection(e))
                        })
                    };

We don't, which means they can bubble up.

Two questions:

  1. Is this a bug, shoudln't pulsar-rs handle this for us?
  2. Is it safe for us to retry

chamons avatar Sep 19 '25 15:09 chamons

@BewareMyPower - You looked into another of my issues, maybe you know?

chamons avatar Sep 19 '25 15:09 chamons

https://github.com/streamnative/pulsar-rs/blob/master/src/connection.rs#L731-L750 is the code I believe we're hitting

chamons avatar Sep 19 '25 15:09 chamons

If you're hitting the error in the code block you've shared, it means the send receipt was not received within the operation_timeout, which represents a timeout for any RPC (30 seconds by default). Though the Rust client handles the timeout a bit different from other clients, which have a separated send timeout config like https://github.com/apache/pulsar/blob/0c6ba1cf72fb12b9a97e3f8438d5c34a5918fb69/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java#L164

It's safe to retry, but it would be better to just configure a big operation timeout.

In other clients, the send timeout config is mainly used to discard some old messages, e.g. there are 1000 pending messages while the broker cannot handle them in time. If some messages failed due to timeout, they will be discarded with the timeout error and new messages won't be blocked due to the queueing of the old messages. The application can choose to record which messages are discarded due to timeout error in some places. Resending them could lead to message disordering.

Rust client does not implement the send timeout correctly. It just uses operation timeout to fail a Send RPC if no response in some time. Retry for this error might lead to duplicated messages, when the SendFuture fails with this error, the message might have already been sent, the timeout error is received might due to a bug on client side or broker side.

I'm wondering if the operation timeout is the default value. 30 seconds should be long enough, i.e. if a message was not sent successfully in 30 seconds, there must be something wrong somewhere.

BewareMyPower avatar Sep 20 '25 11:09 BewareMyPower

I'm a bit confused on the safety of retrying. I first read

It's safe to retry, but it would be better to just configure a big operation timeout.

then later:

Rust client does not implement the send timeout correctly. It just uses operation timeout to fail a Send RPC if no response in some time. Retry for this error might lead to duplicated messages, when the SendFuture fails with this error, the message might have already been sent, the timeout error is received might due to a bug on client side or broker side.

I've had retries enabled for the last few days in our test group and I haven't seen any duplicates yet, but it is something I'm concerned about.

chamons avatar Sep 25 '25 15:09 chamons

We might not have consensus about safety. When I mentioned "safe", I meant it didn't break anything, but it seems to mean "exactly-once" delivery in your context.

If your retry didn't produce duplicated messages, it might be that the RPC was somehow lost or not processed by broker. But it does not violate the "at-least-once" semantic because client SDK only makes its best efforts to delivery messages within the given timeout. If it still does not succeed, the caller will be notified with the error.

That's why I recommended configuring a greater timeout if you want a certain behavior (succeed or fail), while timeout error usually means uncertainty in network programming.

BewareMyPower avatar Sep 26 '25 02:09 BewareMyPower