librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

commitAsync race condition

Open pantoss-torstone opened this issue 1 year ago • 5 comments

Description

We have found that an offset was committed before we have called commit on the consumer group. We have auto offset storing enabled and auto commit disabled. We are using 2.3.0 version of the library. We are also making use of commitAsync.

What I believe is the problem is that because we use the auto store offsets option we make a commitAsync call without an offset partition list. The library does not take a copy of what it has stored at this point in time and so when the async task comes around to be executed it is already too late and we've called consume and updated the offsets again, even though we've not processed the consumed message. This is a problem because I don't expect the offset of messages to be committed before the commitAsync has been called after the original consume. In the even of failures we end up losing messages.

I will try to update this is a method to recreate this issue. It might be quite difficult however because probably requires that the other broker threads do not get run before the consume is called and completed.

pantoss-torstone avatar Dec 08 '23 17:12 pantoss-torstone

I've not been able to recreate what i've observed so i'm thinking that I've miss understood how commitAsync works.

From what I've seen, if you do not provide a partition offset list to the commitAsync call it enquies an operation to commit the offsets in the future. Is this correct? What I don't understand is, it doesn't seem to take a copy of the current automatically stored offsets at this point in time, what is stopping that async autocommit from taking an offset from a messages which is consumed afterr the commit async was called?

pantoss-torstone avatar Dec 10 '23 23:12 pantoss-torstone

I've finally managed to recreate it in a horrible way. I added some hacky trace into the library to make sure I understood what was happening where and my checks caught it. I'll attached the test program (Which is a horrible mess because it was partly trial and error).

Steps I took to reproduce. Loaded a large number of messages onto a test topic. Single partition. I was using Kafka 3.6-IV2 but the original issue was found on a 2.7 MSK cluster.

Here is the output of the test program:

0 0
0 0
0 0
Committing thread: 0x7ff84ade0b40 
Committing rd_kafka_cgrp_offsets_commit thread: 0x70000728d000 
Store offset -1001 thread: 0x70000728d000 
Store offset 3 thread: 0x7ff84ade0b40 
Store offset 4 thread: 0x7ff84ade0b40 
Store offset 5 thread: 0x7ff84ade0b40 
2 0
3 0
4 0
Committing thread: 0x7ff84ade0b40 
Committing rd_kafka_cgrp_offsets_commit thread: 0x70000728d000 
Store offset 6 thread: 0x7ff84ade0b40 
Store offset 7 thread: 0x7ff84ade0b40 
Store offset 8 thread: 0x7ff84ade0b40 
5 6
6 6
7 6
5 6
6 6
Assertion failed: (message1->err() == RdKafka::ERR_NO_ERROR && message1->offset() > commitOffset || commitOffset == 0), function consume, file ConsumerAsyncBugTest.cpp, line 120.

The number on the left is the message offset and the one on the right is the committed offset. It shouldn't be possible for the committed offset to be greater or equal to the number on the right as commitAsync was not called at this point for these offsets. The lines with the threads are the trace I put in the code. I'll attach a patch for this too.

Personally I think this is quite a severe bug.

CommitAsyncBug.zip

I don't see why the assignment offsets are not being taken in the commit call before the enquing of the operation. Is anyone able to enlighten me?

pantoss-torstone avatar Dec 11 '23 12:12 pantoss-torstone

There is a small mistaked in the code I gave but the problem still is apparent. As the committed offset is the last message consumed + 1 I should subtract 1 from it.

pantoss-torstone avatar Dec 11 '23 15:12 pantoss-torstone

I've created a PR to fix this incase it is helpful. https://github.com/confluentinc/librdkafka/pull/4553 I cannot think of a way to create a test for this however.

pantoss-torstone avatar Dec 12 '23 10:12 pantoss-torstone

You expectation is correct, it should commit the offsets stored when the commit is called, not the ones that are stored when the operation is executed. This needs a fix. Thanks for your PR we'll check it for the fix.

emasab avatar Mar 05 '24 13:03 emasab