datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

Comet shuffle read size is larger than Spark shuffle

Open kazuyukitanimura opened this issue 11 months ago • 5 comments

Describe the bug

The attached test is taken from WriteDistributionAndOrderingSuite Spark test ordered distribution and sort with same exprs: append

Looks like Comet shuffle read size is reported much larger than Spark shuffle that causes more partitions

Steps to reproduce

package org.apache.spark.sql

import java.sql.Date
import java.util.Collections

import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.distributions.Distributions
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.LogicalExpressions.sort
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, StructType}
import org.apache.spark.sql.util.QueryExecutionListener

import org.apache.comet.CometConf

class CSuite extends CometTestBase {
  import testImplicits._

  override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
      pos: Position): Unit = {
    super.test(testName, testTags: _*) {
      withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
        testFun
      }
    }
  }

  test("a") {

    def catalog: InMemoryCatalog = {
      spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
      val catalog = spark.sessionState.catalogManager.catalog("testcat")
      catalog.asTableCatalog.asInstanceOf[InMemoryCatalog]
    }
    val namespace = Array("ns1")
    val ident = Identifier.of(namespace, "test_table")
    val tableNameAsString = "testcat." + ident.toString
    val emptyProps = Collections.emptyMap[String, String]
    val schema = new StructType()
      .add("id", IntegerType)
      .add("data", StringType)
      .add("day", DateType)
    val tableOrdering = Array[SortOrder](
      sort(FieldReference("data"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST))
    val tableDistribution = Distributions.ordered(tableOrdering)
    val writeTransform: DataFrame => DataFrame = df => df

    catalog.createTable(
      ident = ident,
      schema = schema,
      partitions = Array.empty,
      properties = emptyProps,
      distribution = tableDistribution,
      ordering = tableOrdering,
      requiredNumPartitions = None,
      advisoryPartitionSize = Some(1000),
      distributionStrictlyRequired = true)

    val df =
      spark.sparkContext
        .parallelize(
          (1 to 10).map { i =>
            (if (i > 4) 5 else i, i.toString, Date.valueOf(s"${2020 + i}-$i-$i"))
          },
          3)
        .toDF("id", "data", "day")
    val writer = writeTransform(df).writeTo(tableNameAsString)

    def execute(writeFunc: => Unit): SparkPlan = {
      var executedPlan: SparkPlan = null

      val listener = new QueryExecutionListener {
        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
          executedPlan = qe.executedPlan
        }
        override def onFailure(
            funcName: String,
            qe: QueryExecution,
            exception: Exception): Unit = {}
      }
      spark.listenerManager.register(listener)

      writeFunc

      sparkContext.listenerBus.waitUntilEmpty()

      executedPlan match {
        case w: V2TableWriteExec =>
          stripAQEPlan(w.query)
        case _ =>
          fail("expected V2TableWriteExec")
      }
    }

    def executeCommand(): SparkPlan = execute(writer.append())

    // if the partition size is configured for the table, set the SQL conf to something small
    // so that the overriding behavior is tested
    val defaultAdvisoryPartitionSize = "15"
    withSQLConf(
      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
      SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> defaultAdvisoryPartitionSize,
      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {

      val executedPlan = executeCommand()
      val read = collect(executedPlan) { case r: AQEShuffleReadExec =>
        r
      }
      assert(read.size == 1)
      println(read.head.partitionSpecs)
      assert(read.head.partitionSpecs.size == 1)
    }
  }
}

Expected behavior

Spark shuffle partition specs ArrayBuffer(CoalescedPartitionSpec(0,5,Some(394)))

Comet shuffle partion specs ArrayBuffer(CoalescedPartitionSpec(0,1,Some(890)), CoalescedPartitionSpec(1,3,Some(890)), CoalescedPartitionSpec(3,4,Some(890)), CoalescedPartitionSpec(4,5,Some(445)))

Additional context

May need Spark 3.5+ for the above test or backport https://issues.apache.org/jira/browse/SPARK-42779

Currently WriteDistributionAndOrderingSuite is disabled in Spark 3.5+ by https://github.com/apache/datafusion-comet/pull/834/

kazuyukitanimura avatar Jan 11 '25 13:01 kazuyukitanimura

I am investigating this

andygrove avatar Jul 09 '25 19:07 andygrove

Here is some debug output from ordered distribution and sort with same exprs: append when running with Comet enabled:

Columnar shuffle wrote the following shuffle bytes (using the default lz4 compression).

wrote 434 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 447 shuffle bytes
wrote 443 shuffle bytes
wrote 435 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 434 shuffle bytes
wrote 0 shuffle bytes
wrote 0 shuffle bytes
wrote 435 shuffle bytes
wrote 446 shuffle bytes

Comet output these shuffle stats:

mapOutputStatistics: 890,445,490,890,490

When running in Spark, the stats are:

mapOutputStatistics: 152,106,106,160,97

andygrove avatar Nov 19 '25 15:11 andygrove

In Comet, we write the schema with each record batch. That likely explains the difference, especially with these tests which are writing 10 rows.

andygrove avatar Nov 19 '25 16:11 andygrove

In Comet, we write the schema with each record batch. That likely explains the difference, especially with these tests which are writing 10 rows.

@kazuyukitanimura @comphead fyi

andygrove avatar Nov 19 '25 16:11 andygrove

In Comet, we write the schema with each record batch. That likely explains the difference, especially with these tests which are writing 10 rows.

@kazuyukitanimura @comphead fyi

Slight correction. Comet writes the schema once per shuffle block, not once per batch.

andygrove avatar Nov 19 '25 16:11 andygrove