flink-connector-kafka
flink-connector-kafka copied to clipboard
[FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader
Problem
When using the flink kafka connector in batch scenarios, consuming transactional messages can cause indefinite hanging. This issue can be easily reproduced with following steps.
- Produce transactional messages and commit them.
- Configure
scan.bounded.mode
tolatest-offset
and run consumer using flink kafka connector
Cause
The previous stopping condition in the KafkaPartitionSplitReader
compared the offset of the last record with the stoppingOffset
. This approach works for streaming use cases and batch processing of non-transactional messages. However, in scenarios involving transactional messages, this is insufficient.
Control messages, which are not visible to clients, can occupy the entire range between the last record's offset and the stoppingOffset which leads to indefinite blocking.
Workaround
I've modified the stopping condition to use consumer.position(tp)
, which effectively skips any control messages present in the current poll, pointing directly to the next record's offset.
To handle edge cases, particularly when properties.max.poll.records
is set to 1
, I've adjusted the fetch method to always check all assigned partitions, even if no records are returned in a poll.
Edge case example
Consider partition 0
, where offsets 13
and 14
are valid records and 15
is a control record. If stoppingOffset
is set to 15 for partition 0
and properties.max.poll.records
is configured to 1
, checking only partitions that return records would miss offset 15. By consistently reviewing all assigned partitions, the consumer’s position jumps control record in the subsequent poll, allowing the system to escape.
Discussion
To address the metric issue in FLINK-33484, I think we need to make wrapper class of ConsumerRecord
for example ConsumerRecordWithOffsetJump
.
public ConsumerRecordWithOffsetJump(ConsumerRecord<K, V> record, long offsetJump) {
this.record = record;
this.offsetJump = offsetJump;
}
And we may need new KafkaPartitionSplitReader
that implements
SplitReader<ConsumerRecordWithOffsetJump<byte[], byte[]>, KafkaPartitionSplit>
.
So when record is emitted it should set current offset not just record.offset()+1
but
record.offset() + record.jumpValue
in here.
jumpValue
is typically 1, except for the last record of each poll where it's calculated as
consumer.position() - lastRecord.offset()
.
If this sounds good to everyone, I'm happy to work on this.
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
its work , i had try.
Hello @morazow I've added ITCase for this case. This case fails for current main branch due to timeout and works okay with fixed code.
Thanks @dongwoo6kim ,
Tests looks good from my side 👍
(Recently I faced similar issue which maybe related, when running batch mode with setting startingOffsets
. The change should solve that issue. But we may create issue for it)
Thanks for confirming @morazow, Please feel free to provide any additional advice before merging this fix. It would be also helpful if you could elaborate more on the issue you mentioned and consider adding relevant test code for it.
Hey @dongwoo6kim, we created another issue for it, the solution seems to be similar but let's discuss it again once this PR is merged.
Hello @morazow, I've added test code for the mentioned issue, please take a look. Test passes with this fixed code and on the latest main branch, it timeouts due to indefinite blocking.
Thanks @dongwoo6kim, Looks good!
@AHeise thanks for the feedback! I've addressed your comments and applied the suggested changes. When you have a moment, please take a look. Thanks
Seems like the deleted arch unit rule was still needed. What was your intent when you deleted it?
Seems like the deleted arch unit rule was still needed. What was your intent when you deleted it?
It was automatically deleted after running mvn clean verify
locally. I manually rolled back the archunit changes.
@AHeise Thanks for the feedback. I've left some comments and made updates. Please have a look.
Changes all LGTM. I'm running CI and merge when it's green. Thank you very much for this much needed fix!
Awesome work, congrats on your first merged pull request!