confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Kafka brokers unavailable is not a fatal error, even when trying to consume
Description
Without brokers running, when you try to consume, you will receive Errors (broker down) over the error handler defined, but these are not defined as Fatal and you'll end up in Consume indefinitely without ever retrying.
How to reproduce
Don't start kafka. Create a consumer (we're using IConsumer<byte[], byte[]> and wire up error handler: this.ConsumerBuilder.SetErrorHandler(this.OnKafkaConsumerError);
Call Consume var consumeResult = this.Consumer.Consume(token);
You'll get broker down errors like the following:
2019-06-14 10:31:14,126 |7| (Tws.Client.Event.Channels.Clients.Kafka.KafkaProvider) [ERROR] - Kafka has registered a non-fatal error and will try to recover. Error: 127.0.0.1:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: No connection could be made because the target machine actively refused it... (after 3001ms in state CONNECT) 2019-06-14 10:31:14,129 |7| (Tws.Client.Event.Channels.Clients.Kafka.KafkaProvider) [ERROR] - Kafka has registered a non-fatal error and will try to recover. Error: 1/1 brokers are down
But they're not fatal and no retry ever happens. You end up blocked at Consume, even if you start the broker.
Checklist
Please provide the following information:
- [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. using System; using System.Collections.Generic;
namespace KafkaDemo { using Confluent.Kafka;
public class TestClass
{
public void Run()
{
var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(new Dictionary<string, string>() { { "bootstrap.servers", "192.168.1.99:9092" }, {"group.id", "group1"} });
consumerBuilder.SetErrorHandler(this.ErrorHandler);
var consumer = consumerBuilder.Build();
while (true)
{
consumer.Consume();
}
}
private void ErrorHandler(IConsumer<byte[], byte[]> consumer, Error error)
{
Console.WriteLine($"Error Received. Fatal: {error.IsFatal}, Error: {error}");
}
}
}
- [x] Confluent.Kafka nuget version. 1.0.1.1
- [x] Apache Kafka version. 2.12
- [x] Client configuration.
- [x] Operating system. windows 10 connecting to ubuntu 18.04 for kafka
- [x] Provide logs (with "debug" : "..." as necessary in configuration). See above
- [x] Provide broker log excerpts. See above
- [ ] Critical issue.
Oh--note that the retry never happens. Forgot to mention that.
You end up blocked at Consume, even if you start the broker.
Everything you describe is intended behavior, except for this. When I get a moment, i'll run a quick test to try and reproduce.
The blocked consume block issue looks identical to the issue I posted in edenhill/librdkafka#2363! We get this due to unavailable/interrupted network connections to brokers. I also posted a detailed log extract using 'consumer,cgrp,topic,fetch,broker'.
Is this still an issue?