spark
spark copied to clipboard
[SPARK-38098][PYTHON] Add support for ArrayType of nested StructType to arrow-based conversion
What changes were proposed in this pull request?
This proposes to add support for ArrayType of nested StructType to arrow-based conversion. This allows Pandas UDFs, mapInArrow UDFs, and toPandas to operate on columns of type Array of Struct, via arrow serialization. It appears to me that pyarrow 2.0.0 allows to serialize array of struct (while pyarrow 1.0.0 throws an error for this type of data).
Why are the changes needed?
This extends the usability of pandas_udf.
Does this PR introduce any user-facing change?
Pandas_udf and mapInArrow will be able to operate on data of type Array of Struct. toPandas will be able to use arrow serialization for Array of Struct (when spark.sql.execution.arrow.pyspark.enabled=true)
How was this patch tested?
A test has been added.
Can one of the admins verify this patch?
cc @HyukjinKwon
cc @BryanCutler @ueshin FYI
@LucaCanali thanks for addressing my comment. The test seems like failing:
======================================================================
ERROR [0.182s]: test_pandas_array_struct (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/__w/spark/spark/python/pyspark/sql/tests/test_pandas_udf_scalar.py", line 152, in test_pandas_array_struct
result = df.select(return_cols("array_struct_col"))
File "/__w/spark/spark/python/pyspark/sql/udf.py", line 276, in wrapper
return self(*args)
File "/__w/spark/spark/python/pyspark/sql/udf.py", line 249, in __call__
judf = self._judf
File "/__w/spark/spark/python/pyspark/sql/udf.py", line 215, in _judf
self._judf_placeholder = self._create_judf(self.func)
File "/__w/spark/spark/python/pyspark/sql/udf.py", line 224, in _create_judf
wrapped_func = _wrap_function(sc, func, self.returnType)
File "/__w/spark/spark/python/pyspark/sql/udf.py", line 50, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/__w/spark/spark/python/pyspark/rdd.py", line 3350, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/__w/spark/spark/python/pyspark/serializers.py", line 458, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/__w/spark/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/__w/spark/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/__w/spark/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 316, in _file_reduce
raise pickle.PicklingError(
_pickle.PicklingError: Cannot pickle files that are not opened for reading: w
Could we fix this one?
I must say I am bit puzzled by the error found in test_pandas_array_struct as I cannot reproduce it in my test system.
When I run python/run-tests --modules pyspark-sql --testnames pyspark.sql.tests.test_pandas_udf_scalar
locally I can see the test going through OK.
This should be good to go now, @HyukjinKwon ?
Are there any further comments on this PR?
Alright, let's merge this in once the tests pass. @LucaCanali mind fixing the lint failure?
starting mypy annotations test...
annotations failed mypy checks:
python/pyspark/sql/tests/test_dataframe.py:1113: error: Redundant cast to "str" [redundant-cast]
Found 1 error in 1 file (checked 341 source files)
1
Thank you @HyukjinKwon @ueshin and @BryanCutler
Tests passing now, merged to master. Thanks all!