confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
KafkaConsumer hangs forever when retrieve messages (Question)
Description
I want to pull messages from a topic using the confluent kafka library, but
var cr = c.Consume(cts.Token);
In this line, the application waits for an infinite amount of time and does not give any errors.
makes you wait forever.
my source code is below
var conf = new ConsumerConfig
{
BootstrapServers = "url:443",
SecurityProtocol = Confluent.Kafka.SecurityProtocol.Ssl,
SslCaLocation = "cert.crt",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
using (var c = new ConsumerBuilder<string, string>(conf).Build())
{
c.Subscribe("Test");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
How to reproduce
Checklist
Please provide the following information:
- [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
- [ ] Confluent.Kafka nuget version.
- [ ] Apache Kafka version.
- [ ] Client configuration.
- [ ] Operating system.
- [ ] Provide logs (with "debug" : "..." as necessary in configuration).
- [ ] Provide broker log excerpts.
- [ ] Critical issue.
Same thing here, I've just moved from .net 5 to .net 6. Is .net 6 officialy supported ? @mhowlett
<TargetFramework>net6.0</TargetFramework>
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />
Same thing is happening to me. I tried with .net 6 and .net 5. Same thing. Gets to consumer.Consume(token) and nothing. No response, no error, no timeout. And I'm producing messages to the subscribed topic that other consumers are consuming. Anyone got any ideas?
set the Debug config to all and paste the output here
I figured it out. Can't speak for the other folks on here, but in my case the critical debug message was:
FindCoordinatorRequest failed: Broker: Coordinator not available
I needed to rerun the docker image with the following flag:
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
Without that it seems to use the default factor of 3, which causes no problems producing to a topic but won't let you consume from it.