confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Consumer subscribing with regex not detecting new topics matching regex even with metadata.max.age.ms set
Description
Our system requires heavy use of regular expression subscriptions. We noticed that our consumers weren't recognizing the topics created after the consumer starts up that matched the regex subscriptions being used. We found the metadata.max.age.ms fixes this issue with a Java consumer, but for the C# consumer it seems to fix it intermittently. I also tried the topic.metadata.refreshinterval.ms setting, but that didn't seem to fix the problem either.
Are there any other configuration items that we should be considering?
How to reproduce
Confluent Kafka Version - 1.4.3 Start up the test consumers and then start up the test producer. The Java consumer will receive the messages on the new topics in the time you would expect it to. The C# consumer does not. This has been tested in Linux (Ubuntu 18.04.4 LTS) and on Windows 10. For some reason the Java version isn't receiving messages in Windows, but the C# version is if the topics exist before starting the consumer. For Windows, we are using kafka_2.12-2.5.0 from the quickstart page with the default configuration.
I've played with the consume interval and metadata.max.age.ms values and haven't been able to get it to work consistently.
C# Consumer
using Confluent.Kafka;
using System;
namespace TestConsumer
{
class Program
{
static void Main(string[] args)
{
ConsumerConfig config = new ConsumerConfig
{
BootstrapServers = "kafka:9092",
GroupId = "csharp-test-consumer",
MetadataMaxAgeMs = 5000,
AutoCommitIntervalMs = 1000,
EnableAutoCommit = true,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (IConsumer<string, string> consumer = new ConsumerBuilder<string, string>(config).Build())
{
string topic = "^test_[0-9]*";
consumer.Subscribe(topic);
while (true)
{
Console.WriteLine("Consuming...");
var result = consumer.Consume(TimeSpan.FromMilliseconds(1000));
if (result?.Message.Value != null)
{
Console.WriteLine(result.Topic);
}
}
}
}
}
}
Producer
using Confluent.Kafka;
using System;
namespace TestProducer
{
class Program
{
static void Main(string[] args)
{
ProducerConfig config = new ProducerConfig
{
BootstrapServers = "kafka:9092",
// TopicMetadataRefreshIntervalMs = 5000
};
using (IProducer<string, string> producer = new ProducerBuilder<string, string>(config)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.Build())
{
int count = 0;
while (true)
{
if (count < 200)
count++;
string topic = $"test_{count}";
Console.WriteLine($"Topic: {topic}");
producer.Produce(topic, new Message<string, string> { Value = "A test Message", Key = "test_key" });
System.Threading.Thread.Sleep(1000);
}
}
}
}
}
Java Consumer
package clients;
import java.time.Duration;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class TestConsumer {
private static final int DURATION_TO_POLL = 1000;
public static void main(String[] args) {
System.out.println("*** Starting Basic Consumer ***");
Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "java-test-consumer");
settings.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
settings.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
settings.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
settings.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
settings.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "5000");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
System.out.println("Subscribing....");
Pattern pattern = Pattern.compile("^test_[0-9]*");
consumer.subscribe(pattern);
while (true) {
System.out.println("Consuming...");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(DURATION_TO_POLL));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic());
}
}
}
}
}
We are using the latest confluent zookeeper and kafka docker images (5.5.0). We also tried it with 5.3.0.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- confluent
broker:
image: confluentinc/cp-enterprise-kafka:5.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://kafka:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
confluent:
ipv4_address: 172.20.0.3
networks:
confluent:
driver: bridge
ipam:
driver: default
config:
- subnet: "172.20.0.0/16"
gateway: "172.20.0.1"
Log Files: linux_java_and_csharp_running.txt - For this one I just ran the java and C# consumers.
csharp_consumer_only_with_restart.txt - For this one I ran just the C# consumer and restarted it so it would consume the messages on each of the generated topics.
Checklist
Please provide the following information:
- [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
- [X] Confluent.Kafka nuget version.
- [X] Apache Kafka version.
- [X] Client configuration.
- [X] Operating system.
- [ ] Provide logs (with "debug" : "..." as necessary in configuration).
- [X] Provide broker log excerpts.
- [X] Critical issue.
metadata.max.age.ms should work - the logic is all there to do the join group if required (there may be a bug in it).
If you do a full metadata request using an AdminClient constructed from the consumer handle, this should also trigger a group rebalance if one is required. If you set up a background thread with a loop doing that, it would be interesting to know if it resolved the issue (this narrows where the problem might be by about half).
I tried that out in Windows and it seems to have fixed the issue. In Ubuntu I'm getting an exception from client.GetMetadata with the message "Local: Broker transport failure." I'm looking into that issue to see if I can resolve that problem. I'll post an update if I get that fixed.
using Confluent.Kafka;
using System;
using System.Threading.Tasks;
namespace TestConsumer
{
class Program
{
private static int METADATA_REFRESH_INTERVAL_MS = 5000;
static void Main(string[] args)
{
ConsumerConfig config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "csharp-test-consumer",
MetadataMaxAgeMs = 5000,
AutoCommitIntervalMs = 1000,
EnableAutoCommit = true,
AutoOffsetReset = AutoOffsetReset.Earliest
};
ConsumerBuilder<string, string> builder = new ConsumerBuilder<string, string>(config);
using (IConsumer<string, string> consumer = builder.Build())
{
Task.Run(() => MetadataRefresh(consumer.Handle));
string topic = "^test_[0-9]*";
consumer.Subscribe(topic);
while (true)
{
Console.WriteLine("Consuming...");
var result = consumer.Consume(TimeSpan.FromMilliseconds(1000));
if (result?.Message.Value != null)
{
Console.WriteLine(result.Topic);
}
}
}
}
private static void MetadataRefresh(Handle handle)
{
using (IAdminClient client = new DependentAdminClientBuilder(handle).Build())
{
while (true)
{
System.Threading.Thread.Sleep(METADATA_REFRESH_INTERVAL_MS);
Console.WriteLine("Refreshing Metadata...");
client.GetMetadata(TimeSpan.FromMilliseconds(5000));
}
}
}
}
}
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
Consuming...
test_1
Consuming...
test_2
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
Consuming...
test_3
Consuming...
test_4
Consuming...
test_5
Consuming...
test_6
Consuming...
test_7
Consuming...
Consuming...
Consuming...
Consuming...
Consuming...
Refreshing Metadata...
test_8
Consuming...
test_11
Consuming...
test_10
Consuming...
test_9
Consuming...
test_12
Consuming...
Consuming...
Consuming...
Consuming...
Thanks for the quick reply by the way!
I think the Ubuntu issue is just a networking issue and not a client issue. I can't ping my kafka broker right now. Once that's fixed I'll test it again.
EDIT: It was an issue on my end with networking. Using the client worked in Ubuntu as well when I fixed my networking issues.
@mhowlett, any updates on this? We may be experiencing some performance issues with the workaround.
We may have found the performance issue with what we were doing. We were injecting a reference to the IAdminClient and that seemed to drive up CPU usage. Injecting the IConsumer instance into what was doing the refreshing and keeping the IAdminClient in a using statement seems to have fixed that issue. I.e. closer to what was above.
Another note, even with the CPU usage issue being fixed we are still seeing latency issues with this workaround.
We would also appreciate the subscribed consumer to automatically detect new topics. Although we don't use AdminClient workaround and simply re-subscribe on receving new topics via a side channel, but it would be great to simply use Subscribe with a wildcard even when running code on Windows.
Can confirm, the AdminClient workaround seem to work on Windows. Probably would check on Kubuntu tomorrow.
@quixoticaxis - I've just opened a PR that includes an integration test that demonstrates newly created topics being detected with a regex subscription: https://github.com/confluentinc/confluent-kafka-dotnet/pull/1881
If you are able to provide a variation on this that should pass, but fails, i would love to investigate.
@quixoticaxis - I've just opened a PR that includes an integration test that demonstrates newly created topics being detected with a regex subscription: https://github.com/confluentinc/confluent-kafka-dotnet/pull/1881
If you are able to provide a variation on this that should pass, but fails, i would love to investigate.
@mhowlett I've changed your tests a little to run on Windows (namely, removed topic deletion), and they pass. It seems our issue with local (Windows) runs lies elsewhere. Thank you.
Closing, as the issue is fixed.