spark
spark copied to clipboard
[SPARK-40311][SQL][PYTHON] Add withColumnsRenamed to scala and pyspark API
What changes were proposed in this pull request?
This change adds an ability for code to rename multiple columns in a single call. Scala:
withColumnsRenamed(colsMap: Map[String, String]): DataFrame
Java:
withColumnsRenamed(colsMap: java.util.Map[String, String]): DataFrame
Python:
withColumnsRenamed(self, *colsMap: Dict[str, Column]) -> "DataFrame"
Why are the changes needed?
We have seen that catalyst optimiser struggles with bigger plans. The larger contribution to these plans in our setup comes from withColumnRenamed, drop and withColumn being called in for loop by unknowing users. master branch of spark already has a version for handling withColumns and drop for multiple columns. The missing bit of the puzzle is withColumnRenamed.
With large amount of columns, either JVM gets killed or StackOverflowError occurs. I am skipping those for the following benchmark and focus on number of columns which work in both old and new implementation. Following example shows the performance impact with 100 columns.: Old fashioned with 100 columns
import datetime
import numpy as np
import pandas as pd
num_rows = 2
num_columns = 100
data = np.zeros((num_rows, num_columns))
columns = map(str, range(num_columns))
raw = spark.createDataFrame(pd.DataFrame(data, columns=columns))
a = datetime.datetime.now()
for col in raw.columns:
raw = raw.withColumnRenamed(col, f"prefix_{col}")
b = datetime.datetime.now()
for col in raw.columns:
raw = raw.withColumnRenamed(col, f"prefix_{col}")
c = datetime.datetime.now()
for col in raw.columns:
raw = raw.withColumnRenamed(col, f"prefix_{col}")
d = datetime.datetime.now()
for col in raw.columns:
raw = raw.withColumnRenamed(col, f"prefix_{col}")
e = datetime.datetime.now()
for col in raw.columns:
raw = raw.withColumnRenamed(col, f"prefix_{col}")
f = datetime.datetime.now()
for col in raw.columns:
raw = raw.withColumnRenamed(col, f"prefix_{col}")
g = datetime.datetime.now()
g-a
datetime.timedelta(seconds=12, microseconds=480021)
New implementation with 100 columns
import datetime
import numpy as np
import pandas as pd
num_rows = 2
num_columns = 100
data = np.zeros((num_rows, num_columns))
columns = map(str, range(num_columns))
raw = spark.createDataFrame(pd.DataFrame(data, columns=columns))
a = datetime.datetime.now()
raw = raw.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns})
b = datetime.datetime.now()
raw = raw.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns})
c = datetime.datetime.now()
raw = raw.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns})
d = datetime.datetime.now()
raw = raw.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns})
e = datetime.datetime.now()
raw = raw.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns})
f = datetime.datetime.now()
raw = raw.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns})
g = datetime.datetime.now()
g-a
datetime.timedelta(microseconds=210400)
Does this PR introduce any user-facing change?
Yes, adds a method to efficiently rename columns in a single batch.
How was this patch tested?
Added unit tests
Can one of the admins verify this patch?
@zhengruifeng would you like to have a look at this PR?
It's a multi-column version of withColumnRenamed
since it's a new API in Scala DataFrame, so let me ping @HyukjinKwon @cloud-fan
looks fine, cc @viirya @dongjoon-hyun
@viirya @itholic @cloud-fan gentle reminder, are there any remarks or anything that this PR still needs to address?
Thanks for your input @dongjoon-hyun, I have added the column duplication handling and test cases for simple and indirect case. PTAL!
Merged to master. Thank you again, @santosh-d3vpl3x , @zhengruifeng, @cloud-fan , @viirya , @itholic, @HyukjinKwon , @srowen .