KafkaNetClient icon indicating copy to clipboard operation
KafkaNetClient copied to clipboard

Consumers across multi-partitions / Consumers with Broker failure

Open ghost opened this issue 9 years ago • 6 comments

We wish to use the new ManualConsumer, to get the benefit of elegeant failover. However, we don't want to limit each Consumer to a single partition, is there a way to allow a consumer to consume from every partition, and still get the failover?

ghost avatar Jan 27 '16 08:01 ghost

I want to clarify that this library is an implementation of the simple Kafka consumer and not the high level one - this mean it does not handle consumer failures (via partition reshuffling, etc).

In case of broker failure, ManualConsumer and consumer will recover by refreshing the metadata and routing the messages to the new leader.

EranOfer avatar Jan 27 '16 09:01 EranOfer

I have been testing Broker failures, and have not seen either the Manual Consumer, or consumer recover.

I have included some code samples and test setup below - is there something I am missing to allow these Consumers to recover when a broker goes down?

Kafka Setup Number of Brokers: 3 Number of Partitions in Topic: 1 Replication factor: 3

Test Steps Create Topic (Leader Node A, Replicas Node B and Node C) Produce messages Start Consumer Kill Node A (Topic leader) Consumer stops consuming, where we would like it continue

Manual Consumer Example

using (var router = new BrokerRouter(_kafkaOptions))
{
    using (var gateway = new ProtocolGateway(router))
    {
        var consumer = new ManualConsumer(0, _topicName, gateway, _consumerName, 1048576);
        consumer.UpdateOrCreateOffset(_consumerGroupName, 0).Wait();

        var messages = consumer.FetchMessages(_numberOfKafkaEventsToConsume, 0).Result.ToList();
    }
}

Consumer Example

using (var router = new BrokerRouter(_kafkaOptions))
{
    var consumerOptions = new ConsumerOptions(_topicName, router);

    using (var consumer = new Consumer(consumerOptions))
    {
        var numberMessagesConsumed = 0;

        foreach (
            var messageContent in
                consumer.Consume(_cancelCancellationTokenSource.Token)
                    .Select(message => Encoding.Default.GetString(message.Value)))
        {
            numberMessagesConsumed++;
        }
    }
}

Any help is appreciated, Thanks, Hannah

ghost avatar Jan 27 '16 09:01 ghost

hi , I will be happy if you could try to reproduce your result with the following code:

         public async Task ConsumersAcrossMulti()
        {
            int id = 0;
            string topic = "TestTopicIssue-13";
            using (var router = new BrokerRouter(_options))
            {
                var producer = new Producer(router);

                var consumerOptions = new ConsumerOptions("TestTopicIssue-13", router){ PartitionWhitelist = new List<int>() { 0 }};

                using (var consumer = new Consumer(consumerOptions))
                {
                    var blokingenumerableOfMessage = consumer.Consume();
                    await producer.SendMessageAsync(topic, new[] { new Message(id.ToString())},partition:0);

                    foreach (var message in blokingenumerableOfMessage)
                    {
                        Console.WriteLine("Offset{0} messageId{1} PartitionId{2} ", message.Meta.Offset, message.Value.ToUtf8String(), message.Meta.PartitionId);
                        await producer.SendMessageAsync(topic, new[] { new Message((++id).ToString()) }, partition: 0);
                        await Task.Delay(1000);
                    }
                }
            }
        }

I tried to reproduce your result with 2 broker, 1 partition and 2 replicas, In Kafka manger i change the leader in the middle of the test.

log: ... ... Offset128 messageId59 PartitionId0 Offset129 messageId60 PartitionId0 Offset130 messageId61 PartitionId0 Offset131 messageId62 PartitionId0 Offset132 messageId63 PartitionId0

//I changed the leader manually

10:16:00-455003 thread:[16] level:[Warn] Message:No connection . Attempting to connect... 10:16:00-456003 thread:[24] level:[Warn] Message:Connection established to:X 10:16:00-458014 thread:[8] level:[Warn] Message:No connectio. Attempting to connect... 10:16:00-460015 thread:[23] level:[Warn] Message:Connection established to:X

//Recover and continue to consume

Offset133 messageId64 PartitionId0 Offset134 messageId65 PartitionId0 Offset135 messageId66 PartitionId0 ... ...

EranOfer avatar Jan 28 '16 11:01 EranOfer

Thanks for getting back to me.

What we are trying to achieve is consumers being able to consume messages even in the event of a broker being taken offline.

Using your code above, this still does not seem to work.

ghost avatar Jan 28 '16 12:01 ghost

Hi @Hmckee-navinet , Thanks for opening this issue.

I've checked, and when using the Consumer class the issue is reproduced - consuming stops working pretty fast when I stop the Kafka broker. That's definitely a bug.

When using ManualConsumer, there's still a bug but it's slightly different. It looks like the failover is working when I shut down a broker, but it doesn't recover when the broker goes back up and becomes leader again.

My test settings - 3 brokers (S1-S3), 1 topic, 1 partition, 3 replicas:

Leader: S3 (3 brokers). Stop the leader - everything is working.

  • Leader: S1 (2 brokers).
  • Stop the leader - everything is working.
  • Start S1 and S3.
  • Leader: S2 (3 brokers).
  • Stop the leader --> get an exception.

I will mark this issue as a bug, I'm not sure when I'll get to it so feel free to submit a PR if this is urgent for you.

EranOfer avatar Feb 07 '16 13:02 EranOfer

Hi @Hmckee-navinet , We add a fix, and would like to know if this solves your problem.

EranOfer avatar Feb 17 '16 12:02 EranOfer