flink
flink copied to clipboard
[FLINK-28853][connector-base] Add FLIP-217 support for watermark alignment of source splits
What is the purpose of the change
- The change implements FLIP-217 (https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits) to support watermark alignment of source splits.
Brief change log
- Revised
SplitFetcher
threading model and add support to pause/resumeSplitFetcher
- Adds support for watermark alignment of individual source splits controlled by
SourceOperator
- Adds support for split watermark alignment for Kafka and Pulsar sources
- Adds configuration parameter to allow unaligned splits as migration plan to support legacy sources that lack support for split alignment
Verifying this change
This change added and extended tests and can be verified as follows:
-
SplitFetcherTest
extended to test pause/resume ofSplitFetcher
-
SourceOperatorSplitWatermarkAlignmentTest
added to test split alignment based on split reader watermarks -
SplitFetcherPauseResumeSplitReaderITCase
added to test pause/resume of individual split readers
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: yes (according to FLIP-217) - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs (configuration parameter) and JavaDocs (changes of public APIs)
CI report:
- 991230aa2cc6e8a105305d6ff901c649cd8f72d9 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
Any thoughts on how and when to proceed with this PR? We'd like to help to bring this over the finish line.
I wonder about the way the SourceOperator calls through the stack via pauseOrResumeSplits while also being called back with watermarks. My intuition would have been to do the pausing directly at the splits based on the current max watermark lag, e.g. by putting splits into a suspended mode where no more output is accepted from a split. I guess this could be tricky because the implementation of the source doesn't necessarily know it's paused and could timeout or otherwise misbehave. So it looks like the current implementation is simpler.
Currently Splits are just pojos, data carriers, without much logic. If I understand you correctly, what you are suggesting would require us to push some availability logic down to the splits and it would still require handling of this availability in the SourceReader
- basically the same logic that we have in the current proposal. The difference would be only that SourceReader
would be informed about (un)availability change of a split, from the split, instead of from the pauseOrResume
call.
Given that most of the users are expected to just use SourceReaderBase
, the current proposal gives them watermark alignment for free. While adding availability logic to the splits would increase exposure of this feature in the public API, without much benefit?
Any thoughts on how and when to proceed with this PR? We'd like to help to bring this over the finish line.
Indeed there is an issue now, that I think @smattheis doesn't work on Flink anymore, so he can not finish this PR. I will go through the PR myself and do a quick pass/respond to the inlined questions from you @mxm and @mas-chen , but it would be great if someone could take over applying the actual changes after the review. Would you be up to do that @mxm ?
The next question would be how to squash the commits? If you want to preserve authorship of your changes @mxm, without overwriting authors of the base commits, we would need either a single fixup commit from you, all one your fixup per original commit.
I'd prefer this to be a single suqashed commit with the use of Co-Authored-By. IMHO there is no benefit in merging the individual commits because they are very much tight to each other. This keeps the history clean while retaining authorship. What do you think?
I'd prefer this to be a single suqashed commit with the use of Co-Authored-By. IMHO there is no benefit in merging the individual commits because they are very much tight to each other. This keeps the history clean while retaining authorship. What do you think?
I wouldn't squash all commits into one. The original commits authored by Sebastian, Dawid and Arvid I think are nicely separated. But sure, squashing your fixups to those commits and co-authoring them is also perfectly fine.
I wouldn't squash all commits into one. The original commits authored by Sebastian, Dawid and Arvid I think are nicely separated. But sure, squashing your fixups to those commits and co-authoring them is also perfectly fine.
If the commits are nicely separated, why were they not handled in separate PRs? Anyhow, I'm ok with just putting my changes in a single commit on top of those.
@flinkbot run azure
@mxm Thanks a lot also from my side for you contributions. However, @pnowojski this PR was not orphaned! (I wrote to you that I'm on vacations.)
Thank you @pnowojski for your thoughtful comments and guidance. And of course thanks to the original authors @AHeise @dawidwys @smattheis.
Whoops @smattheis I had assumed you wouldn't continue on this PR but I think no harm has been done. Let me know if anything is open on your side. It still need to revise the code with respect to a test failure.
@mxm Thanks a lot also from my side for you contributions. However, @pnowojski this PR was not orphaned! (I wrote to you that I'm on vacations.)
Yes, sorry for the mental shortcut. There was a delay in the communication between first my vacations and then yours. Around the time @mxm was asking how to continue with this PR, you @smattheis were not responding, and I got a message from you that you can work on it only after @mxm has already volunteered to take it over. After that I didn't want to create more confusion. But sorry for my inaccurate statement that it was "orphaned" :)
@mxm @pnowojski All good. Thanks for driving it.
(Rebased)
@flinkbot run azure
There are still some related test failures:
Documented option pipeline.watermark-alignment.allow-unaligned-source-splits does not exist.
and
Sep 09 15:10:03 [ERROR] Errors: Sep 09 15:10:03 [ERROR] PulsarUnorderedSourceReaderTest.supportsPausingOrResumingSplits(PulsarSourceReaderBase, Boundedness, String)[1] » Timeout Sep 09 15:10:03 [ERROR] PulsarUnorderedSourceReaderTest.supportsPausingOrResumingSplits(PulsarSourceReaderBase, Boundedness, String)[2] » Timeout
Thanks @pnowojski. Will fix those.
On another note, do you think it makes sense to add a configuration option to disable using split alignment entirely? I'm a bit worried this feature might cause issues with the already integrated Kafka/Pulsar sources that we are not foreseeing right now. The added configuration option only allows to disable split alignments for non-compatible connectors. We could add another option to disable it entirely to retain the old behavior (disabled by default).
There are still some related test failures:
Documented option pipeline.watermark-alignment.allow-unaligned-source-splits does not exist.
I had to remove the @Deprecated
flag from the configuration option. Otherwise, it will be undocumented.
@flinkbot run azure
On another note, do you think it makes sense to add a configuration option to disable using split alignment entirely? I'm a bit worried this feature might cause issues with the already integrated Kafka/Pulsar sources that we are not foreseeing right now. The added configuration option only allows to disable split alignments for non-compatible connectors. We could add another option to disable it entirely to retain the old behavior (disabled by default).
Does it make sense to use and enable watermark alignment via DataStream API
to only disable it via config value? Keep in mind that user has to manually enable via WatermarkStrategy#withWatermarkAlignment
call [1].
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-beta
Fair point, users may modify the job to disable alignment. It doesn't make sense to override the watermark alignment settings from the config. But the same argument applies to the other configuration option to disable split alignment for not-ready sources. I think it would make sense to provide a transitory option to disable split alignment altogether until this feature has proven to be stable.
@flinkbot run azure
Tests are passing now.
Thank you everyone!
Thank you for helping getting it over the line! @mxm
Exciting to see this make the finish line! This was a long awaited feature (we implemented alignment for the Kinesis source ~4 years ago). Let's see how it holds up in production use cases!
Thanks @mxm @pnowojski @smattheis @dawidwys, @AHeise for your contributions!
Yes, it took longer than it should have. Good that it's finally been merged 🎉