confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

When exceed MaxPollIntervalMs, then all API's calls other than .Consume makes the application freeze.

Open 3schwartz opened this issue 3 years ago • 2 comments

Description

Hi,

When MaxPollIntervalMs is exceed all subsequent calls to the consumer, which isn't consume, makes the application freeze.

I'm catching the message from the LogHandler - this is a separate thread and all exceptions from this are swallowed silent. https://github.com/confluentinc/confluent-kafka-dotnet/blob/0af2aad7fbfdd1704292bd9ea22d020818eadd61/src/Confluent.Kafka/Consumer.cs#L156

Note the consumer are able to keep getting messages - but they can't be committed manually and when using auto commit the background job isn't either able to commit.

Confluent.Kafka nuget version: 1.8.2 Apache Kafka Version: 3.1.0 .NET version: Tested with both 6 and 3.1 Operating system: Locally Windows, Kafka docker container in WSL

How to reproduce

I have created a small demo program.

I'm publishing messages AFTER each consume. Hence by this I know that the messages consumed are not in any local cache but actually consumed from Kafka.

When I run this I see that messages are able to keep being consumed after MaxPollIntervalMs - but when looking in Kafka the consumer group doesn't have any members and the lag keep increasing.

using Confluent.Kafka;
using Confluent.Kafka.Admin;
using UnSubscribe;

const string bootstrapServers = "localhost:9093";
const string topicName = "test-foo";
var sessionTimeoutMs = TimeSpan.FromSeconds(6);
var maxPollIntervalMs = sessionTimeoutMs.Add(TimeSpan.FromMilliseconds(1));

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
    try
    {
        await adminClient.CreateTopicsAsync(new[]
        {
            new TopicSpecification{Name =topicName, ReplicationFactor = 1, NumPartitions = 5}
        });
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }

}

using var producer = new ProducerBuilder<string, string>(new ProducerConfig
{
    BootstrapServers = bootstrapServers
}).Build();

var message = new Message<string, string>
{
    Value = "Something"
};
var dr = await producer.ProduceAsync(topicName, message);
Console.WriteLine($"-- Published -- Topic: {dr.Topic}, Partition: {dr.Partition.Value}, Offset: {dr.Offset.Value}");


var consumerConfig = new ConsumerConfig
{
    BootstrapServers = bootstrapServers,
    GroupId = "foo",
    MaxPollIntervalMs = (int)maxPollIntervalMs.TotalMilliseconds,
    SessionTimeoutMs = (int)sessionTimeoutMs.TotalMilliseconds,
    PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
    AutoOffsetReset = AutoOffsetReset.Earliest
};


var count = 0;

var maxPolled = 0;

var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
       .SetLogHandler(((consumer1, message) =>
       {
           if (message.Facility.Equals("MAXPOLL"))
           {
               Console.WriteLine("Consumer has exceed max poll");
               maxPolled = 1;
           }

           Console.WriteLine($"Message: {message}, from consumer: {consumer1.MemberId}");
       }))
       .Build();

consumer.Subscribe(topicName);

while (true)
{
    var cr = consumer.Consume(TimeSpan.FromSeconds(10));

    if (cr == null)
    {
        continue;
    }

    Console.WriteLine($"-- Consumed -- Topic: {cr.Topic}, Partition: {cr.Partition.Value}, Offset: {cr.Offset.Value}");

    Console.WriteLine("Begin await");
    if (count == 0)
    {
        Console.WriteLine("-- Killing consumer -- ");
        await Task.Delay(maxPollIntervalMs.Add(TimeSpan.FromSeconds(5)));
    }
    else
    {
        await Task.Delay(maxPollIntervalMs.Subtract(TimeSpan.FromSeconds(2)));
    }

    dr = await producer.ProduceAsync(topicName, message);
    Console.WriteLine($"-- Published -- Topic: {dr.Topic}, Partition: {dr.Partition.Value}, Offset: {dr.Offset.Value}");

    Console.WriteLine($"Count: {count}");
    Console.WriteLine("Finish await");
    count++;
}

To handle this and trigger a resubscription I throw a exception when MAXPOLL messages has been received. This is done in below code snippet.

However when closing or disposing the consumer the application freeze. I have debugged it down to when calling underlying C++ library. Example here when calling close https://github.com/confluentinc/confluent-kafka-dotnet/blob/0af2aad7fbfdd1704292bd9ea22d020818eadd61/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs#L731

I have tested with other API's like consumer.Subscription or consumer.Assignment - all the same the application freezes.

var count = 0;

