sarama
sarama copied to clipboard
DescribeCluster() returns random broker ID as the controller ID under KRaft mode
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama | Kafka | Go |
---|---|---|
1.40.0 | 3.4.1 | 1.19 |
Main logic:
func (r *Reconciler) determineControllerId(log logr.Logger) (int32, error) {
kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster)
if err != nil {
return -1, errors.WrapIf(err, "could not create Kafka client, thus could not determine controller")
}
defer close()
brokers, controllerID, err := kClient.DescribeCluster()
if err != nil {
return -1, errors.WrapIf(err, "could not find controller broker")
}
log.Info("TESTING", "brokers", brokers, "controllerID", controllerID)
return controllerID, nil
}
How kafka admin client is set up:
type Provider interface {
NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)
}
func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
return NewFromCluster(client, cluster)
}
// NewFromCluster is a convenience wrapper around New() and ClusterConfig()
func NewFromCluster(k8sclient client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
var client KafkaClient
var err error
opts, err := ClusterConfig(k8sclient, cluster)
if err != nil {
return nil, nil, err
}
client = New(opts)
err = client.Open()
close := func() {
if err := client.Close(); err != nil {
log.Error(err, "Error closing Kafka client")
} else {
log.Info("Kafka client closed cleanly")
}
}
return client, close, err
}
There is no error while setting up the admin client, but what I obtained from kClient.DescribeCluster()
is just a random broker ID from my Kafka cluster with 3 quorum voters and 3 regular brokers and the returned brokers list is just a list of empty structs, example logs from my program above:
{"level":"info","ts":"2023-07-25T02:38:42.144Z","msg":"DEBUGGING","controller":"KafkaCluster","controllerGroup":"kafka.banzaicloud.io","controllerKind":"KafkaCluster","KafkaCluster":{"name":"kafka","namespace":"default"},"namespace":"default","name":"kafka","reconcileID":"05fba694-8699-4483-ac81-eb6b27b8dc76","component":"kafka","clusterName":"kafka","clusterNamespace":"default","brokers":[{},{},{},{}],"controllerID":2}
{"level":"info","ts":"2023-07-25T02:38:11.556Z","msg":"DEBUGGING","controller":"KafkaCluster","controllerGroup":"kafka.banzaicloud.io","controllerKind":"KafkaCluster","KafkaCluster":{"name":"kafka","namespace":"default"},"namespace":"default","name":"kafka","reconcileID":"6d89d18c-f341-444b-a062-9210dee5888d","component":"kafka","clusterName":"kafka","clusterNamespace":"default","brokers":[{},{},{},{}],"controllerID":0}
Actual state of my Kafka cluster:
./opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server kafka-headless:29092 describe --status
ClusterId: x_NfbXESRDqP6kP4cwv-xQ
LeaderId: 5
LeaderEpoch: 20
HighWatermark: 88266
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 164
CurrentVoters: [3,4,5]
CurrentObservers: [0,1,2]
Note that everything is working fine in ZooKeeper mode, and it looks like sarama is just not compatible with KRaft yet.
It'd be appreciated if the community can provide some insights
@panyuenlau this is actually an intentional behaviour in the server-side kraft code. See https://github.com/apache/kafka/blob/b2dea17041157ceee741041d23783ff993b88ef1/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L290-L305
When using KRaft, a cluster no longer has a single controller. Instead, nodes in the cluster that are running with the "controller" role, all take part in the controller metadata quorum.
Kafka just expects you to send your requests to any node in the cluster and it will be load balanced accordingly
Hey @dnwe - thanks for the info. I understand that Kafka itself doesn't want clients to have direct access to the quorum controllers, however, shouldn't an admin have the right to know about the quorum controllers? For example, can sarama leverage this https://github.com/apache/kafka/blob/3.4.1/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java#L1482-L1509?
@panyuenlau sure, in the future we will of course add the DescribeQuorumRequest and Response protocol and we can expose this for the metadata topic in the admin.go code in the same way as it is for Java, but afaik this is just informative rather than actually being used by the client to make any choices
Alrighty, thank you for the response! @dnwe
Shall I keep the issue open? If not, please feel free to close it
Actually - I just noticed that there is a KIP currently under discussion and it ties to what we are discussing here: KIP-919
Good find. Yes let’s keep this issue open for now. For one, I’d like to add an FVT matrix that uses Kraft mode anyway and two, we should at least document in the admin client that controllerID is random in Kraft mode.
Adding the metadata quorum protocol will happen over time as part of our general bringup of new protocol which is work in progress in the background
use sarama v1.42.2 The same confusion I encountered, continue to follow