beam icon indicating copy to clipboard operation
beam copied to clipboard

Improved pipeline translation in SparkStructuredStreamingRunner

Open mosche opened this issue 2 years ago • 28 comments

Improved pipeline translation in SparkStructuredStreamingRunner (closes #22445, #22382):

  • Make use of Spark Encoders to leverage structural information in translation (and potentially benefit from Catalyst optimizer). Though note, the possible benefit is limited as every ParDo is a black box and a hard boundary for anything that could be optimized.
  • Improved translation of GroupByKey. When applicable, group also by window to better scale out and/or use Spark native collect_list to collect values of group.
  • Make use of specialised Spark Aggregators for combine (per key / globally), particularly Sessions can be improved significantly.
  • Dedicated translation for Combine.Globally to avoid additional shuffle of data.
  • Remove additional serialization roundtrip when reading from a Beam BoundedSource.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [x] Choose reviewer(s) and mention them in a comment (R: @username).
  • [x] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [x] Update CHANGES.md with noteworthy changes.
  • [x] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests

See CI.md for more information about GitHub Actions CI.

mosche avatar Jul 26 '22 09:07 mosche

R: @aromanenko-dev R: @echauchot

mosche avatar Jul 26 '22 09:07 mosche

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

github-actions[bot] avatar Jul 26 '22 11:07 github-actions[bot]

ping @echauchot @aromanenko-dev 😀

mosche avatar Aug 02 '22 07:08 mosche

Run Spark ValidatesRunner

aromanenko-dev avatar Aug 10 '22 14:08 aromanenko-dev

Run Spark StructuredStreaming ValidatesRunner

aromanenko-dev avatar Aug 10 '22 14:08 aromanenko-dev

results @aromanenko-dev

mosche avatar Aug 11 '22 08:08 mosche

@echauchot Kind ping :)

mosche avatar Sep 05 '22 07:09 mosche

@echauchot @aromanenko-dev Btw, I was thinking a bit about a better name for this runner. I'd suggest to rename it to SparkSqlRunner taking into account it's build on top of the Spark SQL module:

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.

https://www.databricks.com/glossary/what-is-spark-sql

mosche avatar Sep 14 '22 14:09 mosche

@mosche I'm totally agree that the current name is not very practical in a way that it's quite long and, even worse, very confusing since it contains a Streaming word in its name but this runner doesn't support streaming mode at all (we know the reasons but it is what it is).

So, it would be better to rename it, though, I'm not sure about SparkSqlRunner as a new name. IMHO, it may be also confusing and give some false expectations that it supports only Spark (or Beam?) SQL pipelines.

I'd suggest the name SparkDatasetRunner since it's based on Spark Dataset API. This name is quite short and gives the basic idea of what to expect from this runner. Old runner could be called SparkRDDRunner but let's keep it as it is - just SparkRunner.

On the other hand, this renaming will require many incompatible changes, starting from new packages and artifacts names. However, I'm pretty sure that the most users, that run Beam pipelines on Spark, still use the old classical Spark(RDD)Runner. We can check it out on user@ and twitter, if needed.

aromanenko-dev avatar Sep 14 '22 15:09 aromanenko-dev

I agree, that leaves room for potential new confusion. Giving this a 2nd thought I suppose you're right and SparkDatasetRunner is the better name with less ambiguity ... nevertheless it's a rather technical name which i'd usually rather avoid.

Regarding the rename or any other incompatible changes I'm personally fairly relaxed at this stage:

  • it's clearly marked as experimental and such changes are to be expected
  • the runner isn't optimised in any way yet, so there's little to no reason to use an experimental runner over a proven existing one (besides that, there's potential scalability issues that make me doubt a bit it would work well on decent sized datasets)
  • users typically don't (and probably shouldn't) interact with runner packages / classes (the metrics sink might be the only exception)
  • and last, in case it is used, changing the runner name is trivial enough... There could be a dummy runner with the old name that calls out to the new one and asks users to change their configuration ....

mosche avatar Sep 14 '22 15:09 mosche

@echauchot Will you be able to review this? Otherwise I'd suggest to merge this to not further block follow ups. Looking forward to your feedback.

mosche avatar Sep 15 '22 06:09 mosche

Run Java PreCommit

