confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Ability to get consumer group offsets?
Hi,
Does anyone know if there are plans to add to the IAdminClient that ability to get consumer group offsets for a given topic? I am assuming that interface would be the logical place for this functionality.
Preferably for the v1 release?
The java client seems to have it available with the following call: ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
I am using the C# Confluent.Kafka assembly, version 1.0.0-beta2
Thanks, Dave
yes we plan to add this, but it won't make it into 1.0.
Excellent, thanks Matt!
By the way, thanks for the great library. It makes it very easy to connect in with C#.
+1 on this one, would be very usable for us to migrate our microservices onto this client
any word on this feature?
currently not high on the priority list, thanks for the +1.
+1 would like this feature to integrate within our health checks.
Hi, I'm also very interested. I can try to make the job, but I need a little help on how/where to start. Could you help ?
I guess this is blocked by https://github.com/edenhill/librdkafka/issues/2173
Hi @mhowlett, since there is no built-in solution I tried to add a workaround in my code. According to: https://github.com/edenhill/librdkafka/issues/2838#issuecomment-617002762 it should be safe to do such without disrupting existing consumer groups:
- with admin API I fetch existing TopicPartion pairs using
GetMetadata
- I create consumer instance with
groupId
for which consumer lag I wish to read:
- disabled EnableAutoCommit
- disabled AllowAutoCreateTopics
- disabled EnableAutoOffsetStore
- AutoOffsetReset = AutoOffsetReset.Error
- I avoid calling
subscribe
- Query committed offsets for selected TopicPartition pairs
- For each topic for which I find any committed offset I query
QueryWatermarkOffsets
for each partition in that topic - Finally my lag is
watermark.High
subtractcurrent.Offset
using var adminClient = ..;
// step 1.
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
var topicMetadata = metadata.Topics.SelectMany(t => t.Partitions.Select(p => new TopicPartition(t.Topic, p.PartitionId))).ToArray();
// step 2.
using var metadataConsumer = new ConsumerBuilder < byte[], byte[]> (new ConsumerConfig{
EnableAutoCommit = false,
AllowAutoCreateTopics = false,
EnableAutoOffsetStore = false,
AutoOffsetReset = AutoOffsetReset.Error,
GroupId = groupId, // <-- consumer groupId of which lag we wish to check
BootstrapServers = string.Join(",", settings.BootstrapServers)})
.SetValueDeserializer(ThrowingDeserializer.Instance)
.SetKeyDeserializer(ThrowingDeserializer.Instance)
.SetPartitionsAssignedHandler((Func < IConsumer < byte[], byte[] >, List < TopicPartition >, IEnumerable < TopicPartitionOffset >>)((_, _) => throw new NotSupportedException("Misused.")))
.Build()
// step 4.
var committedOffsets = metadataConsumer.Committed(topicMetadata, TimeSpan.FromSeconds(10));
// step 5.
var topicsWithFoundOffsets = committedOffsets.GroupBy(t => t.Topic).Where(t => t.Any(s => !s.Offset.IsSpecial)).SelectMany(t => t);
// step 6.
var lag = topicsWithFoundOffsets.Select <TopicPartitionOffset, (TopicPartition, long)>(tpo => {
if (tpo.Offset.IsSpecial)
return (tpo.TopicPartition, tpo.Offset);
var watermark = metadataConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(10));
return (tpo.TopicPartition, watermark.High - tpo.Offset);
});
So: 1) Is that snipet safe solution which will not disrupt brokers or consumer groups for which I will check consumer lag? 2) Any suggestions how to improve this workaround ?
Hi @mhowlett, @edenhill could you comment above, am I risking something with this approach when it comes to brokers and performance / stability of consumer groups?
@plachor That looks like a good solution.
You could even optimize it further by creating the Admin client as a dependent client on top of the Consumer, which will be cheaper and quicker.
Since you're not assigning or subscribing to any partitions you dont need to set these:
EnableAutoCommit = false,
AllowAutoCreateTopics = false,
EnableAutoOffsetStore = false,
AutoOffsetReset = AutoOffsetReset.Error,
Thanks for review :)
+1 on this one, would be very usable for us to migrate our microservices onto this client
I'd love for the AdminClient to have parity with the Java library, and also to support async/await for all its operations.
There is this method in Java:
adminClient.listConsumerGroupOffsets(groupId).
Confluent-kafka-dotnet 2.0.2 has
adminClient.ListConsumerGroupOffsetsAsync(new List<ConsumerGroupTopicPartitions>() {new ConsumerGroupTopicPartitions(groupId, topicPartitions)})
Compared with the method of the Java version, ListConsumerGroupOffsetsAsync must be passed in topicPartitions, which prevents us from obtaining which topics the consumer group subscribes to.
It is hoped that the same method (listConsumerGroupOffsets(String groupId)
) as the Java version will be provided in a future version.
It is also hoped that the ...Async
methods could support CancellationToken
s...