[BUG] Casting FLOAT64 to DECIMAL(12,7) produces different rows from Apache Spark CPU
Update:
This bug was originally filed with the title:
[BUG] test_window_aggs_for_rows fails with rounding errors with DATAGEN_SEED=1698940723
It has since been established that the problem does not lie in window functions, or aggregations. The problem is with casting float64 to decimal, producing rounding errors.
Repro:
Seq(3527.61953125).toDF("d").repartition(1).selectExpr("CAST(d AS DECIMAL(12,7)) as_decimal").show
This produces different results on CPU and GPU. On CPU:
+------------+
| as_decimal|
+------------+
|3527.6195313|
+------------+
On GPU:
+------------+
| as_decimal|
+------------+
|3527.6195312|
+------------+
The old description continues below:
test_window_aggs_for_rows fails with DATAGEN_SEED=1698940723.
Repro:
SPARK_RAPIDS_TEST_DATAGEN_SEED=1698940723 ./run_pyspark_from_build.sh -k test_window_aggs_for_row\ and\ RepeatSeq\ and\ Integer\ and\ Decimal
[gw4]^[[36m [ 96%] ^[[0m^[[31mFAILED^[[0m ../../src/main/python/window_function_test.py::test_window_aggs_for_rows[[('a', RepeatSeq(Long)), ('b', Integer), ('c', Decimal(8,3))]-1000][DATAGEN_SEED=1698940723, IGNORE_ORDER({'local': True})]
[gw4]^[[36m [ 96%] ^[[0m^[[31mFAILED^[[0m ../../src/main/python/window_function_test.py::test_window_aggs_for_rows[[('a', RepeatSeq(Long)), ('b', Integer), ('c', Decimal(8,3))]-1g][DATAGEN_SEED=1698940723, IGNORE_ORDER({'local': True})
[2023-11-02T17:07:56.063Z] Row(sum_c_asc=Decimal('-231179.895'), max_c_desc=Decimal('-48178.972'), min_c_asc=Decimal('-99999.999'), count_1=103, count_c=97, avg_c=Decimal('-9080.9564433'), rank_val=57, dense_rank_val=57, percent_rank_val=0.5490196078431373, row_num=57)
[2023-11-02T17:07:56.064Z] Row(sum_c_asc=Decimal('-227954.458'), max_c_desc=Decimal('89948.943'), min_c_asc=Decimal('-95651.987'), count_1=103, count_c=98, avg_c=Decimal('-598.2695306'), rank_val=82, dense_rank_val=82, percent_rank_val=0.7941176470588235, row_num=82)
[2023-11-02T17:07:56.064Z] Row(sum_c_asc=Decimal('-223873.440'), max_c_desc=Decimal('48971.237'), min_c_asc=Decimal('-86568.757'), count_1=102, count_c=94, avg_c=Decimal('9590.8501170'), rank_val=88, dense_rank_val=88, percent_rank_val=0.8613861386138614, row_num=88)
[2023-11-02T17:07:56.064Z] -Row(sum_c_asc=Decimal('-220416.559'), max_c_desc=Decimal('34383.780'), min_c_asc=Decimal('-67119.262'), count_1=102, count_c=96, avg_c=Decimal('3527.6195313'), rank_val=84, dense_rank_val=84, percent_rank_val=0.8217821782178217, row_num=84)
[2023-11-02T17:07:56.064Z] +Row(sum_c_asc=Decimal('-220416.559'), max_c_desc=Decimal('34383.780'), min_c_asc=Decimal('-67119.262'), count_1=102, count_c=96, avg_c=Decimal('3527.6195312'), rank_val=84, dense_rank_val=84, percent_rank_val=0.8217821782178217, row_num=84)
[2023-11-02T17:07:56.064Z] Row(sum_c_asc=Decimal('-220028.169'), max_c_desc=Decimal('2270.621'), min_c_asc=Decimal('-91351.118'), count_1=102, count_c=97, avg_c=Decimal('-4903.2057010'), rank_val=47, dense_rank_val=47, percent_rank_val=0.45544554455445546, row_num=47)
[2023-11-02T17:07:56.064Z] Row(sum_c_asc=Decimal('-219560.674'), max_c_desc=Decimal('-32132.633'), min_c_asc=Decimal('-95941.789'), count_1=102, count_c=95, avg_c=Decimal('-1184.0629789'), rank_val=4, dense_rank_val=4, percent_rank_val=0.0297029702970297, row_num=4)
[2023-11-02T17:07:56.064Z] Row(sum_c_asc=Decimal('-219399.848'), max_c_desc=Decimal('43804.921'), min_c_
Attached herewith is a zipped Parquet file with 102 rows in a single Decimal(8,3) column.
Taking the window functions out of the equation, one sees that running AVG() produces slightly different results on Apache Spark and the plugin:
// On Spark.
scala> spark.read.parquet("/tmp/decimals_avg.parquet").select( expr("avg(c)") ).show
+------------+
| avg(c)|
+------------+
|3527.6195313|
+------------+
// On the plugin:
+------------+
| avg(c)|
+------------+
|3527.6195312|
+------------+
The behaviour seems to be consistent on Spark ~3.4.2~ 3.4.1 and Spark 3.2.3. The plugin result is off by 0.000001.
I have filed https://github.com/rapidsai/cudf/issues/14507 to track the CUDF side of this.
I was able to repro this on CUDF by writing the input as DECIMAL(12,7) to Parquet, and then running the MEAN aggregation on it. The bug I filed has the repro details.
A couple of other findings. I tried querying SUM, COUNT, AVG, etc. as follows:
select sum(c), count(c), sum(c)/count(c), avg(c), cast(avg(c) as DECIMAL(12,8)) , cast(sum(c)/count(c) as decimal(12,7)) from foobar
On CPU, those results tally up:
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|sum(c) |count(c)|(sum(c) / count(c)) |avg(c) |CAST(avg(c) AS DECIMAL(12,8))|CAST((sum(c) / count(c)) AS DECIMAL(12,7))|
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|338651.475|96 |3527.61953125000000000000000|3527.6195313|3527.61953130 |3527.6195313 |
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
Here's what one finds on GPU:
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|sum(c) |count(c)|(sum(c) / count(c)) |avg(c) |CAST(avg(c) AS DECIMAL(12,8))|CAST((sum(c) / count(c)) AS DECIMAL(12,7))|
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|338651.475|96 |3527.61953125000000000000000|3527.6195312|3527.61953120 |3527.6195313 |
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
There were some red herrings in investigating this bug.
First off, I have closed the CUDF bug (rapidsai/cudf#14507) I raised for this. CUDF is not at fault; it consistently truncates additional decimal digits.
It looked like this might have to do with GpuDecimalAverage and GpuDecimalDivide, but those operators are not involved. Consider this query:
SELECT avg(c) FROM foobar; -- c is a DECIMAL(8,3).
The execution plan indicates GpuDecimalAverage isn't invoked at all. The operation is done completely in integer/double, and the result is cast to DECIMAL(12,7):
== Optimized Logical Plan ==Aggregate [cast((avg(UnscaledValue(c#9)) / 1000.0) as decimal(12,7)) AS avg(c)#888]
+- Relation [c#9] parquet
== Physical Plan ==
GpuColumnarToRow false
+- GpuHashAggregate(keys=[], functions=[avg(UnscaledValue(c#9), DoubleType)], output=[avg(c)#888])
+- GpuShuffleCoalesce 1073741824
+- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [plan_id=1819]
+- GpuHashAggregate(keys=[], functions=[partial_avg(UnscaledValue(c#9), DoubleType)], output=[sum#892, count#893L])
+- GpuFileGpuScan parquet [c#9] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/decimals_avg.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:decimal(8,3)>
The avg() is computed on the unscaled decimal reps, and the average is divided by 1000.0. The result is cast to DECIMAL(12,7).
The avg() and the divide produce the same results on CPU and GPU. So this amounts to a casting problem.
Here's the simplest repro for the problem:
Seq(3527.61953125).toDF("d").repartition(1).selectExpr("d", "CAST(d AS DECIMAL(12,7))").show
On CPU:
+-------------+---------------+
| d|as_decimal_12_7|
+-------------+---------------+
|3527.61953125| 3527.6195313|
+-------------+---------------+
On GPU:
+-----------+---------------+
| d|as_decimal_12_7|
+-----------+---------------+
|3527.619531| 3527.6195312|
+-----------+---------------+
(Ignore how d is printed as 3527.619531 on GPU. I think that's just a string/display formatting issue. The correct/complete value is written if serialized to file.)
All mention of window functions, aggregation, GpuDecimalAverage and everything else is a distraction.
This is a performance optimization in Spark that is only supposed to happen when the value would not be impacted by potential floating point issues.
https://github.com/apache/spark/blob/9bb358b51e30b5041c0cd20e27cf995aca5ed4c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2110-L2142
So if the precision is less than 15. 15 requires 50 bits to store it and a double has 52 bits in the significant section so the result should produce the correct answer without any possibility of errors.
https://en.wikipedia.org/wiki/Double-precision_floating-point_format
So if we are getting the wrong answer back, then the problem is some where in the computation that the average was replaced with.
I am remembering more now. Converting a double to a decimal value has problems because they do it by going from a double to a string to a decimal. This is inherent in how scala does it in their BigDecimal class, and it is even a bit of magic with an implicit method that just makes it happen behind the scenes. But going from a Double to a String we cannot match what java does. It is not standard which is why we have spark.rapids.sql.castStringToFloat.enabled, which is off by default. Now I really want to understand what would happen if this optimization is disabled and what the result really would be. Is Spark right and we are wrong, or is Spark wrong and we are right?
So java does odd things when interpreting floating point values compared to the rest of the world. They try to fix the problem that some decimal values cannot be represented as floating point values.
https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#toString-double-
https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#valueOf-java.lang.String-
They are self consistent, but it is not standard. The number we are trying to convert is one of them that cannot be accurately represented as a double.
https://binaryconvert.com/result_double.html?decimal=051053050055046054049057053051049050053
So technically the Spark performance optimization is wrong in the general case. But I think how Java/Scala convert double to Strings and in turn decimal values "fixes" it.
So there are two options that we have to fix the problem ourselves. We either undo the optimization and just to the average on Decimal values, or we find a way to replicate what Java is doing. None of these are simple.
In the case of a Window operation it is not that hard to undo the optimization because it is self contained in a single exec. We can do the pattern matching see the UnscaledValue(e) being manipulated. But for a hash aggregation or a reduction it gets to be much harder. Especially if the optimization later went through other transformations related to distinct/etc it could get to be really hard to detect and undo this. We might be able to just find the final Divide by a constant followed by a cast to a Decimal and try to rewrite that part. Just because we get the rest of it right. That might be the simplest way to make this work.
Matching java code is really difficult because it is GPL Licensed so we cannot copy or even read it and try to apply it.
I think if we can try and detect the case of cast(Long/Double Scalar that is 10 ^ x as Decimal(SOMETHING, X + 4) and replace it with `CAST(LONG as Decimal(19,2))/DecimalScalar(10^X) and then have the divide output the desired precision and scale like we do for other decimal average conversions, then we are good.
But before we do any of that I want to be sure that we know what the original input long was before the divide happened and what the double was that we are dividing? I am assuming that it was (352761953125 / 10 ^ 8)
know what the original input long was before the divide happened and what the double was that we are dividing? I am assuming that it was
(352761953125/ 10 ^ 8)
Not exactly. The result of the average (of the unscaled decimals) was 3527619.53125L. That was then divided by 1000.0L to rescale it.
I can confirm here that GpuCast::castFloatsToDecimal() seems to be the one producing the differing output:
// Approach to minimize difference between CPUCast and GPUCast:
// step 1. cast input to FLOAT64 (if necessary)
// step 2. cast FLOAT64 to container DECIMAL (who keeps one more digit for rounding)
// step 3. perform HALF_UP rounding on container DECIMAL
val checkedInput = withResource(input.castTo(DType.FLOAT64)) { double =>
val roundedDouble = double.round(dt.scale, cudf.RoundMode.HALF_UP)
withResource(roundedDouble) { rounded =>
// ...
}
}
The second step (i.e. after ensuring the input is FLOAT64) is to round(RoundMode.HALF_UP). This causes the following change in the input row:
3527.61953125 -> 3527.6195312
The (final) CPU output for this row is 3527.6195313.
I've relinquished ownership on this bug. I'm not actively working on this one.
I am remembering more now. Converting a double to a decimal value has problems because they do it by going from a double to a string to a decimal. This is inherent in how scala does it in their BigDecimal class, and it is even a bit of magic with an implicit method that just makes it happen behind the scenes. But going from a Double to a String we cannot match what java does. It is not standard which is why we have
spark.rapids.sql.castStringToFloat.enabled, which is off by default. Now I really want to understand what would happen if this optimization is disabled and what the result really would be. Is Spark right and we are wrong, or is Spark wrong and we are right?
Since we have an almost match float to string kernel in jni, does that means we can also almost match float to decimal easily by follow Spark's float => string => decimal way?
@thirtiseven yes that is one possibility, but again it is almost match. That is up to management about how close is good enough.