sarama icon indicating copy to clipboard operation
sarama copied to clipboard

Error and data race using transaction example from the library

Open bentcoder opened this issue 1 year ago • 3 comments

Description

Hi,

Using pool of producers like shown in this example time to time cause kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing error and data race shown below.

Thanks

Versions
Sarama Kafka Go
v1.43.2 v3.3.2 1.22
Configuration
	cfg := sarama.NewConfig()
	cfg.ClientID = clientID
	cfg.Version = sarama.V3_3_2_0
	cfg.Net.MaxOpenRequests = 1
	cfg.Metadata.AllowAutoTopicCreation = false
	cfg.Producer.Return.Successes = true
	cfg.Producer.Compression = sarama.CompressionZSTD
	cfg.Producer.Idempotent = true
	cfg.Producer.RequiredAcks = sarama.WaitForAll
	cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	cfg.Producer.Transaction.ID = tansactionID
	cfg.Producer.Transaction.Retry.Max = 10
	cfg.Producer.Transaction.Retry.Backoff = time.Millisecond * 100
Additional Context
==================
WARNING: DATA RACE
Write at 0x00c00010f310 by goroutine 251:
  github.com/IBM/sarama.(*transactionManager).transitionTo()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:220 +0x330
  github.com/IBM/sarama.(*asyncProducer).maybeTransitionToErrorState()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1276 +0xec
  github.com/IBM/sarama.(*asyncProducer).returnError()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1286 +0xad
  github.com/IBM/sarama.(*asyncProducer).retryMessage()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1323 +0x109
  github.com/IBM/sarama.(*asyncProducer).retryMessages()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1332 +0x8c
  github.com/IBM/sarama.(*brokerProducer).handleError.func2()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1197 +0x8f4
  github.com/IBM/sarama.(*produceSet).eachPartition()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/produce_set.go:223 +0x89b
  github.com/IBM/sarama.(*brokerProducer).handleError()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1196 +0x3cd
  github.com/IBM/sarama.(*brokerProducer).handleResponse()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1060 +0x84
  github.com/IBM/sarama.(*brokerProducer).run()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:993 +0x1a93
  github.com/IBM/sarama.(*brokerProducer).run-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x33

Previous read at 0x00c00010f310 by goroutine 252:
  github.com/IBM/sarama.(*transactionManager).publishTxnPartitions()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:766 +0x40e
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:818 +0x23c
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap2()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x33

Goroutine 251 (running) created at:
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x4f3
  github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
  github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
  github.com/IBM/sarama.(*partitionProducer).updateLeader()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
  github.com/IBM/sarama.(*partitionProducer).dispatch()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
  github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33

Goroutine 252 (running) created at:
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x690
  github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
  github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
  github.com/IBM/sarama.(*partitionProducer).updateLeader()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
  github.com/IBM/sarama.(*partitionProducer).dispatch()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
  github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33
==================

bentcoder avatar May 25 '24 00:05 bentcoder

Hm… yeah. The transitionTo() call is holding the statusLock while the publishTxnPartitions is holding the partitionTxnLock.

They should be holding a common lock, or they will race. :(

puellanivis avatar May 25 '24 08:05 puellanivis

More conversation can be found here for reference.

bentcoder avatar May 25 '24 10:05 bentcoder

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 23 '24 12:08 github-actions[bot]