spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[FEA] fix issues so we can remove hasNans config

Open revans2 opened this issue 3 years ago • 36 comments

Is your feature request related to a problem? Please describe. Most users do not have NaNs in their data, but a lot of users have floating point and end up with things not being put on the GPU because we cannot make this work properly.

There are several operators that have problems with NaNs are and turned off unless the user opts into having no NaNs.

  • [ ] #6164
  • [x] #5989
  • [ ] #6317
  • [x] #6315
  • [x] Pivot https://github.com/NVIDIA/spark-rapids/issues/5322
  • [x] #5446

revans2 avatar Apr 26 '22 19:04 revans2

Turns out that lists::contains is doing the right thing. Thanks @mythrocks for seeing that. Now all we have to do is remove the check from ArrayContains and add in more tests.

revans2 avatar May 05 '22 21:05 revans2

After talking to @jrhemstad I think we have a work around where we can do Min/Max aggregations without any changes to CUDF. I need to put together a prototype to verify this.

The idea is to use cudf::is_nan and any or all aggregations to be able to fix up the result afterwards.

For Max we would translate the query into essentially if(any(is_nan(input)), NaN, max(input))

For Min it gets to be more complicated because NULL > NaN so we would translate the query into if(all(is_nan(input) or is_null(input)), if (any(is_null(input)), null, NaN), min(input))

This is just off the top of my head so I need to check this. We also need to benchmark any solution so that we can see what impact this has to min/max. It might be best to do a reduction on the input data first and see if there are any NaNs in the column at all. If not, then we do it the current way, and only if there are would we do the more complicated processing.

revans2 avatar May 09 '22 22:05 revans2

One of the issues with doing this for min/max is that the any/all aggregations are only for reduction and segmented_reduce_aggregation.

https://github.com/rapidsai/cudf/blob/9ac24773d186c22ffbacbe31d92dad60ed2cdb5f/cpp/src/aggregation/aggregation.cpp#L463-L483

For all we can use a min aggregation on the boolean column instead, and for any we could use a max aggregation. But this is going to get to be really confusing and it might be best to just make a CudfAny and CudfAll aggregations to help hide this.

revans2 avatar May 11 '22 14:05 revans2

One of the issues with doing this for min/max is that the any/all aggregations are only for reduction and segmented_reduce_aggregation.

That looks like an oversight to me. Pretty sure groupby already supports these aggregations, it just needs the derived type to be added. Even if groupby doesn't support any/all, we should definitely add support.

jrhemstad avatar May 11 '22 14:05 jrhemstad

For Max we would translate the query into essentially if(any(is_nan(input)), NaN, max(input)) For Min it gets to be more complicated because NULL > NaN so we would translate the query into if(all(is_nan(input) or is_null(input)), if (any(is_null(input)), null, NaN), min(input))

That looks like an oversight to me. Pretty sure groupby already supports these aggregations, it just needs the derived type to be added. Even if groupby doesn't support any/all, we should definitely add support.

I got stuck when tring to apply this logic into CudfMax.groupByAggregate:

class CudfMax(override val dataType: DataType) extends CudfAggregate {
  override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
    (col: cudf.ColumnVector) => col.getType() match {
      case DType.FLOAT32 if (col.isNan().any().getBoolean()) => 
        cudf.Scalar.fromFloat(Float.NaN)
      case DType.FLOAT64 if (col.isNan().any().getBoolean()) => 
        cudf.Scalar.fromDouble(Double.NaN)
      case _ => col.max()
    }
  override lazy val groupByAggregate: GroupByAggregation =
    GroupByAggregation.max()
  override val name: String = "CudfMax"
}

It seems like there is no way to apply isnan, any and all in the GroupByAggregation. Is there any other function could be used, or should we modify the cudf?

HaoYang670 avatar Jun 30 '22 09:06 HaoYang670

@HaoYang670

The way we do aggregates is complicated and is different for each type of aggregations being done.

For a reduction you would put all of the code into the reductionAggregate value, like you have done.

For a group by aggregation there are multiple steps that you can override depending on what you want to do. This tries to explain the steps.

For example for max if the data is floating point I would have the input projection pull out the column we are going to do a max on and also calculate a new isNaN column on the input column. Then I would have the updateAggregates do the max on the original column and also the any on the isNan column. Then in postUpdate it can do the GpuIf(isNaNColumn, GpuLit(NaN), maxColumn)

