[flink-runner] Improve Datastream for batch performances
Context
Flink will drop support for the dataset API in 2.0 which should be released by EOY so it quite important for Beam to support Datastream well.
The PR
This PR improves the performances of Batch jobs executed with --useDatastreamForBatch by porting the following performance optimizations already present in FlinkBatchTransformTranslators but lacking in FlinkStreamingTransformTranslators.
- Limit the max size of source splits. Similar to https://github.com/apache/beam/pull/28045
- Pre-combine before shuffle (both reduce by key and GBK)
- Disable bundling in batch mode (except for pre-combine). Lower the default bundle size since the new behavior puts pressure on the heap.
It also implements the following optimizations:
- Use a "lazy" split enumerator to distributes split dynamically rather than eagerly. This new enumerator greatly reduces skew as each slot is able to pull new splits to consume only when it has finished its work.
- Set the default
maxParallelismtoparallelismas the total number of key groups is equal tomaxParallelism. Again this reduces skew. - Make
ToKeyedWorkItempart ofDoFnOperatorwhich reduces the size of the job graph and avoid unnecessary inter-task communication. - Force a common slot-sharing group on every bounded IOs. This emulate the behavior of the Dataset API which again improves performances especially when data is being shuffled several times while partitioning keys are unchanged (for example of the job does
GBK -> map -> CombinePerKey). Add a flag to control this feature (defaults to active). - Use a custom class for keys which guarantee a good distribution of data even when the number of keys is not >> parallelism.
- Other minor optimizations removing repeated serde work.
Benchmarks
The patched version was tested against a few of Spotify's production batch workflows. All settings were left unchanged except for the followings:
- passed
--useDatastreamForBatch=true - set
jobmanager.scheduler: default(otherwise datastream default to adaptive scheduler).
| Beam 2.56 - dataset | Beam 2.56 - datastream | Beam 2.56 - datastream patched | ||||
|---|---|---|---|---|---|---|
| job | # workers | execution time | execution time | % diff | execution time | % diff |
| Job 1 | 350 | 2:19:00 | fails after 4h29min | - | 1:43:00 | -25.90% |
| Job 2 | 160 | 0:23:00 | 0:35:00 | 52.17% | 0:22:36 | -1.74% |
| Job 3 | 200 | 0:53:08 | 1:34:39 | 78.14% | failed | - |
| Job 4 | 160 | 2:31:20 | 4:27:00 | 76.43% | 2:19:35 | -7.76% |
| Job 5 | 1 | 0:43:00 | not tested | - | 0:38:00 | -11.63% |
| Job 6 | 300 | 3:01:00 | not tested | - | 2:55:00 |
Note
Job 3 fails with a stackoverflow exception because of a bug in versions of Kryo < 3.0. I believe this is because the job uses taskmanager.runtime.large-record-handler: true and it should be fixed in Flink 2.0 since Kryo is upgraded to a more recent version.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [ ] Update
CHANGES.mdwith noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:
R: @damccorm for label website.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
@kennknowles would you mind taking a look at this one?
Reminder, please take a look at this pr: @damccorm
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:
R: @melap for label website.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
R: @kennknowles
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers
To test this thoroughly, let us add some of the postcommits by touching trigger files. In #32648 you can see how I edited the JSON files (including some new ones) and I think these are all the Flink-specific postcommit jobs.
@kennknowles done. I also rebased master but some of the tests seem to be quite flaky now. There are test failing on things I did not touch (direct runner) and the Flink tests that are failing here are not failing on my machine... Any idea how I could make them work ?
I opened jto/beam#236 with some more trigger files. The "PVR" trigger files stands for "Portable Validates Runner" that isn't as directly impacted. I think the non-portable ValidatesRunner tests should test that the runner still complies with the model and passes the basic tests.
Thanks! I just merged it. Sorry for the slow response. I was on vacation :)
Hey there!
I rebased master into my branch and a few tests are failing however:
In beam_PreCommit_Java (Run Java PreCommit)
- tests are failing in
:runners:spark:3:test. I did not change anything in the spark runner and according to the report those tests are also failing in other builds. I guess my PR is not the cause of those failures. - One of the tests in
:runners:flink:1.17:testis failing but it succeeds on my machine. It looks like something may not be deterministic. I already fixed a couple of determinism issues in tests. I'll try to fix this one too.
In beam_PostCommit_Java_ValidatesRunner_Flink (Run Flink ValidatesRunner)
- One test in
:runners:flink:1.19:validatesRunnerBatchWithDataStreamis failing. This one is a "real" failure. I'll investigate it too.
PostCommit Go VR Flink / beam_PostCommit_Go_VR_Flink (Run Go Flink ValidatesRunner)
Logs are truncated. I don't know if there's an actual failure or what it might be...
Hey @kennknowles!
I tried really hard to make the failing test (org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests) work but in the end I think it's impossible.
The behaviour of the runner is correct (it fails when it's supposed to) but Flink will either fail with the expected exception or just fail with a generic error (TaskNotRunningException: Task is not running, but in state FAILED).
The expected error is there in the logs but the "final" error may or may not be the expected one. The behaviour is non-deterministic.
In the end I just added this class into sickbay and that seems to have fixed almost all the problems.
The Python PostCommits are failing but the error is:
2024-11-18T11:39:33.3464029Z Execution failed for task ':sdks:python:container:py312:docker'. │
│2024-11-18T11:39:33.3464463Z > Process 'command 'docker'' finished with non-zero exit value 1
which I think is unrelated to those changes and I could not find anything in the logs suggesting otherwise.
Hey @kennknowles @jto does this PR have any next steps?
I agree with your change to sickbay that Lifecycle test. It sounds like it is incorrectly specified, not taking into account allowable runner variation. Sorry for the delay which has led to conflicts - if you resolve them I think this is good to merge.
Hey @kennknowles !
Thanks for following up. I merged the latest changes from master and the PR can now be merged.
I had to add another test to the sickbay. org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton makes both validatesRunnerBatch and validatesRunnerBatchWithDataStream fail with a timeout. Technically the test passes in both cases but for some reason it takes a very long time to execute. Because it causes the same issue with and without DataStream, I don't think this PR is the cause.
Some of the Python tests are failing with the following exception:
ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject
which does not look like to be caused by this PR.
:sdks:java:harness:test failed after ~6h which again looks unrelated to this PR.
It'd be ideal if fixes were available and could be rebased into my branch to get all the tests green before merging but otherwise I think the changes in the Flink runner are in a good shape.
Also history is a bit messy because I tested a bunch of options to get the test to pass. Cleaning it up right now :)
Ready for merge :)
Thanks for reviewing and merging :)
@kennknowles made a PR here: https://github.com/apache/beam/pull/33838