confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Consumer subscribing with regex not detecting new topics matching regex even with metadata.max.age.ms set

Open chamo92 opened this issue 5 years ago • 9 comments

Description

Our system requires heavy use of regular expression subscriptions. We noticed that our consumers weren't recognizing the topics created after the consumer starts up that matched the regex subscriptions being used. We found the metadata.max.age.ms fixes this issue with a Java consumer, but for the C# consumer it seems to fix it intermittently. I also tried the topic.metadata.refreshinterval.ms setting, but that didn't seem to fix the problem either.

Are there any other configuration items that we should be considering?

How to reproduce

Confluent Kafka Version - 1.4.3 Start up the test consumers and then start up the test producer. The Java consumer will receive the messages on the new topics in the time you would expect it to. The C# consumer does not. This has been tested in Linux (Ubuntu 18.04.4 LTS) and on Windows 10. For some reason the Java version isn't receiving messages in Windows, but the C# version is if the topics exist before starting the consumer. For Windows, we are using kafka_2.12-2.5.0 from the quickstart page with the default configuration.

I've played with the consume interval and metadata.max.age.ms values and haven't been able to get it to work consistently.

C# Consumer

using Confluent.Kafka;
using System;

namespace TestConsumer
{
  class Program
  {
    static void Main(string[] args)
    {
      ConsumerConfig config = new ConsumerConfig
      {
        BootstrapServers = "kafka:9092",
        GroupId = "csharp-test-consumer",
        MetadataMaxAgeMs = 5000,
        AutoCommitIntervalMs = 1000,
        EnableAutoCommit = true,
        AutoOffsetReset = AutoOffsetReset.Earliest
      };

      using (IConsumer<string, string> consumer = new ConsumerBuilder<string, string>(config).Build())
      {
        string topic = "^test_[0-9]*";
        consumer.Subscribe(topic);
        while (true)
        {
          Console.WriteLine("Consuming...");
          var result = consumer.Consume(TimeSpan.FromMilliseconds(1000));
          if (result?.Message.Value != null)
          {
            Console.WriteLine(result.Topic);
          }
        }
      }
    }
  }
}

Producer

using Confluent.Kafka;
using System;

namespace TestProducer
{
  class Program
  {
    static void Main(string[] args)
    {
      ProducerConfig config = new ProducerConfig
      {
        BootstrapServers = "kafka:9092",
        // TopicMetadataRefreshIntervalMs = 5000
      };

      using (IProducer<string, string> producer = new ProducerBuilder<string, string>(config)
        .SetKeySerializer(Serializers.Utf8)
        .SetValueSerializer(Serializers.Utf8)
        .Build())
      {
        int count = 0;
        while (true)
        {
          if (count < 200)
            count++;
          string topic = $"test_{count}";
          Console.WriteLine($"Topic: {topic}");
          producer.Produce(topic, new Message<string, string> { Value = "A test Message", Key = "test_key" });
          System.Threading.Thread.Sleep(1000);
        }
      }
    }
  }
}

Java Consumer

package clients;

import java.time.Duration;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class TestConsumer {

    private static final int DURATION_TO_POLL = 1000;

    public static void main(String[] args) {
        System.out.println("*** Starting Basic Consumer ***");
        Properties settings = new Properties();
        settings.put(ConsumerConfig.GROUP_ID_CONFIG, "java-test-consumer");
        settings.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        settings.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        settings.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        settings.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        settings.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "5000");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
        	System.out.println("Subscribing....");
        	Pattern pattern = Pattern.compile("^test_[0-9]*");
            consumer.subscribe(pattern);
            
            while (true) {
            	System.out.println("Consuming...");
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(DURATION_TO_POLL));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.topic());
                }
            }
        }
    }
}

We are using the latest confluent zookeeper and kafka docker images (5.5.0). We also tried it with 5.3.0.

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - confluent

  broker:
    image: confluentinc/cp-enterprise-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://kafka:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    networks:
      confluent:
        ipv4_address: 172.20.0.3

networks:
  confluent:
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: "172.20.0.0/16"
          gateway: "172.20.0.1"

Log Files: linux_java_and_csharp_running.txt - For this one I just ran the java and C# consumers.

csharp_consumer_only_with_restart.txt - For this one I ran just the C# consumer and restarted it so it would consume the messages on each of the generated topics.

Checklist

