cudf
cudf copied to clipboard
[FEA] Support for approx_count_distinct
Is your feature request related to a problem? Please describe.
I would like to be able to implement a GPU version of Spark's approx_count_distinct
function, which uses the HyperLogLog++ cardinality estimation algorithm.
cuDF does not appear to provide any features today that would allow me to do this.
Describe the solution you'd like
I would like cuDF to implement this capability and expose an API that is likely similar to approx_percentile
in that there would be methods both for computing and merging the underlying data structure, whether that is based on HyperLogLog++ or some other algorithm.
Describe alternatives you've considered None
Additional context None
I found this article to be an excellent description of the algorithm: https://towardsdatascience.com/hyperloglog-a-simple-but-powerful-algorithm-for-data-scientists-aed50fe47869
Seems fairly straightforward to implement in parallel too. Compute the hash value for each row, then compute a histogram using the upper p
bits for m
buckets (m == 2^p
) and using __clzll
to get the count of consecutive 0s.
Then a merge step to compute the average of the values in the histogram taking into account discarding outliers. ~The histogram is small enough it probably makes sense to just copy it to the CPU and do this step serially.~ I changed my mind, I think it probably makes more sense to just do the merging on a single block.
The histogram is small enough it probably makes sense to just copy it to the CPU and do this step serially.
That is true if we are doing a reduction. Spark supports this for groupby aggregations and technically for window operations too, but I don't think we need it there just yet. Although Spark does not support distict window functions so it might be used just because it is the only option. For a groupby we could be doing a lot of smaller distinct counts so having the ability to do that on the GPU would be good.
@andygrove could you please feel in more details of what we are going to need here?
Is this just for a reduction or is it for a group by aggregation? Do we need to include windowing support? Because we do distributed group by and reductions, with two phases to build the aggregation, what operations/aggregations do we need? What are the inputs and the outputs to each aggregation/operation? It looks like Spark supports setting a scalar relative standard deviation
parameter that directly impacts p
. It also looks like Spark expects to shuffle the intermediate results (a histogram) as a struct of longs, and the number of entries in that struct is p^2/6
or something odd like that.
We need a lot more details so we can work with CUDF to hammer out exactly what this is going to look like.
One note about the implementation. From what I've read, most implementations use some small set of bits per histogram bucket (like 5 bits to store the per-bucket count). The only way I'd be comfortable with returning something like that to a user is in an opaque type. Otherwise it should just be a standard integer type per bucket (I'm guessing using a 4B int per bucket will be better for performance since this is going to require atomics to merge histograms).
Because we do distributed group by and reductions, with two phases to build the aggregation, what operations/aggregations do we need?
I spent a little time thinking about this, but indeed we'd need more concrete requirements to say definitively what it should look like.
For the "reduction"/column level API I envision a two phase API where the first returns an opaque-ish object holding the histogram and the second takes n
histograms and merges them to return the approximate cardinality. Spitballing...
namespace cudf::hll { // namespace for hyperloglog functionality?
// Does this need to work on tables? or just columns?
// This could return an opaque `hyperloglog_histogram` object, or it could just return a regular `column`?
compute_histogram(table_view or column_view?, size_type num_buckets, cudf::hash_function hash)
// Depending on what `compute_histogram` returns influences what the input to this function would be, but essentially is just
// a list of histograms of the same size. Should probably be some strongly typed thing that enforces requirements
size_t approx_distinct_count( table_view histograms, float outlier_threshold = 0.7, float bias_correction = 0.77351)
For the groupby case we can do something like with tdigest where there's a hll_histogram
aggregation and merge_hll_histogram
aggregation.
Spark expects to shuffle the intermediate results (a histogram) as a struct of longs
I definitely don't think we should do it this way. Storing the histogram in a literal struct<int, int, int, ...., int>
column would be a bad idea to have 1 child per element in the histogram. For the single column case I imagine the histogram could just be a regular int32
column. For the groupby case, I'd think a LIST<int32>
column would make the most sense.
From what I saw in the code Spark is using a long as the word size, but appears to have 6 bits per histogram bucket.
https://github.com/apache/spark/blob/4835946de2ef71b176da5106e9b6c2706e182722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala#L271
https://github.com/apache/spark/blob/4835946de2ef71b176da5106e9b6c2706e182722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala#L78-L85
I think what we want is a compute_hyperloglogplusplus_histogram
aggregation and a merge_hyperloglogplusplus_histogram
aggregation. I am not tied to the names and if merging histograms is generic enough we can drop the hyperloglog... part. We would want this because it would tie into all of the existing code that we already have for doing these aggregations. If it is different then we will have to create an entirely new special cased code path. For reductions this is not a big deal, but for group by aggregations it would mean we would have to do a join afterwards to line up the different groups with each other again, and I really don't want to do that.
So that means the histogram would be computed on a single input column because that is how reductions and group by aggregations tend to work. The output would need to be a cudf::column for a group by or a cudf::scalar for a reduction just like the existing APIs. I would want the type returned to be the same for both the reduction and the group_by because it makes life simpler to have common code to handle moving it around/etc. a LIST<INT32> would be fine. I would also be fine with a LIST<INT8> because Spark uses an int6? I think? for the bucket.
But I am no expert on this. Ideally we would like to be able to match the format that Spark is using as the result between the compute_histogram
aggregate and the merge_histogram
aggregate because it lets us not worry about making sure that the first aggregation and the merge aggregation are both of the GPU. But looking at how complex this is I have my doubts we should even try to do that. And we have code to try to make sure that they both are on the GPU or none of them are, but it is a bit brittle.
@revans2 I don't think anything you said is controversial or incompatible with what I was thinking. The only piece I don't like is about returning the histogram in an int64
where it's really 6 bits per histogram bucket. That would mean the result isn't actually a INT64
column, but some special column with special meaning that isn't reflected anywhere in it's type or metadata. If it's going to be a non-opaque object, it has to at least be an INT8
per bucket.
To summarize:
- New aggregations
-
hll::compute_histogram
-
cudf::reduce
- Input:
column_view
of any(?) type - Output:
cudf::scalar
withLIST<int8>
(i.e., a single list of 8 bit integers that is the histogram)
- Input:
-
cudf::groupby
- Input:
column_view
of any(?) type - Output:
column
withLIST<int8>
, histogram/list per group
- Input:
-
-
hll::merge_histogram
-
cudf::reduce
- Input:
column
withLIST<INT8>
, i.e., all the histograms to merge- Each histogram must be the same size
- Output:
scalar
withLIST<INT8>
- Input:
-
cudf::groupby
- Input:
column
withLIST<INT8>
- Each histogram must be the same size
- Output:
coumn
withLIST<INT8>
- Input:
-
-
- New
hll::histogram_column_view
- Similar to
tdigest_column_view
provides facade overLIST<INT8>
column - Each histogram must be the same size
- This should really be a fixed-size list column, but we don't have that
- Open question if this ctor should enforce each list being the same size (requires a kernel)
- Similar to
- New function
-
hll::approx_distinct_count
- Input:
hll::histogram_column_view
- Output:
column
withINT64
of the approximate cardinality per histogram in the input column
- Input:
-
That sounds great.
Open question if this ctor should enforce each list being the same size (requires a kernel)
I thought in general if it made the code slower you wouldn't do the check. That it would be a separate method to do the check. I am fine with the separate method, but honestly it is not hard to do that check with existing code.
Also tagging @vinaydes, since we too have been looking into using hll for implementing a fast FP-growth like algo for rule mining.
This issue has been labeled inactive-30d
due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d
if there is no activity in the next 60 days.
This is still relevant
I'm going to start working on it from next week, targeting for 22.10 release.
This issue has been labeled inactive-30d
due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d
if there is no activity in the next 60 days.
This is still relevant
Has there been any progress on this issue? I am planning to implement an HLL-backed version of cuco::util::approx_distinct_count
, which can potentially be used in cudf.
Hi! We deprioritized this for something else thus it was paused for a while. Having cuco::util::approx_distinct_count
would be a big foundation for it. Thanks in advance!
Yes, having an implementation in cuco would be fantastic. @sleeepyjack if you want to discuss design / implementation, I would be more than happy to offer input from our past conversations about this topic and/or PR review.
@bdice Good idea. Let me read through the relevant papers first. I guess the crucial part I have to wrap my head around is how to represent the counters and merge them in parallel. I'll ping you on Slack once I'm prepared so we can schedule a 1:1.
FYI cuco::distinct_count_estimator
(NVIDIA/cuCollections#429) has been merged so we can start implementing and exposing this feature in cudf.