spark-rapids
spark-rapids copied to clipboard
[FEA] fix issues so we can remove hasNans config
Is your feature request related to a problem? Please describe. Most users do not have NaNs in their data, but a lot of users have floating point and end up with things not being put on the GPU because we cannot make this work properly.
There are several operators that have problems with NaNs are and turned off unless the user opts into having no NaNs.
- [ ] #6164
- [x] #5989
- [ ] #6317
- [x] #6315
- [x] Pivot https://github.com/NVIDIA/spark-rapids/issues/5322
- [x] #5446
Turns out that lists::contains is doing the right thing. Thanks @mythrocks for seeing that. Now all we have to do is remove the check from ArrayContains and add in more tests.
After talking to @jrhemstad I think we have a work around where we can do Min/Max aggregations without any changes to CUDF. I need to put together a prototype to verify this.
The idea is to use cudf::is_nan and any or all aggregations to be able to fix up the result afterwards.
For Max we would translate the query into essentially if(any(is_nan(input)), NaN, max(input))
For Min it gets to be more complicated because NULL > NaN so we would translate the query into if(all(is_nan(input) or is_null(input)), if (any(is_null(input)), null, NaN), min(input))
This is just off the top of my head so I need to check this. We also need to benchmark any solution so that we can see what impact this has to min/max. It might be best to do a reduction on the input data first and see if there are any NaNs in the column at all. If not, then we do it the current way, and only if there are would we do the more complicated processing.
One of the issues with doing this for min/max is that the any/all aggregations are only for reduction and segmented_reduce_aggregation.
https://github.com/rapidsai/cudf/blob/9ac24773d186c22ffbacbe31d92dad60ed2cdb5f/cpp/src/aggregation/aggregation.cpp#L463-L483
For all we can use a min aggregation on the boolean column instead, and for any we could use a max aggregation. But this is going to get to be really confusing and it might be best to just make a CudfAny and CudfAll aggregations to help hide this.
One of the issues with doing this for min/max is that the any/all aggregations are only for reduction and segmented_reduce_aggregation.
That looks like an oversight to me. Pretty sure groupby already supports these aggregations, it just needs the derived type to be added. Even if groupby doesn't support any/all, we should definitely add support.
For Max we would translate the query into essentially if(any(is_nan(input)), NaN, max(input)) For Min it gets to be more complicated because NULL > NaN so we would translate the query into if(all(is_nan(input) or is_null(input)), if (any(is_null(input)), null, NaN), min(input))
That looks like an oversight to me. Pretty sure groupby already supports these aggregations, it just needs the derived type to be added. Even if groupby doesn't support any/all, we should definitely add support.
I got stuck when tring to apply this logic into CudfMax.groupByAggregate:
class CudfMax(override val dataType: DataType) extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(col: cudf.ColumnVector) => col.getType() match {
case DType.FLOAT32 if (col.isNan().any().getBoolean()) =>
cudf.Scalar.fromFloat(Float.NaN)
case DType.FLOAT64 if (col.isNan().any().getBoolean()) =>
cudf.Scalar.fromDouble(Double.NaN)
case _ => col.max()
}
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.max()
override val name: String = "CudfMax"
}
It seems like there is no way to apply isnan, any and all in the GroupByAggregation.
Is there any other function could be used, or should we modify the cudf?
@HaoYang670
The way we do aggregates is complicated and is different for each type of aggregations being done.
For a reduction you would put all of the code into the reductionAggregate value, like you have done.
For a group by aggregation there are multiple steps that you can override depending on what you want to do. This tries to explain the steps.
For example for max if the data is floating point I would have the input projection pull out the column we are going to do a max on and also calculate a new isNaN column on the input column. Then I would have the updateAggregates do the max on the original column and also the any on the isNan column. Then in postUpdate it can do the GpuIf(isNaNColumn, GpuLit(NaN), maxColumn)
You would need to replicate this for preMerge, mergeAggregates, and postMerge.
For windowing it is even more complicated, because instead of putting all of the stages into a single class, we thought it was cleaner to decompose the problem. So we have a GpuReplaceWindowFunction trait, that will let you switch window aggregation with a new expression that is equivalent. GpuAverage does this so you could use it as an example. You will need to write a GpuAny and a GpuAll window operation to make this work, but they should be fairly simple and you can probably reuse GpuMax and GpuMin on the boolean columns for these. Possibly just rename them so the code is simpler to read.
I hope this helps.
Thank you for your detailed explanation @revans2. I will spend some days read and learn these carefully.
Hi @revans2
When I tried to override the GpuMax, I find we need to create a CudfAny, just as you said:
case class GpuMax {
...
override lazy val inputProjection: Seq[Expression] = child.dataType match {
case FloatType | DoubleType => Seq(child, GpuIsNan(child))
case _ => Seq(child)
}
override lazy val updateAggregates: Seq[CudfAggregate] = child.dataType match {
case FloatType | DoubleType => Seq(new CudfMax(dataType), new CudfAny(???))
case _ => Seq(new CudfMax(dataType))
}
...
}
My questions are:
- Should
CudfAnyimplementCudfAggregate? - If it does, what should be the value of
groupByAggregate?GroupByAggregation.any()? - If it is, should we add
GroupByAggregation.any()support in cudf?
Look forward to your reply.
@HaoYang670
Should CudfAny implement CudfAggregate?
Yes
If it does, what should be the value of groupByAggregate? GroupByAggregation.any()?
GroupByAggregation.any() is not currently in cudf of the JNI that wraps it. As @jrhemstad said it appears to be an oversight and could be added in. But in the short term you can implement CudfAny using GroupByAggregation.max(). It will work, it just will not be as clean.
If it is, should we add GroupByAggregation.any() support in cudf?
Ideally yes, but you can do that later if you want instead.
Hi @revans2
Do we need to update evaluateExpression and aggBufferAttributes of GpuMax, as we use GpuIf to check the NaN values?
Here is my draft change:
https://github.com/NVIDIA/spark-rapids/pull/5989/files#diff-7f3472a1577f44fa821a6156d356a32dcb0bdd4edd0873570dba59c31601528cR569-R576
But I got
Couldn't find max#90 in [if#89]
when running
val df = Seq((1.0, 1), (2.0, 2), (Double.NaN, 2)).toDF("a", "b")
df.agg(max("a")).collect
Need your help.
Hi @revans2!. I met some problem when updating the windowing function. Need your help.
The way I did is to override the windowReplacement, just as you suggest.
However, I had to use self-recursion to achieve the logic:
override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = child.dataType match {
case DoubleType =>
{
lazy val isNan = GpuWindowExpression(GpuIsNan(child), spec)
val max = GpuWindowExpression(GpuMax(child), spec)
GpuIf(GpuMax(isNan), GpuLiteral(Double.NaN), max)
}
case _ => GpuWindowExpression(GpuMax(child), spec)
}
, which will cause stack overflow error.
Should we update the windowAggregation instead?
@HaoYang670
There are a few ways to avoid this. You can do what GpuAverage does and split the recursion.
override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = child.dataType match {
case DoubleType =>
{
lazy val isNan = GpuWindowExpression(GpuIsNan(child), spec)
val max = GpuWindowExpression(GpuMax(child), spec)
GpuIf(GpuBasicMax(isNan), GpuLiteral(Double.NaN), max)
}
case _ => GpuWindowExpression(GpuBasicMax(child), spec)
}
This makes it a little complicated, but it should work.
Alternatively you could do what GpuDecimal128Sum does and implement shouldReplaceWindow.
override def shouldReplaceWindow(spec: GpuWindowSpecDefinition): Boolean = child.dataType match {
case DoubleType | FloatType => true
case _ =-> false
}
I think the second one with plenty of comments would be good. Then in windowReplacement we throw an exception if it is called on anything except what we expect/want to be replaced.
Hi @revans2 ! Thank you for your reply. Unfortunately, the self-recursion still exists because we still have the expression
val max = GpuWindowExpression(GpuMax(child), spec)
where the child.dataType is Double and it will replace the windowing function again and again , I guess.
Also GpuBasicMax is a subclass of GpuMax, which will leads to mutual-recursion, I guess. (I have tried to use GpuSum(isNan) instead, but not worked.)
So then split it or have another argument to GpuMax that tells it to not replace itself.
For GpuBasicMax there are also several options. We could have an abstract base class that both GpuMax and GpuBasicMax inherent from. Or we could create a GpuWindowMax instead of GpuBasicMax and move all of the window functionality into it. Then have GpuMax always replace itself with at least one GpuWindowMax.
I believe we should include https://github.com/NVIDIA/spark-rapids/issues/6095 in this overall ticket as a blocker.
And I see that all the PRs in the issue description checklist were merged. So this should be close soon after 6095, right?
I am unclear on whether https://github.com/NVIDIA/spark-rapids/pull/5989 is a requirement for this to be closed also.
The PR links in the checklist is the cuDF dependency. I guess originally we wanted the cuDF to solve the NaN problem. But now, we've decided to solve it by ourselves. https://github.com/rapidsai/cudf/issues/10740#issuecomment-1144216655
I am unclear on whether #5989 is a requirement for this to be closed also.
Yes it is. Updated the checklist.
So I see now that we have a WIP PR for supporting GpuMax. How about GpuMin?
So I see now that we have a WIP PR for supporting
GpuMax. How aboutGpuMin?
I will do this after updating the GpuMax
For Min it gets to be more complicated because NULL > NaN so we would translate the query into if(all(is_nan(input) or is_null(input)), if (any(is_null(input)), null, NaN), min(input))
Hi @revans2 @ttnghia , Is it expected that null should be returned if all the values are nulls ?
And is it the fact that that in Spark Nan > Inf and in cuDF Nan < Inf?
For example, In Spark, we would expect to get:
[Some(Nan); None; Some(Infinity)].min() = Infinity
[None; None].min() = null
If they are the truth, would the expression be
If (
All(IsNull(input)),
Null,
input.filter(fun n => n.notNan).min_or_default(Nan),
)
Why do we have to filter all Nans here? Because the Nan will override Infinity in the CudfMin. If there are only Nans and Infinitys in the input, we cannot get the expected min value Infinity if we don't filter all Nans.
@HaoYang670
Is it expected that null should be returned if all the values are nulls ?
For Spark yes
scala> val allNull = Seq[java.lang.Double](null, null).toDF
allNull: org.apache.spark.sql.DataFrame = [value: double]
scala> allNull.selectExpr("min(value)", "max(value)").show()
+----------+----------+
|min(value)|max(value)|
+----------+----------+
| null| null|
+----------+----------+
And is it the fact that that in Spark Nan > Inf ... ?
For Spark yes NaN > Inf, but -Inf is the lowest value.
scala> val df = Seq[java.lang.Double](Double.NaN, null, Double.PositiveInfinity, Double.NegativeInfinity, 100.0).toDF
df: org.apache.spark.sql.DataFrame = [value: double]
scala> df.selectExpr("min(value)", "max(value)").show()
+----------+----------+
|min(value)|max(value)|
+----------+----------+
| -Infinity| NaN|
+----------+----------+
For CUDF it looks like NaN < Inf from my experimentation (running the same queries above, but on the GPU instead of the CPU and Saying that there are no NaNs).
So the situation where we would return a NaN for min is if all of the values were NaNs or Nulls, but there is at least one NaN.
Something like...
GpuIf(GpuAll(isNaN(input))), GpuLit(nan), GpuBasicMin(input))
I think.
Hi @revans2. Thank you for your reply. Here is a case input = [Infinity, Nan] which your expression cannot be simplified to the expected answer. Let's have a try:
GpuIf(GpuAll(isNaN(input))), GpuLit(nan), GpuBasicMin(input))
-> exists input = [Infinity, Nan]
GpuIf(GpuAll(isNaN([Infinity, Nan]))), GpuLit(nan), GpuBasicMin([Infinity, Nan]))
-> simplify GpuAll(isNaN([Infinity, Nan])))
GpuIf(false, GpuLit(nan), GpuBasicMin([Infinity, Nan]))
-> simplify GpuIf
GpuBasicMin([Infinity, Nan])
-> simplify
if (Infinity < Nan) Infinity else Nan
-> In cuDF Infinity > Nan
Nan
However, In Spark the expected answer is Infinity.
In cudf, the result of min and max involving NaN is undefined.
You should mask out NaN before calling to GpubasicMin.
Maybe something like this:
min_val = GpubasicMin(replace NaN by null for input)<---nullwill be excluded when computingmin
If the input has all nulls, the result will be a null. If the input just has some or no null, the result will be non null (since null is excluded from computing min).
In cudf, the result of
minandmaxinvolvingNaNis undefined. You should mask outNaNbefore calling toGpubasicMin.
Hmm🤔, If it is undefined, then we may should do things like this for GpuMin
GpuIf(
GpuAll( isNan( input ) or isNull(input)),
GpuIf ( GpuAny(isNan(input)), Nan, null),
GpuBaiscMin ( input.map( fun n => if n.isNan {null} else {n}))
)
@HaoYang670 Yes for both GpuMin and GpuMax we are going to have to remove the NaNs before calling Min/Max on them.
@ttnghia is there some way we can make it defined? Preferably in a way that we can get the answers that we need/want. We are working around NaNs right now for Min and Max for basic float/double values. But as soon as we start to get into min and max on structs and lists that require lexicographical comparison we run into problems where we cannot play the same games that we are playing here. I realize that it will probably end up being a sorted aggregate in that case. But if feels really odd to me that cudf min/max would have undefined behavior for basic float/double nans, but defined behavior if they are under a list or a struct. It feels a little bit like we moved kind of fast here, and might need to take a step back and think through how CUDF should behave in these cases. Especially for consistency with the binop comparison operators. @rwlee you recently added in support for various binops for structs and lists. How do they handle nans?
@ttnghia is there some way we can make it defined?
I can say that it may be never....
The output of min/max is undefined because comparing with NaN (<, >) always returns false. Let's say we have a column having 1.0 and NaN. These values will be atomically updated to an output variable.
- If the thread processing
1.0updatesmaxfirst,outputwill store1.0. Then the thread processingNaNtries to updatemaxtooutput: it needs to check ifNaN > output, which isfalsedue to comparing withNaN. The final result will be1.0. - If the thread processing
NaNupdatesmaxfirst,outputwill storeNaN. The threads processing1.0will then check ifNaN > output, which isfalse. This time, the final result will beNaN.
Since the order of running threads is undefined, the result is also undefined. We may have the output is 1.0 or NaN inconsistently.
Workaround in the plugin with NaN in nested types should be very similar. Instead of checking if each row is NaN, you will check if each row contains NaN at any nested level. There may be more step needed but it should not be very complicated.
BTW, I'm not sure if the bin op comparison for nested types can produce consistent results, @rwlee?
I realize that it currently is undefined because of how the comparison is defined. Talking to others on the cudf team they were sure that the lexicographical compare code was per type replaceable so we could do a check that wasn't just a > b, but could also include things like is_nan(a) || is_nan(b).