beam
beam copied to clipboard
Reapply "Add Redistribute transform to Java SDK"
This rolls forward the Java SDK reference implementation of Redistribute. This does not include runner translations.
- In the prior attempt at this change: many runners failed the Redistribute tests until a custom translation was inserted. I addressed it by adding custom translations that roughly matched Reshuffle.
- In this attempt, I will focus first on getting all runners passing the tests as a composite, or sickbaying if the runner lacks core capabilities required by the tests. For example it looks like some / many runners may lose windowing metadata. I will add custom runner translations in later iterations.
This reverts commit f498cdfc2a84a49f0d5812e7dd0e55e08b214e4a.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [x] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [x] Update
CHANGES.mdwith noteworthy changes. - [x] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.
Notes on failures:
-
ValidatesRunner Dataflow: failures are all side input tests, leftover from
masterbreakage; we already know this was green from the first time -
ValidatesRunner Dataflow JavaVersions: same
-
ValidatesRunner Dataflow V2: failure is quota; we already know this was green from the first time around
-
ValidatesRunner Flink Java11: failure is unrelated flake in testAfterProcessingTimeContinuationTriggerUsingState
-
ValidatesRunner Spark failures are testRedistributePreservesMetadata which suggests Spark's GBK has issues
-
ULR failure is testRedistributePreservesMetadata which suggests GBK has issues or wrong assumptions (it is just a composite transform that should require no special treatment)
-
PVR Flink Streaming is testRedistributePreservesMetadata same as the ULR failure so presuming the error is in SDK harness or the composite transform
-
PVR Flink Batch is testRedistributePreservesMetadata same error
-
PVR Samza: same
-
PVR Spark 3: all the Redistribute tests fail :runners:spark:3:job-server:validatesPortableRunnerStreaming
Error received from SDK harness for instruction 3: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException: org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow cannot be cast to java.lang.String
I think we can safely drop the Java11 and JavaVersions variations.
So in conclusion, my analysis is this:
Non-portable:
- DirectRunner works
- Dataflow works
- Samza works
- Flink works
- Spark StructuredStreaming works
- Spark batch has some issue where its shuffle drop panes. Out of scope
Portable:
- Dataflow works
- ULR, Flink, Spark, Samza all share the same issue that is likely a bug in the portable translation (greedy fuser?)
Based on this analysis I believe the prior approach of sickbaying the failing tests is valid. There does not seem to be a bug in the Redistribute transform. All runners should correctly run it as a pure composite, and adding translations to the runners for efficiency is a bonus. I will follow up with rolling forward those changes too.
R: @Abacn
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
For the reviewer, summary so far:
- Roll forward: I am rolling forward just the composite transform and tests, which are pretty trivial, also the same as Reshuffle, and already work correctly on enough runners I believe they are correct.
- Since the composite should work, we don't need the runner implementations to move forward. In fact, the runner translations were masking bugs I think. I will separately roll forward runner specializations.
- Filtered tests: Last time I missed tests that were (already) broken, so do you think I missed any this time?
- When I roll forward translations one runner at a time, I can re-enable anything fixed by them.
beam_PostCommit_Java_PVR_Spark_Batch needs to exclude from the test added: https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Spark_Batch.yml?query=event%3Aschedule
Gya I missed one ;_;
Yeah because the naming of this test is "beam_PostCommit_Java_PVR_Spark_Batch" not "beam_PostCommit_Java_PVR_Spark3_Batch" so this test wasn't run on this PR