spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-47336][SQL][CONNECT] Provide to PySpark a functionality to get estimated size of DataFrame in bytes

Open SemyonSinchenko opened this issue 1 year ago • 14 comments

What changes were proposed in this pull request?

In PySpark connect there is no access to JVM to call queryExecution().optimizedPlan.stats. So, there is no way to get information about size in bytes from plan except parsing by regexps an output of explain. This PR is trying to fill that gap by providing sizeInBytesApproximation method to JVM, PySpark Classic and PySpark Connect APIs. Under the hood it is just a call to queryExecution().optimizedPlan.stats.sizeInBytes. JVM and PySpark Classic APIs were updated just to have a parity.

  1. Update of Dataset.scala in JVM connect by adding a new API
  2. Update of Dataset.scala in JVM classic by adding a new API
  3. Update dataframe.py in sql by adding signature and doc of a new API
  4. Update dataframe.py in connect by adding an implementation of a new API
  5. Update dataframe.py in classic by adding an implementation of a new API
  6. Update base.proto in part AnalyzeRequest / AnalyzeResponse by adding new message
  7. Generate new py-files from proto
  8. Update SparkConnectAnalyzeHandler by extending match and adding call to queryExecution
  9. Update SparkConnectClient by adding a new method that build a new request
  10. Update SparkSession by adding a call to client and parsing a response
  11. Add/update corresponding tests

Why are the changes needed?

To provide to PySpark Connect users an ability to get in runtime the DataFrame size estimation without forcing them to parse string-output of df.explain. Other changes are needed to have a parity across Connect / Classic and PySpark / JVM Spark.

Does this PR introduce any user-facing change?

Only a new API. The new API is mostly for PySpark Connect users.

How was this patch tested?

Because the actual logic is in queryExecution I added tests only for syntax / calls. In tests we are testing that for a dataframe the returned size is greater than zero.

Was this patch authored or co-authored using generative AI tooling?

No.

@grundprinzip We discussed that ticket with you, may you please make a look? Thanks!

SemyonSinchenko avatar May 03 '24 17:05 SemyonSinchenko

New changes:

  • fixes from comments
  • changing the type from Long to BigInteger (bytes in proto)

SemyonSinchenko avatar May 07 '24 18:05 SemyonSinchenko

@HyukjinKwon sorry for tagging, but may you please make a look again? Thanks in advance!

SemyonSinchenko avatar May 20 '24 11:05 SemyonSinchenko

Changes from the last two commits (actual changes marked by bold):

  • resolve merge conflicts
  • re-generate proto files for PySpark
  • update docstring in dataframe.py: fix a typo and extend it by describing how it works and corner-cases

SemyonSinchenko avatar May 28 '24 16:05 SemyonSinchenko

@HyukjinKwon I'm sorry for tagging you again, but maybe you can make a look? Thanks in advance!

SemyonSinchenko avatar Jun 05 '24 17:06 SemyonSinchenko

@HyukjinKwon @zhengruifeng Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!

SemyonSinchenko avatar Jun 26 '24 19:06 SemyonSinchenko

@HyukjinKwon @zhengruifeng Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!

SemyonSinchenko avatar Jul 27 '24 20:07 SemyonSinchenko

@SemyonSinchenko thanks for the contribution! I have two high-level questions:

  • How do you use this sizeInBytes? Is it accurate in your workload?
  • Shall we expose more stats like numRows?

cloud-fan avatar Jul 30 '24 00:07 cloud-fan

@SemyonSinchenko thanks for the contribution! I have two high-level questions:

  • How do you use this sizeInBytes? Is it accurate in your workload?
  • Shall we expose more stats like numRows?

@cloud-fan Thank you for the comment!

The main usage of that is for library developers / developers of the reusable spark code. If one wants to use, for example, a broadcast hint inside the library code / reusable code without changing a thresholds globally the only way to get an estimated size of the data is from the plan. But in Connect there is no way to call queryExecution().optimizedPlan.stats because there is no JavaBridge. So, at the moment, Connect devs can get this information only from the parsing of the string-representation of the plan via regexps. But this way is very unstable and fragile to versions update, because obviously there are zero guarantees that the string representation of the spark plan won't change in the future versions. This estimation is very inaccurate because it is an upper bound. But upper bound is perfect for that estimation, because devs has a guarantee that the size of the data is not more than the output of that function.

As an example I can point to an implementation of one of the databricks libs, named tempo: code-example. But it is only an example: I have no relation to databricks, I'm not a contributor of tempo. We are using the similar techniques internally at the moment.

More stats can be exposed, but actually I do not see usage of the numRows estimation, for example. The most usable is size in bytes estimation because it can be used for an estimation of the possibility of the collect, broadcast hints, etc.

SemyonSinchenko avatar Jul 30 '24 07:07 SemyonSinchenko

@HyukjinKwon @zhengruifeng @cloud-fan Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!

SemyonSinchenko avatar Aug 21 '24 07:08 SemyonSinchenko

@HyukjinKwon @zhengruifeng @cloud-fan Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!

SemyonSinchenko avatar Sep 01 '24 08:09 SemyonSinchenko

@HyukjinKwon @zhengruifeng @cloud-fan Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!

SemyonSinchenko avatar Oct 08 '24 09:10 SemyonSinchenko

@SemyonSinchenko My major concern is whether we need this API: IMHO, the estimated size is not precise enough to be added as a dataset/dataframe API, I view queryExecution().optimizedPlan.stats.sizeInBytes as kind of developer API only for advanced users and developers.

zhengruifeng avatar Oct 08 '24 12:10 zhengruifeng

@zhengruifeng You are absolutely right that such an API is for library developers, not end users. It is fine to call queryExecution().optimizedPlan.stats.sizeInBytes from a PySpark Classic via py4j, but the problem is that for Spark Connect it is impossible. Size in bytes is very important if you want to choose between broadcast/collect and distributed processing depends on the upper bound of the size estimation. Otherwise, PySpark Connect library developers are forced to parse the string representation of the plan... I added this to the Classic API with the sole purpose of providing parity and I can remove it from Classic at all if you want.

Is there any other way to achieve it? For example, I can make the method underscored in PySpark. Or I can extend documentation and directly add a note that this information is actually an upper bound, not a true value. What do you think about it?

SemyonSinchenko avatar Oct 08 '24 18:10 SemyonSinchenko

@SemyonSinchenko ExecutionInfo was recently added to expose some query execution metrics.

Not sure whether it is the right place to add the estimated size. cc @grundprinzip

zhengruifeng avatar Oct 09 '24 03:10 zhengruifeng