Comet shuffle read size is larger than Spark shuffle
Describe the bug
The attached test is taken from WriteDistributionAndOrderingSuite Spark test ordered distribution and sort with same exprs: append
Looks like Comet shuffle read size is reported much larger than Spark shuffle that causes more partitions
Steps to reproduce
package org.apache.spark.sql
import java.sql.Date
import java.util.Collections
import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.distributions.Distributions
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.LogicalExpressions.sort
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, StructType}
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.comet.CometConf
class CSuite extends CometTestBase {
import testImplicits._
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
testFun
}
}
}
test("a") {
def catalog: InMemoryCatalog = {
spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
val catalog = spark.sessionState.catalogManager.catalog("testcat")
catalog.asTableCatalog.asInstanceOf[InMemoryCatalog]
}
val namespace = Array("ns1")
val ident = Identifier.of(namespace, "test_table")
val tableNameAsString = "testcat." + ident.toString
val emptyProps = Collections.emptyMap[String, String]
val schema = new StructType()
.add("id", IntegerType)
.add("data", StringType)
.add("day", DateType)
val tableOrdering = Array[SortOrder](
sort(FieldReference("data"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST))
val tableDistribution = Distributions.ordered(tableOrdering)
val writeTransform: DataFrame => DataFrame = df => df
catalog.createTable(
ident = ident,
schema = schema,
partitions = Array.empty,
properties = emptyProps,
distribution = tableDistribution,
ordering = tableOrdering,
requiredNumPartitions = None,
advisoryPartitionSize = Some(1000),
distributionStrictlyRequired = true)
val df =
spark.sparkContext
.parallelize(
(1 to 10).map { i =>
(if (i > 4) 5 else i, i.toString, Date.valueOf(s"${2020 + i}-$i-$i"))
},
3)
.toDF("id", "data", "day")
val writer = writeTransform(df).writeTo(tableNameAsString)
def execute(writeFunc: => Unit): SparkPlan = {
var executedPlan: SparkPlan = null
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
executedPlan = qe.executedPlan
}
override def onFailure(
funcName: String,
qe: QueryExecution,
exception: Exception): Unit = {}
}
spark.listenerManager.register(listener)
writeFunc
sparkContext.listenerBus.waitUntilEmpty()
executedPlan match {
case w: V2TableWriteExec =>
stripAQEPlan(w.query)
case _ =>
fail("expected V2TableWriteExec")
}
}
def executeCommand(): SparkPlan = execute(writer.append())
// if the partition size is configured for the table, set the SQL conf to something small
// so that the overriding behavior is tested
val defaultAdvisoryPartitionSize = "15"
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> defaultAdvisoryPartitionSize,
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
val executedPlan = executeCommand()
val read = collect(executedPlan) { case r: AQEShuffleReadExec =>
r
}
assert(read.size == 1)
println(read.head.partitionSpecs)
assert(read.head.partitionSpecs.size == 1)
}
}
}
Expected behavior
Spark shuffle partition specs
ArrayBuffer(CoalescedPartitionSpec(0,5,Some(394)))
Comet shuffle partion specs
ArrayBuffer(CoalescedPartitionSpec(0,1,Some(890)), CoalescedPartitionSpec(1,3,Some(890)), CoalescedPartitionSpec(3,4,Some(890)), CoalescedPartitionSpec(4,5,Some(445)))
Additional context
May need Spark 3.5+ for the above test or backport https://issues.apache.org/jira/browse/SPARK-42779
Currently WriteDistributionAndOrderingSuite is disabled in Spark 3.5+ by https://github.com/apache/datafusion-comet/pull/834/
I am investigating this
Here is some debug output from ordered distribution and sort with same exprs: append when running with Comet enabled:
Columnar shuffle wrote the following shuffle bytes (using the default lz4 compression).
wrote 434 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 447 shuffle bytes
wrote 443 shuffle bytes
wrote 435 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 434 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 435 shuffle bytes
wrote 446 shuffle bytes
Comet output these shuffle stats:
mapOutputStatistics: 890,445,490,890,490
When running in Spark, the stats are:
mapOutputStatistics: 152,106,106,160,97
In Comet, we write the schema with each record batch. That likely explains the difference, especially with these tests which are writing 10 rows.
In Comet, we write the schema with each record batch. That likely explains the difference, especially with these tests which are writing 10 rows.
@kazuyukitanimura @comphead fyi
In Comet, we write the schema with each record batch. That likely explains the difference, especially with these tests which are writing 10 rows.
Slight correction. Comet writes the schema once per shuffle block, not once per batch.