spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-35564][SQL] Support subexpression elimination for conditionally evaluated expressions

Open Kimahriman opened this issue 4 years ago • 44 comments

What changes were proposed in this pull request?

I am proposing to add support for conditionally evaluated expressions during subexpression elimination. Currently, only expressions that will definitely be always at least twice are candidates for subexpression elimination. This PR updates that logic so that expressions that are always evaluated at least once and conditionally evaluated at least once are also candidates for subexpression elimination. This helps optimize a common case during data normalization and cleaning and want to null out values that don't match a certain pattern, where you have something like:

transformed = F.regexp_replace(F.lower(F.trim('my_column')))
df.withColumn('normalized_value', F.when(F.length(transformed) > 0, transformed))

or

df.withColumn('normalized_value', F.when(transformed.rlike(<some regex>), transformed))

In these cases, transformed will always be fully calculated twice, because it might only be needed once. I am proposing creating a subexpression for transformed in this case.

In practice I've seen a decrease in runtime and codegen size of 10-30% in our production pipelines that heavily make use of this type of logic.

The only potential downside is creating extra subexpressions, and therefore function calls, more than necessary. This should only be an issue for certain edge cases where your conditional overwhelming evaluates to false. And then the only overhead is running your conditional logic potentially in a separate function rather than inlined in the codegen. I added a config to control this behavior if that is actually a real concern to anyone, but I'd be happy to just remove the config.

I also updated some of the existing logic for common expressions in coalesce and when that are actually better handled by the new logic, since you are only guaranteed to have the first value of a Coalesce evaluated, as well as the first conditional of a CaseWhen expression.

Why are the changes needed?

To increase the performance of conditional expressions.

Does this PR introduce any user-facing change?

No, just performance improvements.

How was this patch tested?

New and updated UT.

Kimahriman avatar Jun 20 '21 13:06 Kimahriman

OK to test

cloud-fan avatar Jun 22 '21 13:06 cloud-fan

Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44671/

SparkQA avatar Jun 22 '21 15:06 SparkQA

Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44671/

SparkQA avatar Jun 22 '21 15:06 SparkQA

The only potential downside is creating extra subexpressions, and therefore function calls, more than necessary. This should only be an issue for certain edge cases where your conditional overwhelming evaluates to false. And then the only overhead is running your conditional logic potentially in a separate function rather than inlined in the codegen. I added a config to control this behavior if that is actually a real concern to anyone, but I'd be happy to just remove the config.

I don't think the downside is edge case. On the contrary, I rather think it is common case than the use-case proposed here.

After this, any common expression shared between conditionally evaluated expression and a normal expression will be subexpression. I have a concern that gen-ed code will be overwhelmed with such subexpressions.

At least we need a config for this and I don't think it should be enabled by default.

viirya avatar Jun 22 '21 16:06 viirya

After this, any common expression shared between conditionally evaluated expression and a normal expression will be subexpression. I have a concern that gen-ed code will be overwhelmed with such subexpressions.

What exactly is the overwhelming part? I figured smaller overall code size would be beneficial.

Kimahriman avatar Jun 22 '21 16:06 Kimahriman

What exactly is the overwhelming part? I figured smaller overall code size would be beneficial.

It is not zero-cost. For example, too many subexpressions will possibly make non-split case to be split case.

viirya avatar Jun 22 '21 17:06 viirya

Could you elaborate on how that could happen? I don't know that much about the codegen process

Kimahriman avatar Jun 22 '21 17:06 Kimahriman

Could you elaborate on how that could happen? I don't know that much about the codegen process

In short, during subexpressions codegen, if the total code length is more than a threshold, we choose to split it as functions to avoid reach the max size of a method.

viirya avatar Jun 22 '21 17:06 viirya

Oh you're specifically talking about the subexpressions being split into functions versus inlined, not the general splitting the whole codegen into functions?

Kimahriman avatar Jun 22 '21 17:06 Kimahriman

Test build #140145 has finished for PR 32987 at commit d4d64c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA avatar Jun 22 '21 18:06 SparkQA

My main assumption in creating this was that it's always faster to run an expression once in a function than twice inlined. If this creates a lot of extra subexpressions that pushes the code over the 1kb threshold for breaking into functions, then the alternative is that you are running a lot of duplicate inlined logic, so at the end of the day it all comes down to how often a subexpression created by this logic is only evaluated once.

The two extremes of performance impact I can think of would be:

  • Worst case: Without this logic, you have subexpressions that are just small enough to remain inlined. You add one conditional that creates a new subexpression that pushes your code over the (default) 1kb limit. That conditional never evaluates to true, so your conditional subexpression is evaluated once in a function rather than inlined, and all your other subexpressions are evaluated with a function call instead of inlined as well. This is somewhat bound by the number of subexpressions that can be fit inline in the first place, plus the function calls of the one-time evaluated conditional subexpressions.
  • Best case: Your existing subexpressions have already been broken out into functions before this change, or the new subexpression fits inline as well, and the conditional always evaluates to true, so you are running the conditional expression once instead of two or more times. This is essentially the existing logic where we create a subexpression for things that are always evaluated at least twice, so obviously a win here.

