Remove expensive shuffle of read data in KafkaIO when using sdf and commit offsets
The reshuffle adds a barrier so that the read records were committed before possibly committing offsets. However this is unnecessary as the DoFn is annotated with RequiresStableInput and the fusion barrier can be introduced with just shuffling the offsets.
In both cases it is possible for a commit offset to be committed back to kafka before the data has been entirely processed by the pipeline. However with support to drain data from the pipeline (such as in Cloud Dataflow) this allows for exactly-once semantics for KafkaIO via the committed offsets to the topics across pipeline drain and restart.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [ ] Update
CHANGES.mdwith noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.
@kennknowles I'd appreciate you to take a look and let me know if this makes sense with the beam model in general. I don't think that the Reshuffle adds any more guarantees about barriers for other runners than the combine itself would but could use some confirmation.
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers
I think the problem is that no runner actually implements RequiresStableInput. If Dataflow implemented it, the implemention would be a shuffle.
There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.
We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.
For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier? If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint. The fused graph with the reshuffle looks something like:
Read -> Reshuffled records+ offsets + fused processing -> fused stage with more user processing
\-> per key combined fused stage commiting offsets
With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.
There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.
We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.
For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier? If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint. The fused graph with the reshuffle looks something like:
Read -> Reshuffled records+ offsets + fused processing -> fused stage with more user processing \-> per key combined fused stage commiting offsetsWith flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.
Yes, for Flink this really needs to be something that happens after the checkpoint is known to be durable. To do this right, we probably could use a semantic construct for that, which would also solve it for Dataflow.
Given that everything about this implementation depends on Dataflow's idiosyncracies, I'm OK with modifying it to further depend on them but have better performance.
This change causes an error when updating the Dataflow pipeline from previous version, but the update can be allowed by passing:
--transformNameMapping={"KafkaIO.Read/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey":""}"
@kennknowles Is that sufficient or should I add some option to maintain the previous expensive behavior? Are there any other concerns with this change? Thanks!
This change causes an error when updating the Dataflow pipeline from previous version, but the update can be allowed by passing:
--transformNameMapping={"KafkaIO.Read/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey":""}"@kennknowles Is that sufficient or should I add some option to maintain the previous expensive behavior? Are there any other concerns with this change? Thanks!
Thanks for raising this. I hate to bring it in, because it is a pain, but we do have a mechanism where the user can pass --updateCompatibilityVersion and for this one we really ought to use it. Example at https://github.com/apache/beam/pull/28853/files#diff-b8cf6c3051a36c566f2f28f525449f456a88b05b3b4c17c814e6a55ba2ce36e9R77
For you, the work is to keep the old code as-is in a deprecated fork of expand that is activated if the user requests it. Users who value compatibility can/should set this field to the value of the version they launched the pipeline with.
This allows us to make update-incompatible changes to the default codepath without breaking users with long-running pipeline that want to upgrade the SDK for some other reason. It is fine for users who want this new improvement to have to pass the parameter you mentioned, or even to have to drain.
@kennknowles PTAL, I added the legacy support back via the option you mentioned.
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:
R: @Abacn for label java. R: @damondouglas for label io.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
Manually verified against dataflow service that:
- starting a pipeline without the change and then attempting to update with the change fails with update compatability.
- starting a pipeline without the change and then attempting to update with the updateCompatibilityVersion set to 2.59.1 succeeds and is compatible