You would need to replicate this for preMerge, mergeAggregates, and postMerge.

For windowing it is even more complicated, because instead of putting all of the stages into a single class, we thought it was cleaner to decompose the problem. So we have a GpuReplaceWindowFunction trait, that will let you switch window aggregation with a new expression that is equivalent. GpuAverage does this so you could use it as an example. You will need to write a GpuAny and a GpuAll window operation to make this work, but they should be fairly simple and you can probably reuse GpuMax and GpuMin on the boolean columns for these. Possibly just rename them so the code is simpler to read.

I hope this helps.

revans2 avatar Jun 30 '22 13:06 revans2

Thank you for your detailed explanation @revans2. I will spend some days read and learn these carefully.

HaoYang670 avatar Jul 01 '22 08:07 HaoYang670

Hi @revans2 When I tried to override the GpuMax, I find we need to create a CudfAny, just as you said:

case class GpuMax {
...
  override lazy val inputProjection: Seq[Expression] = child.dataType match {
    case FloatType | DoubleType => Seq(child, GpuIsNan(child))
    case _ => Seq(child)
  }
  override lazy val updateAggregates: Seq[CudfAggregate] = child.dataType match {
    case FloatType | DoubleType => Seq(new CudfMax(dataType), new CudfAny(???))
    case _ => Seq(new CudfMax(dataType))
  }
...
}

My questions are:

  1. Should CudfAny implement CudfAggregate?
  2. If it does, what should be the value of groupByAggregate? GroupByAggregation.any()?
  3. If it is, should we add GroupByAggregation.any() support in cudf?

Look forward to your reply.

HaoYang670 avatar Jul 07 '22 08:07 HaoYang670

@HaoYang670

Should CudfAny implement CudfAggregate?

Yes

If it does, what should be the value of groupByAggregate? GroupByAggregation.any()?

GroupByAggregation.any() is not currently in cudf of the JNI that wraps it. As @jrhemstad said it appears to be an oversight and could be added in. But in the short term you can implement CudfAny using GroupByAggregation.max(). It will work, it just will not be as clean.

If it is, should we add GroupByAggregation.any() support in cudf?

Ideally yes, but you can do that later if you want instead.

revans2 avatar Jul 07 '22 15:07 revans2

Hi @revans2 Do we need to update evaluateExpression and aggBufferAttributes of GpuMax, as we use GpuIf to check the NaN values? Here is my draft change: https://github.com/NVIDIA/spark-rapids/pull/5989/files#diff-7f3472a1577f44fa821a6156d356a32dcb0bdd4edd0873570dba59c31601528cR569-R576

But I got

Couldn't find max#90 in [if#89]

when running

val df = Seq((1.0, 1), (2.0, 2), (Double.NaN, 2)).toDF("a", "b")
df.agg(max("a")).collect

Need your help.

HaoYang670 avatar Jul 12 '22 09:07 HaoYang670

Hi @revans2!. I met some problem when updating the windowing function. Need your help. The way I did is to override the windowReplacement, just as you suggest. However, I had to use self-recursion to achieve the logic:

  override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = child.dataType match {
    case DoubleType => 
      {
        lazy val isNan = GpuWindowExpression(GpuIsNan(child), spec)
        val max = GpuWindowExpression(GpuMax(child), spec)
        GpuIf(GpuMax(isNan), GpuLiteral(Double.NaN), max)
      }
    case _ => GpuWindowExpression(GpuMax(child), spec)
  }

, which will cause stack overflow error.

Should we update the windowAggregation instead?

HaoYang670 avatar Jul 18 '22 03:07 HaoYang670

@HaoYang670

There are a few ways to avoid this. You can do what GpuAverage does and split the recursion.

  override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = child.dataType match {
    case DoubleType => 
      {
        lazy val isNan = GpuWindowExpression(GpuIsNan(child), spec)
        val max = GpuWindowExpression(GpuMax(child), spec)
        GpuIf(GpuBasicMax(isNan), GpuLiteral(Double.NaN), max)
      }
    case _ => GpuWindowExpression(GpuBasicMax(child), spec)
  }

This makes it a little complicated, but it should work.

Alternatively you could do what GpuDecimal128Sum does and implement shouldReplaceWindow.

override def shouldReplaceWindow(spec: GpuWindowSpecDefinition): Boolean =  child.dataType match {
  case DoubleType | FloatType => true
  case _ =-> false
}

