beam icon indicating copy to clipboard operation
beam copied to clipboard

Support VR test including TestStream for Spark runner in streaming mode

Open mosche opened this issue 2 years ago • 6 comments

Run VR tests for Spark streaming runner rather than custom tests (test are already run as part of the "normal" unit test run).

If forceStreaming is set to true, the TestSparkRunner will replace Read.Bounded with UnboundedReadFromBoundedSource so tests are run in streaming mode. Additionally this PR adds support for TestStream.

Closes #22472


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Choose reviewer(s) and mention them in a comment (R: @username).
  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI.

mosche avatar Aug 08 '22 15:08 mosche

Run Spark ValidatesRunner

mosche avatar Aug 08 '22 15:08 mosche

Unfortunately I'm stuck with some flaky tests. It looks like watermarks are not advanced in a deterministic way. Below some logs of org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy (edited for readability).

@aromanenko-dev @echauchot if you have some time I'd be more than grateful for a 2nd pair of 👀 .

Successful run (watermark advanced early enough, so that timer is triggered and the only element is emitted):

14:42:52,938 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,940 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,949 [3] TRACE WindowTracing  - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:42:52,957 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:42:52,960 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:42:52,961 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:42:52,962 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
14:42:53,137 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}] [inputWatermark: BOUNDEDW_MAX]
14:42:53,146 [15] DEBUG WindowTracing  - ReduceFnRunner: Received timer key:Row2; window:GlobalWindow; data:TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false} with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing  - ReduceFnRunner: Cleaning up for key:Row2; window:GlobalWindow with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing  - WatermarkHold.extractAndRelease: for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,149 [15] DEBUG WindowTracing  - WatermarkHold.extractAndRelease.read: clearing for key:Row2; window:GlobalWindow
14:42:53,150 [15] DEBUG WindowTracing  - describePane: ON_TIME pane (prev was null) for key:Row2; windowMaxTimestamp:GLOBALW_MAX; inputWatermark:BOUNDEDW_MAX; outputWatermark:null; isLateForOutput:false
14:42:53,152 [15] TRACE WindowTracing  - ReduceFnRunner.onTrigger: outputWindowedValue key:Row2 value:[Row1] at GLOBALW_MAX
14:42:53,152 [15] DEBUG WindowTracing  - WatermarkHold.clearHolds: For key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,153 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are TimestampedValueInGlobalWindow{value=KV{Row2, [Row1]}, timestamp=GLOBALW_MAX, pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} 1

Failed run (watermark is advanced too late, timer doesn't trigger and element is lost):

14:41:51,453 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,455 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,463 [3] TRACE WindowTracing  - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:41:51,471 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:41:51,476 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:41:51,658 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
14:41:51,662 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}

mosche avatar Aug 08 '22 15:08 mosche

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @lukecwik for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Aug 08 '22 16:08 github-actions[bot]

Run Spark ValidatesRunner

mosche avatar Aug 09 '22 08:08 mosche

Run Spark ValidatesRunner

mosche avatar Aug 09 '22 08:08 mosche

Run Spark ValidatesRunner

mosche avatar Aug 09 '22 09:08 mosche

Run Spark ValidatesRunner

mosche avatar Aug 11 '22 12:08 mosche

Run Java PreCommit

mosche avatar Aug 11 '22 12:08 mosche

Reminder, please take a look at this pr: @lukecwik

github-actions[bot] avatar Aug 19 '22 12:08 github-actions[bot]

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

github-actions[bot] avatar Aug 23 '22 12:08 github-actions[bot]

Reminder, please take a look at this pr: @kennknowles

github-actions[bot] avatar Aug 30 '22 12:08 github-actions[bot]

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @lukecwik for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

github-actions[bot] avatar Sep 02 '22 12:09 github-actions[bot]

Reminder, please take a look at this pr: @lukecwik

github-actions[bot] avatar Sep 12 '22 12:09 github-actions[bot]

Run Spark ValidatesRunner

kennknowles avatar Sep 12 '22 15:09 kennknowles

@kennknowles FYI, a significant number of VR tests are constantly failing here. If I run them independently they usually succeed. It looks like there's some indeterminism around watermark propagation in the runner, see https://github.com/apache/beam/issues/23129. Wondering, would you know anyone who's familiar with that code?

mosche avatar Sep 12 '22 16:09 mosche

I don't know if anyone currently around would be familiar with SparkRunner watermark propagation.

kennknowles avatar Sep 13 '22 20:09 kennknowles

I think it is valuable to get these tests running and disable them. The test and list of disabled tests can be a real representation of the current state. That way things that are green can stay green.

kennknowles avatar Sep 13 '22 20:09 kennknowles

run spark validatesrunner

kennknowles avatar Sep 13 '22 20:09 kennknowles

Run Spark ValidatesRunner

mosche avatar Sep 14 '22 12:09 mosche

Run Spark ValidatesRunner

mosche avatar Sep 14 '22 12:09 mosche

I took a bit of a turn here after validating my initial approach replacing bounded sources with UnboundedReadFromBoundedSource with VR tests in Flink:

  • Tests that failed likely due to watermark issues with the Spark runner (#23129, see test results) ran fine with Flink suggesting there really is a major problem (in streaming mode).

  • Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.

The initial reason for this approach was to prevent the Spark runner from failing when streaming was forced via pipeline options in VR tests for bounded test cases: Spark refuses to start if there's no streaming workload scheduled. Instead TestSparkRunner now just detects the translation mode and acts accordingly.

Unfortunately, this hides any watermark issues uncovered above as VR tests succeed.

mosche avatar Sep 14 '22 13:09 mosche

  • Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.

In the Beam model, this condition is that a GroupByKey of an unbounded PCollection in global window must have a trigger. But you can still have a bounded PCollection in streaming mode.

So the summary is:

  • forcing a run in streaming mode, but leaving bounded PCollections as bounded is OK
  • automatically making all PCollections unbounded is flawed (but still can be useful to find bugs sometimes)

kennknowles avatar Sep 14 '22 20:09 kennknowles

@kennknowles fine to merge this?

mosche avatar Sep 19 '22 09:09 mosche

Reminder, please take a look at this pr: @lukecwik

github-actions[bot] avatar Sep 26 '22 12:09 github-actions[bot]

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

github-actions[bot] avatar Sep 29 '22 12:09 github-actions[bot]

@kennknowles kind ping, are you ok to merge it?

aromanenko-dev avatar Sep 29 '22 15:09 aromanenko-dev