Google Pub/Sub Transport fails with error: "The service was unable to fulfill your request. Please try again. [code=8a75]
Describe the bug Pub/Sub transport eventually fails with the error:
The service was unable to fulfill your request. Please try again. [code=8a75]
To Reproduce Steps to reproduce the behavior:
Setup Pub/Sub:
builder.UseWolverine(options =>
{
options
.UsePubsub("...")
.AutoProvision()
.EnableDeadLettering()
.EnableSystemEndpoints()
.PrefixIdentifiersWithMachineName()
.UseConventionalRouting();
}
Start up the application and observe that the Pub/Sub listener works for a period of time.
After some time, the following errors will occur:
[20:18:51.455 ERR] pubsub://gcp-motion-sbx/Charless-MacBook-Pro.Motion.Apps.Features.Examples.Echo: Error while trying to retrieve messages from Google Cloud Platform Pub/Sub, attempting to restart listener (5/5)... ()
Grpc.Core.RpcException: Status(StatusCode="Unavailable", Detail="The service was unable to fulfill your request. Please try again. [code=8a75]")
at Google.Api.Gax.Grpc.AsyncResponseStream`1.MoveNextAsync(CancellationToken cancellationToken)
at Wolverine.Pubsub.Internal.BatchedPubsubListener.<>c__DisplayClass1_0.<<StartAsync>b__1>d.MoveNext() in /home/runner/work/wolverine/wolverine/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs:line 50
--- End of stack trace from previous location ---
at Wolverine.Pubsub.Internal.PubsubListener.listenForMessagesAsync(Func`1 listenAsync) in /home/runner/work/wolverine/wolverine/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs:line 177
For some window of time, it is still possible to send + receive messages.
However, after some additional time, the listening/subscribe side fails as well.
Here is a minimum reproduction with a .NET Web API:
using Wolverine;
using Wolverine.Pubsub;
var builder = WebApplication.CreateBuilder(args);
// Place your credentials file at the root
Environment.SetEnvironmentVariable(
"GOOGLE_APPLICATION_CREDENTIALS",
"application_default_credentials.json"
);
builder.UseWolverine(options =>
{
options
.UsePubsub("gcp-motion-sbx")
.AutoProvision()
.EnableDeadLettering()
.EnableSystemEndpoints()
.PrefixIdentifiersWithMachineName()
.UseConventionalRouting();
});
// A timer here so we can print out how long we waited.
Task.Run(async () =>
{
int iterations = 0;
while (true)
{
await Task.Delay(1000 * 60);
Console.WriteLine($"Elapsed time: {++iterations} minutes");
}
});
builder.Services.AddControllers();
var app = builder.Build();
app.MapControllers();
app.Run();
The error will occur around the the 1 minute mark.
Expected behavior Pub/Sub listener is kept alive/recovers from transient errors.
I think there is an issue with the connection management either originating from the Pub/Sub side or the implementation details.
See the following relevant issues previously reported:
- https://github.com/googleapis/google-cloud-dotnet/issues/12964
- https://github.com/googleapis/google-cloud-dotnet/issues/12671
Screenshots If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
- OS: [e.g. iOS] macOS 15.6
- Browser [e.g. chrome, safari] N/A
- Version [e.g. 22] [email protected], [email protected]
This looks like an issue with the underlying library API calls being used by the Pub/Sub adapter.
Here's a minimum repro: https://github.com/CharlieDigital/dn-pubsub-error-repro
- Place a file
application_default_credentials_sandbox.jsonat the root of the project (or update thelaunchSettings.json) with your credentials file location - Modify the file
PubSubService.cswith a project ID and queue name
Tried on two machines and confirmed that it fails on both with the exact same error. In this failure mode, no messages are sent nor received; the pull is simply started.
Details:
- The queue is auto-provisioned via Wolverine
- The default options can be seen here: https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub/PubsubOptions.cs#L30-L46
public class CreateSubscriptionOptions
{
public int AckDeadlineSeconds = 10;
public DeadLetterPolicy? DeadLetterPolicy = null;
public bool EnableExactlyOnceDelivery = false;
public bool EnableMessageOrdering = false;
public ExpirationPolicy? ExpirationPolicy = null;
public string? Filter = null;
public Duration MessageRetentionDuration = Duration.FromTimeSpan(TimeSpan.FromDays(7));
public bool RetainAckedMessages = false;
public RetryPolicy RetryPolicy = new()
{
MinimumBackoff = Duration.FromTimeSpan(TimeSpan.FromSeconds(10)),
MaximumBackoff = Duration.FromTimeSpan(TimeSpan.FromSeconds(600))
};
}
https://github.com/user-attachments/assets/1b7bb43b-21bd-4104-b2d0-dc6ef3b33201
This is more or less the same loop of code here: https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs#L50
Per the SDK team on the Pub/Sub SDK, the recommendation is to use the SubscriberClient instead of the SubscriberApiClient.
@jay-zahiri @jeremydmiller , I'm open to taking on the work trying to swap this out, but curious if there was an underlying reason to use the lower level SubscriberApiClient over the SubscriberClient.
Hi @CharlieDigital I'd go ahead and give it a shot. I honestly don't remember right now right of the bat the exact reason but I'm pretty sure it was to have more control over the workflow to make it work with Wolverine, I'm thinking acking and so on.
Try swapping it out, maybe you'll have the same realization as me then you can refresh my memory xD
The issues you linked seems to have beens solved with latest version of the Pub/Sub SDK (at least version 3.18.0 – the version Wolverine uses)? Or at least it is expected?
Right - those all seem expected. (I might update the logging to include the client index in the "Retrying with no backoff" part at some point, but that's slightly tricky.) Sounds like all is okay :) (I'm hoping at some point that pull streams will last longer, but that's a long term issue with various moving parts, and is more of a nice-to-have than a real problem.)
Warnings in GRPC ClientSubscriber Pubsub
Maybe handle that error explicitly?
We should aslo update the SDK maybe, seem like it's a bit behind Pub/Sub SDK - Version history. Based on the commits I can see they've done a bunch fixes and improvements on both SubscriberClient and SubscriberApiClient. Also made SubscriberClient even more performant as well. So if you can manage to swap it, we might gain some performance.
Do you have a link to SDK team's recommendation? All I can find is this Pub/Sub SDK - Getting started
Might be useful to look at how they implemented SubscriberClient too.
@jay-zahiri , suggestion comes from Jon Skeet here: https://github.com/googleapis/google-cloud-dotnet/issues/15057#issuecomment-3226885267
I saw that Wolverine is currently referencing 3.24 (not the latest, but not 3.18): https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub/Wolverine.Pubsub.csproj#L13
Are you using this in production anywhere? Wondering how you worked around the fairly consistent connection failures on the stream.
Ah, I see. I thought it was something in the docs. Thank you!
You're right, I was looking at my local copy.
No, I've only used it with the emulator so far (building a SaaS) so this is a great catch!
Swapping it seems to be the best option but like I said that might not be possible for wiring it up with Wolverine.
Swapping it seems to be the best option but like I said that might not be possible for wiring it up with Wolverine.
What do you see as the challenge here? (Just so I know your thought process here if I try to swap this)
It seems it should be possible to use the SubcriberServiceApiClient for other low level operations and use the SubscriberClient for just the pull by swapping it out here: https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs#L28-L55 for something more like this from the docs:
protected override async Task ExecuteAsync(CancellationToken stoppingToken) =>
await _subscriberClient.StartAsync((msg, token) =>
{
_logger.LogInformation($"Received message {msg.MessageId}: {msg.Data.ToStringUtf8()}");
// Handle the message.
return Task.FromResult(SubscriberClient.Reply.Ack);
});
Maybe in wiring it up with Wolverine and its processes.
We could use PublisherServiceApiClient and SubscriberServiceApiClient to create the resources and the regular clients for the rest of the workflow. Maybe dump the messages as they come in, in Wolverines persistence for the local queue.
I can take a look and come up with an initial draft this week, and we can take a look at it.
Happy to loop in; we'd like to use GCS Pub/Sub in prod and if you're planning on moving your use case upstream off of the emulator, it will definitely need to swap the client implementation.
Hi folks 👋 Just wanted to check in to see if there have been any updates or thoughts since the last discussion. Really appreciate all the work you’re doing here! Our team is actively using Pub/Sub in our applications and would love to know if there’s been any progress on this.
Hey @jay-zahiri, is there anything new with this? Do I need to get involved?
And just to be annoying, do remember folks that support plans are available from https://jasperfx.net that does mean that your issues get prioritized and addressed quickly.
This is still sitting on the backburner (OK, more like on a trivet at the moment 🤣; we are now looking at making the leap to Kafka instead).
Question: are there docs for the testing?
- Does it require manual spin-up of the emulator?
- Can I add it as a Test container?
- What command are you using for the tests?
I am running the emulator with the following command based on the config in TestingExtensions.cs:
gcloud beta emulators pubsub start --project=wolverine
However, test cases are throwing exceptions and unable to connect. Mix of failures between the database and connecting to Pub/Sub
I've put up a PR here: https://github.com/JasperFx/wolverine/pull/1700
Will do more testing tomorrow.
Hey all, I'm going to tackle this improvement later. Not sure if it will hit 5.0 yet.
I'm taking this on as part of V5 because it's going to lead to breaking changes.
Notes
- Create separate Endpoint types for
PubsubTopicandPubsubSubscription. Hang subscriptions off ofPubsubTopicsimilar to how Rabbit MQ internals or ASB internals work - Collapse
PubsubServerOptions,PubsubTopicOptions,CreateTopicOptions,PubsubSubscriptionOptions,CreateSubscriptionOptionsdown to the endpoint classes - Combine
PubsubClientOptions,PubsubRetryPolicy,PubsubDeadLetterOptionsmostly down to the Transport and a dedicated DeadLetterQueue topic - Encapsulate the usage of
PublisherServiceApiClientandSubscriberServiceApiClientinto the transport type - Rewrite
InlinePubsubListenerto use SubscriptionClient. Pass in a GcpEnvelope that records the Ack value - Replace
BatchedPubsubListenerwith just a newStreamingPubsubListener. Doesn't really have to batch. But ack after dropping it on the receiver. The WolverineIReceiveris dealing w/ the lifecycle after than anyway - Rewrite
PubsubTopic.SendMessageAsync()to usePublisherClient - Use
PublisherClientforPubsubSenderProtocol
Punchlist
- [x] Bring back system control endpoints
- [ ] Bring back tests for finding a topic by Uri
- [ ] Bring back tests for finding a subscription by Uri
- [ ] All compliance tests pass
- [ ] Run a test that tries to send messages for a long time
- [ ] Create the GCP account and try to run it for realsies?
- [ ] Add a test doing leadership w/ GCP
- [x] Put diagnostic columns back in
We're just going w/ @danielwinkler 's changes. I do think the GCP transport could be more idiomatic, but let's get something working first and not mess w/ stylistic changes