sarama icon indicating copy to clipboard operation
sarama copied to clipboard

asyncproducer fails with topic does not exist when consecutive messages are published to different topics in the same millisecond

Open dineshudayakumar opened this issue 2 years ago • 0 comments

Versions
Sarama Kafka Go
1.37.2 2.0.0 1.20.1
Configuration
func ProducerConfig() *sarama.Config {
	conf := sarama.NewConfig()
	conf.Producer.Compression = sarama.CompressionSnappy
	conf.Producer.RequiredAcks = sarama.WaitForAll
	conf.Producer.Return.Successes = true
	conf.Producer.Return.Errors = true
	conf.Producer.Retry.Max = 5
	conf.Producer.Retry.Backoff = 500 * time.Millisecond
	conf.Metadata.Full = false
	conf.Metadata.Retry.Max, _ = 50
	conf.Metadata.Retry.Backoff = time.Duration(100) * time.Millisecond
	conf.ClientID = "clientID"
	conf.Version = sarama.V2_0_0_0
	return conf
}

Logs

When I enable logs to print directly to stdout it takes few ms so the issue is not reproducible so I have the logger to print to stdout using a goroutine, so the below logs lines might not be in the exact order. I updated source code to print stacktrace at places where all ErrUnknownTopicOrPartition was thrown

The below test uses a consumer to consume from 2 topics and a async producer to publish to 2 topics

logs: CLICK ME

=== RUN   TestRoundTrip
Successfully initialized new client
Initializing new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Initializing new client
Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Successfully initialized new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092
Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Connected to broker at kafka:9092 (unregistered)
client/brokers registered new broker #0 at 44fe8f289aff:29092Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Connected to broker at 44fe8f289aff:29092 (registered as #0)
client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092
client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092
client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092)
client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092
client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092)
Successfully initialized new client
Initializing new client
client/metadata fetching metadata for [test_rn.pos_transaction.pb] from broker kafka:9092
goroutine 179222 [running]:
runtime/debug.Stack()
        /usr/local/go/src/runtime/debug/stack.go:24 +0x64
runtime/debug.PrintStack()
        /usr/local/go/src/runtime/debug/stack.go:16 +0x1c
github.com/Shopify/sarama.(*client).Partitions(0x0?, {0x4006b660c0, 0x25})
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/client.go:348 +0xb4
github.com/Shopify/sarama.(*topicProducer).partitionMessage.func1()
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:534 +0x98
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1(0x40062b0678?, 0x177a4?)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:85 +0x54
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork(0x0?, 0x0, 0x0?)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:86 +0x30
github.com/eapache/go-resiliency/breaker.(*Breaker).Run(...)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:55
github.com/Shopify/sarama.(*topicProducer).partitionMessage(0x4007d9fe40?, 0x4005c70b40)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:525 +0x90
github.com/Shopify/sarama.(*topicProducer).dispatch(0x4007d9fe40)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:502 +0x60
github.com/Shopify/sarama.withRecover(0x0?)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43 +0x40
created by github.com/Shopify/sarama.(*asyncProducer).newTopicProducer
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:495 +0x1fc
err=kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker
client/metadata fetching metadata for [test_rn.pos_transaction_historical.pb] from broker kafka:9092
Flush Start
consumer/broker/0 accumulated 6 new subscriptions
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/0
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/2
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/1
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/0
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/2
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/1
Connected to broker at kafka:9092 (unregistered)
client/brokers registered new broker #0 at 44fe8f289aff:29092producer/broker/0 state change to [open] on test_rn.pos_transaction.pb/2
producer/broker/0 starting up
Connected to broker at 44fe8f289aff:29092 (registered as #0)
    utility.go:61: lightspeed_fetcher_test.go:314 Got error: kafka producer errors: [kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker]
--- FAIL: TestRoundTrip (0.15s)
Problem Description

We are using the AsyncProducer to publish messages to Kafka and in one of our test cases we are creating the AsyncProducer and publishing multiple messages and intermittently we see that the publish fails with topic/partition does not exist on this broker. By looking at the Kafka request logs and the sarama source code, I could identify that when the first 2 messages getting published are sent at the same millisecond the refreshMetadata method is returning nil and paritionMessage method is throwing the error.

This could be due to the below line in method tryRefreshMetadata in client.go

		t := atomic.LoadInt64(&client.updateMetaDataMs)
		if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) {
			return nil
		}

dineshudayakumar avatar Feb 16 '23 23:02 dineshudayakumar