[SPARK-35564][SQL] Support subexpression elimination for conditionally evaluated expressions
What changes were proposed in this pull request?
I am proposing to add support for conditionally evaluated expressions during subexpression elimination. Currently, only expressions that will definitely be always at least twice are candidates for subexpression elimination. This PR updates that logic so that expressions that are always evaluated at least once and conditionally evaluated at least once are also candidates for subexpression elimination. This helps optimize a common case during data normalization and cleaning and want to null out values that don't match a certain pattern, where you have something like:
transformed = F.regexp_replace(F.lower(F.trim('my_column')))
df.withColumn('normalized_value', F.when(F.length(transformed) > 0, transformed))
or
df.withColumn('normalized_value', F.when(transformed.rlike(<some regex>), transformed))
In these cases, transformed will always be fully calculated twice, because it might only be needed once. I am proposing creating a subexpression for transformed in this case.
In practice I've seen a decrease in runtime and codegen size of 10-30% in our production pipelines that heavily make use of this type of logic.
The only potential downside is creating extra subexpressions, and therefore function calls, more than necessary. This should only be an issue for certain edge cases where your conditional overwhelming evaluates to false. And then the only overhead is running your conditional logic potentially in a separate function rather than inlined in the codegen. I added a config to control this behavior if that is actually a real concern to anyone, but I'd be happy to just remove the config.
I also updated some of the existing logic for common expressions in coalesce and when that are actually better handled by the new logic, since you are only guaranteed to have the first value of a Coalesce evaluated, as well as the first conditional of a CaseWhen expression.
Why are the changes needed?
To increase the performance of conditional expressions.
Does this PR introduce any user-facing change?
No, just performance improvements.
How was this patch tested?
New and updated UT.
OK to test
Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44671/
Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44671/
The only potential downside is creating extra subexpressions, and therefore function calls, more than necessary. This should only be an issue for certain edge cases where your conditional overwhelming evaluates to false. And then the only overhead is running your conditional logic potentially in a separate function rather than inlined in the codegen. I added a config to control this behavior if that is actually a real concern to anyone, but I'd be happy to just remove the config.
I don't think the downside is edge case. On the contrary, I rather think it is common case than the use-case proposed here.
After this, any common expression shared between conditionally evaluated expression and a normal expression will be subexpression. I have a concern that gen-ed code will be overwhelmed with such subexpressions.
At least we need a config for this and I don't think it should be enabled by default.
After this, any common expression shared between conditionally evaluated expression and a normal expression will be subexpression. I have a concern that gen-ed code will be overwhelmed with such subexpressions.
What exactly is the overwhelming part? I figured smaller overall code size would be beneficial.
What exactly is the overwhelming part? I figured smaller overall code size would be beneficial.
It is not zero-cost. For example, too many subexpressions will possibly make non-split case to be split case.
Could you elaborate on how that could happen? I don't know that much about the codegen process
Could you elaborate on how that could happen? I don't know that much about the codegen process
In short, during subexpressions codegen, if the total code length is more than a threshold, we choose to split it as functions to avoid reach the max size of a method.
Oh you're specifically talking about the subexpressions being split into functions versus inlined, not the general splitting the whole codegen into functions?
Test build #140145 has finished for PR 32987 at commit d4d64c7.
- This patch passes all tests.
- This patch merges cleanly.
- This patch adds no public classes.
My main assumption in creating this was that it's always faster to run an expression once in a function than twice inlined. If this creates a lot of extra subexpressions that pushes the code over the 1kb threshold for breaking into functions, then the alternative is that you are running a lot of duplicate inlined logic, so at the end of the day it all comes down to how often a subexpression created by this logic is only evaluated once.
The two extremes of performance impact I can think of would be:
- Worst case: Without this logic, you have subexpressions that are just small enough to remain inlined. You add one conditional that creates a new subexpression that pushes your code over the (default) 1kb limit. That conditional never evaluates to true, so your conditional subexpression is evaluated once in a function rather than inlined, and all your other subexpressions are evaluated with a function call instead of inlined as well. This is somewhat bound by the number of subexpressions that can be fit inline in the first place, plus the function calls of the one-time evaluated conditional subexpressions.
- Best case: Your existing subexpressions have already been broken out into functions before this change, or the new subexpression fits inline as well, and the conditional always evaluates to true, so you are running the conditional expression once instead of two or more times. This is essentially the existing logic where we create a subexpression for things that are always evaluated at least twice, so obviously a win here.
Realistically things are going to fall somewhere in the middle. Where the extra function calls outweigh the deduped expression execution, who knows. But the upside here is pretty large, and I would expect most Spark users would expect this to logically happen (don't run the same code twice). If we want to leave it with the setting defaulted to disabled I'm fine with that.
Updated based on the refactor. It's still a little rough and needs some cleaning, renaming things, and updating a lot of comments, but wanted to get initial feedback
Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45267/
Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45267/
Test build #140756 has finished for PR 32987 at commit 1111b9b.
- This patch passes all tests.
- This patch merges cleanly.
- This patch adds the following public classes (experimental):
case class ExpressionStats(expr: Expression)(case class RecurseChildren(
Test build #141494 has started for PR 32987 at commit d956d22.
Test build #141497 has started for PR 32987 at commit 5d6e1ad.
Test build #141498 has started for PR 32987 at commit 80245a6.
https://github.com/apache/spark/pull/33142#discussion_r661540443 in other cases it's already accepted that the performance overhead of maybe only using a subexpression once is worth the trade-off of not having to potentially evaluate it twice, so this just expands the places that could happen. Personally I don't think it needs a config defaulting to turned off, but I'm fine leaving it in if necessary. It does effectively prevent all the existing cases of creating a subexpression for an expression that might only be evaluated once, like mentioned in the comment, if the config is turned off.
Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46014/
Kubernetes integration test unable to build dist.
exiting with code: 1 URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46015/
Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46014/
Kubernetes integration test unable to build dist.
exiting with code: 1 URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47425/
Test build #142923 has finished for PR 32987 at commit 8b53f7b.
- This patch passes all tests.
- This patch merges cleanly.
- This patch adds the following public classes (experimental):
case class ExpressionStats(expr: Expression)(case class RecurseChildren(
Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48228/
Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48228/
Test build #143715 has finished for PR 32987 at commit 38a411d.
- This patch passes all tests.
- This patch merges cleanly.
- This patch adds the following public classes (experimental):
public class NettyLoggerpublic final class LZ4Compressorpublic final class LZ4Factorypublic final class LZ4SafeDecompressorclass DatetimeNTZOps(DatetimeOps):class IndexNameTypeHolder(object):new_class = type(\"NameType\", (NameTypeHolder,),new_class = param.type if isinstance(param, np.dtype) else paramclass TimestampNTZType(AtomicType, metaclass=DataTypeSingleton):class DatetimeNTZConverter(object):public final class TableIndexpublic final class AlwaysFalse extends Filterpublic final class AlwaysTrue extends Filterpublic final class And extends BinaryFilterabstract class BinaryComparison extends Filterabstract class BinaryFilter extends Filterpublic final class EqualNullSafe extends BinaryComparisonpublic final class EqualTo extends BinaryComparisonpublic abstract class Filter implements Expression, Serializablepublic final class GreaterThan extends BinaryComparisonpublic final class GreaterThanOrEqual extends BinaryComparisonpublic final class In extends Filterpublic final class IsNotNull extends Filterpublic final class IsNull extends Filterpublic final class LessThan extends BinaryComparisonpublic final class LessThanOrEqual extends BinaryComparisonpublic final class Not extends Filterpublic final class Or extends BinaryFilterpublic final class StringContains extends StringPredicatepublic final class StringEndsWith extends StringPredicateabstract class StringPredicate extends Filterpublic final class StringStartsWith extends StringPredicatepublic class ColumnarBatch implements AutoCloseablepublic final class ColumnarBatchRow extends InternalRowclass IndexAlreadyExistsException(indexName: String, table: Identifier)class NoSuchIndexException(indexName: String)case class CastTimestampNTZToLong(child: Expression) extends TimestampToLongBasecase class Sec(child: Expression)case class Csc(child: Expression)case class ILike(trait OperationHelper extends AliasHelper with PredicateHelpercase class AsOfJoin(trait SupportsPushDownCatalystFiltersclass SQLOpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](case class OptimizeSkewedJoin(case class SkewJoinChildWrapper(plan: SparkPlan) extends LeafExecNodecase class SimpleCostEvaluator(forceOptimizeSkewedJoin: Boolean) extends CostEvaluatorcase class ShowCurrentNamespaceCommand() extends LeafRunnableCommandcase class WriterBucketSpec(case class EnsureRequirements(case class HashedRelationBroadcastMode(
Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49293/
Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49293/
Test build #144822 has finished for PR 32987 at commit f4ed2be.
- This patch passes all tests.
- This patch merges cleanly.
- This patch adds the following public classes (experimental):
case class ExpressionStats(expr: Expression)(case class RecurseChildren(