beam icon indicating copy to clipboard operation
beam copied to clipboard

Remove expensive shuffle of read data in KafkaIO when using sdf and commit offsets

Open scwhittle opened this issue 1 year ago • 5 comments

The reshuffle adds a barrier so that the read records were committed before possibly committing offsets. However this is unnecessary as the DoFn is annotated with RequiresStableInput and the fusion barrier can be introduced with just shuffling the offsets.

In both cases it is possible for a commit offset to be committed back to kafka before the data has been entirely processed by the pipeline. However with support to drain data from the pipeline (such as in Cloud Dataflow) this allows for exactly-once semantics for KafkaIO via the committed offsets to the topics across pipeline drain and restart.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

scwhittle avatar Jun 25 '24 10:06 scwhittle

@kennknowles I'd appreciate you to take a look and let me know if this makes sense with the beam model in general. I don't think that the Reshuffle adds any more guarantees about barriers for other runners than the combine itself would but could use some confirmation.

scwhittle avatar Jun 25 '24 10:06 scwhittle

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

github-actions[bot] avatar Jun 25 '24 12:06 github-actions[bot]

I think the problem is that no runner actually implements RequiresStableInput. If Dataflow implemented it, the implemention would be a shuffle.

kennknowles avatar Jun 25 '24 20:06 kennknowles

There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.

We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.

For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier? If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint. The fused graph with the reshuffle looks something like:

Read -> Reshuffled records+ offsets + fused processing  -> fused stage with more user processing                                                                                               
                                                       \-> per key combined fused stage commiting offsets

With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.

scwhittle avatar Jun 26 '24 08:06 scwhittle

There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.

We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.

For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier? If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint. The fused graph with the reshuffle looks something like:

Read -> Reshuffled records+ offsets + fused processing  -> fused stage with more user processing                                                                                               
                                                       \-> per key combined fused stage commiting offsets

With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.

Yes, for Flink this really needs to be something that happens after the checkpoint is known to be durable. To do this right, we probably could use a semantic construct for that, which would also solve it for Dataflow.

Given that everything about this implementation depends on Dataflow's idiosyncracies, I'm OK with modifying it to further depend on them but have better performance.

kennknowles avatar Jun 27 '24 21:06 kennknowles

This change causes an error when updating the Dataflow pipeline from previous version, but the update can be allowed by passing:

--transformNameMapping={"KafkaIO.Read/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey":""}"

@kennknowles Is that sufficient or should I add some option to maintain the previous expensive behavior? Are there any other concerns with this change? Thanks!

scwhittle avatar Aug 12 '24 11:08 scwhittle

This change causes an error when updating the Dataflow pipeline from previous version, but the update can be allowed by passing:

--transformNameMapping={"KafkaIO.Read/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey":""}"

@kennknowles Is that sufficient or should I add some option to maintain the previous expensive behavior? Are there any other concerns with this change? Thanks!

Thanks for raising this. I hate to bring it in, because it is a pain, but we do have a mechanism where the user can pass --updateCompatibilityVersion and for this one we really ought to use it. Example at https://github.com/apache/beam/pull/28853/files#diff-b8cf6c3051a36c566f2f28f525449f456a88b05b3b4c17c814e6a55ba2ce36e9R77

For you, the work is to keep the old code as-is in a deprecated fork of expand that is activated if the user requests it. Users who value compatibility can/should set this field to the value of the version they launched the pipeline with.

This allows us to make update-incompatible changes to the default codepath without breaking users with long-running pipeline that want to upgrade the SDK for some other reason. It is fine for users who want this new improvement to have to pass the parameter you mentioned, or even to have to drain.

kennknowles avatar Aug 12 '24 16:08 kennknowles

@kennknowles PTAL, I added the legacy support back via the option you mentioned.

scwhittle avatar Aug 19 '24 20:08 scwhittle

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java. R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Aug 20 '24 09:08 github-actions[bot]

Manually verified against dataflow service that:

  1. starting a pipeline without the change and then attempting to update with the change fails with update compatability.
  2. starting a pipeline without the change and then attempting to update with the updateCompatibilityVersion set to 2.59.1 succeeds and is compatible

scwhittle avatar Aug 29 '24 13:08 scwhittle