pubsub: Many instances of "The StreamingPull stream closed for an expected reason and should be recreated ..."
Client
cloud.google.com/go/pubsub v1.37.0
Environment
GKE
Go Environment
$ go version
go version go1.21.8 darwin/arm64
$ go env
GO111MODULE=''
GOARCH='arm64'
GOBIN=''
GOCACHE='/Users/Harald/Library/Caches/go-build'
GOENV='/Users/Harald/Library/Application Support/go/env'
GOEXE=''
GOEXPERIMENT=''
GOFLAGS=''
GOHOSTARCH='arm64'
GOHOSTOS='darwin'
GOINSECURE=''
GOMODCACHE='/Users/Harald/go/pkg/mod'
GONOPROXY='github.com/dietdoctor/*'
GONOSUMDB='github.com/dietdoctor/*'
GOOS='darwin'
GOPATH='/Users/Harald/go'
GOPRIVATE='github.com/dietdoctor/*'
GOPROXY='https://proxy.golang.org,direct'
GOROOT='/Users/Harald/go/pkg/mod/golang.org/[email protected]'
GOSUMDB='sum.golang.org'
GOTMPDIR=''
GOTOOLCHAIN='auto'
GOTOOLDIR='/Users/Harald/go/pkg/mod/golang.org/[email protected]/pkg/tool/darwin_arm64'
GOVCS=''
GOVERSION='go1.21.8'
GCCGO='gccgo'
AR='ar'
CC='clang'
CXX='clang++'
CGO_ENABLED='1'
GOMOD='/Users/Harald/dd/hive/go.mod'
GOWORK=''
CGO_CFLAGS='-O2 -g'
CGO_CPPFLAGS=''
CGO_CXXFLAGS='-O2 -g'
CGO_FFLAGS='-O2 -g'
CGO_LDFLAGS='-O2 -g'
PKG_CONFIG='pkg-config'
GOGCCFLAGS='-fPIC -arch arm64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -ffile-prefix-map=/var/folders/j2/0w9hqz1x01g3j4_yz0w0sw780000gp/T/go-build3312079878=/tmp/go-build -gno-record-gcc-switches -fno-common'
Code
func initPubSubClient(cc *cli.Context, log logrus.FieldLogger) (*pubsub.Client, error) {
clientOpts := googleAPIClientOptions(cc, log)
log.Debug("Creating a new pubsub client.")
client, err := pubsub.NewClient(context.Background(), cc.String("gcp-project"), clientOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create a pubsub client: %v", err)
}
log.Debug("Pubsub client created.")
return client, nil
}
func googleAPIClientOptions(cc *cli.Context, log logrus.FieldLogger) []option.ClientOption {
var clientOpts []option.ClientOption
if cc.IsSet("gcp-credentials-file") {
log.Debugf("Using %s credentials file for Google API auth.", cc.String("gcp-credentials-file"))
clientOpts = append(clientOpts, option.WithCredentialsFile(cc.String("gcp-credentials-file")))
}
return clientOpts
}
then using it like
func Action(log *logrus.Logger) cli.ActionFunc {
fn := func(cc *cli.Context) error {
psubClient, err := initPubSubClient(cc, log)
if err != nil {
return err
}
...
c := Ctrl{
sub: psubClient.Subscription(name),
}
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
select {
case sig := <-sigChan:
log.Infof("Received signal, exiting: %s", sig)
return psubClient.Close()
case <-ctx.Done():
log.Infof("Received context cancel signal, exiting: %s", ctx.Err())
return psubClient.Close()
}
})
g.Go(func() error {
return c.sub.Receive(ctx, c.msgHandler)
})
return g.Wait()
}
return newAction("name", log, fn)
}
Expected behavior
No error messages.
Actual behavior
The StreamingPull stream closed for an expected reason and should be recreated, which is done automatically if using Cloud Pub/Sub client libraries. Refer to https://cloud.google.com/pubsub/docs/pull#streamingpull for more information.
Screenshots
Hi, how long have you been experiencing this issue? Does this correspond to a recent version bump of the Pub/Sub library?
Separately, in the last code block you have, I wasn't able to see where exactly Receive is called. Could you amend that block to include that?
Hi @hongalex!
My Honeycomb data stretches back for 2 months and the issue has been going on at least since then. At that point we were using pubsub v1.36.2, and since about 1 month we are using pubsub v1.37.0 and they both show this issue.
I have amended my code block to include info on how Receive is called 🤗
StreamingPull streams are periodically closed every 30ish minutes, which your above graph confirms. This is intentional for the server to reassign resources properly. While this behavior isn't documented specifically, StreamingPull streams being closed with a non-ok error is normal: https://cloud.google.com/pubsub/docs/pull-troubleshooting#troubleshooting_a_streamingpull_subscription
Given that the error isn't tied to poor behavior, this is working as intended. If you have noticed your streams behaving poorly though, please let us know so we can investigate further.
Closing this issue since I think my previous comment answers this. If you're experiencing other kinds of unexpected behavior, please open another issue and I'll investigate there.
We've been experiencing this noise for several months and I finally got around to looking into it (which caused me to stumble upon this thread). We, like @HaraldNordgren, use otel & honeycomb. I think the root of this noise goes back a change implementing a bridge between opencensus and otel in the google apis, although I'm not sure. My hunch is that these traces suddenly started being reported due to this.
It's unfortunate that the streamingPull interruption is always an error and the trace is reported accordingly. This really throws a wrench in a lot of our dashboards/queries. Right now, it seems the only real options we have are to either filter these out at some layer of our otel stack or to disable telemetry on the client in our code.
Mostly just writing this small essay as additional information for anyone else that stumbles upon this and maybe to hear if there's other options I didn't consider.
Yeah so you're close, but it was actually added as part of https://github.com/googleapis/google-api-go-client/pull/2127 which is in a different repo. The change you linked above was for an internal trace package, that Pub/Sub currently doesn't rely on. The PR I linked adds a grpc otel stat handler to our underlying cloud.google.com/go/pubsub/apiv1 package, which cloud.google.com/go/pubsub wraps.
I agree that it is unfortunate that StreamingPull returning an non-nil error affects your dashboards.
If it helps, we are close to launching native instrumentation for Pub/Sub, tracked in #4665 which traces the message lifecycle more closely than gRPC events. Once this is supported, I imagine you could try only using this instrumentation, while disabling the underlying client telemetry with
c, err := pubsub.NewClientWithConfig(ctx, projectID,
// This enables tracing on the high level client
&pubsub.ClientConfig{EnableOpenTelemetryTracing: true},
// This disables tracing on the lower level gRPC client
option.WithTelemetryDisabled(),
)
Ah sweet, there was no chance I was going to ever hunt that one down, so glad that you had it! Makes a ton of sense.
I had already found the ability to disable, and I will keep an eye on your linked issue so we can get visibility back when it's ready!
Appreciate the response!