beam
beam copied to clipboard
Improved pipeline translation in SparkStructuredStreamingRunner
Improved pipeline translation in SparkStructuredStreamingRunner
(closes #22445, #22382):
- Make use of Spark
Encoder
s to leverage structural information in translation (and potentially benefit from Catalyst optimizer). Though note, the possible benefit is limited as everyParDo
is a black box and a hard boundary for anything that could be optimized. - Improved translation of
GroupByKey
. When applicable, group also by window to better scale out and/or use Spark nativecollect_list
to collect values of group. - Make use of specialised Spark
Aggregator
s for combine (per key / globally), particularlySessions
can be improved significantly. - Dedicated translation for
Combine.Globally
to avoid additional shuffle of data. - Remove additional serialization roundtrip when reading from a Beam
BoundedSource
.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [x] Choose reviewer(s) and mention them in a comment (
R: @username
). - [x] Mention the appropriate issue in your description (for example:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead. - [x] Update
CHANGES.md
with noteworthy changes. - [x] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.
R: @aromanenko-dev R: @echauchot
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
ping @echauchot @aromanenko-dev 😀
Run Spark ValidatesRunner
Run Spark StructuredStreaming ValidatesRunner
@aromanenko-dev
@echauchot Kind ping :)
@echauchot @aromanenko-dev Btw, I was thinking a bit about a better name for this runner. I'd suggest to rename it to SparkSqlRunner
taking into account it's build on top of the Spark SQL module:
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.
https://www.databricks.com/glossary/what-is-spark-sql
@mosche I'm totally agree that the current name is not very practical in a way that it's quite long and, even worse, very confusing since it contains a Streaming
word in its name but this runner doesn't support streaming mode at all (we know the reasons but it is what it is).
So, it would be better to rename it, though, I'm not sure about SparkSqlRunner
as a new name. IMHO, it may be also confusing and give some false expectations that it supports only Spark (or Beam?) SQL pipelines.
I'd suggest the name SparkDatasetRunner
since it's based on Spark Dataset API. This name is quite short and gives the basic idea of what to expect from this runner. Old runner could be called SparkRDDRunner
but let's keep it as it is - just SparkRunner
.
On the other hand, this renaming will require many incompatible changes, starting from new packages and artifacts names. However, I'm pretty sure that the most users, that run Beam pipelines on Spark, still use the old classical Spark(RDD)Runner. We can check it out on user@ and twitter, if needed.
I agree, that leaves room for potential new confusion. Giving this a 2nd thought I suppose you're right and SparkDatasetRunner
is the better name with less ambiguity ... nevertheless it's a rather technical name which i'd usually rather avoid.
Regarding the rename or any other incompatible changes I'm personally fairly relaxed at this stage:
- it's clearly marked as experimental and such changes are to be expected
- the runner isn't optimised in any way yet, so there's little to no reason to use an experimental runner over a proven existing one (besides that, there's potential scalability issues that make me doubt a bit it would work well on decent sized datasets)
- users typically don't (and probably shouldn't) interact with runner packages / classes (the metrics sink might be the only exception)
- and last, in case it is used, changing the runner name is trivial enough... There could be a dummy runner with the old name that calls out to the new one and asks users to change their configuration ....
@echauchot Will you be able to review this? Otherwise I'd suggest to merge this to not further block follow ups. Looking forward to your feedback.
Run Java PreCommit
Run Spark ValidatesRunner
Run Spark StructuredStreaming ValidatesRunner
I took a glance on this change and LGTM for me. Taking into account that this PR really improves the performance of some transforms while running it on Spark (according to Nexmark results), I believe we have to merge it once all tests will be green.
I would like to review this before merging but it is very long and I'm stuck on other thinks. I'll try my best to take a look ASAP
I agree, that leaves room for potential new confusion. Giving this a 2nd thought I suppose you're right and
SparkDatasetRunner
is the better name with less ambiguity ... nevertheless it's a rather technical name which i'd usually rather avoid.Regarding the rename or any other incompatible changes I'm personally fairly relaxed at this stage:
- it's clearly marked as experimental and such changes are to be expected
- the runner isn't optimised in any way yet, so there's little to no reason to use an experimental runner over a proven existing one (besides that, there's potential scalability issues that make me doubt a bit it would work well on decent sized datasets)
- users typically don't (and probably shouldn't) interact with runner packages / classes (the metrics sink might be the only exception)
- and last, in case it is used, changing the runner name is trivial enough... There could be a dummy runner with the old name that calls out to the new one and asks users to change their configuration ....
I agree, the name needs to change. I also agree with @aromanenko-dev SparkSQLRunner is confusing. I agree on the proposal of SparkDatasetRunner
@mosche reviewing ... cc: @aromanenko-dev
@mosche: did you rebase this PR on top of the previous merged code about the Encoders? I have the impression it contains the same changes ?
There was no such PR yet @echauchot ... maybe you already had a look at that code on the branch
oh, I remember ... you mean this one https://github.com/apache/beam/pull/22157/? Yes, that's rebased ... but obviously this one here contains lots of changes to encoders to use encoders that are aware of the structure rather than just using binary encoders.
oh, I remember ... you mean this one #22157? Yes, that's rebased ... but obviously this one here contains lots of changes to encoders to use encoders that are aware of the structure rather than just using binary encoders.
Yes I meant #22157. Ok so my intuition was incorrect: the encoders changes are not the same as in #22157.
@aromanenko-dev
I think you should also run the TPCDS suite on this PR (ask @aromanenko-dev ) because when we compared the 2 spark runners in the past we've seen big differences between nexmark and tpcds suites (nexmark was slighly in fravor of dataset runner for some queries but tpcds was way in favor of RDD runner for almost all queries).
We can run it on Jenkins against this PR, if needed.
Run Spark StructuredStreaming ValidatesRunner
Run Spark ValidatesRunner
Run Spark ValidatesRunner
We can run it on Jenkins against this PR, if needed.
@mosche did you manage to run TPCDS suite on this PR ?
I see that Nexmark query 5 and 7 have improved quite a lot. They are mainly based on combiners and windows. Nice !
Alternatively you could run the load tests for combiners and GBK available in sdk/testing they are per transform
@echauchot As said I don't see the value of spending more time on load tests / benchmark at this point. Correctness is tested by the VR tests. One thing at a time