spark
spark copied to clipboard
[SPARK-34265][PYTHON][SQL] Instrument Python UDFs using SQL metrics
What changes are proposed in this pull request?
This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow. The proposed metrics are:
- data sent to Python workers
- data returned from Python workers
- number of output rows
Why are the changes needed?
This aims at improving monitoring and performance troubleshooting of Python UDFs. In particular it is intended as an aid to answer performance-related questions such as: why is the UDF slow?, how much work has been done so far?, etc.
Does this PR introduce any user-facing change?
SQL metrics are made available in the WEB UI.
See the following examples:
How was this patch tested?
Manually tested + a Python unit test and a Scala unit test have been added.
Example code used for testing:
from pyspark.sql.functions import col, pandas_udf
import time
@pandas_udf("long")
def test_pandas(col1):
time.sleep(0.02)
return col1 * col1
spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1)) from t1").collect()
This is used to test with more data pushed to the Python workers:
from pyspark.sql.functions import col, pandas_udf
import time
@pandas_udf("long")
def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17):
time.sleep(0.02)
return col1
spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect()
This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received):
from pyspark.sql.functions import udf
@udf
def test_udf(col1, col2):
return col1 * col1
spark.sql("select id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' col2 from range(10)").select(test_udf("id", "col2")).collect()
Can one of the admins verify this patch?
HI @LucaCanali. Thanks for your proposal!
Could you please follow the instructions in the failed workflow run to enable GitHub actions for your fork? TIA.
cc @HyukjinKwon
Looks fine from a cursory look .. but let me add some more Python and SQL people here - @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler FYI
@HyukjinKwon thanks for the review. Indeed I agree this needs to be checked on the "SQL side" too. I have just pushed a small extension to address the case of BatchEvalPython too, apologies for the late addition.
BTW, @LucaCanali do you mind taking a look at the test failures at https://github.com/LucaCanali/spark/runs/5059530132?
@HyukjinKwon I see that a particular query in SQLQueryTestSuite.udf/postgreSQL/udf-aggregates_part3.sql seems to have a problem with this PR. I am struggling to understand why. It looks to be related to the way the query is executed in the test. I can see that if I take the query that fails in the test, and run it with pyspark, it all works OK. Moreover there are dozens of queries in SQLQueryTestSuite that run OK, including udf-aggregates_part2.sql I see that you had worked on SPARK-28272 related to those tests. Would you have any further clues/ideas on how to proceed?
I understand from @HyukjinKwon comment on January 18 that there should be more people expert in Spark's use of Python and SQL to review this. @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler would you be interested?
The issue with SQLQueryTestSuite.udf/postgreSQL/udf-aggregates_part3.sql should be fixed now. I have also extended the instrumentation to applyInPandasWithState recently introduced in SPARK-40434
thanks, merging to master!
Thank you @cloud-fan !