kop
kop copied to clipboard
[BUG] Some messages could be ignored even if auto.reset.offset is earliest
Describe the bug #530 reported the bug that the offset is still available after a topic is deleted. #531 tried to fix the bug by checking whether the current offset is greater than the LEO and reset it to 0 instead of the fake current offset. However, it also introduced a bug when the current offset is less than LEO.
To Reproduce
- Run a KoP standalone.
- Produce 5 messages to a topic
$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
>1
>2
>3
>4
>5
>
- Consume all messages
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group --from-beginning
1
2
3
4
5
- Delete the topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic my-topic --delete
- Produce 10 messages to the topic, which will recreate the topic automatically.
$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
>a
>b
>c
>d
>e
>f
>g
>h
>i
>j
>
- Describe the group
my-group, you can see the lag is 5
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe
Consumer group 'my-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group persistent://public/default/my-topic 0 5 10 5 - - -
- Consume the topic
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group --from-beginning
f
g
h
i
j
Step 6 shows the current offset is inaccurate because it's the LEO of the previous deleted topic. #531 only assumes the LEO of previous deleted topic is greater than the LEO of the current topic with the same name.
Step 7 verifies the point that consumer starts consuming from message f (the 6th message) not message a (the 1st message).
The current offset is fake, comparing it with LEO is not enough. Even if log start offset can be retrieved (see #580, not done yet), the log start offset is 0 then we have:
log start offset (0) < current offset (5) < LEO (10)
Then consumer will still fetch messages from offset=5.
Expected behavior The current offset should be validated.
I've done the above steps with Kafka, the result of step 6 is:
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe
Error: Consumer group 'my-group' does not exist.