Realistically things are going to fall somewhere in the middle. Where the extra function calls outweigh the deduped expression execution, who knows. But the upside here is pretty large, and I would expect most Spark users would expect this to logically happen (don't run the same code twice). If we want to leave it with the setting defaulted to disabled I'm fine with that.

Kimahriman avatar Jun 24 '21 15:06 Kimahriman

Updated based on the refactor. It's still a little rough and needs some cleaning, renaming things, and updating a lot of comments, but wanted to get initial feedback

Kimahriman avatar Jul 07 '21 11:07 Kimahriman

Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45267/

SparkQA avatar Jul 07 '21 12:07 SparkQA

Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45267/

SparkQA avatar Jul 07 '21 13:07 SparkQA

Test build #140756 has finished for PR 32987 at commit 1111b9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ExpressionStats(expr: Expression)(
  • case class RecurseChildren(

SparkQA avatar Jul 07 '21 16:07 SparkQA

Test build #141494 has started for PR 32987 at commit d956d22.

SparkQA avatar Jul 22 '21 13:07 SparkQA

Test build #141497 has started for PR 32987 at commit 5d6e1ad.

SparkQA avatar Jul 22 '21 13:07 SparkQA

Test build #141498 has started for PR 32987 at commit 80245a6.

SparkQA avatar Jul 22 '21 13:07 SparkQA

https://github.com/apache/spark/pull/33142#discussion_r661540443 in other cases it's already accepted that the performance overhead of maybe only using a subexpression once is worth the trade-off of not having to potentially evaluate it twice, so this just expands the places that could happen. Personally I don't think it needs a config defaulting to turned off, but I'm fine leaving it in if necessary. It does effectively prevent all the existing cases of creating a subexpression for an expression that might only be evaluated once, like mentioned in the comment, if the config is turned off.

Kimahriman avatar Jul 22 '21 13:07 Kimahriman

Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46014/

SparkQA avatar Jul 22 '21 14:07 SparkQA

Kubernetes integration test unable to build dist.

exiting with code: 1 URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46015/

SparkQA avatar Jul 22 '21 14:07 SparkQA

Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46014/

SparkQA avatar Jul 22 '21 15:07 SparkQA

Kubernetes integration test unable to build dist.

exiting with code: 1 URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47425/

SparkQA avatar Sep 01 '21 14:09 SparkQA

Test build #142923 has finished for PR 32987 at commit 8b53f7b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ExpressionStats(expr: Expression)(
  • case class RecurseChildren(

SparkQA avatar Sep 01 '21 19:09 SparkQA

Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48228/

SparkQA avatar Sep 29 '21 13:09 SparkQA

Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48228/

SparkQA avatar Sep 29 '21 14:09 SparkQA

Test build #143715 has finished for PR 32987 at commit 38a411d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class NettyLogger
  • public final class LZ4Compressor
  • public final class LZ4Factory
  • public final class LZ4SafeDecompressor
  • class DatetimeNTZOps(DatetimeOps):
  • class IndexNameTypeHolder(object):
  • new_class = type(\"NameType\", (NameTypeHolder,),
  • new_class = param.type if isinstance(param, np.dtype) else param
  • class TimestampNTZType(AtomicType, metaclass=DataTypeSingleton):
  • class DatetimeNTZConverter(object):
  • public final class TableIndex
  • public final class AlwaysFalse extends Filter
  • public final class AlwaysTrue extends Filter
  • public final class And extends BinaryFilter
  • abstract class BinaryComparison extends Filter
  • abstract class BinaryFilter extends Filter
  • public final class EqualNullSafe extends BinaryComparison
  • public final class EqualTo extends BinaryComparison
  • public abstract class Filter implements Expression, Serializable
  • public final class GreaterThan extends BinaryComparison
  • public final class GreaterThanOrEqual extends BinaryComparison
  • public final class In extends Filter
  • public final class IsNotNull extends Filter
  • public final class IsNull extends Filter
  • public final class LessThan extends BinaryComparison
  • public final class LessThanOrEqual extends BinaryComparison
  • public final class Not extends Filter
  • public final class Or extends BinaryFilter
  • public final class StringContains extends StringPredicate
  • public final class StringEndsWith extends StringPredicate
  • abstract class StringPredicate extends Filter
  • public final class StringStartsWith extends StringPredicate
  • public class ColumnarBatch implements AutoCloseable
  • public final class ColumnarBatchRow extends InternalRow
  • class IndexAlreadyExistsException(indexName: String, table: Identifier)
  • class NoSuchIndexException(indexName: String)
  • case class CastTimestampNTZToLong(child: Expression) extends TimestampToLongBase
  • case class Sec(child: Expression)
  • case class Csc(child: Expression)
  • case class ILike(
  • trait OperationHelper extends AliasHelper with PredicateHelper
  • case class AsOfJoin(
  • trait SupportsPushDownCatalystFilters
  • class SQLOpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
  • case class OptimizeSkewedJoin(
  • case class SkewJoinChildWrapper(plan: SparkPlan) extends LeafExecNode
  • case class SimpleCostEvaluator(forceOptimizeSkewedJoin: Boolean) extends CostEvaluator
  • case class ShowCurrentNamespaceCommand() extends LeafRunnableCommand
  • case class WriterBucketSpec(
  • case class EnsureRequirements(
  • case class HashedRelationBroadcastMode(

SparkQA avatar Sep 29 '21 17:09 SparkQA

Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49293/

SparkQA avatar Nov 02 '21 00:11 SparkQA

Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49293/

SparkQA avatar Nov 02 '21 01:11 SparkQA

Test build #144822 has finished for PR 32987 at commit f4ed2be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ExpressionStats(expr: Expression)(
  • case class RecurseChildren(

SparkQA avatar Nov 02 '21 04:11 SparkQA