spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-49383][SQL][PYTHON][CONNECT] Support Transpose DataFrame API

Open xinrong-meng opened this issue 1 year ago • 1 comments

What changes were proposed in this pull request?

The PR is proposed to support Transpose as Scala/Python DataFrame API in both Spark Connect and Classic Spark.

Please see https://docs.google.com/document/d/1QSmG81qQ-muab0UOeqgDAELqF7fJTH8GnxCJF4Ir-kA/edit for a detailed design.

Why are the changes needed?

Transposing data is a crucial operation in data analysis, enabling the transformation of rows into columns. This operation is widely used in tools like pandas and numpy, allowing for more flexible data manipulation and visualization.

While Apache Spark supports unpivot and pivot operations, it currently lacks a built-in transpose function. Implementing a transpose operation in Spark would enhance its data processing capabilities, aligning it with the functionalities available in pandas and numpy, and further empowering users in their data analysis workflows.

Does this PR introduce any user-facing change?

Yes Transpose is supported.

Scala

scala> df.show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  x|  y|  z|
+---+---+---+

scala> df.transpose().show()
+---+---+
|key|  x|
+---+---+
|  b|  y|
|  c|  z|
+---+---+

scala> df.transpose().explain(true)
== Parsed Logical Plan ==
'UnresolvedTranspose a#48: string
+- LocalRelation [a#48, b#49, c#50]

== Analyzed Logical Plan ==
key: string, x: string
Transpose [key#83, x#84], [[b,y], [c,z]], true

== Optimized Logical Plan ==
LocalRelation [key#83, x#84]

== Physical Plan ==
LocalTableScan [key#83, x#84]

scala> df.transpose($"b").show()
+---+---+
|key|  y|
+---+---+
|  a|  x|
|  c|  z|
+---+---+

Python

>>> df.show()
+---+---+---+                                                                   
|  a|  b|  c|
+---+---+---+
|  x|  y|  z|
+---+---+---+

>>> df.transpose().show()
+---+---+
|key|  x|
+---+---+
|  b|  y|
|  c|  z|
+---+---+
>>> df.transpose(df.b).show()
+---+---+
|key|  y|
+---+---+
|  a|  x|
|  c|  z|
+---+---+

How was this patch tested?

Spark Connect

  • Python
    • doctest
    • module: python.pyspark.sql.tests.connect.test_parity_dataframe
    • case: test_transpose
  • Proto
    • suite: org.apache.spark.sql.PlanGenerationTestSuite
    • case: transpose index_column transpose no_index_colum,

Spark Classic

  • Python
    • doctest
    • module: python.pyspark.sql.tests.test_dataframe
    • case: test_transpose
  • Scala
    • suite: org.apache.spark.sql.DataFrameTransposeSuite
    • case: all

Was this patch authored or co-authored using generative AI tooling?

No.

xinrong-meng avatar Aug 27 '24 06:08 xinrong-meng

Thank you @zhengruifeng for attention on test cases!

  • Added "transpose frame with duplicates in index column" for 1.
  • Added "transpose frame with nulls in index column" for 2.
  • 3 and 4 are covered by the existing tests "enforce least common type for non-index columns" and "enforce transpose max values".

xinrong-meng avatar Aug 28 '24 06:08 xinrong-meng

Btw, I can't run test queries in my local classic PySpark for some reason.

$ ./bin/pyspark
>>> df = spark.createDataFrame([{"a": "x", "b": "y", "c": "z"}])
>>> df.transpose().show()
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o56.transpose.
: org.apache.spark.SparkException: [INTERNAL_ERROR] Found the unresolved operator: 'UnresolvedTranspose a#0: string SQLSTATE: XX000
...

whereas Spark Connect seems fine.

$ ./bin/pyspark --remote local
>>> df = spark.createDataFrame([{"a": "x", "b": "y", "c": "z"}])
>>> df.transpose().show()
+---+---+
|key|  x|
+---+---+
|  b|  y|
|  c|  z|
+---+---+

Do you see this in your local?

ueshin avatar Aug 28 '24 19:08 ueshin

I overrode the commit history and forced push to merge master, sorry for the confusion when review

xinrong-meng avatar Aug 29 '24 01:08 xinrong-meng

I verified that transpose works on Spark Connect Scala Client as

scala> spark.range(2) 
res0: org.apache.spark.sql.Dataset[java.lang.Long] = [id: bigint]

scala> spark.range(2).show() 
+---+
| id|
+---+
|  0|
|  1|
+---+


scala> spark.range(2).transpose().show() 
+---+---+---+
|key|  0|  1|
+---+---+---+
+---+---+---+

scala> spark.range(2).transpose().explain(true) 
== Parsed Logical Plan ==
'UnresolvedTranspose id#38: bigint
+- Range (0, 2, step=1, splits=Some(10))

== Analyzed Logical Plan ==
key: string, 0: string, 1: string
Transpose [key#40, 0#41, 1#42], false

== Optimized Logical Plan ==
LocalRelation <empty>, [key#40, 0#41, 1#42]

== Physical Plan ==
LocalTableScan <empty>, [key#40, 0#41, 1#42]

scala> sc 
cmd3.sc:1: not found: value sc
val res3 = sc
           ^
Compilation Failed

However, it failed test suite org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite by

[info] - transpose_no_index_colomn *** FAILED *** (6 milliseconds)
[info]  org.apache.spark.SparkException: [INTERNAL_ERROR] Found the unresolved operator: 'UnresolvedTranspose id#431: bigint SQLSTATE: XX000

My guess is that the way we launch a SparkSession in the test suite doesn't mimic exactly how we launch bin/spark-shell. Can we skip the test suite in the PR and fix the test in a follow-up?

xinrong-meng avatar Sep 02 '24 02:09 xinrong-meng

cc @HyukjinKwon for the question above.

cloud-fan avatar Sep 02 '24 13:09 cloud-fan

I noticed that a new Analyzer is created in the test suite ProtoToParsedPlanTestSuite, see here.

This is why it didn’t recognize the rule added in BaseSessionStateBuilder, see here.

I don’t see a simple fix at the moment. When I tried overriding extendedResolutionRules in the new Analyzer of the test suite, the SparkSession was null, leading to:

[info]   java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.SparkSession.sessionState()" because "this.$outer.org$apache$spark$sql$catalyst$analysis$ResolveTranspose$$sparkSession" is null

I propose skipping the test suite in this PR and will file a follow-up for the fix.

WDYT @HyukjinKwon @cloud-fan @zhengruifeng @hvanhovell ?

xinrong-meng avatar Sep 04 '24 06:09 xinrong-meng

Yeah, I think it's fine to skip it for now.

HyukjinKwon avatar Sep 04 '24 07:09 HyukjinKwon

@xinrong-meng why is this rule not added to the Analyzer by default?

hvanhovell avatar Sep 04 '24 13:09 hvanhovell

@hvanhovell this rule needs to collect results and access SparkSession, which is not available in the catalyst module.

cloud-fan avatar Sep 04 '24 17:09 cloud-fan

Hi @hvanhovell thanks for attention!

As @cloud-fan said we need to "collect" in Analyzer, which requires a SparkSession, and BaseSessionStateBuilder enables that. There are three more existing rules sharing the same situation see here.

xinrong-meng avatar Sep 04 '24 23:09 xinrong-meng

Irrelevant tests ClientStreamingQuerySuite and FlatMapGroupsWithStateStreamingSuite failed, retrigger tests.

xinrong-meng avatar Sep 05 '24 04:09 xinrong-meng

Rebased for irrelevant failed tests.

xinrong-meng avatar Sep 05 '24 05:09 xinrong-meng

Irrelevant MySQLOverMariaConnectorIntegrationSuite failed, retriggering test.

xinrong-meng avatar Sep 06 '24 02:09 xinrong-meng

thanks, merging to master!

cloud-fan avatar Sep 06 '24 04:09 cloud-fan

Thank you so much @cloud-fan for review and guidance! Thank you all!

xinrong-meng avatar Sep 06 '24 06:09 xinrong-meng