go-cloud
go-cloud copied to clipboard
pubsub: Ensure batcher flushes on shutdown, even if min batch size isn't met
This PR ensures that the batcher flushes on shutdown, even if the pending length is less than the min batch size specified. Sending events is preferred to dropping, even if limits are not obeyed.
Related to #3383, but doesn't necessarily replace.
The tests are failing with a data race:
WARNING: DATA RACE Write at 0x00c00044e3f0 by goroutine 978: gocloud.dev/pubsub/batcher.(*Batcher).callHandler() /home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:287 +0x1ca gocloud.dev/pubsub/batcher.(*Batcher).handleBatch.func1() /home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:217 +0x54
Previous read at 0x00c00044e3f0 by goroutine 979: gocloud.dev/pubsub/batcher.(*Batcher).Shutdown() /home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:303 +0x75 gocloud.dev/pubsub.(*Subscription).Shutdown.func2() /home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:697 +0xb2
Goroutine 978 (running) created at: gocloud.dev/pubsub/batcher.(*Batcher).handleBatch() /home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:216 +0x11d gocloud.dev/pubsub/batcher.(*Batcher).AddNoWait() /home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:203 +0x3b1 gocloud.dev/pubsub.(*Subscription).Receive.func3() /home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:608 +0xcc gocloud.dev/pubsub.(*Message).Ack() /home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:161 +0xe1 gocloud.dev/pubsub/natspubsub.TestInteropWithDirectNATS.deferwrap4() /home/runner/work/go-cloud/go-cloud/pubsub/natspubsub/nats_test.go:259 +0x33 runtime.deferreturn() /opt/hostedtoolcache/go/1.22.0/x64/src/runtime/panic.go:602 +0x5d testing.tRunner() /opt/hostedtoolcache/go/1.22.0/x64/src/testing/testing.go:1689 +0x21e testing.(*T).Run.gowrap1() /opt/hostedtoolcache/go/1.22.0/x64/src/testing/testing.go:1742 +0x44
Ah, thanks. I ran without -race 🤦. Fixing!
@vangent fixed, thanks for the notification.