sarama
sarama copied to clipboard
Error and data race using transaction example from the library
Description
Hi,
Using pool of producers like shown in this example time to time cause kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing error and data race shown below.
Thanks
Versions
| Sarama | Kafka | Go |
|---|---|---|
| v1.43.2 | v3.3.2 | 1.22 |
Configuration
cfg := sarama.NewConfig()
cfg.ClientID = clientID
cfg.Version = sarama.V3_3_2_0
cfg.Net.MaxOpenRequests = 1
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
cfg.Producer.Transaction.ID = tansactionID
cfg.Producer.Transaction.Retry.Max = 10
cfg.Producer.Transaction.Retry.Backoff = time.Millisecond * 100
Additional Context
==================
WARNING: DATA RACE
Write at 0x00c00010f310 by goroutine 251:
github.com/IBM/sarama.(*transactionManager).transitionTo()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:220 +0x330
github.com/IBM/sarama.(*asyncProducer).maybeTransitionToErrorState()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1276 +0xec
github.com/IBM/sarama.(*asyncProducer).returnError()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1286 +0xad
github.com/IBM/sarama.(*asyncProducer).retryMessage()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1323 +0x109
github.com/IBM/sarama.(*asyncProducer).retryMessages()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1332 +0x8c
github.com/IBM/sarama.(*brokerProducer).handleError.func2()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1197 +0x8f4
github.com/IBM/sarama.(*produceSet).eachPartition()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/produce_set.go:223 +0x89b
github.com/IBM/sarama.(*brokerProducer).handleError()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1196 +0x3cd
github.com/IBM/sarama.(*brokerProducer).handleResponse()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1060 +0x84
github.com/IBM/sarama.(*brokerProducer).run()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:993 +0x1a93
github.com/IBM/sarama.(*brokerProducer).run-fm()
<autogenerated>:1 +0x33
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x33
Previous read at 0x00c00010f310 by goroutine 252:
github.com/IBM/sarama.(*transactionManager).publishTxnPartitions()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:766 +0x40e
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:818 +0x23c
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap2()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x33
Goroutine 251 (running) created at:
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x4f3
github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
github.com/IBM/sarama.(*partitionProducer).updateLeader()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
github.com/IBM/sarama.(*partitionProducer).dispatch()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
<autogenerated>:1 +0x33
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33
Goroutine 252 (running) created at:
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x690
github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
github.com/IBM/sarama.(*partitionProducer).updateLeader()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
github.com/IBM/sarama.(*partitionProducer).dispatch()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
<autogenerated>:1 +0x33
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33
==================
Hm… yeah. The transitionTo() call is holding the statusLock while the publishTxnPartitions is holding the partitionTxnLock.
They should be holding a common lock, or they will race. :(
More conversation can be found here for reference.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.