[SPARK-49383][SQL][PYTHON][CONNECT] Support Transpose DataFrame API
What changes were proposed in this pull request?
The PR is proposed to support Transpose as Scala/Python DataFrame API in both Spark Connect and Classic Spark.
Please see https://docs.google.com/document/d/1QSmG81qQ-muab0UOeqgDAELqF7fJTH8GnxCJF4Ir-kA/edit for a detailed design.
Why are the changes needed?
Transposing data is a crucial operation in data analysis, enabling the transformation of rows into columns. This operation is widely used in tools like pandas and numpy, allowing for more flexible data manipulation and visualization.
While Apache Spark supports unpivot and pivot operations, it currently lacks a built-in transpose function. Implementing a transpose operation in Spark would enhance its data processing capabilities, aligning it with the functionalities available in pandas and numpy, and further empowering users in their data analysis workflows.
Does this PR introduce any user-facing change?
Yes Transpose is supported.
Scala
scala> df.show()
+---+---+---+
| a| b| c|
+---+---+---+
| x| y| z|
+---+---+---+
scala> df.transpose().show()
+---+---+
|key| x|
+---+---+
| b| y|
| c| z|
+---+---+
scala> df.transpose().explain(true)
== Parsed Logical Plan ==
'UnresolvedTranspose a#48: string
+- LocalRelation [a#48, b#49, c#50]
== Analyzed Logical Plan ==
key: string, x: string
Transpose [key#83, x#84], [[b,y], [c,z]], true
== Optimized Logical Plan ==
LocalRelation [key#83, x#84]
== Physical Plan ==
LocalTableScan [key#83, x#84]
scala> df.transpose($"b").show()
+---+---+
|key| y|
+---+---+
| a| x|
| c| z|
+---+---+
Python
>>> df.show()
+---+---+---+
| a| b| c|
+---+---+---+
| x| y| z|
+---+---+---+
>>> df.transpose().show()
+---+---+
|key| x|
+---+---+
| b| y|
| c| z|
+---+---+
>>> df.transpose(df.b).show()
+---+---+
|key| y|
+---+---+
| a| x|
| c| z|
+---+---+
How was this patch tested?
Spark Connect
- Python
- doctest
- module: python.pyspark.sql.tests.connect.test_parity_dataframe
- case: test_transpose
- Proto
- suite: org.apache.spark.sql.PlanGenerationTestSuite
- case: transpose index_column transpose no_index_colum,
Spark Classic
- Python
- doctest
- module: python.pyspark.sql.tests.test_dataframe
- case: test_transpose
- Scala
- suite: org.apache.spark.sql.DataFrameTransposeSuite
- case: all
Was this patch authored or co-authored using generative AI tooling?
No.
Thank you @zhengruifeng for attention on test cases!
- Added "transpose frame with duplicates in index column" for 1.
- Added "transpose frame with nulls in index column" for 2.
- 3 and 4 are covered by the existing tests "enforce least common type for non-index columns" and "enforce transpose max values".
Btw, I can't run test queries in my local classic PySpark for some reason.
$ ./bin/pyspark
>>> df = spark.createDataFrame([{"a": "x", "b": "y", "c": "z"}])
>>> df.transpose().show()
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o56.transpose.
: org.apache.spark.SparkException: [INTERNAL_ERROR] Found the unresolved operator: 'UnresolvedTranspose a#0: string SQLSTATE: XX000
...
whereas Spark Connect seems fine.
$ ./bin/pyspark --remote local
>>> df = spark.createDataFrame([{"a": "x", "b": "y", "c": "z"}])
>>> df.transpose().show()
+---+---+
|key| x|
+---+---+
| b| y|
| c| z|
+---+---+
Do you see this in your local?
I overrode the commit history and forced push to merge master, sorry for the confusion when review
I verified that transpose works on Spark Connect Scala Client as
scala> spark.range(2)
res0: org.apache.spark.sql.Dataset[java.lang.Long] = [id: bigint]
scala> spark.range(2).show()
+---+
| id|
+---+
| 0|
| 1|
+---+
scala> spark.range(2).transpose().show()
+---+---+---+
|key| 0| 1|
+---+---+---+
+---+---+---+
scala> spark.range(2).transpose().explain(true)
== Parsed Logical Plan ==
'UnresolvedTranspose id#38: bigint
+- Range (0, 2, step=1, splits=Some(10))
== Analyzed Logical Plan ==
key: string, 0: string, 1: string
Transpose [key#40, 0#41, 1#42], false
== Optimized Logical Plan ==
LocalRelation <empty>, [key#40, 0#41, 1#42]
== Physical Plan ==
LocalTableScan <empty>, [key#40, 0#41, 1#42]
scala> sc
cmd3.sc:1: not found: value sc
val res3 = sc
^
Compilation Failed
However, it failed test suite org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite by
[info] - transpose_no_index_colomn *** FAILED *** (6 milliseconds)
[info] org.apache.spark.SparkException: [INTERNAL_ERROR] Found the unresolved operator: 'UnresolvedTranspose id#431: bigint SQLSTATE: XX000
My guess is that the way we launch a SparkSession in the test suite doesn't mimic exactly how we launch bin/spark-shell. Can we skip the test suite in the PR and fix the test in a follow-up?
cc @HyukjinKwon for the question above.
I noticed that a new Analyzer is created in the test suite ProtoToParsedPlanTestSuite, see here.
This is why it didn’t recognize the rule added in BaseSessionStateBuilder, see here.
I don’t see a simple fix at the moment. When I tried overriding extendedResolutionRules in the new Analyzer of the test suite, the SparkSession was null, leading to:
[info] java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.SparkSession.sessionState()" because "this.$outer.org$apache$spark$sql$catalyst$analysis$ResolveTranspose$$sparkSession" is null
I propose skipping the test suite in this PR and will file a follow-up for the fix.
WDYT @HyukjinKwon @cloud-fan @zhengruifeng @hvanhovell ?
Yeah, I think it's fine to skip it for now.
@xinrong-meng why is this rule not added to the Analyzer by default?
@hvanhovell this rule needs to collect results and access SparkSession, which is not available in the catalyst module.
Hi @hvanhovell thanks for attention!
As @cloud-fan said we need to "collect" in Analyzer, which requires a SparkSession, and BaseSessionStateBuilder enables that. There are three more existing rules sharing the same situation see here.
Irrelevant tests ClientStreamingQuerySuite and FlatMapGroupsWithStateStreamingSuite failed, retrigger tests.
Rebased for irrelevant failed tests.
Irrelevant MySQLOverMariaConnectorIntegrationSuite failed, retriggering test.
thanks, merging to master!
Thank you so much @cloud-fan for review and guidance! Thank you all!