spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-48666][SQL] Do not push down filter if it contains Unevaluable expression

Open weiatwork opened this issue 8 months ago • 0 comments

What changes were proposed in this pull request?

We should avoid pushing down Unevaluable expression as it can cause unexpected failures.

Why are the changes needed?

For example, the code snippet below (assuming there is a table t with a partition column p)

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

import pyspark.sql.functions as f

def getdata(p: str) -> str:
    return "data"

NEW_COLUMN = 'new_column'
P_COLUMN = 'p'

f_getdata = f.udf(getdata, StringType())
rows = spark.sql("select * from default.t")

table = rows.withColumn(NEW_COLUMN, f_getdata(f.col(P_COLUMN)))

df = table.alias('t1').join(table.alias('t2'), (f.col(f"t1.{NEW_COLUMN}") == f.col(f"t2.{NEW_COLUMN}")), how='inner')

df.show()

will cause an error like:

org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: getdata(input[0, string, true])#16
    at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:66)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:391)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:390)
    at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:71)
    at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:384)
    at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:166)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:165) 

Does this PR introduce any user-facing change?

No

How was this patch tested?

This PR adds test coverage.

Was this patch authored or co-authored using generative AI tooling?

No

weiatwork avatar Jun 19 '24 22:06 weiatwork