flink-connector-kafka icon indicating copy to clipboard operation
flink-connector-kafka copied to clipboard

[FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader

Open dongwoo6kim opened this issue 9 months ago • 8 comments

Problem

When using the flink kafka connector in batch scenarios, consuming transactional messages can cause indefinite hanging. This issue can be easily reproduced with following steps.

  1. Produce transactional messages and commit them.
  2. Configure scan.bounded.mode to latest-offset and run consumer using flink kafka connector

Cause

The previous stopping condition in the KafkaPartitionSplitReader compared the offset of the last record with the stoppingOffset. This approach works for streaming use cases and batch processing of non-transactional messages. However, in scenarios involving transactional messages, this is insufficient.
Control messages, which are not visible to clients, can occupy the entire range between the last record's offset and the stoppingOffset which leads to indefinite blocking.

Workaround

I've modified the stopping condition to use consumer.position(tp), which effectively skips any control messages present in the current poll, pointing directly to the next record's offset. To handle edge cases, particularly when properties.max.poll.records is set to 1, I've adjusted the fetch method to always check all assigned partitions, even if no records are returned in a poll.

Edge case example

Consider partition 0, where offsets 13 and 14 are valid records and 15 is a control record. If stoppingOffset is set to 15 for partition 0and properties.max.poll.records is configured to 1, checking only partitions that return records would miss offset 15. By consistently reviewing all assigned partitions, the consumer’s position jumps control record in the subsequent poll, allowing the system to escape.

Discussion

To address the metric issue in FLINK-33484, I think we need to make wrapper class of ConsumerRecord for example ConsumerRecordWithOffsetJump.

public ConsumerRecordWithOffsetJump(ConsumerRecord<K, V> record, long offsetJump) {
        this.record = record;
        this.offsetJump = offsetJump;
    }

And we may need new KafkaPartitionSplitReader that implements
SplitReader<ConsumerRecordWithOffsetJump<byte[], byte[]>, KafkaPartitionSplit>.
So when record is emitted it should set current offset not just record.offset()+1 but
record.offset() + record.jumpValue in here.
jumpValue is typically 1, except for the last record of each poll where it's calculated as
consumer.position() - lastRecord.offset().
If this sounds good to everyone, I'm happy to work on this.

dongwoo6kim avatar Apr 29 '24 03:04 dongwoo6kim

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Apr 29 '24 03:04 boring-cyborg[bot]

its work , i had try.

LinMingQiang avatar May 15 '24 02:05 LinMingQiang

Hello @morazow I've added ITCase for this case. This case fails for current main branch due to timeout and works okay with fixed code.

dongwoo6kim avatar Jun 01 '24 16:06 dongwoo6kim

Thanks @dongwoo6kim ,

Tests looks good from my side 👍

(Recently I faced similar issue which maybe related, when running batch mode with setting startingOffsets. The change should solve that issue. But we may create issue for it)

morazow avatar Jun 10 '24 21:06 morazow

Thanks for confirming @morazow, Please feel free to provide any additional advice before merging this fix. It would be also helpful if you could elaborate more on the issue you mentioned and consider adding relevant test code for it.

dongwoo6kim avatar Jun 13 '24 01:06 dongwoo6kim

Hey @dongwoo6kim, we created another issue for it, the solution seems to be similar but let's discuss it again once this PR is merged.

morazow avatar Jun 26 '24 11:06 morazow

Hello @morazow, I've added test code for the mentioned issue, please take a look. Test passes with this fixed code and on the latest main branch, it timeouts due to indefinite blocking.

dongwoo6kim avatar Jun 30 '24 13:06 dongwoo6kim

Thanks @dongwoo6kim, Looks good!

morazow avatar Jul 08 '24 11:07 morazow

@AHeise thanks for the feedback! I've addressed your comments and applied the suggested changes. When you have a moment, please take a look. Thanks

dongwoo6kim avatar Sep 13 '24 16:09 dongwoo6kim

Seems like the deleted arch unit rule was still needed. What was your intent when you deleted it?

AHeise avatar Sep 16 '24 07:09 AHeise

Seems like the deleted arch unit rule was still needed. What was your intent when you deleted it?

It was automatically deleted after running mvn clean verify locally. I manually rolled back the archunit changes.

dongwoo6kim avatar Sep 17 '24 04:09 dongwoo6kim

@AHeise Thanks for the feedback. I've left some comments and made updates. Please have a look.

dongwoo6kim avatar Sep 17 '24 04:09 dongwoo6kim

Changes all LGTM. I'm running CI and merge when it's green. Thank you very much for this much needed fix!

AHeise avatar Sep 17 '24 09:09 AHeise

Awesome work, congrats on your first merged pull request!

boring-cyborg[bot] avatar Sep 17 '24 13:09 boring-cyborg[bot]