sarama
sarama copied to clipboard
asyncproducer fails with topic does not exist when consecutive messages are published to different topics in the same millisecond
Versions
Sarama | Kafka | Go |
---|---|---|
1.37.2 | 2.0.0 | 1.20.1 |
Configuration
func ProducerConfig() *sarama.Config {
conf := sarama.NewConfig()
conf.Producer.Compression = sarama.CompressionSnappy
conf.Producer.RequiredAcks = sarama.WaitForAll
conf.Producer.Return.Successes = true
conf.Producer.Return.Errors = true
conf.Producer.Retry.Max = 5
conf.Producer.Retry.Backoff = 500 * time.Millisecond
conf.Metadata.Full = false
conf.Metadata.Retry.Max, _ = 50
conf.Metadata.Retry.Backoff = time.Duration(100) * time.Millisecond
conf.ClientID = "clientID"
conf.Version = sarama.V2_0_0_0
return conf
}
Logs
When I enable logs to print directly to stdout it takes few ms so the issue is not reproducible so I have the logger to print to stdout using a goroutine, so the below logs lines might not be in the exact order.
I updated source code to print stacktrace at places where all ErrUnknownTopicOrPartition
was thrown
The below test uses a consumer to consume from 2 topics and a async producer to publish to 2 topics
logs: CLICK ME
=== RUN TestRoundTrip
Successfully initialized new client
Initializing new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Initializing new client
Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Successfully initialized new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092
Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Connected to broker at kafka:9092 (unregistered)
client/brokers registered new broker #0 at 44fe8f289aff:29092Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Connected to broker at 44fe8f289aff:29092 (registered as #0)
client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092
client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092
client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092)
client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092
client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092)
Successfully initialized new client
Initializing new client
client/metadata fetching metadata for [test_rn.pos_transaction.pb] from broker kafka:9092
goroutine 179222 [running]:
runtime/debug.Stack()
/usr/local/go/src/runtime/debug/stack.go:24 +0x64
runtime/debug.PrintStack()
/usr/local/go/src/runtime/debug/stack.go:16 +0x1c
github.com/Shopify/sarama.(*client).Partitions(0x0?, {0x4006b660c0, 0x25})
/Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/client.go:348 +0xb4
github.com/Shopify/sarama.(*topicProducer).partitionMessage.func1()
/Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:534 +0x98
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1(0x40062b0678?, 0x177a4?)
/Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:85 +0x54
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork(0x0?, 0x0, 0x0?)
/Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:86 +0x30
github.com/eapache/go-resiliency/breaker.(*Breaker).Run(...)
/Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:55
github.com/Shopify/sarama.(*topicProducer).partitionMessage(0x4007d9fe40?, 0x4005c70b40)
/Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:525 +0x90
github.com/Shopify/sarama.(*topicProducer).dispatch(0x4007d9fe40)
/Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:502 +0x60
github.com/Shopify/sarama.withRecover(0x0?)
/Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43 +0x40
created by github.com/Shopify/sarama.(*asyncProducer).newTopicProducer
/Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:495 +0x1fc
err=kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker
client/metadata fetching metadata for [test_rn.pos_transaction_historical.pb] from broker kafka:9092
Flush Start
consumer/broker/0 accumulated 6 new subscriptions
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/0
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/2
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/1
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/0
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/2
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/1
Connected to broker at kafka:9092 (unregistered)
client/brokers registered new broker #0 at 44fe8f289aff:29092producer/broker/0 state change to [open] on test_rn.pos_transaction.pb/2
producer/broker/0 starting up
Connected to broker at 44fe8f289aff:29092 (registered as #0)
utility.go:61: lightspeed_fetcher_test.go:314 Got error: kafka producer errors: [kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker]
--- FAIL: TestRoundTrip (0.15s)
Problem Description
We are using the AsyncProducer to publish messages to Kafka and in one of our test cases we are creating the AsyncProducer and publishing multiple messages and intermittently we see that the publish fails with topic/partition does not exist on this broker. By looking at the Kafka request logs and the sarama source code, I could identify that when the first 2 messages getting published are sent at the same millisecond the refreshMetadata method is returning nil
and paritionMessage
method is throwing the error.
This could be due to the below line in method tryRefreshMetadata
in client.go
t := atomic.LoadInt64(&client.updateMetaDataMs)
if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) {
return nil
}