watermill-nats icon indicating copy to clipboard operation
watermill-nats copied to clipboard

jetstream: fix test_race failures

Open AlexCuse opened this issue 9 months ago • 1 comments

We are seeing test_race failures intermittently with the jetstream package (more frequently in slower environments).

Example race detector output:

==================
WARNING: DATA RACE
Write at 0x00c0005805b0 by goroutine 316:
  runtime.closechan()
      /snap/go/current/src/runtime/chan.go:357 +0x0
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.monitor()
      /home/alex/git/watermill-nats/pkg/jetstream/consumer.go:158 +0x104
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.consume.func3()
      /home/alex/git/watermill-nats/pkg/jetstream/consumer.go:138 +0x6b

Previous read at 0x00c0005805b0 by goroutine 313:
  runtime.chansend()
      /snap/go/current/src/runtime/chan.go:160 +0x0
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.(*Subscriber).handleMsg()
      /home/alex/git/watermill-nats/pkg/jetstream/message_handler.go:52 +0x595
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.(*Subscriber).handleMsg-fm()
      <autogenerated>:1 +0x69
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.consume.func1()
      /home/alex/git/watermill-nats/pkg/jetstream/consumer.go:132 +0x6c
  github.com/nats-io/nats.go/jetstream.(*pullConsumer).Consume.func1()
      /home/alex/go/pkg/mod/github.com/nats-io/[email protected]/jetstream/pull.go:228 +0x408
  github.com/nats-io/nats%2ego.(*Conn).waitForMsgs()
      /home/alex/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3045 +0x77e
  github.com/nats-io/nats%2ego.(*Conn).subscribeLocked.func1()
      /home/alex/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:4256 +0x44

Goroutine 316 (running) created at:
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.consume()
      /home/alex/git/watermill-nats/pkg/jetstream/consumer.go:138 +0x430
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.(*Subscriber).Subscribe()
      /home/alex/git/watermill-nats/pkg/jetstream/subscriber.go:107 +0x3e4
  github.com/ThreeDotsLabs/watermill/pubsub/tests.TestContinueAfterErrors()
      /home/alex/go/pkg/mod/github.com/!three!dots!labs/[email protected]/pubsub/tests/test_pubsub.go:764 +0x35b
  github.com/ThreeDotsLabs/watermill/pubsub/tests.TestPubSub.func1()
      /home/alex/go/pkg/mod/github.com/!three!dots!labs/[email protected]/pubsub/tests/test_pubsub.go:76 +0x72
  github.com/ThreeDotsLabs/watermill/pubsub/tests.TestPubSub.runTest.func3.1()
      /home/alex/go/pkg/mod/github.com/!three!dots!labs/[email protected]/pubsub/tests/test_pubsub.go:190 +0xe1
  testing.tRunner()
      /snap/go/current/src/testing/testing.go:1595 +0x238
  testing.(*T).Run.func1()
      /snap/go/current/src/testing/testing.go:1648 +0x44

Goroutine 313 (running) created at:
  github.com/nats-io/nats%2ego.(*Conn).subscribeLocked()
      /home/alex/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:4256 +0x7cf
  github.com/nats-io/nats%2ego.(*Conn).subscribe()
      /home/alex/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:4192 +0x137
  github.com/nats-io/nats%2ego.(*Conn).Subscribe()
      /home/alex/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:4108 +0xace
  github.com/nats-io/nats.go/jetstream.(*pullConsumer).Consume()
      /home/alex/go/pkg/mod/github.com/nats-io/[email protected]/jetstream/pull.go:239 +0xa56
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.consume()
      /home/alex/git/watermill-nats/pkg/jetstream/consumer.go:131 +0x1ac
  github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream.(*Subscriber).Subscribe()
      /home/alex/git/watermill-nats/pkg/jetstream/subscriber.go:107 +0x3e4
  github.com/ThreeDotsLabs/watermill/pubsub/tests.TestContinueAfterErrors()
      /home/alex/go/pkg/mod/github.com/!three!dots!labs/[email protected]/pubsub/tests/test_pubsub.go:764 +0x35b
  github.com/ThreeDotsLabs/watermill/pubsub/tests.TestPubSub.func1()
      /home/alex/go/pkg/mod/github.com/!three!dots!labs/[email protected]/pubsub/tests/test_pubsub.go:76 +0x72
  github.com/ThreeDotsLabs/watermill/pubsub/tests.TestPubSub.runTest.func3.1()
      /home/alex/go/pkg/mod/github.com/!three!dots!labs/[email protected]/pubsub/tests/test_pubsub.go:190 +0xe1
  testing.tRunner()
      /snap/go/current/src/testing/testing.go:1595 +0x238
  testing.(*T).Run.func1()
      /snap/go/current/src/testing/testing.go:1648 +0x44

It seems like we are getting a callback delivered too late from the nats.go client, and in a way that existing synchronization code in the message handler doesn't catch. Test with updated nats.go client once a fix for https://github.com/nats-io/nats.go/issues/1470 is available in case that helps.

AlexCuse avatar Nov 15 '23 20:11 AlexCuse