beam icon indicating copy to clipboard operation
beam copied to clipboard

Adds support in Samza Runner to run DoFn.processElement in parallel inside Samza tasks

Open xinyuiscool opened this issue 2 years ago • 1 comments

This patch adds a thread pool in the DoFnRunner of Samza runner to execute DoFn.ProcessElement in parallel within a Samza task. This allows greater parallelism beyond a single task/partition. Users can specify the number of threads in the ExecutorService, and also be allowed to provide customized executor service. Under the hood, we provide another layer of DoFnRunner to parallelize the calls and pass the futures down to the Samza async engine.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Choose reviewer(s) and mention them in a comment (R: @username).
  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI.

xinyuiscool avatar Sep 20 '22 23:09 xinyuiscool

@mynameborat : pls take a look

xinyuiscool avatar Sep 20 '22 23:09 xinyuiscool

Does this PR remove the limitation of job.container.thread.pool.size > 1 and max.bundle.size > existing together?

If not, then the place where we set setNumThreadsForProcessElement to > 1 should throw validation exception right?

To enable this feature, I used the following ways:

  • numThreadsForProcessElement: this is the user-facing SamzaPipelineOption, which can be set by typical beam users.
  • job.container.thread.pool.size: for legacy samza users, since they are familiar with this Samza config, I use this samza config to set numThreadsForProcessElement option. Afterwards the config is removed so that it won't confuse the samza engine to run the thread pool in the task.

I still keep the validation of checking job.container.thread.pool.size > 1 and max.bundle.size > 1 there since the above logic will override the job.container.thread.pool.size to empty. So this condition will happen only if some user is manipulating the config after the config is generated.

Does this make sense to you? We can chat offline if needed.

xinyuiscool avatar Sep 26 '22 23:09 xinyuiscool