kafka-go
kafka-go copied to clipboard
Unable to read messages from MSK Serverless cluster
While attempting to switch from an AWS MSK Provisioned Cluster to an MSK Serverless Cluster: Writing messages to topics works, but attempting to read from a topic fails.
Kafka Version
github.com/segmentio/kafka-go v0.4.34 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a
AWS is very vague about the version of Kafka they are running in the Serverless setup, but the examples here use the client library intended for 2.8.1. (Consuming messages with that java client works.)
To Reproduce
package main
import (
"context"
"crypto/tls"
"fmt"
"os"
"os/signal"
"syscall"
"time"
signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
awsCfg "github.com/aws/aws-sdk-go-v2/config"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)
func main() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-signals
fmt.Println("Got signal: ", sig)
cancel()
}()
bootstrapServers := []string{"<kafka-bootstrap-server-url>"}
topic := "example-topic"
cfg, _ := awsCfg.LoadDefaultConfig(ctx)
creds, _ := cfg.Credentials.Retrieve(ctx)
m := &aws_msk_iam_v2.Mechanism{
Signer: signer.NewSigner(),
Credentials: creds,
Region: "us-east-1",
SignTime: time.Now(),
Expiry: time.Minute * 15,
}
config := kafka.ReaderConfig{
Brokers: bootstrapServers,
GroupID: "test-consumer-group-1",
Topic: topic,
// Partition: 0,
MaxWait: 50000 * time.Millisecond,
Dialer: &kafka.Dialer{
Timeout: 50 * time.Second,
DualStack: true,
SASLMechanism: m,
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
},
},
}
r := kafka.NewReader(config)
fmt.Println("Consumer configuration: ", config)
defer func() {
err := r.Close()
if err != nil {
fmt.Println("Error closing consumer: ", err)
return
}
fmt.Println("Consumer closed")
}()
for {
m, err := r.ReadMessage(ctx)
// m, err := r.FetchMessage(ctx)
if err != nil {
fmt.Printf("Error reading message: %+v\n", err)
break
}
fmt.Printf("Received message from %s-%d [%d]: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}
Expected Behavior
Messages should be received from the topic.
Observed Behavior
Running the example fails.
When providing a groupID in the ReaderConfig, the error is:
Error reading message: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details
Log output:
entering loop for consumer group, test-consumer-group-1 Failed to join group test-consumer-group-1: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details
Without a groupId, and with a partition, the error is:
Error reading message: unexpected EOF
Log output:
initializing kafka reader for partition 0 of example-topic starting at offset first offset kafka reader failed to read lag of partition 0 of example-topic: EOF
Additional Context
The topic has been created beforehand, and messages have been written to that topic.
I have also tried most of the possible combinations of TLSConfig.MinVersion and MaxVersion available, but to no avail.
Hi @mtkopone,
Thanks for the detailed report!
Unfortunately based on https://github.com/segmentio/kafka-go/pull/948 I don't we're compatible with 2.8.1 so that could be the problem
You could give https://github.com/rhansen2/kafka-go/tree/consumer-group-client , it makes changes to the Reader including supporting more API Versions but it is a WIP.
It doesn't seem like a TLS issue but have you tried with InsecureSkipVerify in your TLS Config?
@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the 2.8.1 itself shouldn't be a problem.
github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a
The error is EOF which likely happen when you have network connection failure. I had this error when I contributed to aws_msk_iam_v2 module development.
My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.
Thanks for the additional insight @kikyomits.
It also appears there's a bug in the v2 iam package integration https://github.com/segmentio/kafka-go/issues/976
The error messages appear different but could also be related.
My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK
I double checked the port, it's correct. As for IAM, that's the only option for MSK Serverless, so it's enabled.
@mtkopone interesting, can you try to connect to your MSK Serverless via CLI? The steps are well-documented in this blog. https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk
The section Create a Apache Kafka topic from an EC2 instance it the one I want you to try. It shows the steps for
- Create EC2 instance in the network where your application is running and should be able to reach MSK
- Download Kafka Client library
- Download MSK IAM Jar file
- set up
client-config.propertiesfile - Run Kafka client (e.g. list topics) which calls MSK API with using IAM authentication.
It will help me to understand where the root cause of your issue is.
@mtkopone interesting, can you try to connect to your MSK Serverless via CLI? The steps are well-documented in this blog. https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk
Yeah. The java client library in that example works as a consumer.
Same endpoint, same config.
@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the
2.8.1itself shouldn't be a problem.github.com/segmentio/kafka-go v0.4.34 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6aThe error is
EOFwhich likely happen when you have network connection failure. I had this error when I contributed toaws_msk_iam_v2module development.My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.
^ I had the same issue and the hint network connection failure has helped me. Turns out I miss the TLS config (even tho set the InsecureSkipVerify to true). Thanks, @kikyomits !
I upgraded to the latest versions, and the same errors still occur against MSK Serverless with the following:
github.com/segmentio/kafka-go v0.4.35
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20221014170723-bef291169c84
Also tried with InsecureSkipVerify in TLSConfig. Same problem.
On a different tact, it looks like the javascript kafkajs library has a very similar issue. Here: https://github.com/tulios/kafkajs/issues/1449. An AWS engineer pinpointed where the MSK Serverless protocol differs from the one used by that library here: https://github.com/tulios/kafkajs/issues/1449#issuecomment-1273943422.Perhaps that could provide some light to this issue?
@mtkopone Using debug logging, is it possible to extract the JoinGroupRequestData from the JoinGroup requests sent from this library, especially the embedded protocol metadata? Once we have that, we should check if we have the same issue described here https://github.com/tulios/kafkajs/issues/1449#issuecomment-1273943422
It wasn't the JoinGroupRequest.
Debugging further.... MSK Serverless reports it would support MetadataRequests in API versions 0 - 11.
But looks like any MetadataRequest with API version < 6 only gets an EOF as a response.
Is it possible to share the request and response for APIVersions API, and the Metadata request?
Sure thing, here you go: Bytes are all in base-10 decimal...
ApiVersions request: (clientId = "1")
[0 0 0 11 0 18 0 0 0 0 0 1 0 1 49]
ApiVersions response:
[0 0 1 90 0 0 0 1 0 0 0 0 0 56
0 0 0 0 0 9
0 1 0 0 0 12
0 2 0 0 0 6
0 3 0 0 0 11
0 4 0 0 0 5
0 5 0 0 0 3
0 6 0 0 0 7
0 7 0 0 0 3
0 8 0 0 0 8
0 9 0 0 0 7
0 10 0 0 0 3 0 11 0 0 0 7 0 12 0 0 0 4 0 13 0 0 0 4 0 14 0 0 0 5 0 15 0 0 0 5 0 16 0 0 0 4 0 17 0 0 0 1 0 18 0 0 0 3 0 19 0 0 0 7
0 20 0 0 0 6 0 21 0 0 0 2 0 22 0 0 0 4 0 23 0 0 0 4 0 24 0 0 0 3 0 25 0 0 0 3 0 26 0 0 0 3 0 27 0 0 0 1 0 28 0 0 0 3 0 29 0 0 0 2
0 30 0 0 0 2 0 31 0 0 0 2 0 32 0 0 0 4 0 33 0 0 0 2 0 34 0 0 0 2 0 35 0 0 0 2 0 36 0 0 0 2 0 37 0 0 0 3 0 38 0 0 0 2 0 39 0 0 0 2
0 40 0 0 0 2 0 41 0 0 0 2 0 42 0 0 0 2 0 43 0 0 0 2 0 44 0 0 0 1 0 45 0 0 0 0 0 46 0 0 0 0 0 47 0 0 0 0 0 48 0 0 0 1 0 49 0 0 0 1
0 50 0 0 0 0 0 51 0 0 0 0 0 56 0 0 0 0 0 57 0 0 0 0 0 60 0 0 0 0 0 61]
Which gets parsed to:
map[Produce:Produce[v0:v9] Fetch:Fetch[v0:v12] ListOffsets:ListOffsets[v0:v6] Metadata:Metadata[v0:v11] LeaderAndIsr:LeaderAndIsr[v0:v5] StopReplica:StopReplica[v0:v3] UpdateMetadata:UpdateMetadata[v0:v7] ControlledShutdown:ControlledShutdown[v0:v3] OffsetCommit:OffsetCommit[v0:v8] OffsetFetch:OffsetFetch[v0:v7] FindCoordinator:FindCoordinator[v0:v3] JoinGroup:JoinGroup[v0:v7] Heartbeat:Heartbeat[v0:v4] LeaveGroup:LeaveGroup[v0:v4] SyncGroup:SyncGroup[v0:v5] DescribeGroups:DescribeGroups[v0:v5] ListGroups:ListGroups[v0:v4] SaslHandshake:SaslHandshake[v0:v1] ApiVersions:ApiVersions[v0:v3] CreateTopics:CreateTopics[v0:v7] DeleteTopics:DeleteTopics[v0:v6] DeleteRecords:DeleteRecords[v0:v2] InitProducerId:InitProducerId[v0:v4] OffsetForLeaderEpoch:OffsetForLeaderEpoch[v0:v4] AddPartitionsToTxn:AddPartitionsToTxn[v0:v3] AddOffsetsToTxn:AddOffsetsToTxn[v0:v3] EndTxn:EndTxn[v0:v3] WriteTxnMarkers:WriteTxnMarkers[v0:v1] TxnOffsetCommit:TxnOffsetCommit[v0:v3] DescribeAcls:DescribeAcls[v0:v2] CreateAcls:CreateAcls[v0:v2] DeleteAcls:DeleteAcls[v0:v2] DescribeConfigs:DescribeConfigs[v0:v4] AlterConfigs:AlterConfigs[v0:v2] AlterReplicaLogDirs:AlterReplicaLogDirs[v0:v2] DescribeLogDirs:DescribeLogDirs[v0:v2] SaslAuthenticate:SaslAuthenticate[v0:v2] CreatePartitions:CreatePartitions[v0:v3] CreateDelegationToken:CreateDelegationToken[v0:v2] RenewDelegationToken:RenewDelegationToken[v0:v2] ExpireDelegationToken:ExpireDelegationToken[v0:v2] DescribeDelegationToken:DescribeDelegationToken[v0:v2] DeleteGroups:DeleteGroups[v0:v2] ElectLeaders:ElectLeaders[v0:v2] IncrementalAlfterConfigs:IncrementalAlfterConfigs[v0:v1] AlterPartitionReassignments:AlterPartitionReassignments[v0:v0] ListPartitionReassignments:ListPartitionReassignments[v0:v0] OffsetDelete:OffsetDelete[v0:v0] 48:48[v0:v1] 49:49[v0:v1] 50:50[v0:v0] 51:51[v0:v0] 56:56[v0:v0] 57:57[v0:v0] 60:60[v0:v0] 61:61[v0:v0]]
Metadata request: (clientId: "1", topic: "t1")
[0 0 0 19 0 3 0 1 0 0 0 4 0 1 49 0 0 0 1 0 2 116 49]
As far as I understand, these look ok to me...
@mtkopone Are you still seeing this issue if you use the tip of main? With https://github.com/segmentio/kafka-go/pull/947 merged I would expect the Reader and consumer group to be sending the highest supported metadata request version.
Yes. The problem remained in main.
The metadata call only used v1. The PR above makes it also support v6, which fixes at least that problem.
I face the same err. github.com/segmentio/kafka-go v0.4.35 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20221021184657-750193894a7e
@ZhangDahe would you be able to test if it works using the fork of #1013 ?
Your merged PR has been included in release https://github.com/segmentio/kafka-go/releases/tag/v0.4.36 Feel free to close this issue if things are working as expected in that version!
Thank you!
We are working on verifying that nothing else is broken with MSK Serverless. Looks good so far. I will update/close this issue once we have more info.
Closing this with a note that v0.4.36 and v0.4.37 should be skipped as they contain severe bugs. v0.4.38 contains this fix but with the bugs reverted. Please file a new issue if you're continuing to have trouble. Thanks!