count distinct on NaN produces incorrect results
Describe the bug
In https://github.com/apache/datafusion-comet/pull/1209 we now fall back to columnar shuffle in some cases where native shuffle is not supported, rather than just falling back to Spark. This caused some test failures in Spark SQL tests such as the following test that now produces incorrect results. This could potentially be a bug in columnar shuffle that we have not seen before.
This is from SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate.
Steps to reproduce
No response
Expected behavior
No response
Additional context
No response
I can reproduce the issue in main so I am confused how this is currently passing when we run Spark SQL tests. Any idea @kazuyukitanimura or @parthchandra?
Here is my repro:
test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
val df = Seq(
("mithunr", Float.NaN),
("mithunr", nan1),
("mithunr", nan2),
("abellina", 1.0f),
("abellina", 2.0f)).toDF("uid", "score")
df.write.mode(SaveMode.Overwrite).parquet("test.parquet")
}
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "auto") {
spark.read.parquet("test.parquet").createOrReplaceTempView("view")
val df =
spark.sql("select uid, count(distinct score) from view group by 1 order by 1 asc")
checkSparkAnswer /*AndOperator*/ (df)
}
}
Produces:
== Results ==
!== Correct Answer - 2 == == Spark Answer - 2 ==
struct<uid:string,count(DISTINCT score):bigint> struct<uid:string,count(DISTINCT score):bigint>
[abellina,2] [abellina,2]
![mithunr,1] [mithunr,3]
The issue is presumably relateds to normalizing NaN and differences between Rust and JVM when comparing NaN to other NaN numbers.
Hmm can be an aggregation bug?
Here is a better repro:
test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "auto") {
Seq(
("mithunr", Float.NaN),
("mithunr", nan1),
("mithunr", nan2),
("abellina", 1.0f),
("abellina", 2.0f)).toDF("uid", "score").createOrReplaceTempView("view")
val df =
spark.sql("select uid, count(distinct score) from view group by 1 order by 1 asc")
checkSparkAnswer /*AndOperator*/ (df)
}
}
The test passes for shuffle mode auto or native and fails for jvm.
Query plan in auto mode passes because we fall back to Spark for most of the query:
AdaptiveSparkPlan isFinalPlan=false
+- CometSort [uid#7, count(DISTINCT score)#12L], [uid#7 ASC NULLS FIRST]
+- CometColumnarExchange rangepartitioning(uid#7 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=528]
+- HashAggregate(keys=[uid#7], functions=[count(distinct score#56)], output=[uid#7, count(DISTINCT score)#12L])
+- Exchange hashpartitioning(uid#7, 10), ENSURE_REQUIREMENTS, [plan_id=523]
+- HashAggregate(keys=[uid#7], functions=[partial_count(distinct score#56)], output=[uid#7, count#59L])
+- HashAggregate(keys=[uid#7, score#56], functions=[], output=[uid#7, score#56])
+- Exchange hashpartitioning(uid#7, score#56, 10), ENSURE_REQUIREMENTS, [plan_id=519]
+- HashAggregate(keys=[uid#7, knownfloatingpointnormalized(normalizenanandzero(score#8)) AS score#56], functions=[], output=[uid#7, score#56])
+- LocalTableScan [uid#7, score#8]
Query plan in jvm mode:
CometSort [uid#7, count(DISTINCT score)#12L], [uid#7 ASC NULLS FIRST]
+- CometColumnarExchange rangepartitioning(uid#7 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=190]
+- !CometHashAggregate [uid#7, count#35L], Final, [uid#7], [count(distinct score#32)]
+- CometColumnarExchange hashpartitioning(uid#7, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=188]
+- !CometHashAggregate [uid#7, score#32], Partial, [uid#7], [partial_count(distinct score#32)]
+- CometHashAggregate [uid#7, score#32], [uid#7, score#32]
+- CometColumnarExchange hashpartitioning(uid#7, score#32, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=163]
+- HashAggregate(keys=[uid#7, knownfloatingpointnormalized(normalizenanandzero(score#8)) AS score#32], functions=[], output=[uid#7, score#32])
+- LocalTableScan [uid#7, score#8]
Perhaps we can fall back to Spark for count distinct on floating-point columns unless some config is set. We can explain the NaN issue in the compatibility guide.
I think that we will have to explicitly check if both sides of a floating point comparison are NaN to match Spark behavior. By definition NaN is not equal to NaN, so technically Comet is correct. Falling back to Spark for floating point can be forced by a config value but I don't think we should do that by default.
Since filing this issue we discovered that Comet did not really support distinct aggregates (https://github.com/apache/datafusion-comet/issues/1260) so that may at least have been a factor that contributed to this test failing.
This was fixed by upgrading to DF 48, which fixed a bug with counting distict NaNs in a grouped aggregate query