beam icon indicating copy to clipboard operation
beam copied to clipboard

Optimize Datastream for batch

Open jto opened this issue 1 year ago • 0 comments

This PR contains several optimizations for the Datastream API when used in Batch mode. In it's current state, using Datastream for batch is much slower than Dataset, this is an attempt to get it to the same performance level.

The following optimizations are implemented:

Same optimisation as https://github.com/apache/beam/pull/28045 for Datastream.

It has the same benefits with Datastream as with Dataset (up to 20% speedup). I discovered the patch was also necessary for Datastream while migrating existing workflows from dataset to datastream by passing --useDataStreamForBatch.

Use a lazy enumerator for bounded IOs reads

The existing enumerator eagerly distributes splits to workers. When splits are not all equal in size, the distribution causes a lot of skew. The new implementation is mimicking the behaviours of Flink's StaticFileSplitEnumerator where splits are lazily distributed to workers as they are consumed which results in better load balancing.

Set the serializer on Bounded reads.

For some reason serializer was not set on Bounded reads.

TODO

Fix BQ IO issue

BQ writes do not behave the same with Datastream and garbage collection is much much slower. In dataset the IO will create 1 temp file per worker, this is not true with Datastream where it creates a lot (20x) more files.

Fix double encoding of window in GBK and CombinePerKey

Before shuffle KV are converted to KeyedWorkItem, however the actual stream type is: DataStream<WindowedValue<KeyedWorkItem<K, byte[]>>> Both KeyedWorkItem and WindowedValue serialize the window. Since the conversion happens before keyBy (shuffle), the duplication directly results in network overhead.

I tried the simplest fix: move conversion after keyBy but WindowDoFnOperator needs the stream it transforms be keyed so turning ToBinaryKeyedWorkItem -> keyBy -> transform(doFnOperator) into keyBy -> ToBinaryKeyedWorkItem -> transform(doFnOperator) is not possible.

I also tried a similar fix using reinterpretAsKeyedStream to avoid this problem. The chain becomes: ToBinaryKV -> keyBy -> ToKeyedWorkItem -> reinterpretAsKeyedStream -> transform(doFnOperator) but reinterpretAsKeyedStream breaks operator chaining between ToBinaryKeyedWorkItem and the following operator which degrades performances even more.

The best fix would be to not need KeyedWorkItem but that'd be a large change in Beam.

Missing pre-shuffle combine on redure operator

The Dataset translation will translate reduce into a partial reduce -> shuffle -> reduce. The Datastream translator is missing this optimization which make reduce operations much slower.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

jto avatar Jul 23 '24 08:07 jto