google-cloud-go icon indicating copy to clipboard operation
google-cloud-go copied to clipboard

pubsub: concurrent map write panic when publishing with telemetry enabled and shared attributes

Open steved opened this issue 1 year ago • 1 comments

Client

PubSub

Environment

$ go version
go version go1.23.4 darwin/arm64

Code and Dependencies

package main

import (
	"context"
	"fmt"
	"sync"

	"strings"

	"cloud.google.com/go/pubsub"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

func initTracing(ctx context.Context) {
	r, err := resource.New(context.Background(),
		resource.WithSchemaURL(semconv.SchemaURL),
		resource.WithAttributes(semconv.ServiceName("test-service")),
		resource.WithTelemetrySDK(),
		resource.WithContainerID(),
	)

	if err != nil {
		panic(err)
	}

	client := otlptracegrpc.NewClient()
	exporter, err := otlptrace.New(ctx, client)
	if err != nil {
		panic(err)
	}

	provider := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(r),
	)

	otel.SetTextMapPropagator(
		propagation.NewCompositeTextMapPropagator(
			propagation.TraceContext{},
			propagation.Baggage{},
		),
	)

	otel.SetTracerProvider(provider)
}

func main() {
	ctx := context.Background()

	initTracing(ctx)

	client, err := pubsub.NewClientWithConfig(ctx, "test-project", &pubsub.ClientConfig{EnableOpenTelemetryTracing: true})
	if err != nil {
		panic(err)
	}

	_, err = client.CreateTopic(ctx, "test-topic")
	if err != nil {
		if !strings.Contains(err.Error(), "AlreadyExists") {
			panic(err)
		}
	}

	tracer := otel.GetTracerProvider().Tracer("")
	topic := client.Topic("test-topic")
	topic.PublishSettings = pubsub.PublishSettings{CountThreshold: 1}

	attributes := map[string]string{"my-key": "my-value"}

	var wg sync.WaitGroup

	for num := range 100 {
		num := num
		wg.Add(1)

		go func() {
			defer wg.Done()

			// attributes := map[string]string{"my-key": "my-value"}
			ctx, span := tracer.Start(context.Background(), "publish")
			defer span.End()

			msg := []byte(fmt.Sprintf("%d", num))
			topic.Publish(ctx, &pubsub.Message{
				Data:       msg,
				Attributes: attributes,
			})
			fmt.Printf("Published %d\n", num)
		}()
	}

	wg.Wait()
	topic.Stop()

	otel.GetTracerProvider().(*sdktrace.TracerProvider).Shutdown(ctx)
}
go.mod
module github.com/steved/pubsub-repro

go 1.23.4

require (
	cloud.google.com/go/pubsub v1.45.3
	go.opentelemetry.io/otel v1.33.0
	go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0
	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
	go.opentelemetry.io/otel/sdk v1.33.0
)

require (
	cloud.google.com/go v0.116.0 // indirect
	cloud.google.com/go/auth v0.11.0 // indirect
	cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
	cloud.google.com/go/compute/metadata v0.5.2 // indirect
	cloud.google.com/go/iam v1.2.2 // indirect
	github.com/cenkalti/backoff/v4 v4.3.0 // indirect
	github.com/felixge/httpsnoop v1.0.4 // indirect
	github.com/go-logr/logr v1.4.2 // indirect
	github.com/go-logr/stdr v1.2.2 // indirect
	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
	github.com/google/s2a-go v0.1.8 // indirect
	github.com/google/uuid v1.6.0 // indirect
	github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
	github.com/googleapis/gax-go/v2 v2.14.0 // indirect
	github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
	go.opencensus.io v0.24.0 // indirect
	go.opentelemetry.io/auto/sdk v1.1.0 // indirect
	go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
	go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
	go.opentelemetry.io/otel/metric v1.33.0 // indirect
	go.opentelemetry.io/otel/trace v1.33.0 // indirect
	go.opentelemetry.io/proto/otlp v1.4.0 // indirect
	golang.org/x/crypto v0.30.0 // indirect
	golang.org/x/net v0.32.0 // indirect
	golang.org/x/oauth2 v0.24.0 // indirect
	golang.org/x/sync v0.10.0 // indirect
	golang.org/x/sys v0.28.0 // indirect
	golang.org/x/text v0.21.0 // indirect
	golang.org/x/time v0.8.0 // indirect
	google.golang.org/api v0.210.0 // indirect
	google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect
	google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
	google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
	google.golang.org/grpc v1.68.1 // indirect
	google.golang.org/protobuf v1.35.2 // indirect
)

