sarama icon indicating copy to clipboard operation
sarama copied to clipboard

Sarama version >= 1.41.0,Kafka 2.11-0.10.2.2,sasl_scram_client can't connect correctly,old version kafka exist incompatibility problem。Hope to fix it in new version!

Open cheneylew opened this issue 2 months ago • 6 comments

Description
Versions
Sarama Kafka Go
>=1.41.0 2.11-0.10.2.2 1.24.4

https://archive.apache.org/dist/kafka/0.10.2.2/kafka_2.11-0.10.2.2.tgz

Configuration
sasl_scram_client:
-brokers=10.0.70.128:9090 -username=test -passwd=test-1234 -tls-skip-verify=true -algorithm=sha256 -mode=produce -tls=true -topic=test-topic -ca=./certificates/ca-cert-0.10.2.2.pem
Logs

[Sarama] 2025/10/24 16:19:22 Initializing new client [Sarama] 2025/10/24 16:19:22 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:22 Error while performing SASL handshake 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:22 Closed connection to broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:22 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported [Sarama] 2025/10/24 16:19:22 client/metadata no available broker to send metadata request to [Sarama] 2025/10/24 16:19:22 client/brokers resurrecting 1 dead seed brokers [Sarama] 2025/10/24 16:19:23 client/metadata retrying after 250ms... (2 attempts remaining) [Sarama] 2025/10/24 16:19:23 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 Error while performing SASL handshake 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 Closed connection to broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported [Sarama] 2025/10/24 16:19:23 client/metadata no available broker to send metadata request to [Sarama] 2025/10/24 16:19:23 client/brokers resurrecting 1 dead seed brokers [Sarama] 2025/10/24 16:19:23 client/metadata retrying after 250ms... (1 attempts remaining) [Sarama] 2025/10/24 16:19:23 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 Error while performing SASL handshake 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 Closed connection to broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported [Sarama] 2025/10/24 16:19:23 client/metadata no available broker to send metadata request to [Sarama] 2025/10/24 16:19:23 client/brokers resurrecting 1 dead seed brokers [Sarama] 2025/10/24 16:19:23 client/metadata retrying after 250ms... (0 attempts remaining) [Sarama] 2025/10/24 16:19:23 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 Error while performing SASL handshake 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 Closed connection to broker 10.0.70.128:9090 [Sarama] 2025/10/24 16:19:23 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported [Sarama] 2025/10/24 16:19:23 client/metadata no available broker to send metadata request to [Sarama] 2025/10/24 16:19:23 client/brokers resurrecting 1 dead seed brokers [Sarama] 2025/10/24 16:19:23 Closing Client [Producer] 2025/10/24 16:19:23 failed to create producer: kafka: client has run out of available brokers to talk to

cheneylew avatar Oct 24 '25 08:10 cheneylew

[Sarama] 2025/10/24 16:19:22 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported

Have you set Version in sarama.Config correctly to match 0.10.2.2? Modern sarama versions assume a default of Kafka 2.1 protocol unless specified and will also use the ApiVersions responses too

Aside: is there a specific reason your backend cluster is still running 0.10.2.2 from 2018? I’d strongly recommend an upgrade there to at least 2.4 or newer, for numerous improvements and bugfixes

dnwe avatar Oct 24 '25 08:10 dnwe

We are based in China, and our client is a bank. Their system is relatively outdated, and upgrading it is not feasible for us due to operational constraints. As such, we need to ensure compatibility with their Kafka version: Kafka-2.11.0.10.22. Following your official example, we have configured conf.Version = sarama.V0_10_2_2。However, we've noticed that versions ≤ 1.40.1 used to work properly. Therefore, we hope to maintain compatibility with these older versions.

-brokers=10.0.70.128:9090 -username=test -passwd=test-1234 -tls-skip-verify=true -algorithm=sha256 -mode=produce -tls=true -topic=test-topic -ca=./certificates/ca-cert-0.10.2.2.pem

import (
	"crypto/tls"
	"crypto/x509"
	"flag"
	"log"
	"os"
	"os/signal"
	"strings"

	"github.com/IBM/sarama"
)

func init() {
	sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}

