confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Kafka brokers unavailable is not a fatal error, even when trying to consume

Open rakker91 opened this issue 6 years ago • 4 comments

Description

Without brokers running, when you try to consume, you will receive Errors (broker down) over the error handler defined, but these are not defined as Fatal and you'll end up in Consume indefinitely without ever retrying.

How to reproduce

Don't start kafka. Create a consumer (we're using IConsumer<byte[], byte[]> and wire up error handler: this.ConsumerBuilder.SetErrorHandler(this.OnKafkaConsumerError);

Call Consume var consumeResult = this.Consumer.Consume(token);

You'll get broker down errors like the following:

2019-06-14 10:31:14,126 |7| (Tws.Client.Event.Channels.Clients.Kafka.KafkaProvider) [ERROR] - Kafka has registered a non-fatal error and will try to recover. Error: 127.0.0.1:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: No connection could be made because the target machine actively refused it... (after 3001ms in state CONNECT) 2019-06-14 10:31:14,129 |7| (Tws.Client.Event.Channels.Clients.Kafka.KafkaProvider) [ERROR] - Kafka has registered a non-fatal error and will try to recover. Error: 1/1 brokers are down

But they're not fatal and no retry ever happens. You end up blocked at Consume, even if you start the broker.

Checklist

Please provide the following information:

  • [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. using System; using System.Collections.Generic;

namespace KafkaDemo { using Confluent.Kafka;

public class TestClass
{
    public void Run()
    {
        var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(new Dictionary<string, string>() { { "bootstrap.servers", "192.168.1.99:9092" }, {"group.id", "group1"} });
        consumerBuilder.SetErrorHandler(this.ErrorHandler);
        var consumer = consumerBuilder.Build();

        while (true)
        {
            consumer.Consume();
        }
    }

    private void ErrorHandler(IConsumer<byte[], byte[]> consumer, Error error)
    {
        Console.WriteLine($"Error Received.  Fatal: {error.IsFatal}, Error: {error}");
    }
}

}

  • [x] Confluent.Kafka nuget version. 1.0.1.1
  • [x] Apache Kafka version. 2.12
  • [x] Client configuration.
  • [x] Operating system. windows 10 connecting to ubuntu 18.04 for kafka
  • [x] Provide logs (with "debug" : "..." as necessary in configuration). See above
  • [x] Provide broker log excerpts. See above
  • [ ] Critical issue.

rakker91 avatar Jun 14 '19 17:06 rakker91

Oh--note that the retry never happens. Forgot to mention that.

rakker91 avatar Jun 14 '19 17:06 rakker91

You end up blocked at Consume, even if you start the broker.

Everything you describe is intended behavior, except for this. When I get a moment, i'll run a quick test to try and reproduce.

mhowlett avatar Jun 17 '19 23:06 mhowlett

The blocked consume block issue looks identical to the issue I posted in edenhill/librdkafka#2363! We get this due to unavailable/interrupted network connections to brokers. I also posted a detailed log extract using 'consumer,cgrp,topic,fetch,broker'.

erik-neumann avatar Jun 27 '19 08:06 erik-neumann

Is this still an issue?

anchitj avatar Jun 18 '24 12:06 anchitj