kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

After upgrade to v0.4.10, writer.WriteMessages returning "context deadline exceeded" error

Open aramcodz opened this issue 4 years ago • 0 comments

Describe the bug Upgrading kafka-go client with no client code changes from v0.3.5 to v0.4.10, when the first (kafka-go) writer.WriteMessages() call is made, the client returns error message: "context deadline exceeded".

Kafka Version Kafka Version: IBM Event Streams (Kafka 2.6) Kafka-go version: v0.4.10

To Reproduce Upgrade to v0.4.10.

Please note: neither our Kafka version nor our (rather-well wrapped/encapsulated) kafka-go client code has changed when we started seeing this issue. The upgrade of the Kafka-go client version alone caused the behavior change.

Code Example: //Creating and Calling the OUR custom wrapper Code

kafkaWriter, err := NewWriterFromConfig(h.config)
err = kafkaWriter.Write(notificationTopic, someId, someInfo) 

Creating the New Dialer to be used:

type KafkaConnect struct {
	Brokers []string
	Dialer  *kg.Dialer
}

//Wrapper code that Creates OUR (Wrapped struct) writer instance
//"Writer" here is our own custom type
type Writer interface {
	Write(topic string, key string, val map[string]interface{}) error
}

func NewWriterFromConfig(config config.Config) (Writer, error) {
	brokers := config.KafkaBrokers

	user := config.KafkaUsername
	password := config.KafkaPassword
	dialer := &kafka.Dialer{
		SASLMechanism: plain.Mechanism{user, password},
		TLS: &tls.Config{
			MaxVersion: defaultTLSVersion,
		},
	}

	return KafkaConnect{
		Brokers: brokers,
		Dialer:  dialer,
	}, nil
}

//The code that Creates and sets up the kafka-go Writer instance AND writes the messages

func (kc KafkaConnect) Write(topic string, key string, val map[string]interface{}) error {
	jsonVal, err := json.Marshal(val)
	if err != nil {
		return err
	}

	//TODO: deprecated
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:   kc.Brokers,
		Topic:     topic,
		Balancer:  kafka.Murmur2Balancer{},
		BatchSize: 1,
		Dialer:    kc.Dialer,
	})

	defer writer.Close()

	return writer.WriteMessages(
		context.Background(),
		kg.Message{Key: []byte(key), Value: []byte(jsonVal)},
	)
}

Expected behavior Post-client upgrade with NO client-code changes, no expected change in Kafka Writer/Producer operations.

Additional context Upgrade from kafka-go client v0.3.5 to v0.4.10

aramcodz avatar Apr 13 '21 16:04 aramcodz