beam icon indicating copy to clipboard operation
beam copied to clipboard

[flink-runner] Improve Datastream for batch performances

Open jto opened this issue 1 year ago • 9 comments

Context

Flink will drop support for the dataset API in 2.0 which should be released by EOY so it quite important for Beam to support Datastream well.

The PR

This PR improves the performances of Batch jobs executed with --useDatastreamForBatch by porting the following performance optimizations already present in FlinkBatchTransformTranslators but lacking in FlinkStreamingTransformTranslators.

  • Limit the max size of source splits. Similar to https://github.com/apache/beam/pull/28045
  • Pre-combine before shuffle (both reduce by key and GBK)
  • Disable bundling in batch mode (except for pre-combine). Lower the default bundle size since the new behavior puts pressure on the heap.

It also implements the following optimizations:

  • Use a "lazy" split enumerator to distributes split dynamically rather than eagerly. This new enumerator greatly reduces skew as each slot is able to pull new splits to consume only when it has finished its work.
  • Set the default maxParallelism to parallelism as the total number of key groups is equal to maxParallelism. Again this reduces skew.
  • Make ToKeyedWorkItem part of DoFnOperator which reduces the size of the job graph and avoid unnecessary inter-task communication.
  • Force a common slot-sharing group on every bounded IOs. This emulate the behavior of the Dataset API which again improves performances especially when data is being shuffled several times while partitioning keys are unchanged (for example of the job does GBK -> map -> CombinePerKey). Add a flag to control this feature (defaults to active).
  • Use a custom class for keys which guarantee a good distribution of data even when the number of keys is not >> parallelism.
  • Other minor optimizations removing repeated serde work.

Benchmarks

The patched version was tested against a few of Spotify's production batch workflows. All settings were left unchanged except for the followings:

  • passed --useDatastreamForBatch=true
  • set jobmanager.scheduler: default (otherwise datastream default to adaptive scheduler).
Beam 2.56 - dataset Beam 2.56 - datastream Beam 2.56 - datastream patched
job # workers execution time execution time % diff execution time % diff
Job 1 350 2:19:00 fails after 4h29min - 1:43:00 -25.90%
Job 2 160 0:23:00 0:35:00 52.17% 0:22:36 -1.74%
Job 3 200 0:53:08 1:34:39 78.14% failed -
Job 4 160 2:31:20 4:27:00 76.43% 2:19:35 -7.76%
Job 5 1 0:43:00 not tested - 0:38:00 -11.63%
Job 6 300 3:01:00 not tested - 2:55:00

Note

Job 3 fails with a stackoverflow exception because of a bug in versions of Kryo < 3.0. I believe this is because the job uses taskmanager.runtime.large-record-handler: true and it should be fixed in Flink 2.0 since Kryo is upgraded to a more recent version.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

jto avatar Sep 12 '24 09:09 jto

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label website.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Sep 13 '24 10:09 github-actions[bot]

@kennknowles would you mind taking a look at this one?

damccorm avatar Sep 13 '24 17:09 damccorm

Reminder, please take a look at this pr: @damccorm

github-actions[bot] avatar Sep 24 '24 12:09 github-actions[bot]

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @melap for label website.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

github-actions[bot] avatar Sep 26 '24 12:09 github-actions[bot]

R: @kennknowles

damccorm avatar Oct 03 '24 15:10 damccorm

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

github-actions[bot] avatar Oct 03 '24 15:10 github-actions[bot]

To test this thoroughly, let us add some of the postcommits by touching trigger files. In #32648 you can see how I edited the JSON files (including some new ones) and I think these are all the Flink-specific postcommit jobs.

kennknowles avatar Oct 03 '24 20:10 kennknowles

@kennknowles done. I also rebased master but some of the tests seem to be quite flaky now. There are test failing on things I did not touch (direct runner) and the Flink tests that are failing here are not failing on my machine... Any idea how I could make them work ?

