beam
beam copied to clipboard
Adds support in Samza Runner to run DoFn.processElement in parallel inside Samza tasks
This patch adds a thread pool in the DoFnRunner of Samza runner to execute DoFn.ProcessElement in parallel within a Samza task. This allows greater parallelism beyond a single task/partition. Users can specify the number of threads in the ExecutorService, and also be allowed to provide customized executor service. Under the hood, we provide another layer of DoFnRunner to parallelize the calls and pass the futures down to the Samza async engine.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Choose reviewer(s) and mention them in a comment (
R: @username
). - [ ] Mention the appropriate issue in your description (for example:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead. - [ ] Update
CHANGES.md
with noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.
@mynameborat : pls take a look
Does this PR remove the limitation of
job.container.thread.pool.size > 1
andmax.bundle.size >
existing together?If not, then the place where we set
setNumThreadsForProcessElement
to > 1 should throw validation exception right?
To enable this feature, I used the following ways:
-
numThreadsForProcessElement
: this is the user-facing SamzaPipelineOption, which can be set by typical beam users. -
job.container.thread.pool.size
: for legacy samza users, since they are familiar with this Samza config, I use this samza config to set numThreadsForProcessElement option. Afterwards the config is removed so that it won't confuse the samza engine to run the thread pool in the task.
I still keep the validation of checking job.container.thread.pool.size > 1
and max.bundle.size > 1
there since the above logic will override the job.container.thread.pool.size
to empty. So this condition will happen only if some user is manipulating the config after the config is generated.
Does this make sense to you? We can chat offline if needed.