datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

count distinct on NaN produces incorrect results

Open andygrove opened this issue 11 months ago • 7 comments

Describe the bug

In https://github.com/apache/datafusion-comet/pull/1209 we now fall back to columnar shuffle in some cases where native shuffle is not supported, rather than just falling back to Spark. This caused some test failures in Spark SQL tests such as the following test that now produces incorrect results. This could potentially be a bug in columnar shuffle that we have not seen before.

This is from SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate.

2025-01-08_15-36

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

andygrove avatar Jan 08 '25 22:01 andygrove

I can reproduce the issue in main so I am confused how this is currently passing when we run Spark SQL tests. Any idea @kazuyukitanimura or @parthchandra?

Here is my repro:

  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
    withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
      val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
      val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)

      val df = Seq(
        ("mithunr", Float.NaN),
        ("mithunr", nan1),
        ("mithunr", nan2),
        ("abellina", 1.0f),
        ("abellina", 2.0f)).toDF("uid", "score")

      df.write.mode(SaveMode.Overwrite).parquet("test.parquet")
    }

    withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "auto") {
      spark.read.parquet("test.parquet").createOrReplaceTempView("view")
      val df =
        spark.sql("select uid, count(distinct score) from view group by 1 order by 1 asc")
      checkSparkAnswer /*AndOperator*/ (df)
    }
  }

Produces:

== Results ==
!== Correct Answer - 2 ==                          == Spark Answer - 2 ==
 struct<uid:string,count(DISTINCT score):bigint>   struct<uid:string,count(DISTINCT score):bigint>
 [abellina,2]                                      [abellina,2]
![mithunr,1]                                       [mithunr,3]

The issue is presumably relateds to normalizing NaN and differences between Rust and JVM when comparing NaN to other NaN numbers.

andygrove avatar Jan 08 '25 23:01 andygrove

Hmm can be an aggregation bug?

kazuyukitanimura avatar Jan 09 '25 01:01 kazuyukitanimura

Here is a better repro:

  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
    val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
    val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)

    withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "auto") {
      Seq(
        ("mithunr", Float.NaN),
        ("mithunr", nan1),
        ("mithunr", nan2),
        ("abellina", 1.0f),
        ("abellina", 2.0f)).toDF("uid", "score").createOrReplaceTempView("view")
      val df =
        spark.sql("select uid, count(distinct score) from view group by 1 order by 1 asc")
      checkSparkAnswer /*AndOperator*/ (df)
    }
  }

The test passes for shuffle mode auto or native and fails for jvm.

andygrove avatar Jan 09 '25 12:01 andygrove

Query plan in auto mode passes because we fall back to Spark for most of the query:

AdaptiveSparkPlan isFinalPlan=false
+- CometSort [uid#7, count(DISTINCT score)#12L], [uid#7 ASC NULLS FIRST]
   +- CometColumnarExchange rangepartitioning(uid#7 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=528]
      +- HashAggregate(keys=[uid#7], functions=[count(distinct score#56)], output=[uid#7, count(DISTINCT score)#12L])
         +- Exchange hashpartitioning(uid#7, 10), ENSURE_REQUIREMENTS, [plan_id=523]
            +- HashAggregate(keys=[uid#7], functions=[partial_count(distinct score#56)], output=[uid#7, count#59L])
               +- HashAggregate(keys=[uid#7, score#56], functions=[], output=[uid#7, score#56])
                  +- Exchange hashpartitioning(uid#7, score#56, 10), ENSURE_REQUIREMENTS, [plan_id=519]
                     +- HashAggregate(keys=[uid#7, knownfloatingpointnormalized(normalizenanandzero(score#8)) AS score#56], functions=[], output=[uid#7, score#56])
                        +- LocalTableScan [uid#7, score#8]

Query plan in jvm mode:

   CometSort [uid#7, count(DISTINCT score)#12L], [uid#7 ASC NULLS FIRST]
   +- CometColumnarExchange rangepartitioning(uid#7 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=190]
      +- !CometHashAggregate [uid#7, count#35L], Final, [uid#7], [count(distinct score#32)]
         +- CometColumnarExchange hashpartitioning(uid#7, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=188]
            +- !CometHashAggregate [uid#7, score#32], Partial, [uid#7], [partial_count(distinct score#32)]
               +- CometHashAggregate [uid#7, score#32], [uid#7, score#32]
                  +- CometColumnarExchange hashpartitioning(uid#7, score#32, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=163]
                     +- HashAggregate(keys=[uid#7, knownfloatingpointnormalized(normalizenanandzero(score#8)) AS score#32], functions=[], output=[uid#7, score#32])
                        +- LocalTableScan [uid#7, score#8]

andygrove avatar Jan 09 '25 12:01 andygrove

Perhaps we can fall back to Spark for count distinct on floating-point columns unless some config is set. We can explain the NaN issue in the compatibility guide.

andygrove avatar Jan 09 '25 13:01 andygrove

I think that we will have to explicitly check if both sides of a floating point comparison are NaN to match Spark behavior. By definition NaN is not equal to NaN, so technically Comet is correct. Falling back to Spark for floating point can be forced by a config value but I don't think we should do that by default.

parthchandra avatar Jan 11 '25 05:01 parthchandra

Since filing this issue we discovered that Comet did not really support distinct aggregates (https://github.com/apache/datafusion-comet/issues/1260) so that may at least have been a factor that contributed to this test failing.

andygrove avatar Jan 13 '25 15:01 andygrove

This was fixed by upgrading to DF 48, which fixed a bug with counting distict NaNs in a grouped aggregate query

andygrove avatar Jun 10 '25 17:06 andygrove