jto avatar Oct 17 '24 15:10 jto

I opened jto/beam#236 with some more trigger files. The "PVR" trigger files stands for "Portable Validates Runner" that isn't as directly impacted. I think the non-portable ValidatesRunner tests should test that the runner still complies with the model and passes the basic tests.

kennknowles avatar Oct 18 '24 17:10 kennknowles

Thanks! I just merged it. Sorry for the slow response. I was on vacation :)

jto avatar Nov 05 '24 09:11 jto

Hey there!

I rebased master into my branch and a few tests are failing however:

In beam_PreCommit_Java (Run Java PreCommit)

  • tests are failing in :runners:spark:3:test. I did not change anything in the spark runner and according to the report those tests are also failing in other builds. I guess my PR is not the cause of those failures.
  • One of the tests in :runners:flink:1.17:test is failing but it succeeds on my machine. It looks like something may not be deterministic. I already fixed a couple of determinism issues in tests. I'll try to fix this one too.

In beam_PostCommit_Java_ValidatesRunner_Flink (Run Flink ValidatesRunner)

  • One test in :runners:flink:1.19:validatesRunnerBatchWithDataStream is failing. This one is a "real" failure. I'll investigate it too.

PostCommit Go VR Flink / beam_PostCommit_Go_VR_Flink (Run Go Flink ValidatesRunner)

Logs are truncated. I don't know if there's an actual failure or what it might be...

jto avatar Nov 07 '24 12:11 jto

Hey @kennknowles! I tried really hard to make the failing test (org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests) work but in the end I think it's impossible. The behaviour of the runner is correct (it fails when it's supposed to) but Flink will either fail with the expected exception or just fail with a generic error (TaskNotRunningException: Task is not running, but in state FAILED). The expected error is there in the logs but the "final" error may or may not be the expected one. The behaviour is non-deterministic. In the end I just added this class into sickbay and that seems to have fixed almost all the problems.

The Python PostCommits are failing but the error is:

2024-11-18T11:39:33.3464029Z Execution failed for task ':sdks:python:container:py312:docker'.                                                                                                              │
│2024-11-18T11:39:33.3464463Z > Process 'command 'docker'' finished with non-zero exit value 1

which I think is unrelated to those changes and I could not find anything in the logs suggesting otherwise.

jto avatar Nov 18 '24 13:11 jto

Hey @kennknowles @jto does this PR have any next steps?

claudevdm avatar Jan 07 '25 14:01 claudevdm

I agree with your change to sickbay that Lifecycle test. It sounds like it is incorrectly specified, not taking into account allowable runner variation. Sorry for the delay which has led to conflicts - if you resolve them I think this is good to merge.

kennknowles avatar Jan 10 '25 15:01 kennknowles

Hey @kennknowles !

Thanks for following up. I merged the latest changes from master and the PR can now be merged.

I had to add another test to the sickbay. org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton makes both validatesRunnerBatch and validatesRunnerBatchWithDataStream fail with a timeout. Technically the test passes in both cases but for some reason it takes a very long time to execute. Because it causes the same issue with and without DataStream, I don't think this PR is the cause.

Some of the Python tests are failing with the following exception:

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

which does not look like to be caused by this PR.

:sdks:java:harness:test failed after ~6h which again looks unrelated to this PR.

It'd be ideal if fixes were available and could be rebased into my branch to get all the tests green before merging but otherwise I think the changes in the Flink runner are in a good shape.

jto avatar Jan 27 '25 09:01 jto

Also history is a bit messy because I tested a bunch of options to get the test to pass. Cleaning it up right now :)

jto avatar Jan 27 '25 09:01 jto

Ready for merge :)

jto avatar Jan 27 '25 13:01 jto

Thanks for reviewing and merging :)

jto avatar Feb 03 '25 15:02 jto

@kennknowles made a PR here: https://github.com/apache/beam/pull/33838

jto avatar Feb 03 '25 16:02 jto