pubsub: concurrent map write panic when publishing with telemetry enabled and shared attributes
Client
PubSub
Environment
$ go version
go version go1.23.4 darwin/arm64
Code and Dependencies
package main
import (
"context"
"fmt"
"sync"
"strings"
"cloud.google.com/go/pubsub"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
func initTracing(ctx context.Context) {
r, err := resource.New(context.Background(),
resource.WithSchemaURL(semconv.SchemaURL),
resource.WithAttributes(semconv.ServiceName("test-service")),
resource.WithTelemetrySDK(),
resource.WithContainerID(),
)
if err != nil {
panic(err)
}
client := otlptracegrpc.NewClient()
exporter, err := otlptrace.New(ctx, client)
if err != nil {
panic(err)
}
provider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(r),
)
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
)
otel.SetTracerProvider(provider)
}
func main() {
ctx := context.Background()
initTracing(ctx)
client, err := pubsub.NewClientWithConfig(ctx, "test-project", &pubsub.ClientConfig{EnableOpenTelemetryTracing: true})
if err != nil {
panic(err)
}
_, err = client.CreateTopic(ctx, "test-topic")
if err != nil {
if !strings.Contains(err.Error(), "AlreadyExists") {
panic(err)
}
}
tracer := otel.GetTracerProvider().Tracer("")
topic := client.Topic("test-topic")
topic.PublishSettings = pubsub.PublishSettings{CountThreshold: 1}
attributes := map[string]string{"my-key": "my-value"}
var wg sync.WaitGroup
for num := range 100 {
num := num
wg.Add(1)
go func() {
defer wg.Done()
// attributes := map[string]string{"my-key": "my-value"}
ctx, span := tracer.Start(context.Background(), "publish")
defer span.End()
msg := []byte(fmt.Sprintf("%d", num))
topic.Publish(ctx, &pubsub.Message{
Data: msg,
Attributes: attributes,
})
fmt.Printf("Published %d\n", num)
}()
}
wg.Wait()
topic.Stop()
otel.GetTracerProvider().(*sdktrace.TracerProvider).Shutdown(ctx)
}
go.mod
module github.com/steved/pubsub-repro
go 1.23.4
require (
cloud.google.com/go/pubsub v1.45.3
go.opentelemetry.io/otel v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
go.opentelemetry.io/otel/sdk v1.33.0
)
require (
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.11.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
cloud.google.com/go/iam v1.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
golang.org/x/crypto v0.30.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/api v0.210.0 // indirect
google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
)
Expected behavior
The answer could be that users should not be passing in a shared map of attributes, but I expected that any modification would only occur on a pubsub-local copy of the attributes.
Actual behavior
fatal error: concurrent map iteration and map write
goroutine 181 [running]:
reflect.mapiterinit(0x1400017f638?, 0x100875220?, 0x1400017f608?)
/opt/homebrew/Cellar/go/1.23.4/libexec/src/runtime/map.go:1520 +0x1c
reflect.(*MapIter).Next(0x100f8d880?)
/opt/homebrew/Cellar/go/1.23.4/libexec/src/reflect/value.go:1984 +0x60
google.golang.org/protobuf/internal/impl.sizeMap({0x100f8d880?, 0x1400013c7b0?, 0x1400013c7b0?}, 0x140001601e0, 0x14000172090, {0xec?})
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/codec_map.go:98 +0x130
google.golang.org/protobuf/internal/impl.encoderFuncsForMap.func1({0x0?}, 0x14000172090, {0xc0?})
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/codec_map.go:54 +0x58
google.golang.org/protobuf/internal/impl.(*MessageInfo).sizePointerSlow(0x140003a47e8, {0x68?}, {0x28?})
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:82 +0xc8
google.golang.org/protobuf/internal/impl.(*MessageInfo).sizePointer(0x1400017f828?, {0x1008046e8?}, {0x28?})
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:59 +0x8c
google.golang.org/protobuf/internal/impl.(*MessageInfo).size(0x1400017f8c8?, {{}, {0x1010c1790?, 0x1400013c770?}, 0xc0?})
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:40 +0x64
google.golang.org/protobuf/proto.MarshalOptions.size({{}, 0x70?, 0xc7?, 0x13?}, {0x1010c1790, 0x1400013c770})
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:35 +0xdc
google.golang.org/protobuf/proto.MarshalOptions.Size({{}, 0xbf?, 0x27?, 0xf5?}, {0x1010aec60?, 0x1400013c770?})
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:26 +0x54
google.golang.org/protobuf/proto.Size(...)
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:16
cloud.google.com/go/pubsub.(*Topic).publishMessageBundle(0x14000204e00, {0x1010b8d40, 0x101621a80}, {0x140001460a8, 0x1, 0x0?})
/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/topic.go:1198 +0xe68
cloud.google.com/go/pubsub.(*Topic).initBundler.func1({0x100f37060?, 0x14000148348?})
/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/topic.go:1093 +0xfc
cloud.google.com/go/pubsub/internal/scheduler.(*PublishScheduler).Add.func1({0x100f37060, 0x14000148348})
/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/internal/scheduler/publish_scheduler.go:114 +0x64
google.golang.org/api/support/bundler.(*Bundler).handle(0x140006a80b0, 0x0?)
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/support/bundler/bundler.go:324 +0x4c
created by google.golang.org/api/support/bundler.(*Bundler).enqueueCurBundle in goroutine 77
/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/support/bundler/bundler.go:179 +0x12c
[snip]
Full stacktrace: https://gist.github.com/steved/55a4c8a2aa451cd2246f1b752e9e44e6
It doesn't seem to be captured all the time, but I believe it's going through: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.45.3/pubsub/trace.go#L304 https://github.com/open-telemetry/opentelemetry-go/blob/v1.33.0/propagation/trace_context.go#L64
Thanks for filing an issue with a repro. It seems like you have a workaround currently (don't use a shared map) but the better long term fix is for us to create a local copy before injecting context propagation attributes.