[SPARK-52450][CONNECT] Improve performance of schema deepcopy
What changes were proposed in this pull request?
In Spark Connect, DataFrame.schema returns a deep copy of the schema to prevent unexpected behavior caused by user modifications to the returned schema object. However, if a user accesses df.schema repeatedly on a DataFrame with a complex schema, it can lead to noticeable performance degradation.
The performance issue can be reproduced using the code snippet below. Since copy.deepcopy is known to be slow to handle complex objects, this PR replaces it with pickle-based ser/de to improve the performance of df.schema access. Given the limitations of pickle, the implementation falls back to deepcopy in cases where pickling fails.
from pyspark.sql.types import StructType, StructField, StringType
def make_nested_struct(level, max_level, fields_per_level):
if level == max_level - 1:
return StructType(
[StructField(f"f{level}_{i}", StringType(), True) for i in range(fields_per_level)])
else:
return StructType(
[StructField(f"s{level}_{i}",
make_nested_struct(level + 1, max_level, fields_per_level), True) for i in
range(fields_per_level)])
# Create a 4 level nested schema with in total 10,000 leaf fields
schema = make_nested_struct(0, 4, 10)
The existing needs 21.9s to copy the schema for 100 times.
import copy
timeit.timeit(lambda: copy.deepcopy(schema), number=100)
# 21.9
The updated approach only needs 2.0s to copy for 100 times:
from pyspark.serializers import CPickleSerializer
cached_schema_serialized = CPickleSerializer().dumps(schema)
timeit.timeit(lambda: CPickleSerializer().loads(cached_schema_serialized), number=100)
# 2.0
Why are the changes needed?
It improves the performance when calling df.schema many times.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests and new tests.
Was this patch authored or co-authored using generative AI tooling?
No.
Hi @hvanhovell @vicennial , could you take a look at this PR? Thanks.
Hi @zhengruifeng, could you help with the CI failures from pyspark-pandas-connect-part1? This PR has no changes on any scala code, but sql/hive, connector/kafka, and connect/server fail to compile due to sbt OutOfMemoryError. Do you have any idea here? I've retriggered CI, but it still failed. Thanks.
[error] ## Exception when compiling 156 sources to /__w/spark/spark/sql/hive/target/scala-2.13/test-classes
[error] java.lang.OutOfMemoryError: Java heap space
[error]
[error]
[error] ## Exception when compiling 21 sources to /__w/spark/spark/connector/kafka-0-10-sql/target/scala-2.13/test-classes
[error] java.lang.OutOfMemoryError: Java heap space
[error]
[error]
[error] ## Exception when compiling 41 sources to /__w/spark/spark/sql/connect/server/target/scala-2.13/test-classes
[error] java.lang.OutOfMemoryError: Java heap space
[error]
[error]
[warn] javac exited with exit code -1
[info] Compilation has been cancelled
[info] Compilation has been cancelled
[warn] In the last 10 seconds, 5.032 (50.6%) were spent in GC. [Heap: 2.45GB free of 4.00GB, max 4.00GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance.
java.lang.OutOfMemoryError: Java heap space
Error: [launcher] error during sbt launcher: java.lang.OutOfMemoryError: Java heap space
Update: Never mind, it works now. Thanks anyway.
Merging to master/4.0. Thanks!