confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Proper way to reset offset for a given GroupId

Open alexandery opened this issue 5 years ago • 9 comments

Description

I need to be able to "rewind" a consumer to the beginning of a topic and re-process existing messages. Is this properly supported?

I'm using the following code, but am not getting the expected result.

foreach (var topicPartition in Consumer.Assignment)
{
	Consumer.Assign(new TopicPartitionOffset(topicPartition, Offset.Beginning));
}

I've tried both .Seek() and .Assign() in the code above.

What is the proper way of resetting the offset on a per-group basis?

Checklist

  • [1.0.1.1 ] Confluent.Kafka nuget version.

alexandery avatar Jun 27 '19 23:06 alexandery

@mhowlett Would appreciate an insight into this.

alexandery avatar Jul 11 '19 19:07 alexandery

If you assign or seek to a specific TopicPartitionOffset as you are doing (whether or not the offset is logical or absolute), messages returned by Consume will reflect the offset you specify. Note that this does not explicitly store offsets in kafka - to control that, you use commit and store related functionality.

you don't explain what "the expected result" is, but I expect this is because you're not using Assign as intended - calling assign will completely replace the current set of partitions being read from, not add to it: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/IConsumer.cs#L170

mhowlett avatar Jul 11 '19 22:07 mhowlett

@mhowlett The expected result is my consumer group restarting from the beginning of messages in a topic, re-processing them all from start. I want to re-wind my consumer's offset on demand (so, not all the time, only when something happens and I want to re-process all of messages).

So, what is the proper way to achieve that? Attempting to .Seek() and then .Commit() results in an exception ("Local: No offset stored"). Currently I am not setting "enable.auto.offset.store", leaving it to be the default.

I also understand the implications of .Assign() in terms of partitions, but without any details on offset rewinding I was trying to see what would work.

Do I have to manually manage offsets to be able to reset them for partitions for a given topic?

P.S. Sorry for the delay in responding.

alexandery avatar Aug 06 '19 18:08 alexandery

@mhowlett Matt, what is the correct way to reset offset for a consumer group? How can I make consumers belonging to a consumer group start processing the messages from the beginning of a topic?

alexandery avatar Aug 14 '19 15:08 alexandery

you can use the cli tool that comes with kafka (kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute) but you'll need to stop your consumers (it will often make sense to do that).

if you don't want to need to stop your consumers, you'll need to signal them in some way that they should start reprocessing, then you could use IConsumer.QueryWatermarkOffsets to get the beginning offset for each partition, then seek to that position, then commit as usual after consuming messages.

mhowlett avatar Aug 14 '19 16:08 mhowlett

FWIW This appears to be working for me with ver 1.3.0:

var consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetPartitionsAssignedHandler((c, partitions) =>
{
    var offsets = partitions.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
    return offsets;
})
.Build();
....

russau avatar Jan 22 '20 22:01 russau

There is corresponding to this command "kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute" .net method?

chenjing1294 avatar Jul 01 '22 14:07 chenjing1294

if you don't want to need to stop your consumers, you'll need to signal them in some way that they should start reprocessing, then you could use IConsumer.QueryWatermarkOffsets to get the beginning offset for each partition, then seek to that position, then commit as usual after consuming messages.

This appears to be the suggested code:

private void ResetOffset()
{
    // Assume partition 0, good luck finding out what your partition is otherwise!
    var topicPartition = new TopicPartition(_topic, new Partition(0));
    var watermarkOffsets = _consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(1));
    _consumer.Seek(new TopicPartitionOffset(topicPartition, watermarkOffsets.Low));
}

Which would work about 50% of the time, with the other half throwing an "erroneous" exception.

harvzor avatar Feb 02 '23 16:02 harvzor

you can use the cli tool that comes with kafka (kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute) but you'll need to stop your consumers (it will often make sense to do that).

if you don't want to need to stop your consumers, you'll need to signal them in some way that they should start reprocessing, then you could use IConsumer.QueryWatermarkOffsets to get the beginning offset for each partition, then seek to that position, then commit as usual after consuming messages.

@mhowlett, sorry to drag up an old post, but are there any plans to support this CLI functionality in the Dotnet client?

kfrn avatar Jan 24 '24 03:01 kfrn