confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration
Description
OK, I'm new to C# and .net, but have a working implementation with a java client on my machine and am trying to help our .net developers connect to the kafka service.
Kafka is running inside a kubernetes cluster on a private network (for example 10.1.2.3:9092). When I connect from my workstation with a java client, I only have to specify the bootstrap server and everything else works without any issue. This means that the network configuration is fine and there are no firewall issues blocking the connection.
However I try to configure the client, either plaintext, ssl or sasl I get either the following error, or an issue with ssl handshake. In java it's not necessary to configure anything related to ssl or sasl.
%7|1703086748.514|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test-consumer-group": updating member id "(not-set)" -> ""
%7|1703086748.518|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0x2504)
%7|1703086748.518|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group" changed state init -> query-coord (join-state init)
%7|1703086748.518|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group": no broker available for coordinator query: intervaled in state query-coord
%7|1703086748.527|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group" received op SUBSCRIBE in state query-coord (join-state init)
%7|1703086748.527|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group": subscribe to new subscription of 1 topics (join-state init)
%7|1703086748.527|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group": no broker available for coordinator query: intervaled in state query-coord
%6|1703086748.529|FAIL|rdkafka#consumer-1| [thrd:sasl_plaintext://10.1.2.3:9092/bootstrap]: sasl_plaintext://10.1.2.3:9092/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY)
%3|1703086748.529|ERROR|rdkafka#consumer-1| [thrd:sasl_plaintext://10.1.2.3:9092/bootstrap]: 1/1 brokers are down
%3|1703086748.530|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: sasl_plaintext://10.1.2.3:9092/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY)
%6|1703086748.737|FAIL|rdkafka#consumer-1| [thrd:sasl_plaintext://10.1.2.3:9092/bootstrap]: sasl_plaintext://10.1.2.3:9092/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
How to reproduce
The following is the most simple implementation in .net
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Connector_Simplified
{
internal class Worker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
ConsumerConfig config = new ConsumerConfig
{
BootstrapServers = "10.1.2.3:9094",
GroupId = "test-consumer-group",
ApiVersionRequest = true,
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol= SecurityProtocol.SaslPlaintext,
Debug = "consumer, cgrp, topic, fetch",
SaslMechanism=Confluent.Kafka.SaslMechanism.Gssapi,
// SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.Https,
// SecurityProtocol = SecurityProtocol.SaslSsl,
SslCaLocation = "probe"
};
IConsumer<Ignore, string> consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("test-topic");
while (!stoppingToken.IsCancellationRequested)
{
ConsumeResult<Ignore, string> message = consumer.Consume(stoppingToken);
Console.WriteLine($"Received message at {message.TopicPartitionOffset}: {message.Message.Value}");
}
}
}
}
I have played with every possible combination of PlainText, Sasl and SSL that I can think of, nothing works.
Checklist
Please provide the following information:
- Confluent.Kafka nuget version: 2.3.0
- Apache Kafka version 3.3.1
- Operating system - client is running on windows workstation, kafka is running inside a kubernetes cluster deployed using the bitnami helm template.
- Broker log excerpts - the broker does not produce any logs when the .net consumer attempts to connect, but does provide logs when the java consumer connects.
This means configuration issue. Are you using the correct SaslMechanism and SecurityProtocol which the broker supports?
I'm fairly sure I am, I think I've tried everything just in case. We're now looking to reconfigure the broker to see if we can set up something that mutually works, but it needs to be done in conjunction with other systems that are currently working.