I think the second one with plenty of comments would be good. Then in windowReplacement we throw an exception if it is called on anything except what we expect/want to be replaced.

revans2 avatar Jul 18 '22 14:07 revans2

Hi @revans2 ! Thank you for your reply. Unfortunately, the self-recursion still exists because we still have the expression

val max = GpuWindowExpression(GpuMax(child), spec)

where the child.dataType is Double and it will replace the windowing function again and again , I guess. Also GpuBasicMax is a subclass of GpuMax, which will leads to mutual-recursion, I guess. (I have tried to use GpuSum(isNan) instead, but not worked.)

HaoYang670 avatar Jul 19 '22 02:07 HaoYang670

So then split it or have another argument to GpuMax that tells it to not replace itself.

For GpuBasicMax there are also several options. We could have an abstract base class that both GpuMax and GpuBasicMax inherent from. Or we could create a GpuWindowMax instead of GpuBasicMax and move all of the window functionality into it. Then have GpuMax always replace itself with at least one GpuWindowMax.

revans2 avatar Jul 19 '22 15:07 revans2

I believe we should include https://github.com/NVIDIA/spark-rapids/issues/6095 in this overall ticket as a blocker.

abellina avatar Jul 26 '22 16:07 abellina

And I see that all the PRs in the issue description checklist were merged. So this should be close soon after 6095, right?

ttnghia avatar Jul 27 '22 16:07 ttnghia

I am unclear on whether https://github.com/NVIDIA/spark-rapids/pull/5989 is a requirement for this to be closed also.

abellina avatar Jul 27 '22 17:07 abellina

The PR links in the checklist is the cuDF dependency. I guess originally we wanted the cuDF to solve the NaN problem. But now, we've decided to solve it by ourselves. https://github.com/rapidsai/cudf/issues/10740#issuecomment-1144216655

HaoYang670 avatar Jul 27 '22 22:07 HaoYang670

I am unclear on whether #5989 is a requirement for this to be closed also.

Yes it is. Updated the checklist.

HaoYang670 avatar Jul 27 '22 22:07 HaoYang670

So I see now that we have a WIP PR for supporting GpuMax. How about GpuMin?

ttnghia avatar Jul 28 '22 04:07 ttnghia

So I see now that we have a WIP PR for supporting GpuMax. How about GpuMin?

I will do this after updating the GpuMax

HaoYang670 avatar Jul 28 '22 06:07 HaoYang670

For Min it gets to be more complicated because NULL > NaN so we would translate the query into if(all(is_nan(input) or is_null(input)), if (any(is_null(input)), null, NaN), min(input))

Hi @revans2 @ttnghia , Is it expected that null should be returned if all the values are nulls ? And is it the fact that that in Spark Nan > Inf and in cuDF Nan < Inf? For example, In Spark, we would expect to get:

[Some(Nan); None; Some(Infinity)].min() = Infinity
[None; None].min() = null

If they are the truth, would the expression be

If (
    All(IsNull(input)),
    Null,
    input.filter(fun n => n.notNan).min_or_default(Nan),
)

Why do we have to filter all Nans here? Because the Nan will override Infinity in the CudfMin. If there are only Nans and Infinitys in the input, we cannot get the expected min value Infinity if we don't filter all Nans.

HaoYang670 avatar Aug 08 '22 05:08 HaoYang670

@HaoYang670

Is it expected that null should be returned if all the values are nulls ?

For Spark yes

scala> val allNull = Seq[java.lang.Double](null, null).toDF
allNull: org.apache.spark.sql.DataFrame = [value: double]

scala> allNull.selectExpr("min(value)", "max(value)").show()
+----------+----------+
|min(value)|max(value)|
+----------+----------+
|      null|      null|
+----------+----------+

And is it the fact that that in Spark Nan > Inf ... ?

For Spark yes NaN > Inf, but -Inf is the lowest value.

scala> val df = Seq[java.lang.Double](Double.NaN, null, Double.PositiveInfinity, Double.NegativeInfinity, 100.0).toDF
df: org.apache.spark.sql.DataFrame = [value: double]

scala> df.selectExpr("min(value)", "max(value)").show()
+----------+----------+
|min(value)|max(value)|
+----------+----------+
| -Infinity|       NaN|
+----------+----------+

For CUDF it looks like NaN < Inf from my experimentation (running the same queries above, but on the GPU instead of the CPU and Saying that there are no NaNs).

