cudf icon indicating copy to clipboard operation
cudf copied to clipboard

[FEA] Support for approx_count_distinct

Open andygrove opened this issue 2 years ago • 29 comments

Is your feature request related to a problem? Please describe. I would like to be able to implement a GPU version of Spark's approx_count_distinct function, which uses the HyperLogLog++ cardinality estimation algorithm.

cuDF does not appear to provide any features today that would allow me to do this.

Describe the solution you'd like I would like cuDF to implement this capability and expose an API that is likely similar to approx_percentile in that there would be methods both for computing and merging the underlying data structure, whether that is based on HyperLogLog++ or some other algorithm.

Describe alternatives you've considered None

Additional context None

andygrove avatar Apr 13 '22 18:04 andygrove

I found this article to be an excellent description of the algorithm: https://towardsdatascience.com/hyperloglog-a-simple-but-powerful-algorithm-for-data-scientists-aed50fe47869

Seems fairly straightforward to implement in parallel too. Compute the hash value for each row, then compute a histogram using the upper p bits for m buckets (m == 2^p) and using __clzll to get the count of consecutive 0s.

Then a merge step to compute the average of the values in the histogram taking into account discarding outliers. ~The histogram is small enough it probably makes sense to just copy it to the CPU and do this step serially.~ I changed my mind, I think it probably makes more sense to just do the merging on a single block.

jrhemstad avatar Apr 13 '22 19:04 jrhemstad

The histogram is small enough it probably makes sense to just copy it to the CPU and do this step serially.

That is true if we are doing a reduction. Spark supports this for groupby aggregations and technically for window operations too, but I don't think we need it there just yet. Although Spark does not support distict window functions so it might be used just because it is the only option. For a groupby we could be doing a lot of smaller distinct counts so having the ability to do that on the GPU would be good.

@andygrove could you please feel in more details of what we are going to need here?

Is this just for a reduction or is it for a group by aggregation? Do we need to include windowing support? Because we do distributed group by and reductions, with two phases to build the aggregation, what operations/aggregations do we need? What are the inputs and the outputs to each aggregation/operation? It looks like Spark supports setting a scalar relative standard deviation parameter that directly impacts p. It also looks like Spark expects to shuffle the intermediate results (a histogram) as a struct of longs, and the number of entries in that struct is p^2/6 or something odd like that.

We need a lot more details so we can work with CUDF to hammer out exactly what this is going to look like.

revans2 avatar Apr 14 '22 14:04 revans2