Expected behavior

The answer could be that users should not be passing in a shared map of attributes, but I expected that any modification would only occur on a pubsub-local copy of the attributes.

Actual behavior

fatal error: concurrent map iteration and map write

goroutine 181 [running]:
reflect.mapiterinit(0x1400017f638?, 0x100875220?, 0x1400017f608?)
	/opt/homebrew/Cellar/go/1.23.4/libexec/src/runtime/map.go:1520 +0x1c
reflect.(*MapIter).Next(0x100f8d880?)
	/opt/homebrew/Cellar/go/1.23.4/libexec/src/reflect/value.go:1984 +0x60
google.golang.org/protobuf/internal/impl.sizeMap({0x100f8d880?, 0x1400013c7b0?, 0x1400013c7b0?}, 0x140001601e0, 0x14000172090, {0xec?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/codec_map.go:98 +0x130
google.golang.org/protobuf/internal/impl.encoderFuncsForMap.func1({0x0?}, 0x14000172090, {0xc0?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/codec_map.go:54 +0x58
google.golang.org/protobuf/internal/impl.(*MessageInfo).sizePointerSlow(0x140003a47e8, {0x68?}, {0x28?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:82 +0xc8
google.golang.org/protobuf/internal/impl.(*MessageInfo).sizePointer(0x1400017f828?, {0x1008046e8?}, {0x28?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:59 +0x8c
google.golang.org/protobuf/internal/impl.(*MessageInfo).size(0x1400017f8c8?, {{}, {0x1010c1790?, 0x1400013c770?}, 0xc0?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:40 +0x64
google.golang.org/protobuf/proto.MarshalOptions.size({{}, 0x70?, 0xc7?, 0x13?}, {0x1010c1790, 0x1400013c770})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:35 +0xdc
google.golang.org/protobuf/proto.MarshalOptions.Size({{}, 0xbf?, 0x27?, 0xf5?}, {0x1010aec60?, 0x1400013c770?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:26 +0x54
google.golang.org/protobuf/proto.Size(...)
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:16
cloud.google.com/go/pubsub.(*Topic).publishMessageBundle(0x14000204e00, {0x1010b8d40, 0x101621a80}, {0x140001460a8, 0x1, 0x0?})
	/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/topic.go:1198 +0xe68
cloud.google.com/go/pubsub.(*Topic).initBundler.func1({0x100f37060?, 0x14000148348?})
	/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/topic.go:1093 +0xfc
cloud.google.com/go/pubsub/internal/scheduler.(*PublishScheduler).Add.func1({0x100f37060, 0x14000148348})
	/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/internal/scheduler/publish_scheduler.go:114 +0x64
google.golang.org/api/support/bundler.(*Bundler).handle(0x140006a80b0, 0x0?)
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/support/bundler/bundler.go:324 +0x4c
created by google.golang.org/api/support/bundler.(*Bundler).enqueueCurBundle in goroutine 77
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/support/bundler/bundler.go:179 +0x12c

[snip]

Full stacktrace: https://gist.github.com/steved/55a4c8a2aa451cd2246f1b752e9e44e6

It doesn't seem to be captured all the time, but I believe it's going through: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.45.3/pubsub/trace.go#L304 https://github.com/open-telemetry/opentelemetry-go/blob/v1.33.0/propagation/trace_context.go#L64

steved avatar Dec 18 '24 18:12 steved

Thanks for filing an issue with a repro. It seems like you have a workaround currently (don't use a shared map) but the better long term fix is for us to create a local copy before injecting context propagation attributes.

hongalex avatar Dec 18 '24 19:12 hongalex