confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Ability to get consumer group offsets?

Open Dave-French opened this issue 6 years ago • 13 comments

Hi,

Does anyone know if there are plans to add to the IAdminClient that ability to get consumer group offsets for a given topic? I am assuming that interface would be the logical place for this functionality.

Preferably for the v1 release?

The java client seems to have it available with the following call: ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);

I am using the C# Confluent.Kafka assembly, version 1.0.0-beta2

Thanks, Dave

Dave-French avatar Jan 23 '19 22:01 Dave-French

yes we plan to add this, but it won't make it into 1.0.

mhowlett avatar Jan 23 '19 23:01 mhowlett

Excellent, thanks Matt!

By the way, thanks for the great library. It makes it very easy to connect in with C#.

Dave-French avatar Jan 24 '19 14:01 Dave-French

+1 on this one, would be very usable for us to migrate our microservices onto this client

henrik-mvno avatar Mar 30 '19 22:03 henrik-mvno

any word on this feature?

tonykimani avatar Apr 28 '20 02:04 tonykimani

currently not high on the priority list, thanks for the +1.

mhowlett avatar Apr 28 '20 15:04 mhowlett

+1 would like this feature to integrate within our health checks.

brechtvhb avatar Sep 28 '20 07:09 brechtvhb

Hi, I'm also very interested. I can try to make the job, but I need a little help on how/where to start. Could you help ?

pitming avatar Oct 15 '20 14:10 pitming

I guess this is blocked by https://github.com/edenhill/librdkafka/issues/2173

PSanetra avatar Sep 15 '21 10:09 PSanetra

Hi @mhowlett, since there is no built-in solution I tried to add a workaround in my code. According to: https://github.com/edenhill/librdkafka/issues/2838#issuecomment-617002762 it should be safe to do such without disrupting existing consumer groups:

  1. with admin API I fetch existing TopicPartion pairs using GetMetadata
  2. I create consumer instance with groupId for which consumer lag I wish to read:
  • disabled EnableAutoCommit
  • disabled AllowAutoCreateTopics
  • disabled EnableAutoOffsetStore
  • AutoOffsetReset = AutoOffsetReset.Error
  1. I avoid calling subscribe
  2. Query committed offsets for selected TopicPartition pairs
  3. For each topic for which I find any committed offset I query QueryWatermarkOffsets for each partition in that topic
  4. Finally my lag is watermark.High subtract current.Offset

using var adminClient = ..;

// step 1.
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
var topicMetadata = metadata.Topics.SelectMany(t => t.Partitions.Select(p => new TopicPartition(t.Topic, p.PartitionId))).ToArray();

// step 2.
using var metadataConsumer = new ConsumerBuilder < byte[], byte[]> (new ConsumerConfig{
        EnableAutoCommit = false,
        AllowAutoCreateTopics = false,
        EnableAutoOffsetStore = false,
        AutoOffsetReset = AutoOffsetReset.Error,
        GroupId = groupId,  // <-- consumer groupId of which lag we wish to check
        BootstrapServers = string.Join(",", settings.BootstrapServers)})
    .SetValueDeserializer(ThrowingDeserializer.Instance)
    .SetKeyDeserializer(ThrowingDeserializer.Instance)
    .SetPartitionsAssignedHandler((Func < IConsumer < byte[], byte[] >, List < TopicPartition >, IEnumerable < TopicPartitionOffset >>)((_, _) => throw new NotSupportedException("Misused.")))
    .Build()

// step 4.
var committedOffsets = metadataConsumer.Committed(topicMetadata, TimeSpan.FromSeconds(10));

// step 5.
var topicsWithFoundOffsets = committedOffsets.GroupBy(t => t.Topic).Where(t => t.Any(s => !s.Offset.IsSpecial)).SelectMany(t => t);

// step 6.
var lag = topicsWithFoundOffsets.Select <TopicPartitionOffset, (TopicPartition, long)>(tpo => {
    if (tpo.Offset.IsSpecial)
        return (tpo.TopicPartition, tpo.Offset);

    var watermark = metadataConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(10));
    return (tpo.TopicPartition, watermark.High - tpo.Offset);
});

So: 1) Is that snipet safe solution which will not disrupt brokers or consumer groups for which I will check consumer lag? 2) Any suggestions how to improve this workaround ?

plachor avatar Mar 18 '22 11:03 plachor

Hi @mhowlett, @edenhill could you comment above, am I risking something with this approach when it comes to brokers and performance / stability of consumer groups?

plachor avatar Mar 24 '22 06:03 plachor

@plachor That looks like a good solution.

You could even optimize it further by creating the Admin client as a dependent client on top of the Consumer, which will be cheaper and quicker.

Since you're not assigning or subscribing to any partitions you dont need to set these:

        EnableAutoCommit = false,
        AllowAutoCreateTopics = false,
        EnableAutoOffsetStore = false,
        AutoOffsetReset = AutoOffsetReset.Error,

edenhill avatar Mar 24 '22 09:03 edenhill

Thanks for review :)

plachor avatar Mar 24 '22 10:03 plachor

+1 on this one, would be very usable for us to migrate our microservices onto this client

chenjing1294 avatar Apr 03 '22 01:04 chenjing1294

I'd love for the AdminClient to have parity with the Java library, and also to support async/await for all its operations.

taspeotis avatar Nov 14 '22 10:11 taspeotis

There is this method in Java:

adminClient.listConsumerGroupOffsets(groupId).

Confluent-kafka-dotnet 2.0.2 has

adminClient.ListConsumerGroupOffsetsAsync(new List<ConsumerGroupTopicPartitions>() {new ConsumerGroupTopicPartitions(groupId, topicPartitions)})

Compared with the method of the Java version, ListConsumerGroupOffsetsAsync must be passed in topicPartitions, which prevents us from obtaining which topics the consumer group subscribes to.

It is hoped that the same method (listConsumerGroupOffsets(String groupId)) as the Java version will be provided in a future version.

chenjing1294 avatar Feb 02 '23 03:02 chenjing1294

It is also hoped that the ...Async methods could support CancellationTokens...

taspeotis avatar Apr 03 '23 07:04 taspeotis