sarama icon indicating copy to clipboard operation
sarama copied to clipboard

DescribeCluster() returns random broker ID as the controller ID under KRaft mode

Open panyuenlau opened this issue 1 year ago • 8 comments

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.40.0 3.4.1 1.19

Main logic:

func (r *Reconciler) determineControllerId(log logr.Logger) (int32, error) {
	kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster)
	if err != nil {
		return -1, errors.WrapIf(err, "could not create Kafka client, thus could not determine controller")
	}
	defer close()

	brokers, controllerID, err := kClient.DescribeCluster()
	if err != nil {
		return -1, errors.WrapIf(err, "could not find controller broker")
	}

	log.Info("TESTING", "brokers", brokers, "controllerID", controllerID)

	return controllerID, nil
}

How kafka admin client is set up:

type Provider interface {
	NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)
}

func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
	return NewFromCluster(client, cluster)
}

// NewFromCluster is a convenience wrapper around New() and ClusterConfig()
func NewFromCluster(k8sclient client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
	var client KafkaClient
	var err error
	opts, err := ClusterConfig(k8sclient, cluster)
	if err != nil {
		return nil, nil, err
	}
	client = New(opts)
	err = client.Open()
	close := func() {
		if err := client.Close(); err != nil {
			log.Error(err, "Error closing Kafka client")
		} else {
			log.Info("Kafka client closed cleanly")
		}
	}
	return client, close, err
}

There is no error while setting up the admin client, but what I obtained from kClient.DescribeCluster() is just a random broker ID from my Kafka cluster with 3 quorum voters and 3 regular brokers and the returned brokers list is just a list of empty structs, example logs from my program above:

{"level":"info","ts":"2023-07-25T02:38:42.144Z","msg":"DEBUGGING","controller":"KafkaCluster","controllerGroup":"kafka.banzaicloud.io","controllerKind":"KafkaCluster","KafkaCluster":{"name":"kafka","namespace":"default"},"namespace":"default","name":"kafka","reconcileID":"05fba694-8699-4483-ac81-eb6b27b8dc76","component":"kafka","clusterName":"kafka","clusterNamespace":"default","brokers":[{},{},{},{}],"controllerID":2}
{"level":"info","ts":"2023-07-25T02:38:11.556Z","msg":"DEBUGGING","controller":"KafkaCluster","controllerGroup":"kafka.banzaicloud.io","controllerKind":"KafkaCluster","KafkaCluster":{"name":"kafka","namespace":"default"},"namespace":"default","name":"kafka","reconcileID":"6d89d18c-f341-444b-a062-9210dee5888d","component":"kafka","clusterName":"kafka","clusterNamespace":"default","brokers":[{},{},{},{}],"controllerID":0}

Actual state of my Kafka cluster:

./opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server kafka-headless:29092 describe --status
ClusterId:              x_NfbXESRDqP6kP4cwv-xQ
LeaderId:               5
LeaderEpoch:            20
HighWatermark:          88266
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   164
CurrentVoters:          [3,4,5]
CurrentObservers:       [0,1,2]

panyuenlau avatar Jul 25 '23 02:07 panyuenlau

Note that everything is working fine in ZooKeeper mode, and it looks like sarama is just not compatible with KRaft yet.

It'd be appreciated if the community can provide some insights

panyuenlau avatar Jul 25 '23 02:07 panyuenlau

@panyuenlau this is actually an intentional behaviour in the server-side kraft code. See https://github.com/apache/kafka/blob/b2dea17041157ceee741041d23783ff993b88ef1/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L290-L305

When using KRaft, a cluster no longer has a single controller. Instead, nodes in the cluster that are running with the "controller" role, all take part in the controller metadata quorum.

Kafka just expects you to send your requests to any node in the cluster and it will be load balanced accordingly

dnwe avatar Jul 25 '23 05:07 dnwe

Hey @dnwe - thanks for the info. I understand that Kafka itself doesn't want clients to have direct access to the quorum controllers, however, shouldn't an admin have the right to know about the quorum controllers? For example, can sarama leverage this https://github.com/apache/kafka/blob/3.4.1/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java#L1482-L1509?

panyuenlau avatar Jul 25 '23 12:07 panyuenlau

@panyuenlau sure, in the future we will of course add the DescribeQuorumRequest and Response protocol and we can expose this for the metadata topic in the admin.go code in the same way as it is for Java, but afaik this is just informative rather than actually being used by the client to make any choices

dnwe avatar Jul 25 '23 14:07 dnwe

Alrighty, thank you for the response! @dnwe

Shall I keep the issue open? If not, please feel free to close it

panyuenlau avatar Jul 25 '23 14:07 panyuenlau

Actually - I just noticed that there is a KIP currently under discussion and it ties to what we are discussing here: KIP-919

panyuenlau avatar Jul 25 '23 15:07 panyuenlau

Good find. Yes let’s keep this issue open for now. For one, I’d like to add an FVT matrix that uses Kraft mode anyway and two, we should at least document in the admin client that controllerID is random in Kraft mode.

Adding the metadata quorum protocol will happen over time as part of our general bringup of new protocol which is work in progress in the background

dnwe avatar Jul 26 '23 17:07 dnwe

image use sarama v1.42.2 The same confusion I encountered, continue to follow

cuiwin avatar Feb 22 '24 02:02 cuiwin