datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

Improve performance of TPC-DS q72

Open andygrove opened this issue 1 year ago • 4 comments

What is the problem the feature request solves?

I ran our benchmark derived from TPC-DS @ sf=100 locally and saw that q72 shows the largest regression (measured in seconds rather than percentage) and was 754 seconds (12.5 minutes) slower with Comet enabled. Spark took 1.1 hours, and Comet took 1.3 hours.

This was based on a single run of all 99 queries in Spark and then again with Comet enabled.

Comet does not currently support the many sort-merge joins in the query, so Comet is only performing the initial file scans, filters, and exchanges (and sometimes sorts) before transitioning back to Spark for the joins.

This issue is for discussing possible solutions to avoid this regression.

Describe the potential solution

No response

Additional context

No response

andygrove avatar Jul 03 '24 00:07 andygrove

Here are some notes comparing the catalog_sales scan+filter+exchange+sort

Spark

  • Parquet scan = 1.4 s
  • C2R with 35k input batches + Filter (WholeStageCodegen) duration = 39.9 seconds
  • Exchange shuffle write time total = 10.1s
  • Sort (WholeStageCodegen) = 1.3 s, total time 52.1 minutes

Comet

  • Comet Parquet scan = 1.2 s
  • CometFilter 915 ms
  • CometExchange shuffle write time total = not specified, but there is a "shuffle read elapsed compute at native" time of 20.4 s
  • C2R with 17k input batches (WholeStageCodegen) duration 1 hour
  • CometSort 15.1 s

andygrove avatar Jul 03 '24 02:07 andygrove

Spark produces the worst possible query plan for q72 which amplifies the difference in performance. The C2R overhead for comet is amplified because the conversion happens on a dataset that is larger than the source data. To get a reasonable query plan for q72 we need to have spark.sql.cbo.enabled and spark.sql.cbo.joinReorder.enabled set to true. This also requires stats. Also perhaps we can try with a larger broadcast threshold (spark.sql.autoBroadcastJoinThreshold)? Irrespective of the plan though, given the same number of input rows are the Comet operators also slower than the corresponding Spark operators?

parthchandra avatar Jul 03 '24 16:07 parthchandra

Spark produces the worst possible query plan for q72

Yes, it does. I am comparing like-for-like plans between Spark and Comet without any join reordering enabled.

Irrespective of the plan though, given the same number of input rows are the Comet operators also slower than the corresponding Spark operators?

In both cases, Spark is executing the SortMergeJoin and the join takes longer when the inputs are from CometScan/CometFilter/CometExchange than if they are from the Spark equivalents (with same number of rows in both cases).

Things I have learned since filing this issue:

  • The time reported for the WholestageCodegen C2R is misleading. It is the duration of the operator, not the time spent in the operator. The reason for this taking so long is not necessarily the C2R conversion itself but the elapsed time when retrieving data from child operators (such as the AQEShuffleRead)
  • With Comet enabled, AQEShuffleRead is coalescing partitions down to a smaller number of partitions than Spark because Comet produces smaller partitions, thanks to columnar compression presumably

andygrove avatar Jul 03 '24 20:07 andygrove

More learnings:

  • As soon as we encounter one SortMergeJoin with a join condition we fall back to Spark, and all subsequent SortMergeJoins also fall back regardless of whether they have join conditions or not. This seems like something we could fix.
  • If I manually change the join order to be more optimal then we run many of these joins natively and get slightly better performance than Spark

andygrove avatar Jul 03 '24 21:07 andygrove