kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Unable to read messages from MSK Serverless cluster

Open mtkopone opened this issue 3 years ago • 6 comments

While attempting to switch from an AWS MSK Provisioned Cluster to an MSK Serverless Cluster: Writing messages to topics works, but attempting to read from a topic fails.

Kafka Version

github.com/segmentio/kafka-go v0.4.34 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

AWS is very vague about the version of Kafka they are running in the Serverless setup, but the examples here use the client library intended for 2.8.1. (Consuming messages with that java client works.)

To Reproduce

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
	awsCfg "github.com/aws/aws-sdk-go-v2/config"
	kafka "github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)

func main() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT)
	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		sig := <-signals
		fmt.Println("Got signal: ", sig)
		cancel()
	}()

	bootstrapServers := []string{"<kafka-bootstrap-server-url>"}
	topic := "example-topic"

	cfg, _ := awsCfg.LoadDefaultConfig(ctx)
	creds, _ := cfg.Credentials.Retrieve(ctx)
	m := &aws_msk_iam_v2.Mechanism{
		Signer:      signer.NewSigner(),
		Credentials: creds,
		Region:      "us-east-1",
		SignTime:    time.Now(),
		Expiry:      time.Minute * 15,
	}

	config := kafka.ReaderConfig{
		Brokers: bootstrapServers,
		GroupID: "test-consumer-group-1",
		Topic:   topic,
		// Partition: 0,
		MaxWait: 50000 * time.Millisecond,
		Dialer: &kafka.Dialer{
			Timeout:       50 * time.Second,
			DualStack:     true,
			SASLMechanism: m,
			TLS: &tls.Config{
				MinVersion: tls.VersionTLS12,
			},
		},
	}

	r := kafka.NewReader(config)
	fmt.Println("Consumer configuration: ", config)

	defer func() {
		err := r.Close()
		if err != nil {
			fmt.Println("Error closing consumer: ", err)
			return
		}
		fmt.Println("Consumer closed")
	}()

	for {
		m, err := r.ReadMessage(ctx)
		// m, err := r.FetchMessage(ctx)
		if err != nil {
			fmt.Printf("Error reading message: %+v\n", err)
			break
		}
		fmt.Printf("Received message from %s-%d [%d]: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

Expected Behavior

Messages should be received from the topic.

Observed Behavior

Running the example fails.

When providing a groupID in the ReaderConfig, the error is:

Error reading message: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details

Log output:

entering loop for consumer group, test-consumer-group-1 Failed to join group test-consumer-group-1: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details

Without a groupId, and with a partition, the error is:

Error reading message: unexpected EOF

Log output:

initializing kafka reader for partition 0 of example-topic starting at offset first offset kafka reader failed to read lag of partition 0 of example-topic: EOF

Additional Context

The topic has been created beforehand, and messages have been written to that topic.

I have also tried most of the possible combinations of TLSConfig.MinVersion and MaxVersion available, but to no avail.

mtkopone avatar Aug 17 '22 08:08 mtkopone

Hi @mtkopone,

Thanks for the detailed report!

Unfortunately based on https://github.com/segmentio/kafka-go/pull/948 I don't we're compatible with 2.8.1 so that could be the problem

You could give https://github.com/rhansen2/kafka-go/tree/consumer-group-client , it makes changes to the Reader including supporting more API Versions but it is a WIP.

It doesn't seem like a TLS issue but have you tried with InsecureSkipVerify in your TLS Config?

rhansen2 avatar Aug 26 '22 15:08 rhansen2

@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the 2.8.1 itself shouldn't be a problem.

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

The error is EOF which likely happen when you have network connection failure. I had this error when I contributed to aws_msk_iam_v2 module development.

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.

kikyomits avatar Aug 30 '22 02:08 kikyomits

Thanks for the additional insight @kikyomits.

It also appears there's a bug in the v2 iam package integration https://github.com/segmentio/kafka-go/issues/976

The error messages appear different but could also be related.

rhansen2 avatar Sep 02 '22 17:09 rhansen2

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK

I double checked the port, it's correct. As for IAM, that's the only option for MSK Serverless, so it's enabled.

mtkopone avatar Sep 09 '22 07:09 mtkopone

@mtkopone interesting, can you try to connect to your MSK Serverless via CLI? The steps are well-documented in this blog. https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk

The section Create a Apache Kafka topic from an EC2 instance it the one I want you to try. It shows the steps for

  1. Create EC2 instance in the network where your application is running and should be able to reach MSK
  2. Download Kafka Client library
  3. Download MSK IAM Jar file
  4. set up client-config.properties file
  5. Run Kafka client (e.g. list topics) which calls MSK API with using IAM authentication.

It will help me to understand where the root cause of your issue is.

kikyomits avatar Sep 15 '22 15:09 kikyomits

@mtkopone interesting, can you try to connect to your MSK Serverless via CLI? The steps are well-documented in this blog. https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk

Yeah. The java client library in that example works as a consumer.

Same endpoint, same config.

mtkopone avatar Sep 15 '22 16:09 mtkopone

@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the 2.8.1 itself shouldn't be a problem.

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

The error is EOF which likely happen when you have network connection failure. I had this error when I contributed to aws_msk_iam_v2 module development.

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.

^ I had the same issue and the hint network connection failure has helped me. Turns out I miss the TLS config (even tho set the InsecureSkipVerify to true). Thanks, @kikyomits !

fuatto avatar Oct 06 '22 16:10 fuatto

I upgraded to the latest versions, and the same errors still occur against MSK Serverless with the following:

github.com/segmentio/kafka-go v0.4.35
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20221014170723-bef291169c84

Also tried with InsecureSkipVerify in TLSConfig. Same problem.

On a different tact, it looks like the javascript kafkajs library has a very similar issue. Here: https://github.com/tulios/kafkajs/issues/1449. An AWS engineer pinpointed where the MSK Serverless protocol differs from the one used by that library here: https://github.com/tulios/kafkajs/issues/1449#issuecomment-1273943422.Perhaps that could provide some light to this issue?

mtkopone avatar Oct 17 '22 11:10 mtkopone

@mtkopone Using debug logging, is it possible to extract the JoinGroupRequestData from the JoinGroup requests sent from this library, especially the embedded protocol metadata? Once we have that, we should check if we have the same issue described here https://github.com/tulios/kafkajs/issues/1449#issuecomment-1273943422

sankalpbhatia avatar Oct 17 '22 17:10 sankalpbhatia

It wasn't the JoinGroupRequest.

Debugging further.... MSK Serverless reports it would support MetadataRequests in API versions 0 - 11. But looks like any MetadataRequest with API version < 6 only gets an EOF as a response.

mtkopone avatar Oct 18 '22 13:10 mtkopone

Is it possible to share the request and response for APIVersions API, and the Metadata request?

sankalpbhatia avatar Oct 21 '22 00:10 sankalpbhatia

Sure thing, here you go: Bytes are all in base-10 decimal...

ApiVersions request: (clientId = "1") [0 0 0 11 0 18 0 0 0 0 0 1 0 1 49]

ApiVersions response:

[0 0 1 90 0 0 0 1 0 0 0 0 0 56
 0 0 0 0 0 9
 0 1 0 0 0 12
 0 2 0 0 0 6 
 0 3 0 0 0 11 
 0 4 0 0 0 5 
 0 5 0 0 0 3 
 0 6 0 0 0 7 
 0 7 0 0 0 3
 0 8 0 0 0 8 
 0 9 0 0 0 7 
 0 10 0 0 0 3 0 11 0 0 0 7 0 12 0 0 0 4 0 13 0 0 0 4 0 14 0 0 0 5 0 15 0 0 0 5 0 16 0 0 0 4 0 17 0 0 0 1 0 18 0 0 0 3 0 19 0 0 0 7 
 0 20 0 0 0 6 0 21 0 0 0 2 0 22 0 0 0 4 0 23 0 0 0 4 0 24 0 0 0 3 0 25 0 0 0 3 0 26 0 0 0 3 0 27 0 0 0 1 0 28 0 0 0 3 0 29 0 0 0 2 
 0 30 0 0 0 2 0 31 0 0 0 2 0 32 0 0 0 4 0 33 0 0 0 2 0 34 0 0 0 2 0 35 0 0 0 2 0 36 0 0 0 2 0 37 0 0 0 3 0 38 0 0 0 2 0 39 0 0 0 2 
 0 40 0 0 0 2 0 41 0 0 0 2 0 42 0 0 0 2 0 43 0 0 0 2 0 44 0 0 0 1 0 45 0 0 0 0 0 46 0 0 0 0 0 47 0 0 0 0 0 48 0 0 0 1 0 49 0 0 0 1 
 0 50 0 0 0 0 0 51 0 0 0 0 0 56 0 0 0 0 0 57 0 0 0 0 0 60 0 0 0 0 0 61]

Which gets parsed to:

map[Produce:Produce[v0:v9] Fetch:Fetch[v0:v12] ListOffsets:ListOffsets[v0:v6] Metadata:Metadata[v0:v11] LeaderAndIsr:LeaderAndIsr[v0:v5] StopReplica:StopReplica[v0:v3] UpdateMetadata:UpdateMetadata[v0:v7] ControlledShutdown:ControlledShutdown[v0:v3] OffsetCommit:OffsetCommit[v0:v8] OffsetFetch:OffsetFetch[v0:v7] FindCoordinator:FindCoordinator[v0:v3] JoinGroup:JoinGroup[v0:v7] Heartbeat:Heartbeat[v0:v4] LeaveGroup:LeaveGroup[v0:v4] SyncGroup:SyncGroup[v0:v5] DescribeGroups:DescribeGroups[v0:v5] ListGroups:ListGroups[v0:v4] SaslHandshake:SaslHandshake[v0:v1] ApiVersions:ApiVersions[v0:v3] CreateTopics:CreateTopics[v0:v7] DeleteTopics:DeleteTopics[v0:v6] DeleteRecords:DeleteRecords[v0:v2] InitProducerId:InitProducerId[v0:v4] OffsetForLeaderEpoch:OffsetForLeaderEpoch[v0:v4] AddPartitionsToTxn:AddPartitionsToTxn[v0:v3] AddOffsetsToTxn:AddOffsetsToTxn[v0:v3] EndTxn:EndTxn[v0:v3] WriteTxnMarkers:WriteTxnMarkers[v0:v1] TxnOffsetCommit:TxnOffsetCommit[v0:v3] DescribeAcls:DescribeAcls[v0:v2] CreateAcls:CreateAcls[v0:v2] DeleteAcls:DeleteAcls[v0:v2] DescribeConfigs:DescribeConfigs[v0:v4] AlterConfigs:AlterConfigs[v0:v2] AlterReplicaLogDirs:AlterReplicaLogDirs[v0:v2] DescribeLogDirs:DescribeLogDirs[v0:v2] SaslAuthenticate:SaslAuthenticate[v0:v2] CreatePartitions:CreatePartitions[v0:v3] CreateDelegationToken:CreateDelegationToken[v0:v2] RenewDelegationToken:RenewDelegationToken[v0:v2] ExpireDelegationToken:ExpireDelegationToken[v0:v2] DescribeDelegationToken:DescribeDelegationToken[v0:v2] DeleteGroups:DeleteGroups[v0:v2] ElectLeaders:ElectLeaders[v0:v2] IncrementalAlfterConfigs:IncrementalAlfterConfigs[v0:v1] AlterPartitionReassignments:AlterPartitionReassignments[v0:v0] ListPartitionReassignments:ListPartitionReassignments[v0:v0] OffsetDelete:OffsetDelete[v0:v0] 48:48[v0:v1] 49:49[v0:v1] 50:50[v0:v0] 51:51[v0:v0] 56:56[v0:v0] 57:57[v0:v0] 60:60[v0:v0] 61:61[v0:v0]]

Metadata request: (clientId: "1", topic: "t1") [0 0 0 19 0 3 0 1 0 0 0 4 0 1 49 0 0 0 1 0 2 116 49]

As far as I understand, these look ok to me...

mtkopone avatar Oct 21 '22 11:10 mtkopone

@mtkopone Are you still seeing this issue if you use the tip of main? With https://github.com/segmentio/kafka-go/pull/947 merged I would expect the Reader and consumer group to be sending the highest supported metadata request version.

rhansen2 avatar Oct 21 '22 18:10 rhansen2

Yes. The problem remained in main.

The metadata call only used v1. The PR above makes it also support v6, which fixes at least that problem.

mtkopone avatar Oct 21 '22 19:10 mtkopone

I face the same err. github.com/segmentio/kafka-go v0.4.35 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20221021184657-750193894a7e

ZhangDahe avatar Oct 26 '22 03:10 ZhangDahe

@ZhangDahe would you be able to test if it works using the fork of #1013 ?

mtkopone avatar Oct 26 '22 06:10 mtkopone

Your merged PR has been included in release https://github.com/segmentio/kafka-go/releases/tag/v0.4.36 Feel free to close this issue if things are working as expected in that version!

rhansen2 avatar Oct 28 '22 21:10 rhansen2

Thank you!

We are working on verifying that nothing else is broken with MSK Serverless. Looks good so far. I will update/close this issue once we have more info.

mtkopone avatar Nov 02 '22 08:11 mtkopone

Closing this with a note that v0.4.36 and v0.4.37 should be skipped as they contain severe bugs. v0.4.38 contains this fix but with the bugs reverted. Please file a new issue if you're continuing to have trouble. Thanks!

rhansen2 avatar Nov 18 '22 18:11 rhansen2