Please provide the following information:

  • [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [X] Confluent.Kafka nuget version.
  • [X] Apache Kafka version.
  • [X] Client configuration.
  • [X] Operating system.
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [X] Provide broker log excerpts.
  • [X] Critical issue.

chamo92 avatar Jun 25 '20 23:06 chamo92

metadata.max.age.ms should work - the logic is all there to do the join group if required (there may be a bug in it).

If you do a full metadata request using an AdminClient constructed from the consumer handle, this should also trigger a group rebalance if one is required. If you set up a background thread with a loop doing that, it would be interesting to know if it resolved the issue (this narrows where the problem might be by about half).

mhowlett avatar Jun 26 '20 01:06 mhowlett

I tried that out in Windows and it seems to have fixed the issue. In Ubuntu I'm getting an exception from client.GetMetadata with the message "Local: Broker transport failure." I'm looking into that issue to see if I can resolve that problem. I'll post an update if I get that fixed.

using Confluent.Kafka;
using System;
using System.Threading.Tasks;

namespace TestConsumer
{
  class Program
  {

    private static int METADATA_REFRESH_INTERVAL_MS = 5000;
    static void Main(string[] args)
    {
      ConsumerConfig config = new ConsumerConfig
      {
        BootstrapServers = "localhost:9092",
        GroupId = "csharp-test-consumer",
        MetadataMaxAgeMs = 5000,
        AutoCommitIntervalMs = 1000,
        EnableAutoCommit = true,
        AutoOffsetReset = AutoOffsetReset.Earliest
      };

      ConsumerBuilder<string, string> builder = new ConsumerBuilder<string, string>(config);
      using (IConsumer<string, string> consumer = builder.Build())
      {
        Task.Run(() => MetadataRefresh(consumer.Handle));
        string topic = "^test_[0-9]*";
        consumer.Subscribe(topic);
        while (true)
        {
          Console.WriteLine("Consuming...");
          var result = consumer.Consume(TimeSpan.FromMilliseconds(1000));
          if (result?.Message.Value != null)
          {
            Console.WriteLine(result.Topic);
          }
        }
      }
    }

    private static void MetadataRefresh(Handle handle)
    {
      using (IAdminClient client = new DependentAdminClientBuilder(handle).Build())
      {
        while (true)
        {
          System.Threading.Thread.Sleep(METADATA_REFRESH_INTERVAL_MS);
          Console.WriteLine("Refreshing Metadata...");
          client.GetMetadata(TimeSpan.FromMilliseconds(5000));
        }
      }
    }
  }
}
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
test_1
Consuming...
test_2
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
test_3
Consuming...
test_4
Consuming...
test_5
Consuming...
test_6
Consuming...
test_7
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
test_8
Consuming...
test_11
Consuming...
test_10
Consuming...
test_9
Consuming...
test_12
Consuming...
Consuming...
Consuming...
Consuming...

Thanks for the quick reply by the way!

chamo92 avatar Jun 26 '20 18:06 chamo92

I think the Ubuntu issue is just a networking issue and not a client issue. I can't ping my kafka broker right now. Once that's fixed I'll test it again.

EDIT: It was an issue on my end with networking. Using the client worked in Ubuntu as well when I fixed my networking issues.

chamo92 avatar Jun 26 '20 18:06 chamo92

@mhowlett, any updates on this? We may be experiencing some performance issues with the workaround.

chamo92 avatar Sep 15 '20 21:09 chamo92

We may have found the performance issue with what we were doing. We were injecting a reference to the IAdminClient and that seemed to drive up CPU usage. Injecting the IConsumer instance into what was doing the refreshing and keeping the IAdminClient in a using statement seems to have fixed that issue. I.e. closer to what was above.

chamo92 avatar Sep 15 '20 22:09 chamo92

Another note, even with the CPU usage issue being fixed we are still seeing latency issues with this workaround.

chamo92 avatar Oct 19 '20 17:10 chamo92

We would also appreciate the subscribed consumer to automatically detect new topics. Although we don't use AdminClient workaround and simply re-subscribe on receving new topics via a side channel, but it would be great to simply use Subscribe with a wildcard even when running code on Windows. Can confirm, the AdminClient workaround seem to work on Windows. Probably would check on Kubuntu tomorrow.

quixoticaxis avatar Aug 25 '22 21:08 quixoticaxis

@quixoticaxis - I've just opened a PR that includes an integration test that demonstrates newly created topics being detected with a regex subscription: https://github.com/confluentinc/confluent-kafka-dotnet/pull/1881

If you are able to provide a variation on this that should pass, but fails, i would love to investigate.

mhowlett avatar Aug 26 '22 00:08 mhowlett

@quixoticaxis - I've just opened a PR that includes an integration test that demonstrates newly created topics being detected with a regex subscription: https://github.com/confluentinc/confluent-kafka-dotnet/pull/1881

If you are able to provide a variation on this that should pass, but fails, i would love to investigate.

@mhowlett I've changed your tests a little to run on Windows (namely, removed topic deletion), and they pass. It seems our issue with local (Windows) runs lies elsewhere. Thank you.

quixoticaxis avatar Aug 26 '22 10:08 quixoticaxis

Closing, as the issue is fixed.

anchitj avatar Jun 04 '24 12:06 anchitj