spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-48012][SQL] SPJ: Support Transfrom Expressions for One Side Shuffle

Open szehon-ho opened this issue 9 months ago • 2 comments

Why are the changes needed?

Support SPJ one-side shuffle if other side has partition transform expression

How was this patch tested?

New unit test in KeyGroupedPartitioningSuite

Was this patch authored or co-authored using generative AI tooling?

No.

szehon-ho avatar Apr 27 '24 06:04 szehon-ho

Some implementation notes. SPARK-41471 works by making the ShuffleExchangeExec side of the join have a KeyGroupedPartitioning, which is created by the other side's KeyGroupedShuffleSpec and is a clone of it (with that side's partition expression and values). That way both sides of the join have KeyGroupedPartioning and SPJ can work. But previously only AttributeExpressions were supported for the other side's partition expressions.

Code changes:

  • Remove check in KeyGroupedShuffleSpec::canCreatePartitioning that allows only AttributeReference, and add support for TransformExpression
  • Implement TransformExpression.eval(), by re-using the code from V2ExpressionUtils. This allows the ShuffleExchangeExec to evaluate the partition key with transform expressions from each row.

Some fixes:

  • normalize the valueMap key type in KeyGroupedPartitioner to use specific Seq implementation class. Previously the partitioner's map are initialized with keys as Vector , but then compared with keys as ArraySeq, and these seem to have different hashcodes, so will always create new entries with new partition ids.
  • Change the test YearsTransform to have the same logic as the InMemoryBaseTable. This was pointed out in SPARK-41471 pr.

Limitations:

  • This feature is disabled if partiallyClustered is enabled. Partiallly clustered implies the partitioned side of the join has multiple partitions with the same value, and does not group them. Not sure at the moment, how the KeyGroupedPartitioner on the shuffle side can handle that.
  • This feature is disabled if allowJoinKeysLessThanPartitionKeys is enabled and partitions are transform expressions. allowJoinKeysLessThanPartitionKeys feature works by 'grouping' the BatchScanExec's partitions again by join keys. If enabled along with this feature, there is a failure happens when checking that both sides of the join (ShuffleExchangeExec and the partitioned BatchScanExec side) have outputPartioning with same numPartitions. This actually works in the first optimizer pass, as ShuffleExchangeExec's KeyGroupedPartioning is created as a clone of the other side (including partition values). But after that there is a 'grouping' phase triggered here:
        // Now we need to push-down the common partition information to the scan in each child
        newLeft = populateCommonPartitionInfo(left, mergedPartValues, leftSpec.joinKeyPositions,
          leftReducers, applyPartialClustering, replicateLeftSide)
        newRight = populateCommonPartitionInfo(right, mergedPartValues, rightSpec.joinKeyPositions,
          rightReducers, applyPartialClustering, replicateRightSide)

This updates the BatchScanExec's outputPartitioning with new numPartitions. after the re-grouping by join key. But it does not update the ShuffleExchangeExec's outputPartitioning's numPartitons. Hence the error in subsequent optimizer pass:

requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)

This can be reproduced by removing this check and running the relevant unit test added in this pr. It needs more investigation to be enabled in follow up pr.

szehon-ho avatar Apr 27 '24 06:04 szehon-ho

@sunchao , @Hisoka-X can you guys take a look?

szehon-ho avatar Apr 29 '24 21:04 szehon-ho

Thanks! Merged to master/4.0

sunchao avatar Jun 09 '24 14:06 sunchao