One note about the implementation. From what I've read, most implementations use some small set of bits per histogram bucket (like 5 bits to store the per-bucket count). The only way I'd be comfortable with returning something like that to a user is in an opaque type. Otherwise it should just be a standard integer type per bucket (I'm guessing using a 4B int per bucket will be better for performance since this is going to require atomics to merge histograms).

Because we do distributed group by and reductions, with two phases to build the aggregation, what operations/aggregations do we need?

I spent a little time thinking about this, but indeed we'd need more concrete requirements to say definitively what it should look like.

For the "reduction"/column level API I envision a two phase API where the first returns an opaque-ish object holding the histogram and the second takes n histograms and merges them to return the approximate cardinality. Spitballing...

namespace cudf::hll { // namespace for hyperloglog functionality? 

// Does this need to work on tables? or just columns?
// This could return an opaque `hyperloglog_histogram` object, or it could just return a regular `column`?
compute_histogram(table_view or column_view?, size_type num_buckets, cudf::hash_function hash)

// Depending on what `compute_histogram` returns influences what the input to this function would be, but essentially is just
// a list of histograms of the same size. Should probably be some strongly typed thing that enforces requirements
size_t approx_distinct_count( table_view histograms, float outlier_threshold = 0.7, float bias_correction =  0.77351)

For the groupby case we can do something like with tdigest where there's a hll_histogram aggregation and merge_hll_histogram aggregation.

Spark expects to shuffle the intermediate results (a histogram) as a struct of longs

I definitely don't think we should do it this way. Storing the histogram in a literal struct<int, int, int, ...., int> column would be a bad idea to have 1 child per element in the histogram. For the single column case I imagine the histogram could just be a regular int32 column. For the groupby case, I'd think a LIST<int32> column would make the most sense.

jrhemstad avatar Apr 14 '22 15:04 jrhemstad

From what I saw in the code Spark is using a long as the word size, but appears to have 6 bits per histogram bucket.

https://github.com/apache/spark/blob/4835946de2ef71b176da5106e9b6c2706e182722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala#L271

https://github.com/apache/spark/blob/4835946de2ef71b176da5106e9b6c2706e182722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala#L78-L85

I think what we want is a compute_hyperloglogplusplus_histogram aggregation and a merge_hyperloglogplusplus_histogram aggregation. I am not tied to the names and if merging histograms is generic enough we can drop the hyperloglog... part. We would want this because it would tie into all of the existing code that we already have for doing these aggregations. If it is different then we will have to create an entirely new special cased code path. For reductions this is not a big deal, but for group by aggregations it would mean we would have to do a join afterwards to line up the different groups with each other again, and I really don't want to do that.

So that means the histogram would be computed on a single input column because that is how reductions and group by aggregations tend to work. The output would need to be a cudf::column for a group by or a cudf::scalar for a reduction just like the existing APIs. I would want the type returned to be the same for both the reduction and the group_by because it makes life simpler to have common code to handle moving it around/etc. a LIST<INT32> would be fine. I would also be fine with a LIST<INT8> because Spark uses an int6? I think? for the bucket.

But I am no expert on this. Ideally we would like to be able to match the format that Spark is using as the result between the compute_histogram aggregate and the merge_histogram aggregate because it lets us not worry about making sure that the first aggregation and the merge aggregation are both of the GPU. But looking at how complex this is I have my doubts we should even try to do that. And we have code to try to make sure that they both are on the GPU or none of them are, but it is a bit brittle.

revans2 avatar Apr 15 '22 13:04 revans2

@revans2 I don't think anything you said is controversial or incompatible with what I was thinking. The only piece I don't like is about returning the histogram in an int64 where it's really 6 bits per histogram bucket. That would mean the result isn't actually a INT64 column, but some special column with special meaning that isn't reflected anywhere in it's type or metadata. If it's going to be a non-opaque object, it has to at least be an INT8 per bucket.

To summarize:

  • New aggregations
    • hll::compute_histogram
      • cudf::reduce
        • Input: column_view of any(?) type
        • Output: cudf::scalar with LIST<int8> (i.e., a single list of 8 bit integers that is the histogram)
      • cudf::groupby
        • Input: column_view of any(?) type
        • Output: column with LIST<int8>, histogram/list per group
    • hll::merge_histogram
      • cudf::reduce
        • Input: column with LIST<INT8>, i.e., all the histograms to merge
          • Each histogram must be the same size
        • Output: scalar with LIST<INT8>
      • cudf::groupby
        • Input: column with LIST<INT8>
          • Each histogram must be the same size
        • Output: coumn with LIST<INT8>
  • New hll::histogram_column_view
    • Similar to tdigest_column_view provides facade over LIST<INT8> column
    • Each histogram must be the same size
    • This should really be a fixed-size list column, but we don't have that
    • Open question if this ctor should enforce each list being the same size (requires a kernel)
  • New function
    • hll::approx_distinct_count
      • Input: hll::histogram_column_view
      • Output: column with INT64 of the approximate cardinality per histogram in the input column

jrhemstad avatar Apr 15 '22 15:04 jrhemstad

That sounds great.

Open question if this ctor should enforce each list being the same size (requires a kernel)

I thought in general if it made the code slower you wouldn't do the check. That it would be a separate method to do the check. I am fine with the separate method, but honestly it is not hard to do that check with existing code.

revans2 avatar Apr 15 '22 17:04 revans2

Also tagging @vinaydes, since we too have been looking into using hll for implementing a fast FP-growth like algo for rule mining.

teju85 avatar May 04 '22 16:05 teju85

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

github-actions[bot] avatar Jun 30 '22 21:06 github-actions[bot]

This is still relevant

revans2 avatar Jul 01 '22 20:07 revans2

I'm going to start working on it from next week, targeting for 22.10 release.

ttnghia avatar Jul 01 '22 20:07 ttnghia

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

github-actions[bot] avatar Jul 31 '22 22:07 github-actions[bot]

This is still relevant

revans2 avatar Aug 03 '22 14:08 revans2

Has there been any progress on this issue? I am planning to implement an HLL-backed version of cuco::util::approx_distinct_count, which can potentially be used in cudf.

sleeepyjack avatar Jul 09 '23 02:07 sleeepyjack

Hi! We deprioritized this for something else thus it was paused for a while. Having cuco::util::approx_distinct_count would be a big foundation for it. Thanks in advance!

ttnghia avatar Jul 10 '23 04:07 ttnghia

Yes, having an implementation in cuco would be fantastic. @sleeepyjack if you want to discuss design / implementation, I would be more than happy to offer input from our past conversations about this topic and/or PR review.

bdice avatar Jul 10 '23 12:07 bdice

@bdice Good idea. Let me read through the relevant papers first. I guess the crucial part I have to wrap my head around is how to represent the counters and merge them in parallel. I'll ping you on Slack once I'm prepared so we can schedule a 1:1.

sleeepyjack avatar Jul 12 '23 01:07 sleeepyjack

FYI cuco::distinct_count_estimator (NVIDIA/cuCollections#429) has been merged so we can start implementing and exposing this feature in cudf.

sleeepyjack avatar Apr 04 '24 01:04 sleeepyjack