spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-34265][PYTHON][SQL] Instrument Python UDFs using SQL metrics

Open LucaCanali opened this issue 3 years ago • 8 comments

What changes are proposed in this pull request?

This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow. The proposed metrics are:

  • data sent to Python workers
  • data returned from Python workers
  • number of output rows

Why are the changes needed?

This aims at improving monitoring and performance troubleshooting of Python UDFs. In particular it is intended as an aid to answer performance-related questions such as: why is the UDF slow?, how much work has been done so far?, etc.

Does this PR introduce any user-facing change?

SQL metrics are made available in the WEB UI.
See the following examples:

image1

How was this patch tested?

Manually tested + a Python unit test and a Scala unit test have been added.

Example code used for testing:

from pyspark.sql.functions import col, pandas_udf
import time

@pandas_udf("long")
def test_pandas(col1):
  time.sleep(0.02)
  return col1 * col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1)) from t1").collect()

This is used to test with more data pushed to the Python workers:

from pyspark.sql.functions import col, pandas_udf
import time

@pandas_udf("long")
def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17):
  time.sleep(0.02)
  return col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect()

This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows:

import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()

This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received):

from pyspark.sql.functions import udf

@udf
def test_udf(col1, col2):
     return col1 * col1

spark.sql("select id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' col2 from range(10)").select(test_udf("id", "col2")).collect()

LucaCanali avatar Jul 28 '21 12:07 LucaCanali

Can one of the admins verify this patch?

AmplabJenkins avatar Jul 28 '21 13:07 AmplabJenkins

HI @LucaCanali. Thanks for your proposal!

Could you please follow the instructions in the failed workflow run to enable GitHub actions for your fork? TIA.

zero323 avatar Jan 12 '22 21:01 zero323

cc @HyukjinKwon

zero323 avatar Jan 12 '22 21:01 zero323

Looks fine from a cursory look .. but let me add some more Python and SQL people here - @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler FYI

HyukjinKwon avatar Jan 18 '22 00:01 HyukjinKwon

@HyukjinKwon thanks for the review. Indeed I agree this needs to be checked on the "SQL side" too. I have just pushed a small extension to address the case of BatchEvalPython too, apologies for the late addition.

LucaCanali avatar Feb 03 '22 21:02 LucaCanali

BTW, @LucaCanali do you mind taking a look at the test failures at https://github.com/LucaCanali/spark/runs/5059530132?

HyukjinKwon avatar Feb 04 '22 00:02 HyukjinKwon

@HyukjinKwon I see that a particular query in SQLQueryTestSuite.udf/postgreSQL/udf-aggregates_part3.sql seems to have a problem with this PR. I am struggling to understand why. It looks to be related to the way the query is executed in the test. I can see that if I take the query that fails in the test, and run it with pyspark, it all works OK. Moreover there are dozens of queries in SQLQueryTestSuite that run OK, including udf-aggregates_part2.sql I see that you had worked on SPARK-28272 related to those tests. Would you have any further clues/ideas on how to proceed?

LucaCanali avatar Feb 07 '22 19:02 LucaCanali

I understand from @HyukjinKwon comment on January 18 that there should be more people expert in Spark's use of Python and SQL to review this. @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler would you be interested?

LucaCanali avatar Mar 04 '22 09:03 LucaCanali

The issue with SQLQueryTestSuite.udf/postgreSQL/udf-aggregates_part3.sql should be fixed now. I have also extended the instrumentation to applyInPandasWithState recently introduced in SPARK-40434

LucaCanali avatar Sep 28 '22 17:09 LucaCanali

thanks, merging to master!

cloud-fan avatar Oct 24 '22 09:10 cloud-fan

Thank you @cloud-fan !

LucaCanali avatar Oct 24 '22 19:10 LucaCanali