kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

w.WriteMessages: context deadline exceeded on MSK IAM

Open ayoul3 opened this issue 2 years ago • 4 comments

Describe the bug I activated IAM authentication on a Kafka cluster 2.7.0 (AWS MSK). Reading topics worked like a charm.
However when writing to a partition I suffered timeout errors at the discovery query when listing all brokers and partitions. It turns out that brokers on port 9098 (IAM authentication) does not like this empty Request metadata v8. Specifically the empty Topics field translated to 0xffffffff at the network level.

I reached out to AWS MSK to look into it, but maybe there is something to be done at the client to avoid running into this issue. The regular Kafka Java client does not have this issue, so there is a chance they'll just snooze off my bug report. We'll see.

I did not find an open issue on this, so putting this one here for visibility. Curious to know if someone else could reproduce.

Kafka Version

  • What version(s) of Kafka are you testing against? 2.7.0
  • What version of kafka-go are you using? v0.4.28, v.0.4.29, v0.4.31, v0.4.32

To Reproduce

  • Initialise an MSK cluster version 2.7.0 and activate IAM authentication.
  • Create a topic and define variables KAFKA_SERVERS, AWS_REGION, KAFKA_TOPIC
  • Run the following script
package main

import (
	"context"
	"crypto/tls"
	"os"
	"strings"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/aws_msk_iam"
)

func main() {
	awsRegion := aws.String(os.Getenv("AWS_REGION"))
	sess, err := session.NewSession(&aws.Config{
		Region: awsRegion,
	})
	if err != nil {
		log.Logger.Fatal().Err(err)
	}

	kafkaDialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
		SASLMechanism: &aws_msk_iam.Mechanism{
			Signer: sigv4.NewSigner(sess.Config.Credentials),
			Region: *awsRegion,
		},
		TLS: &tls.Config{},
	}

	brokers := strings.Split(os.Getenv("KAFKA_SERVERS"), ",")
	topic := os.Getenv("KAFKA_TOPIC")
	batchSize := int(10e6) // 10MB
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:   brokers,
		Topic:     topic,
		Dialer:    kafkaDialer,
		BatchSize: batchSize,
		Balancer:  &kafka.CRC32Balancer{},
	})

	err = w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
	)
	if err != nil {
		log.Logger.Err(err).Msg("error while writing batch")
	}
}

Expected Behavior

The message should be written to the topic.
At the network level, the cluster should respond with a list of brokers and partitions at the discovery level.

Observed Behavior The client hangs on the Metadata Request and finally times out: "level":"error","error":"context deadline exceeded","time":"2022-06-05T13:02:52+02:00","message":"error while writing batch"}

Additional Context I did a nasty patch to validate the hypothesis of the null topics causing issue. Might unlock people running into the same issue. Definitely a hack though...

ayoul3 avatar Jun 05 '22 11:06 ayoul3

Unfortunately, I haven't been able to reproduce this issue on my own MSK cluster yet. I started with a serverless variety which I'm unsure about which version of Kafka it uses. I'll try again with a provisioned cluster at v2.7.0 to see if I can reproduce.

In the interim, have you made any discoveries on your end?

dominicbarnes avatar Jul 08 '22 17:07 dominicbarnes

No nothing. The support has not been much helpful. The patch I linked above works properly but the issue is still there on a provisionned cluster. I hit the same issue with the franz-go kafka library.

ayoul3 avatar Jul 13 '22 06:07 ayoul3

We face the same issue. And debugging it, it points to the same point with pool.discover.

I also run kafka cli (Java) to get metadata of a topic, it turns out that kafka with IAM is significant lower than kafka unauthenticated (not sure this is due to my setup or AWS issue)

time bin/kafka-topics.sh --bootstrap-server xxxxx:9092 --topic xxxxx --describe
real	0m2.345s
user	0m2.788s
sys	0m0.244s
time bin/kafka-topics.sh --bootstrap-server xxxxx:9098 --topic xxxxx --describe
real	0m11.401s
user	0m4.580s
sys	0m0.274s
  • By default, MetadataTTL is 6s and this request takes up to 12 s. So I set MetadataTTL into 2 minutes and it fixes the issue.

ducquangkstn avatar Jun 26 '23 02:06 ducquangkstn

Btw, IMO, this library issue is lack of error wrapping. Hence users have to debug manually.

ducquangkstn avatar Jun 26 '23 02:06 ducquangkstn