var (
	brokers       = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
	userName      = flag.String("username", "", "The SASL username")
	passwd        = flag.String("passwd", "", "The SASL password")
	algorithm     = flag.String("algorithm", "", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
	topic         = flag.String("topic", "default_topic", "The Kafka topic to use")
	certFile      = flag.String("certificate", "", "The optional certificate file for client authentication")
	keyFile       = flag.String("key", "", "The optional key file for client authentication")
	caFile        = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
	tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether to skip TLS server cert verification")
	useTLS        = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
	mode          = flag.String("mode", "produce", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
	logMsg        = flag.Bool("logmsg", false, "True to log consumed messages to console")

	logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
)

func createTLSConfiguration() (t *tls.Config) {
	t = &tls.Config{
		InsecureSkipVerify: *tlsSkipVerify,
	}
	if *certFile != "" && *keyFile != "" && *caFile != "" {
		cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
		if err != nil {
			log.Fatal(err)
		}

		caCert, err := os.ReadFile(*caFile)
		if err != nil {
			log.Fatal(err)
		}

		caCertPool := x509.NewCertPool()
		caCertPool.AppendCertsFromPEM(caCert)

		t = &tls.Config{
			Certificates:       []tls.Certificate{cert},
			RootCAs:            caCertPool,
			InsecureSkipVerify: *tlsSkipVerify,
		}
	}
	return t
}

func main() {
	flag.Parse()

	if *brokers == "" {
		log.Fatalln("at least one broker is required")
	}
	splitBrokers := strings.Split(*brokers, ",")

	if *userName == "" {
		log.Fatalln("SASL username is required")
	}

	if *passwd == "" {
		log.Fatalln("SASL password is required")
	}

	conf := sarama.NewConfig()
	conf.Producer.Retry.Max = 1
	conf.Producer.RequiredAcks = sarama.WaitForAll
	conf.Producer.Return.Successes = true
	conf.Metadata.Full = true
	conf.Version = sarama.V0_10_2_2
	conf.ClientID = "sasl_scram_client"
	conf.Metadata.Full = true
	conf.Net.SASL.Enable = true
	conf.Net.SASL.User = *userName
	conf.Net.SASL.Password = *passwd
	conf.Net.SASL.Handshake = true
	if *algorithm == "sha512" {
		conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
		conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
	} else if *algorithm == "sha256" {
		conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
		conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256

	} else {
		log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
	}

	if *useTLS {
		conf.Net.TLS.Enable = true
		conf.Net.TLS.Config = createTLSConfiguration()
	}

	if *mode == "consume" {
		consumer, err := sarama.NewConsumer(splitBrokers, conf)
		if err != nil {
			panic(err)
		}
		log.Println("consumer created")
		defer func() {
			if err := consumer.Close(); err != nil {
				log.Fatalln(err)
			}
		}()
		log.Println("commence consuming")
		partitionConsumer, err := consumer.ConsumePartition(*topic, 0, sarama.OffsetOldest)
		if err != nil {
			panic(err)
		}

		defer func() {
			if err := partitionConsumer.Close(); err != nil {
				log.Fatalln(err)
			}
		}()

		// Trap SIGINT to trigger a shutdown.
		signals := make(chan os.Signal, 1)
		signal.Notify(signals, os.Interrupt)

		consumed := 0
	ConsumerLoop:
		for {
			log.Println("in the for")
			select {
			case msg := <-partitionConsumer.Messages():
				log.Printf("Consumed message offset %d\n", msg.Offset)
				if *logMsg {
					log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
				}
				consumed++
			case <-signals:
				break ConsumerLoop
			}
		}

		log.Printf("Consumed: %d\n", consumed)

	} else {
		syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
		if err != nil {
			logger.Fatalln("failed to create producer: ", err)
		}
		partition, offset, err := syncProducer.SendMessage(&sarama.ProducerMessage{
			Topic: *topic,
			Value: sarama.StringEncoder("test_message"),
		})
		if err != nil {
			logger.Fatalln("failed to send message to ", *topic, err)
		}
		logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
		_ = syncProducer.Close()
	}
	logger.Println("Bye now !")
}
Got a connection, launched process /Users/apple/Library/Caches/JetBrains/GoLand2023.2/tmp/GoLand/___2go_build_github_com_IBM_sarama_examples_sasl_scram_client (pid = 67378).
[Sarama] 2025/10/24 17:14:37 Initializing new client
[Sarama] 2025/10/24 17:14:37 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported
[Sarama] 2025/10/24 17:14:37 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 17:14:37 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 17:14:37 client/metadata retrying after 250ms... (2 attempts remaining)
[Sarama] 2025/10/24 17:14:37 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported
[Sarama] 2025/10/24 17:14:37 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 17:14:37 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 17:14:37 client/metadata retrying after 250ms... (1 attempts remaining)
[Sarama] 2025/10/24 17:14:37 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:37 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported
[Sarama] 2025/10/24 17:14:37 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 17:14:37 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 17:14:38 client/metadata retrying after 250ms... (0 attempts remaining)
[Sarama] 2025/10/24 17:14:38 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:38 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:38 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 17:14:38 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported
[Sarama] 2025/10/24 17:14:38 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 17:14:38 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 17:14:38 Closing Client
[Producer] 2025/10/24 17:14:38 failed to create producer:  kafka: client has run out of available brokers to talk to
Exiting.

[Sarama] 2025/10/24 16:19:22 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported

Have you set Version in sarama.Config correctly to match 0.10.2.2? Modern sarama versions assume a default of Kafka 2.1 protocol unless specified and will also use the ApiVersions responses too

Aside: is there a specific reason your backend cluster is still running 0.10.2.2 from 2018? I’d strongly recommend an upgrade there to at least 2.4 or newer, for numerous improvements and bugfixes

[Sarama] 2025/10/24 16:19:22 client/metadata got error from broker -1 while fetching metadata: kafka server: The version of API is not supported

Have you set Version in sarama.Config correctly to match 0.10.2.2? Modern sarama versions assume a default of Kafka 2.1 protocol unless specified and will also use the ApiVersions responses too

Aside: is there a specific reason your backend cluster is still running 0.10.2.2 from 2018? I’d strongly recommend an upgrade there to at least 2.4 or newer, for numerous improvements and bugfixes

cheneylew avatar Oct 24 '25 09:10 cheneylew

Sure, but as announced in 1.41.0 release notes under “breaking changes” we purposefully bumped the default version to give a better out of the box experience for most Sarama users

However, Kafka 0.10.x.x did support the ApiVersions flow to select protocol versions, so on the very latest version of Sarama you shouldn’t need to pin the protocol version at all as it should be negotiated when using that version (assuming you haven’t turned ApiVersionsRequest off in config)

dnwe avatar Oct 24 '25 09:10 dnwe

Sure, but as announced in 1.41.0 release notes under “breaking changes” we purposefully bumped the default version to give a better out of the box experience for most Sarama users

However, Kafka 0.10.x.x did support the ApiVersions flow to select protocol versions, so on the very latest version of Sarama you shouldn’t need to pin the protocol version at all as it should be negotiated when using that version (assuming you haven’t turned ApiVersionsRequest off in config)

OK, but the fact is that setting conf.Version = sarama.V0_10_2_2 for Kafka 0.10.x.x isn't working properly. Is there any way to fix this? Or should I abandon it? 😂

	conf.Producer.Return.Successes = true
	conf.Metadata.Full = true
	// conf.Version = sarama.V0_10_2_2
	conf.ClientID = "sasl_scram_client"

I commented out the version setting, but it still doesn't seem to work.

Got a connection, launched process /Users/apple/Library/Caches/JetBrains/GoLand2023.2/tmp/GoLand/___2go_build_github_com_IBM_sarama_examples_sasl_scram_client (pid = 72628).
[Sarama] 2025/10/24 18:04:28 Initializing new client
[Sarama] 2025/10/24 18:04:28 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 client/metadata got error from broker -1 while fetching metadata: EOF
[Sarama] 2025/10/24 18:04:28 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 18:04:28 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 18:04:28 client/metadata retrying after 250ms... (2 attempts remaining)
[Sarama] 2025/10/24 18:04:28 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 client/metadata got error from broker -1 while fetching metadata: EOF
[Sarama] 2025/10/24 18:04:28 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 18:04:28 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 18:04:28 client/metadata retrying after 250ms... (1 attempts remaining)
[Sarama] 2025/10/24 18:04:28 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:28 client/metadata got error from broker -1 while fetching metadata: EOF
[Sarama] 2025/10/24 18:04:28 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 18:04:28 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 18:04:29 client/metadata retrying after 250ms... (0 attempts remaining)
[Sarama] 2025/10/24 18:04:29 client/metadata fetching metadata for all topics from broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:29 Error while performing SASL handshake 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:29 Closed connection to broker 10.0.70.128:9090
[Sarama] 2025/10/24 18:04:29 client/metadata got error from broker -1 while fetching metadata: EOF
[Sarama] 2025/10/24 18:04:29 client/metadata no available broker to send metadata request to
[Sarama] 2025/10/24 18:04:29 client/brokers resurrecting 1 dead seed brokers
[Sarama] 2025/10/24 18:04:29 Closing Client
[Producer] 2025/10/24 18:04:29 failed to create producer:  kafka: client has run out of available brokers to talk to: EOF
Exiting.

cheneylew avatar Oct 24 '25 10:10 cheneylew

OK, but the fact is that setting conf.Version = sarama.V0_10_2_2 for Kafka 0.10.x.x isn't working properly. Is there any way to fix this? Or should I abandon it? 😂

Ah sorry I didn’t understand this point in your reply, I hadn’t realised you’d tried that 0.10.2.2 pin and it still wasn’t working, that’s curious – I’ll take a look

dnwe avatar Oct 24 '25 14:10 dnwe

OK, but the fact is that setting conf.Version = sarama.V0_10_2_2 for Kafka 0.10.x.x isn't working properly. Is there any way to fix this? Or should I abandon it? 😂

Ah sorry I didn’t understand this point in your reply, I hadn’t realised you’d tried that 0.10.2.2 pin and it still wasn’t working, that’s curious – I’ll take a look

Okay, thank you. Please let me know if there’s any progress or if the issue has been fixed.The weekend is over,I'm back。

cheneylew avatar Oct 27 '25 01:10 cheneylew