while (true)
{

    var maxPolled = 0;

    var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
           .SetLogHandler(((consumer1, message) =>
           {
               if (message.Facility.Equals("MAXPOLL"))
               {
                   Console.WriteLine("Consumer has exceed max poll");
                   maxPolled = 1;
               }

               Console.WriteLine($"Message: {message}, from consumer: {consumer1.MemberId}");
           }))
           .Build();

    consumer.Subscribe(topicName);

    try
    {
        while (true)
        {
            if (maxPolled == 1)
            {
                throw new Exception("Max poll has been done");
            }

            var cr = consumer.Consume(TimeSpan.FromSeconds(10));

            if (cr == null)
            {
                continue;
            }

            Console.WriteLine($"-- Consumed -- Topic: {cr.Topic}, Partition: {cr.Partition.Value}, Offset: {cr.Offset.Value}");

            if (count == 0)
            {
                Console.WriteLine("-- Killing consumer -- ");
                await Task.Delay(maxPollIntervalMs.Add(TimeSpan.FromSeconds(5)));
            }
            else
            {
                await Task.Delay(maxPollIntervalMs.Subtract(TimeSpan.FromSeconds(2)));
            }

            dr = await producer.ProduceAsync(topicName, message);
            Console.WriteLine($"-- Published -- Topic: {dr.Topic}, Partition: {dr.Partition.Value}, Offset: {dr.Offset.Value}");

            Console.WriteLine($"Count: {count}");

            count++;
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }
    finally
    {
        consumer.Close();
        consumer.Dispose();
        Console.WriteLine("Starting closing");
    }

}

I would expect that some exception are thrown or the consumer are able to re-subscribe when calling consume again.

Checklist

  • [ x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [ x] Confluent.Kafka nuget version.
  • [ x] Apache Kafka version.
  • [ x] Client configuration.
  • [ x] Operating system.
  • [ x] Provide broker log excerpts. - There are none
  • [ x] Critical issue.

3schwartz avatar Feb 22 '22 11:02 3schwartz

I think next steps is to try and isolate i.e. do the bare minimum with the consumer and see if the problem still occurs. in particular 1. don't await anything. 2. get rid of the producer code. If the issue goes away, add things back and see what makes it occur.

If the issue still presents with the minimal code, feel free to open a PR with an integration test that should pass but doesn't.

mhowlett avatar Mar 14 '22 17:03 mhowlett

Hi @mhowlett

I now simplified it even more, and I am now able to toggle the error on and of.

I have also created a PR with a integration test https://github.com/confluentinc/confluent-kafka-dotnet/pull/1781.

If one from within the LogHandler calls anything on the consumer, like .MemberId, then the thread will freeze. The side effect is, that any later calls to poll from the consumer doesn't throw a ConsumeException as expected specifying MaxPollInterval exceeded.

Below is a minimal program to reproduce with comments (some messages on the topic are injected before start)

using Confluent.Kafka;

const string bootstrapServers = "localhost:9093";
const string topicName = "test-foo";
var sessionTimeoutMs = TimeSpan.FromSeconds(6);
var maxPollIntervalMs = sessionTimeoutMs.Add(TimeSpan.FromMilliseconds(1));

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = bootstrapServers,
    GroupId = Guid.NewGuid().ToString(),
    MaxPollIntervalMs = (int)maxPollIntervalMs.TotalMilliseconds,
    SessionTimeoutMs = (int)sessionTimeoutMs.TotalMilliseconds,
    PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
    AutoOffsetReset = AutoOffsetReset.Earliest
};


var count = 0;
var maxPolled = 0;

var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
       .SetLogHandler(((consumer1, message) =>
       {
           if (message.Facility.Equals("MAXPOLL"))
           {
               Console.WriteLine("Consumer has exceed max poll");
               maxPolled = 1;
           }
           // Calling anything from within here makes the thread freeze
           // and then any exception will not be thrown when the consumer poll
           var memberId = consumer1.MemberId;
       }))
       .Build();

consumer.Subscribe(topicName);

while (true)
{
    var cr = consumer.Consume(TimeSpan.FromSeconds(10));

    if (cr == null) { continue; }

    Console.WriteLine($"-- Consumed -- Topic: {cr.Topic}, Partition: {cr.Partition.Value}, Offset: {cr.Offset.Value}");

    Console.WriteLine("Begin await");
    if (count == 0)
    {
        Console.WriteLine("-- Killing consumer -- ");
        Thread.Sleep(maxPollIntervalMs.Add(TimeSpan.FromSeconds(500)));
    }
    else
    {
        Console.WriteLine("-- Call assignment - THIS MAKES IT FREEZE");
        var assignment = consumer.Assignment;
        Console.WriteLine("-- Done call assignment");
    }

    Console.WriteLine($"Count: {count}");
    Console.WriteLine("Finish await");
    count++;
}

3schwartz avatar Mar 22 '22 20:03 3schwartz

Hi @3schwartz, I tried to dig into the issue here and it seems you are trying to use MemberId operation within the logHandler which is freezing the thread.

This is expected because Log handlers are called spontaneously from internal librdkafka threads. For more details on this please check the comment here https://github.com/confluentinc/confluent-kafka-dotnet/blob/28eb3aba89c82faab3b4ab8fb97f76300ddb45f8/src/Confluent.Kafka/ConsumerBuilder.cs#L202-L205

anchitj avatar Nov 03 '22 18:11 anchitj