kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Random errors of "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker" probably related to MSK IAM auth

Open ekeric13 opened this issue 2 years ago • 9 comments

Describe the bug

So when I use kafka go as a writer, I will randomly get the error "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker".

I reproduced this in a sandbox app where I simply just add the current date to a topic every 5 seconds. I am only getting this with SASL auth with the msk iam mechanism. I have a parallel kafka-go client with TLS auth and I have yet to reproduce this error. I also wrote a client using franz-go using TLS and SASL authentication with the msk iam authentication and I have yet to reproduce the error there. So it really does feel specific to kafka-go SASL MSK_IAM auth with the writer.

I think there might be a bug with the msk IAM auth package? Or maybe I need to give it different permissions?

In my sandbox app I give the AWS credentials very thorough permissions:

  statement {
    effect = "Allow"
    actions = [
      "kafka-cluster:Connect",
      "kafka-cluster:Describe*",
      "kafka-cluster:Read*",
      "kafka-cluster:Write*",
      "kafka-cluster:Alter*",
      "kafka-cluster:Create*",
    ]
    resources = [
      "*"
    ]
  }

Kafka Version

  • What version(s) of Kafka are you testing against?

Using Kafka 2.6.2 via MSK.

  • What version of kafka-go are you using?

github.com/segmentio/kafka-go v0.4.34 github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220902170624-ba6f4426b511

To Reproduce

I cannot reproduce this immediately. As mentioned above, in order to reproduce I created a sandbox app that authenticated with a working aws account. Furthermore and it doesn't happen right away and usually takes some time to appear, usually lik 4 days. The error will usually occur in batches. Like it will occur for 15 seconds and then just stop.

Resources to reproduce the behavior:

setup:

kafkaAddr := kafka.TCP(strings.Split(brokersEndpoint, ",")...)
dialerTimeout := 10 * time.Second
dialer := &net.Dialer{
	Timeout: dialerTimeout,
}
transport := &kafka.Transport{
	DialTimeout: dialerTimeout,
	Dial:        dialer.DialContext,
}

if useAuth {
	awsCreds := credentials.NewEnvCredentials()

	transport.TLS = &tls.Config{}
	transport.SASL = &aws_msk_iam.Mechanism{
		Signer: sigv4.NewSigner(awsCreds),
		Region: awsRegion,
	}
}
kWriter := &kafka.Writer{
	Addr:  kafkaAddr,
	Topic: kafkaTopic,
	// default hash algo used by kafka
	Balancer:     kafka.Murmur2Balancer{},
	RequiredAcks: kafka.RequireAll,
	ReadTimeout:  15 * time.Second,
	WriteTimeout: 15 * time.Second,
	MaxAttempts:  5,
	Transport:    transport,
}

usage:

currentTime := time.Now()
headers := []kafka.Header{
	{
		Key:   "UnixTime",
		Value: []byte(strconv.FormatInt(currentTime.Unix(), 10)),
	},
}
value := currentTime.Format("2006-01-02 15:04:05")

log.Debug().Str("Data", value).Msg("Data added")
err := w.service.WriteMessages(ctx,
	kafka.Message{
		Value:   []byte(value),
		Headers: headers,
	},
)
if err != nil {
	log.Error().Err(err).Msg("Could not publish event to kafka topic")
	return err
}

I have copied a couple different setups: https://github.com/segmentio/kafka-go/issues/898#issuecomment-1234974514

but so far I always get the "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker" error eventually.

Expected Behavior

No errors when writing to a topic using a sasl authenticated connection via msk_iam.

Observed Behavior

Random failures of "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker" when writing to a topic using a sasl authenticated connection via msk_iam.

ekeric13 avatar Sep 19 '22 17:09 ekeric13

I think this might be happening on sasl re-authentication. Is there way to know to not write to a topic when the client is re-authenticating?

ekeric13 avatar Sep 28 '22 21:09 ekeric13

As a quick note I get the same error if my kafka user is not authorised for a given topic.

jdupl123 avatar Oct 12 '22 01:10 jdupl123

@jdupl123 how are you authorizing your user for a given topic? As explained above this is my setup which I assumed works for all topics:

  statement {
    effect = "Allow"
    actions = [
      "kafka-cluster:Connect",
      "kafka-cluster:Describe*",
      "kafka-cluster:Read*",
      "kafka-cluster:Write*",
      "kafka-cluster:Alter*",
      "kafka-cluster:Create*",
    ]
    resources = [
      "*"
    ]
  }

ekeric13 avatar Oct 14 '22 18:10 ekeric13

No, I also encountered this problem, but no new issues were released. In fact, when I used this library in May this year, I did not observe this error for normal production and consumption, The architecture of the latter project has changed and put kafka as idle, without any production and consumption of messages, and it will appear [29] Topic Authorization Failed: the client is not authorized to access the requested topic and [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker

[29] Topic Authorization Failed: the client is not authorized to access the requested topic This error is reported very frequently, once every 12 hours.

[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker This error is very rare, but it occurs about once every few weeks.

I also contacted aws tech support, which did not give me any help and suggested I ask around in the community.

I now plan to have aws sdk v2 to rewrite the authentication process, hopefully that will help(By the way, English is not my native language, may not express some accurate)

kscooo avatar Oct 19 '22 13:10 kscooo

Upgrading to aws sdk v2 did not help

kscooo avatar Nov 01 '22 02:11 kscooo

I also noticed the same errors with AWS MSK using SASL authentication.

Is there way to know to not write to a topic when the client is re-authenticating?

@ekeric13 Did you find a way to solve this?

zhongchen avatar Nov 08 '22 01:11 zhongchen

@zhongchen I just used this library instead: https://github.com/twmb/franz-go

It had a different sasl authentication issue but the maintainer is very good and fixed it.

ekeric13 avatar Jan 06 '23 23:01 ekeric13

Hello @ekeric13, I'm glad you got it sorted out using franz-go!

Would you be able to share a link to the discussion you had with the franz-go maintainer(s) that helped address your issue? This would be very valuable for us to improve kafka-go as well.

achille-roussel avatar Jan 27 '23 17:01 achille-roussel

@achille-roussel I managed to solve the issue by adding an additional external retry logic for the kafka.WriteMessage.

I looked at the code and found that the delay interval of internal retry has an upper bound of 1s. As a result, even though it retries, it still gets affected if the temporary issue lasts a little bit longer.

zhongchen avatar Jan 27 '23 18:01 zhongchen