confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

KafkaConsumer hangs forever when retrieve messages (Question)

Open DoganTahir opened this issue 3 years ago • 4 comments

Description

I want to pull messages from a topic using the confluent kafka library, but var cr = c.Consume(cts.Token); In this line, the application waits for an infinite amount of time and does not give any errors. makes you wait forever.

my source code is below

var conf = new ConsumerConfig
            {
                BootstrapServers = "url:443",
                SecurityProtocol = Confluent.Kafka.SecurityProtocol.Ssl,
                SslCaLocation = "cert.crt",
                AutoOffsetReset = AutoOffsetReset.Earliest,
            };

            using (var c = new ConsumerBuilder<string, string>(conf).Build())
            {
                c.Subscribe("Test");

                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };

                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    
                    c.Close();
                }

How to reproduce

Checklist

Please provide the following information:

  • [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [ ] Confluent.Kafka nuget version.
  • [ ] Apache Kafka version.
  • [ ] Client configuration.
  • [ ] Operating system.
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [ ] Provide broker log excerpts.
  • [ ] Critical issue.

DoganTahir avatar Nov 09 '21 11:11 DoganTahir

Same thing here, I've just moved from .net 5 to .net 6. Is .net 6 officialy supported ? @mhowlett

<TargetFramework>net6.0</TargetFramework>
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />

NicolasREY69330 avatar Nov 10 '21 08:11 NicolasREY69330

Same thing is happening to me. I tried with .net 6 and .net 5. Same thing. Gets to consumer.Consume(token) and nothing. No response, no error, no timeout. And I'm producing messages to the subscribed topic that other consumers are consuming. Anyone got any ideas?

ChrisBurgdorff avatar Sep 07 '22 15:09 ChrisBurgdorff

set the Debug config to all and paste the output here

mhowlett avatar Sep 07 '22 16:09 mhowlett

I figured it out. Can't speak for the other folks on here, but in my case the critical debug message was:

FindCoordinatorRequest failed: Broker: Coordinator not available

I needed to rerun the docker image with the following flag:

-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

Without that it seems to use the default factor of 3, which causes no problems producing to a topic but won't let you consume from it.

ChrisBurgdorff avatar Sep 07 '22 16:09 ChrisBurgdorff