So the situation where we would return a NaN for min is if all of the values were NaNs or Nulls, but there is at least one NaN.

Something like...

GpuIf(GpuAll(isNaN(input))), GpuLit(nan), GpuBasicMin(input))

I think.

revans2 avatar Aug 08 '22 14:08 revans2

Hi @revans2. Thank you for your reply. Here is a case input = [Infinity, Nan] which your expression cannot be simplified to the expected answer. Let's have a try:

GpuIf(GpuAll(isNaN(input))), GpuLit(nan), GpuBasicMin(input))

-> exists input = [Infinity, Nan]

GpuIf(GpuAll(isNaN([Infinity, Nan]))), GpuLit(nan), GpuBasicMin([Infinity, Nan]))

-> simplify GpuAll(isNaN([Infinity, Nan])))

GpuIf(false, GpuLit(nan), GpuBasicMin([Infinity, Nan]))

-> simplify GpuIf

GpuBasicMin([Infinity, Nan])

-> simplify

if (Infinity < Nan) Infinity else Nan

-> In cuDF Infinity > Nan

Nan

However, In Spark the expected answer is Infinity.

HaoYang670 avatar Aug 08 '22 22:08 HaoYang670

In cudf, the result of min and max involving NaN is undefined. You should mask out NaN before calling to GpubasicMin.

Maybe something like this:

  • min_val = GpubasicMin(replace NaN by null for input) <--- null will be excluded when computing min

If the input has all nulls, the result will be a null. If the input just has some or no null, the result will be non null (since null is excluded from computing min).

ttnghia avatar Aug 08 '22 22:08 ttnghia

In cudf, the result of min and max involving NaN is undefined. You should mask out NaN before calling to GpubasicMin.

Hmm🤔, If it is undefined, then we may should do things like this for GpuMin

GpuIf(
    GpuAll( isNan( input ) or isNull(input)),
    GpuIf ( GpuAny(isNan(input)), Nan, null),
    GpuBaiscMin ( input.map( fun n => if n.isNan {null} else {n}))
)

HaoYang670 avatar Aug 08 '22 22:08 HaoYang670

@HaoYang670 Yes for both GpuMin and GpuMax we are going to have to remove the NaNs before calling Min/Max on them.

@ttnghia is there some way we can make it defined? Preferably in a way that we can get the answers that we need/want. We are working around NaNs right now for Min and Max for basic float/double values. But as soon as we start to get into min and max on structs and lists that require lexicographical comparison we run into problems where we cannot play the same games that we are playing here. I realize that it will probably end up being a sorted aggregate in that case. But if feels really odd to me that cudf min/max would have undefined behavior for basic float/double nans, but defined behavior if they are under a list or a struct. It feels a little bit like we moved kind of fast here, and might need to take a step back and think through how CUDF should behave in these cases. Especially for consistency with the binop comparison operators. @rwlee you recently added in support for various binops for structs and lists. How do they handle nans?

revans2 avatar Aug 09 '22 14:08 revans2

@ttnghia is there some way we can make it defined?

I can say that it may be never....

The output of min/max is undefined because comparing with NaN (<, >) always returns false. Let's say we have a column having 1.0 and NaN. These values will be atomically updated to an output variable.

  • If the thread processing 1.0 updates max first, output will store 1.0. Then the thread processing NaN tries to update max to output: it needs to check if NaN > output, which is false due to comparing with NaN. The final result will be 1.0.
  • If the thread processing NaN updates max first, output will store NaN. The threads processing 1.0 will then check if NaN > output, which is false. This time, the final result will be NaN.

Since the order of running threads is undefined, the result is also undefined. We may have the output is 1.0 or NaN inconsistently.

Workaround in the plugin with NaN in nested types should be very similar. Instead of checking if each row is NaN, you will check if each row contains NaN at any nested level. There may be more step needed but it should not be very complicated.

ttnghia avatar Aug 09 '22 16:08 ttnghia

BTW, I'm not sure if the bin op comparison for nested types can produce consistent results, @rwlee?

ttnghia avatar Aug 09 '22 16:08 ttnghia

I realize that it currently is undefined because of how the comparison is defined. Talking to others on the cudf team they were sure that the lexicographical compare code was per type replaceable so we could do a check that wasn't just a > b, but could also include things like is_nan(a) || is_nan(b).

revans2 avatar Aug 09 '22 16:08 revans2