KafkaNetClient
KafkaNetClient copied to clipboard
Consumers across multi-partitions / Consumers with Broker failure
We wish to use the new ManualConsumer, to get the benefit of elegeant failover. However, we don't want to limit each Consumer to a single partition, is there a way to allow a consumer to consume from every partition, and still get the failover?
I want to clarify that this library is an implementation of the simple Kafka consumer and not the high level one - this mean it does not handle consumer failures (via partition reshuffling, etc).
In case of broker failure, ManualConsumer and consumer will recover by refreshing the metadata and routing the messages to the new leader.
I have been testing Broker failures, and have not seen either the Manual Consumer, or consumer recover.
I have included some code samples and test setup below - is there something I am missing to allow these Consumers to recover when a broker goes down?
Kafka Setup Number of Brokers: 3 Number of Partitions in Topic: 1 Replication factor: 3
Test Steps Create Topic (Leader Node A, Replicas Node B and Node C) Produce messages Start Consumer Kill Node A (Topic leader) Consumer stops consuming, where we would like it continue
Manual Consumer Example
using (var router = new BrokerRouter(_kafkaOptions))
{
using (var gateway = new ProtocolGateway(router))
{
var consumer = new ManualConsumer(0, _topicName, gateway, _consumerName, 1048576);
consumer.UpdateOrCreateOffset(_consumerGroupName, 0).Wait();
var messages = consumer.FetchMessages(_numberOfKafkaEventsToConsume, 0).Result.ToList();
}
}
Consumer Example
using (var router = new BrokerRouter(_kafkaOptions))
{
var consumerOptions = new ConsumerOptions(_topicName, router);
using (var consumer = new Consumer(consumerOptions))
{
var numberMessagesConsumed = 0;
foreach (
var messageContent in
consumer.Consume(_cancelCancellationTokenSource.Token)
.Select(message => Encoding.Default.GetString(message.Value)))
{
numberMessagesConsumed++;
}
}
}
Any help is appreciated, Thanks, Hannah
hi , I will be happy if you could try to reproduce your result with the following code:
public async Task ConsumersAcrossMulti()
{
int id = 0;
string topic = "TestTopicIssue-13";
using (var router = new BrokerRouter(_options))
{
var producer = new Producer(router);
var consumerOptions = new ConsumerOptions("TestTopicIssue-13", router){ PartitionWhitelist = new List<int>() { 0 }};
using (var consumer = new Consumer(consumerOptions))
{
var blokingenumerableOfMessage = consumer.Consume();
await producer.SendMessageAsync(topic, new[] { new Message(id.ToString())},partition:0);
foreach (var message in blokingenumerableOfMessage)
{
Console.WriteLine("Offset{0} messageId{1} PartitionId{2} ", message.Meta.Offset, message.Value.ToUtf8String(), message.Meta.PartitionId);
await producer.SendMessageAsync(topic, new[] { new Message((++id).ToString()) }, partition: 0);
await Task.Delay(1000);
}
}
}
}
I tried to reproduce your result with 2 broker, 1 partition and 2 replicas, In Kafka manger i change the leader in the middle of the test.
log: ... ... Offset128 messageId59 PartitionId0 Offset129 messageId60 PartitionId0 Offset130 messageId61 PartitionId0 Offset131 messageId62 PartitionId0 Offset132 messageId63 PartitionId0
//I changed the leader manually
10:16:00-455003 thread:[16] level:[Warn] Message:No connection . Attempting to connect... 10:16:00-456003 thread:[24] level:[Warn] Message:Connection established to:X 10:16:00-458014 thread:[8] level:[Warn] Message:No connectio. Attempting to connect... 10:16:00-460015 thread:[23] level:[Warn] Message:Connection established to:X
//Recover and continue to consume
Offset133 messageId64 PartitionId0 Offset134 messageId65 PartitionId0 Offset135 messageId66 PartitionId0 ... ...
Thanks for getting back to me.
What we are trying to achieve is consumers being able to consume messages even in the event of a broker being taken offline.
Using your code above, this still does not seem to work.
Hi @Hmckee-navinet , Thanks for opening this issue.
I've checked, and when using the Consumer
class the issue is reproduced - consuming stops working pretty fast when I stop the Kafka broker. That's definitely a bug.
When using ManualConsumer
, there's still a bug but it's slightly different. It looks like the failover is working when I shut down a broker, but it doesn't recover when the broker goes back up and becomes leader again.
My test settings - 3 brokers (S1-S3), 1 topic, 1 partition, 3 replicas:
Leader: S3 (3 brokers). Stop the leader - everything is working.
- Leader: S1 (2 brokers).
- Stop the leader - everything is working.
- Start S1 and S3.
- Leader: S2 (3 brokers).
- Stop the leader --> get an exception.
I will mark this issue as a bug, I'm not sure when I'll get to it so feel free to submit a PR if this is urgent for you.
Hi @Hmckee-navinet , We add a fix, and would like to know if this solves your problem.