[SPARK-47336][SQL][CONNECT] Provide to PySpark a functionality to get estimated size of DataFrame in bytes
What changes were proposed in this pull request?
In PySpark connect there is no access to JVM to call queryExecution().optimizedPlan.stats. So, there is no way to get information about size in bytes from plan except parsing by regexps an output of explain. This PR is trying to fill that gap by providing sizeInBytesApproximation method to JVM, PySpark Classic and PySpark Connect APIs. Under the hood it is just a call to queryExecution().optimizedPlan.stats.sizeInBytes. JVM and PySpark Classic APIs were updated just to have a parity.
- Update of
Dataset.scalain JVM connect by adding a new API - Update of
Dataset.scalain JVM classic by adding a new API - Update
dataframe.pyin sql by adding signature and doc of a new API - Update
dataframe.pyin connect by adding an implementation of a new API - Update
dataframe.pyin classic by adding an implementation of a new API - Update
base.protoin partAnalyzeRequest/AnalyzeResponseby adding new message - Generate new py-files from proto
- Update
SparkConnectAnalyzeHandlerby extendingmatchand adding call toqueryExecution - Update
SparkConnectClientby adding a new method that build a new request - Update
SparkSessionby adding a call to client and parsing a response - Add/update corresponding tests
Why are the changes needed?
To provide to PySpark Connect users an ability to get in runtime the DataFrame size estimation without forcing them to parse string-output of df.explain. Other changes are needed to have a parity across Connect / Classic and PySpark / JVM Spark.
Does this PR introduce any user-facing change?
Only a new API. The new API is mostly for PySpark Connect users.
How was this patch tested?
Because the actual logic is in queryExecution I added tests only for syntax / calls. In tests we are testing that for a dataframe the returned size is greater than zero.
Was this patch authored or co-authored using generative AI tooling?
No.
@grundprinzip We discussed that ticket with you, may you please make a look? Thanks!
New changes:
- fixes from comments
- changing the type from Long to BigInteger (
bytesin proto)
@HyukjinKwon sorry for tagging, but may you please make a look again? Thanks in advance!
Changes from the last two commits (actual changes marked by bold):
- resolve merge conflicts
- re-generate proto files for PySpark
- update docstring in
dataframe.py: fix a typo and extend it by describing how it works and corner-cases
@HyukjinKwon I'm sorry for tagging you again, but maybe you can make a look? Thanks in advance!
@HyukjinKwon @zhengruifeng Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!
@HyukjinKwon @zhengruifeng Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!
@SemyonSinchenko thanks for the contribution! I have two high-level questions:
- How do you use this sizeInBytes? Is it accurate in your workload?
- Shall we expose more stats like numRows?
@SemyonSinchenko thanks for the contribution! I have two high-level questions:
- How do you use this sizeInBytes? Is it accurate in your workload?
- Shall we expose more stats like numRows?
@cloud-fan Thank you for the comment!
The main usage of that is for library developers / developers of the reusable spark code. If one wants to use, for example, a broadcast hint inside the library code / reusable code without changing a thresholds globally the only way to get an estimated size of the data is from the plan. But in Connect there is no way to call queryExecution().optimizedPlan.stats because there is no JavaBridge. So, at the moment, Connect devs can get this information only from the parsing of the string-representation of the plan via regexps. But this way is very unstable and fragile to versions update, because obviously there are zero guarantees that the string representation of the spark plan won't change in the future versions. This estimation is very inaccurate because it is an upper bound. But upper bound is perfect for that estimation, because devs has a guarantee that the size of the data is not more than the output of that function.
As an example I can point to an implementation of one of the databricks libs, named tempo: code-example. But it is only an example: I have no relation to databricks, I'm not a contributor of tempo. We are using the similar techniques internally at the moment.
More stats can be exposed, but actually I do not see usage of the numRows estimation, for example. The most usable is size in bytes estimation because it can be used for an estimation of the possibility of the collect, broadcast hints, etc.
@HyukjinKwon @zhengruifeng @cloud-fan Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!
@HyukjinKwon @zhengruifeng @cloud-fan Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!
@HyukjinKwon @zhengruifeng @cloud-fan Sorry for tagging but maybe you can take a look again? I fixed everything from the last review round... Tnx in advance!
@SemyonSinchenko My major concern is whether we need this API:
IMHO, the estimated size is not precise enough to be added as a dataset/dataframe API, I view queryExecution().optimizedPlan.stats.sizeInBytes as kind of developer API only for advanced users and developers.
@zhengruifeng You are absolutely right that such an API is for library developers, not end users. It is fine to call queryExecution().optimizedPlan.stats.sizeInBytes from a PySpark Classic via py4j, but the problem is that for Spark Connect it is impossible. Size in bytes is very important if you want to choose between broadcast/collect and distributed processing depends on the upper bound of the size estimation. Otherwise, PySpark Connect library developers are forced to parse the string representation of the plan... I added this to the Classic API with the sole purpose of providing parity and I can remove it from Classic at all if you want.
Is there any other way to achieve it? For example, I can make the method underscored in PySpark. Or I can extend documentation and directly add a note that this information is actually an upper bound, not a true value. What do you think about it?
@SemyonSinchenko ExecutionInfo was recently added to expose some query execution metrics.
Not sure whether it is the right place to add the estimated size. cc @grundprinzip