google-cloud-rust icon indicating copy to clipboard operation
google-cloud-rust copied to clipboard

Pubsub: losing connections when using streamingpull

Open brunopereira27 opened this issue 5 months ago • 5 comments

Hi all,

We are experiencing issues with GCP Pubsub. I would like to hear your opinion on this, so I could contribute with a fix if needed.

Problem Description

After migrating from Pub/Sub push to pull using streaming pull connections, we're experiencing a critical issue where our application becomes completely unresponsive:

  1. Initial state: 8 streaming pull workers configured and running correctly
  2. Degradation: Connections drop one by one over time due to gRPC Cancelled responses
  3. Final state: When all connections are lost, .receive() never terminates, leaving the process stuck indefinitely with:
    • No errors logged
    • No automatic recovery
    • Silent failure requiring manual intervention

Root Cause Analysis

I've traced the issue to how Cancelled status codes are handled in the streaming subscriber:

let stream = match response {
    Ok(r) => r.into_inner(),
    Err(e) => {
        if e.code() == Code::Cancelled {
            tracing::trace!("stop subscriber : {}", subscription);
            break;  // <-- Subscriber stops permanently on Cancelled
        } else if retryable_codes.contains(&e.code()) {
            tracing::warn!("failed to start streaming: will reconnect {:?} : {}", e, subscription);
            continue;
        } else {
            tracing::error!("failed to start streaming: will stop {:?} : {}", e, subscription);
            break;
        }
    }
};

The Cancelled status is treated as a terminal condition rather than a retryable error.

Proposed Solution

I've tested making Cancelled a retryable status code in a fork. Results after several hours:

✅ All connections remain stable ✅ No more silent failures ✅ Automatic recovery from Cancelled responses

Questions for Maintainers

Is there a specific reason Cancelled is treated as non-retryable? (I noticed the Golang implementation also treats it as non-retryable) Could there be side effects from making Cancelled retryable that I haven't considered? If making Cancelled retryable isn't appropriate, what alternative approach would you recommend for handling these connection drops?

I'm happy to submit a PR with either my current fix or an alternative implementation based on your guidance.

brunopereira27 avatar Jul 21 '25 05:07 brunopereira27

Thank you for reporting.

Is there a specific reason Cancelled is treated as non-retryable?

No. It is the same as Go.

Could there be side effects from making Cancelled retryable that I haven't considered?

The conditions that cause Cancelled cannot be reproduced, but I don't think there will be any side effects from including Cancelled in the retry target. If the retry is successful, it is reasonable to include it in the retry.

Therefore, could you please create a PR with a policy to include “Cancelled” in the default retry for pull?

yoshidan avatar Jul 22 '25 13:07 yoshidan

When all connections are lost, .receive() never terminates, leaving the process stuck indefinitely with

This needs to be corrected separately.

yoshidan avatar Jul 22 '25 13:07 yoshidan

Indeed, I can't find a way to reproduce the Cancelled state as it seems to be purely server side conditions that I ignore, but I can now confirm 100% after 1 week testing in production that we have connections that are cancelled, and after a few reconnection retries, it get connected again with success, and no visible side-effect that I can observe for the moment.

I will open a PR for this before end of the week. Regarding the 2nd fix needed, with .receive() that never quits gracefully, I haven't implemented anything yet, but I can give it a try.

brunopereira27 avatar Jul 24 '25 03:07 brunopereira27

Thank you very much.

Regarding the 2nd fix needed, with .receive() that never quits gracefully

I will consider implementing this myself, so you don't need to worry about it for now.

yoshidan avatar Jul 26 '25 13:07 yoshidan

We have addressed reconnection upon Code::Cancelled and resolved the issue where subscribers did not terminate. However, the .receive() method is no longer available in versions 1.5.0 and later. please use the .subscribe() method instead.

yoshidan avatar Sep 13 '25 03:09 yoshidan