aromanenko-dev avatar Sep 15 '22 08:09 aromanenko-dev

Run Spark ValidatesRunner

aromanenko-dev avatar Sep 15 '22 08:09 aromanenko-dev

Run Spark StructuredStreaming ValidatesRunner

aromanenko-dev avatar Sep 15 '22 08:09 aromanenko-dev

I took a glance on this change and LGTM for me. Taking into account that this PR really improves the performance of some transforms while running it on Spark (according to Nexmark results), I believe we have to merge it once all tests will be green.

I would like to review this before merging but it is very long and I'm stuck on other thinks. I'll try my best to take a look ASAP

echauchot avatar Sep 15 '22 08:09 echauchot

I agree, that leaves room for potential new confusion. Giving this a 2nd thought I suppose you're right and SparkDatasetRunner is the better name with less ambiguity ... nevertheless it's a rather technical name which i'd usually rather avoid.

Regarding the rename or any other incompatible changes I'm personally fairly relaxed at this stage:

  • it's clearly marked as experimental and such changes are to be expected
  • the runner isn't optimised in any way yet, so there's little to no reason to use an experimental runner over a proven existing one (besides that, there's potential scalability issues that make me doubt a bit it would work well on decent sized datasets)
  • users typically don't (and probably shouldn't) interact with runner packages / classes (the metrics sink might be the only exception)
  • and last, in case it is used, changing the runner name is trivial enough... There could be a dummy runner with the old name that calls out to the new one and asks users to change their configuration ....

I agree, the name needs to change. I also agree with @aromanenko-dev SparkSQLRunner is confusing. I agree on the proposal of SparkDatasetRunner

echauchot avatar Sep 15 '22 08:09 echauchot

@mosche reviewing ... cc: @aromanenko-dev

echauchot avatar Sep 15 '22 12:09 echauchot

@mosche: did you rebase this PR on top of the previous merged code about the Encoders? I have the impression it contains the same changes ?

echauchot avatar Sep 15 '22 12:09 echauchot

There was no such PR yet @echauchot ... maybe you already had a look at that code on the branch

mosche avatar Sep 15 '22 12:09 mosche

oh, I remember ... you mean this one https://github.com/apache/beam/pull/22157/? Yes, that's rebased ... but obviously this one here contains lots of changes to encoders to use encoders that are aware of the structure rather than just using binary encoders.

mosche avatar Sep 15 '22 12:09 mosche

oh, I remember ... you mean this one #22157? Yes, that's rebased ... but obviously this one here contains lots of changes to encoders to use encoders that are aware of the structure rather than just using binary encoders.

Yes I meant #22157. Ok so my intuition was incorrect: the encoders changes are not the same as in #22157.

echauchot avatar Sep 15 '22 12:09 echauchot

results @aromanenko-dev

I think you should also run the TPCDS suite on this PR (ask @aromanenko-dev ) because when we compared the 2 spark runners in the past we've seen big differences between nexmark and tpcds suites (nexmark was slighly in fravor of dataset runner for some queries but tpcds was way in favor of RDD runner for almost all queries).

echauchot avatar Sep 16 '22 08:09 echauchot

We can run it on Jenkins against this PR, if needed.

aromanenko-dev avatar Sep 16 '22 08:09 aromanenko-dev

Run Spark StructuredStreaming ValidatesRunner

mosche avatar Sep 19 '22 09:09 mosche

Run Spark ValidatesRunner

mosche avatar Sep 19 '22 09:09 mosche

Run Spark ValidatesRunner

mosche avatar Sep 19 '22 12:09 mosche

We can run it on Jenkins against this PR, if needed.

@mosche did you manage to run TPCDS suite on this PR ?

echauchot avatar Sep 19 '22 14:09 echauchot

I see that Nexmark query 5 and 7 have improved quite a lot. They are mainly based on combiners and windows. Nice !

echauchot avatar Sep 20 '22 14:09 echauchot

Alternatively you could run the load tests for combiners and GBK available in sdk/testing they are per transform

echauchot avatar Sep 22 '22 05:09 echauchot

@echauchot As said I don't see the value of spending more time on load tests / benchmark at this point. Correctness is tested by the VR tests. One thing at a time

mosche avatar Sep 22 '22 06:09 mosche