feat: implement GroupsAccumulator for `count(DISTINCT)` aggr
Signed-off-by: Ruihang Xia [email protected]
Which issue does this PR close?
Related to #5472
Rationale for this change
Implement group accumulator for distinct count aggr fn. In hits.parquet dataset from clickbench, it can gain ~5x performance improve for query like select "RegionID", COUNT("UserID"), COUNT(DISTINCT "UserID") as u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;:
After:
> select "RegionID", COUNT("UserID"), COUNT(DISTINCT "UserID") as u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;
+----------+--------------------+---------+
| RegionID | count(hits.UserID) | u |
+----------+--------------------+---------+
| 229 | 18295832 | 2845673 |
| 2 | 6687587 | 1081016 |
| 208 | 4261812 | 831676 |
| 169 | 3320229 | 604583 |
| 184 | 1755192 | 322661 |
| 158 | 1318059 | 307152 |
| 34 | 1792369 | 299479 |
| 55 | 1426901 | 286525 |
| 107 | 1516690 | 272448 |
| 42 | 1542717 | 243181 |
+----------+--------------------+---------+
10 row(s) fetched.
Elapsed 1.941 seconds.
Before:
Elapsed 11.828 seconds.
For queries with only one distinct count (like q5 from clickbench), optimize rule single_distinct_to_groupby will rewrite the distinct column to group by column, which avoids the need for this group accumulator. For scenarios exceeding that rule, this group accumulator can improve a lot.
What changes are included in this PR?
implement GroupsAccumulator for distinct count.
Are these changes tested?
yes
Are there any user-facing changes?
no
Thank you @waynexia, I'm planning to check it out at most tomorrow.
I have a question in advance before reviewing -- have you been considering to implement groups accumulator for specialized cases of DistinctCountAccumulator (primitive/native types and bytes)?
I'm asking because, as for me, it looks a bit odd (though I haven't rechecked performance results, and perhaps GroupsAccumulatorAdapter introduces some insane overhead), that switching from native Rust types to ScalarValue still gives x5 faster execution, while groups accumulator in this case, if I'm not mistaken, does basically the same as the GroupsAccumulatorAdapter -- storing separate states (hashsets) in the vector is already implemented in the adapter.
Thank you @waynexia, I'm planning to check it out at most tomorrow.
I have a question in advance before reviewing -- have you been considering to implement groups accumulator for specialized cases of DistinctCountAccumulator (primitive/native types and bytes)?
I'm asking because, as for me, it looks a bit odd (though I haven't rechecked performance results, and perhaps GroupsAccumulatorAdapter introduces some insane overhead), that switching from native Rust types to ScalarValue still gives x5 faster execution, while groups accumulator in this case, if I'm not mistaken, does basically the same as the GroupsAccumulatorAdapter -- storing separate states (hashsets) in the vector is already implemented in the adapter.
Here are my thoughts on why 5X given implementations are very similar:
GroupsAccumulator for median() in https://github.com/apache/datafusion/pull/13681 also achieved 5X speed-up, we found 2X is due to old Accumulator implementation has a inefficient vec<ScalarValue> <---> ListArray conversion when state() and merge_batch() is called, and the remaining 2X difference I believe is due to inefficiency in adaptor, or extra memory allocation for many outer state struct. Perhaps Accumulator for DistinctCount is also not implementing this list conversion efficiently ๐ค
It would be worthwhile to run the clickbench_extended benchmarks as well (./bench.sh run clickbench_extended)
It would be worthwhile to run the
clickbench_extendedbenchmarks as well (./bench.sh run clickbench_extended)
I am running the benchmarks now and will report
BTW if there is no existing coverage we can add the one from this description into clickbench_extended perhaps
๐ค my measurements show Q3 getting quite a bit slower. I will rerun to test
Comparing main_base and count-distinct-group
--------------------
Benchmark clickbench_extended.json
--------------------
โโโโโโโโโโโโโโโโณโโโโโโโโโโโโโณโโโโโโโโโโโโโโโโโโโโโโโณโโโโโโโโโโโโโโโ
โ Query โ main_base โ count-distinct-group โ Change โ
โกโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฉ
โ QQuery 0 โ 1978.13ms โ 1940.74ms โ no change โ
โ QQuery 1 โ 736.56ms โ 753.84ms โ no change โ
โ QQuery 2 โ 1432.89ms โ 4533.83ms โ 3.16x slower โ
โ QQuery 3 โ 706.18ms โ 716.03ms โ no change โ
โ QQuery 4 โ 1492.33ms โ 1498.56ms โ no change โ
โ QQuery 5 โ 17251.84ms โ 17220.01ms โ no change โ
โโโโโโโโโโโโโโโโดโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ
Ahh, I reproduced the same result. And I also observed a regression on q0:
| Query | Before (ms) | After (ms) |
|---|---|---|
| Q0 | 1407.04 | 2013.42 |
| Q2 | 681.78 | 1742.08 |
(BTW, how do you get the comparison output
I'll look into the regression this weekend. I suspect the reason is the improvement from grouping is way less than specialized non-group accumulator ๐ค
Quick update: made some progress, but still need a few days to refine it. I've implemented a primitive aggregator and it does work
I (am very excited!) just realized we may have overcomplicated things: we specialize in array types to compute hashes and store the value, but we neither need a dedicated hash function (wrapped as xxx set in previous implementation) nor need to store the origin value. We only need to do two things for count(distinct) accumulator -- compute and maintain a hashset.
Thus I tried another way to rewrite this aggregator, use a uniform accumulator for all types. Do one dispatch for each update to dispatch the actual hash implementation (and this can be eliminated by extracting a type parameter for accumulator). Throw the origin value and only store the hashes in state. This can not only save memory, but also gain a good performance:
| Query | Before (ms) | After (ms) | Notes |
|---|---|---|---|
| Q0 | 1046.3 | 430.4 | no group-by |
| Q1 | 243.7 | 200.5 | no group-by |
| Q2 | 441.8 | 327.9 | 110 groups |
| Count + Count Distinct | 11.828 seconds | 0.515 seconds | 9000 groups |
p.s. I changed a machine to run them
p.p.s I didn't use bench.sh compare because it seems not to support selecting test case from help text
Some follow-up things:
- Make a type parameter for the new general accumulator's implementation, if needed (consider our compile time is quite slow... one dispatch per array seems acceptable)
- Use RawTable to further optimize the states, and reduce another hash over
u64hash values - Maybe remove
PrimitiveDistinctCountAccumulatorand similar implementations? They are not used by us after this patch, but they are part of our public API
i think this is still a work in progress, so marking it as a draft to clean up the review queue
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.