beam
beam copied to clipboard
Support VR test including TestStream for Spark runner in streaming mode
Run VR tests for Spark streaming runner rather than custom tests (test are already run as part of the "normal" unit test run).
If forceStreaming
is set to true
, the TestSparkRunner
will replace Read.Bounded
with UnboundedReadFromBoundedSource
so tests are run in streaming mode.
Additionally this PR adds support for TestStream
.
Closes #22472
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Choose reviewer(s) and mention them in a comment (
R: @username
). - [ ] Mention the appropriate issue in your description (for example:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead. - [ ] Update
CHANGES.md
with noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.
Run Spark ValidatesRunner
Unfortunately I'm stuck with some flaky tests. It looks like watermarks are not advanced in a deterministic way.
Below some logs of org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy
(edited for readability).
@aromanenko-dev @echauchot if you have some time I'd be more than grateful for a 2nd pair of 👀 .
Successful run (watermark advanced early enough, so that timer is triggered and the only element is emitted):
14:42:52,938 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,940 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,949 [3] TRACE WindowTracing - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:42:52,957 [3] TRACE WindowTracing - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:42:52,960 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:42:52,961 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:42:52,962 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements are 0
14:42:53,137 [spark-listener-group-appStatus] INFO GlobalWatermarkHolder - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}] [inputWatermark: BOUNDEDW_MAX]
14:42:53,146 [15] DEBUG WindowTracing - ReduceFnRunner: Received timer key:Row2; window:GlobalWindow; data:TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false} with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing - ReduceFnRunner: Cleaning up for key:Row2; window:GlobalWindow with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing - WatermarkHold.extractAndRelease: for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,149 [15] DEBUG WindowTracing - WatermarkHold.extractAndRelease.read: clearing for key:Row2; window:GlobalWindow
14:42:53,150 [15] DEBUG WindowTracing - describePane: ON_TIME pane (prev was null) for key:Row2; windowMaxTimestamp:GLOBALW_MAX; inputWatermark:BOUNDEDW_MAX; outputWatermark:null; isLateForOutput:false
14:42:53,152 [15] TRACE WindowTracing - ReduceFnRunner.onTrigger: outputWindowedValue key:Row2 value:[Row1] at GLOBALW_MAX
14:42:53,152 [15] DEBUG WindowTracing - WatermarkHold.clearHolds: For key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,153 [15] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements are TimestampedValueInGlobalWindow{value=KV{Row2, [Row1]}, timestamp=GLOBALW_MAX, pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} 1
Failed run (watermark is advanced too late, timer doesn't trigger and element is lost):
14:41:51,453 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,455 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,463 [3] TRACE WindowTracing - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:41:51,471 [3] TRACE WindowTracing - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:41:51,476 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements are 0
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:41:51,658 [15] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements are 0
14:41:51,662 [spark-listener-group-appStatus] INFO GlobalWatermarkHolder - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer
:
R: @lukecwik for label java.
Available commands:
-
stop reviewer notifications
- opt out of the automated review tooling -
remind me after tests pass
- tag the comment author after tests pass -
waiting on author
- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
Run Spark ValidatesRunner
Run Spark ValidatesRunner
Run Spark ValidatesRunner
Run Spark ValidatesRunner
Run Java PreCommit
Reminder, please take a look at this pr: @lukecwik
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer
:
R: @kennknowles for label java.
Available commands:
-
stop reviewer notifications
- opt out of the automated review tooling -
remind me after tests pass
- tag the comment author after tests pass -
waiting on author
- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
Reminder, please take a look at this pr: @kennknowles
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer
:
R: @lukecwik for label java.
Available commands:
-
stop reviewer notifications
- opt out of the automated review tooling -
remind me after tests pass
- tag the comment author after tests pass -
waiting on author
- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
Reminder, please take a look at this pr: @lukecwik
Run Spark ValidatesRunner
@kennknowles FYI, a significant number of VR tests are constantly failing here. If I run them independently they usually succeed. It looks like there's some indeterminism around watermark propagation in the runner, see https://github.com/apache/beam/issues/23129. Wondering, would you know anyone who's familiar with that code?
I don't know if anyone currently around would be familiar with SparkRunner watermark propagation.
I think it is valuable to get these tests running and disable them. The test and list of disabled tests can be a real representation of the current state. That way things that are green can stay green.
run spark validatesrunner
Run Spark ValidatesRunner
Run Spark ValidatesRunner
I took a bit of a turn here after validating my initial approach replacing bounded sources with UnboundedReadFromBoundedSource
with VR tests in Flink:
-
Tests that failed likely due to watermark issues with the Spark runner (#23129, see test results) ran fine with Flink suggesting there really is a major problem (in streaming mode).
-
Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.
The initial reason for this approach was to prevent the Spark runner from failing when streaming was forced via pipeline options in VR tests for bounded test cases: Spark refuses to start if there's no streaming workload scheduled.
Instead TestSparkRunner
now just detects the translation mode and acts accordingly.
Unfortunately, this hides any watermark issues uncovered above as VR tests succeed.
- Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.
In the Beam model, this condition is that a GroupByKey of an unbounded PCollection in global window must have a trigger. But you can still have a bounded PCollection in streaming mode.
So the summary is:
- forcing a run in streaming mode, but leaving bounded PCollections as bounded is OK
- automatically making all PCollections unbounded is flawed (but still can be useful to find bugs sometimes)
@kennknowles fine to merge this?
Reminder, please take a look at this pr: @lukecwik
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer
:
R: @kennknowles for label java.
Available commands:
-
stop reviewer notifications
- opt out of the automated review tooling -
remind me after tests pass
- tag the comment author after tests pass -
waiting on author
- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
@kennknowles kind ping, are you ok to merge it?