datafusion
datafusion copied to clipboard
Implement fast min/max accumulator for binary / strings (now it uses the slower path)
Is your feature request related to a problem or challenge?
https://github.com/apache/arrow-datafusion/pull/6904 introduces some fancy new hashing and ways to implement aggregates
min/max for strings (StringArray / LargeStringArray, etc) now uses the slower Accumulator implementation which could be made much faster
Describe the solution you'd like
I would like to implement a fast GroupsAccumulator for Min/Max
Describe alternatives you've considered
here is one potential way to implement it:
We could store the current minimum for all groups in the same Rows π€ and track an index into that Rows for the current minimum for each group.
This would require an extra copy of the input values, but it could probably be vectorized pretty well, as shown in the following diagram.
Sorry what I meant was something like the following where the accumulator only stored the current minimum values.
This approach would potentially end up with min_storage being full of "garbage" if many batches had new minumums, but I think we could heuristically "compact" min_storage (if it had 2*num_groups, for example) if it got too large
β β β β β β β β β β β β β β β β β β β
β Accumulator β
state
βββββββββββ βββββββββββ β βββββββββββ βββββββββββ β
β βββββββ β β βββββββ β β βββββββ β β βββββββ β
β β A β β β β A βββΌβββββ β β β D β β βββββββΌββ 1 β β β
β βββββββ€ β β βββββββ€ β β β βββββββ€ β β β βββββββ€ β
β β B β β β β B β β ββββΌββΌβΆβ A βββΌβββββ β β 0 β β β
β βββββββ€ β β βββββββ€ β β βββββββ β β βββββββ β
β β A β β β β A β β β β β β β β
β βββββββ€ β β βββββββ€ β β β β β
β β A β β β β A β β β β β β β β
β βββββββ€ β β βββββββ€ β β β β β
β β C β β β β C β β β β β β β β
β βββββββ β β βββββββ β β β β β
βββββββββββ βββββββββββ β βββββββββββ βββββββββββ β
input input β min_storage: min_values β
values values Rows
(Array) (Rows) β β β β β β β β β β β β β β β β β β β
step 1: step 2: for
convert any value step 3: min value
arguments to that is a new (per group) is
Row format group tracked as an
minimum, copy index into
it to a min_storage `Rows`
second `Rows`
See https://github.com/apache/arrow-datafusion/pull/6800#issuecomment-1622290981 for more details
Additional context
No response
One observation here is that min and max on strings is not that common of an operation from what it seems -- grouping on strings is more common.
Maybe there is some binary usecase where it is important (e.g. embeddings π€ )
@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.
@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.
I dont know of anyone working on this @devanbenz -- but I also don't know of any benchmarks or actual queries that use min / max on string columns. The place it shows up is when computing statistics when writing parquet, but I think parquet is already pretty good at this (and has its own code to compute min/max)
It actually turns out that Min / Max on string/binary columns are in several ClickBench queries:
https://github.com/apache/datafusion/blob/a08f923c2acb1a46614970231d9a672c36ce3ad2/benchmarks/queries/clickbench/queries.sql#L22-L23
https://github.com/apache/datafusion/blob/a08f923c2acb1a46614970231d9a672c36ce3ad2/benchmarks/queries/clickbench/queries.sql#L29
I don't think min/max have appeared as priorities in benchmarking before because the queries in question are doing other string heavy operations that tend to dominate. Thus the use of GroupsAccumulatorAdaptor for Min/Max on strings, while bad, is overshadowed by other things
However, while working on https://github.com/apache/datafusion/pull/12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow. We can likely restore their speed to something similar to https://github.com/apache/arrow-rs/issues/6408 but I also think this is a good time to actually make Min / Max on strings faster. I will write up some ideas on how to do this
Background
(I will make a PR shortly to add this to the actual datafusion docs)
GroupsAccumulator logically does this:
βββββββ
β 0 βββββββββββββΆ "A"
βββββββ€
β 1 βββββββββββββΆ "Z"
βββββββ
... ...
βββββββ
β N-2 β "A"
βββββββ€
β N-1 βββββββββββββΆ "Q"
βββββββ
Logical group Current Min/Max
number value for that
group
GroupsAccumulator to store N aggregate
values: logically keepa a mapping from
each group index to the current value
Today, String / Binary min/max values are implemented using GroupsAccumulatorAdapter which results in
Individual String
(separate
allocation)
βββββββ ββββββββββββββββββββββββββββ
β 0 βββββββββββββΆβ ScalarValue::Utf8("A") ββββββββββββΆ "A"
βββββββ€ ββββββββββββββββββββββββββββ€
β 1 βββββββββββββΆβ ScalarValue::Utf8("Z") ββββββββββββΆ "Z"
βββββββ ββββββββββββββββββββββββββββ
... ... ...
βββββββ ββββββββββββββββββββββββββββ
β N-2 β β ScalarValue::Utf8("A") ββββββββββββΆ "A"
βββββββ€ ββββββββββββββββββββββββββββ€
β N-1 βββββββββββββΆβ ScalarValue::Utf8("Q") ββββββββββββΆ "Q"
βββββββ ββββββββββββββββββββββββββββ
Logical group Current Min/Max value for that group stored
number as a ScalarValue which points to an
indivdually allocated String
How GroupsAccumulatorAdaptor works today:
stores each current min/max as a
ScalarValue
Potential Design
One high level idea is to build a data structure that uses the same internal format (views/buffers) as StringViewArray in Arrow:
βββββββββββββββββββββββββββββββββββββββββββββ
β Stored in Vec<u8> β
β Stored in a βββββββββββββββββββ β
β Vec<u128> β βββββββββββββ β β
β βββββββββββββββ β βββΆβsome value β β β
βββββββ β β βββββββββββ β β β βββββββββββββ β β
β 0 ββββββββββΌββΌβΆβ View β β β β β
βββββββ€ β β βββββββββββ€ β β β β β
β 1 ββββββββββΌββΌβΆβ View ββββ β β β ... β β
βββββββ β β βββββββββββ β β β β β
... β β ... β β β β β
βββββββ β β βββββββββββ β β β β β
β N-2 β β β β View ββββ β β β βββββββββββββββ β
βββββββ€ β β βββββββββββ€ β β βΌ ββΆβother value ββ β
β N-1 ββββββββββΌββΌβΆβ View β β β βββββββββββββββ β
βββββββ β β βββββββββββ β βββββββββββββββββββ β
β βββββββββββββββ String values are stored β
β inline or in extra byte β
Logical group β buffer β
number βββββββββββββββββββββββββββββββββββββββββββββ
New structure: MutableStringViewBuilder
Current Min/Max value for that group stored in
same format as StringViewArray
In this design, the current value for each group is stored in two parts (as described on arrow docs)
- a fixed size
u128view - a variable length part with the string data
As new batches are updated, each View is updated if necessary
Benefits of design:
- Hopefully use the same code as in arrow-rs
- Allows Zero copy conversion to StringView / BinaryView at output
- Use inlined values for quick min/max comparison
I believe (though we will have to verify it) that the conversion from MutableStringViewBuilder to just StringArray (not StringViewArray) should also be better than the current GroupsAccumulatorAdapter. Both conversions need to copy the string bytes again into the packed StringArray format, but the GroupsAccumulatorAdapter also has to allocate/free owned Strings as well
Potential challenges
I think the trickiest part of this code, other the low level code optimizations is that as min/max values are replaced, data in the variable length buffer will become "garbage" (not reachable) thus consuming more memory than necessary:
Stored in Vec<u8>
ββββββββββββββββββββββ
β ββββββββββββββββββ β
βββββββββββββββ β βprev max value 1β β
β βββββββββββ β β ββββββββββββββββββ β
β β View ββββ β β ... β
β βββββββββββ β β ββββββββββββββββββ β
β ... β β β βprev max value mβ β
β β β ββββββββββββββββββ β
β β β β ββββββββββββββββββ β
β β β βββΆβprev max value mβ β
β β β ββββββββββββββββββ β
β β β ... β
β β ββββββββββββββββββββββ
β β
βββββββββββββββ Previous min/max values
are not pointed to
anymore and need to be
cleaned up
I think this means the code will need something GenericByteViewArray::gc run occasionally
Random Thoughts
Thoughts: maybe this structure (MutableStringViewBuilder??) could be upstreamed eventually
@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.
@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out π€
@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.
@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out π€
Great! I'll get started on this later in the week/over the weekend :) Will likely bug folks as I require help haha π
take
In terms of implementation, what I suggest is:
- Do a POC implementaiton: wire up just enough `StringView, don't worry about GC, basic unit tests
- Verify it makes the clickbench query faster
- Flesh out testing, documentation, add support for StringArrary, etc
- Merge and profit (bonus points for blogging about it)
For the POC here is the reproducer I recommend:
Step 1. Get hits_partitioned using bench.sh:
cd benchmarks
./bench.sh data clickbench_partitioned
Step 2: Prepare a script with reproducer query:
set datafusion.execution.parquet.schema_force_view_types = true;
SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2/benchmarks/data$ cat q28.sql
set datafusion.execution.parquet.schema_force_view_types = true;
SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
Step 3: Run script (with release build of datafusion-cli):
datafusion-cli -f q28.sql
set datafusion.execution.parquet.schema_force_view_types = true;-->Elapsed 18.431 seconds.set datafusion.execution.parquet.schema_force_view_types = false;-->Elapsed 6.427 seconds.
The goal is to get set datafusion.execution.parquet.schema_force_view_types = true; to be the same (or better) than when it is false
If you look at the flamegraph-string-view.svg, you can see most of the time is spent doing GroupsAccumulator
@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.
@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out π€
Seems really interesting, I am reading the related disscusions.
BTW in case anyone is interested, I recorded a short video on how to make these flamegraphs: https://youtu.be/2z11xtYw_xs
I will add a link to that in the docs later
The challenge of String seems that?
- If we just simply use a
Vec<String>likeprimitivesto keep the min/max values, it is too expensive to convert them toStringArray/StringViewArray(many many copy) - But if we use
StringArraylike approach to keep the values, we can't update the min/max values. - So Finally we need to use a
StringViewArraylike approach to make it, but still have the new challenge about gc?
The challenge of
Stringseems that?
- If we just simply use a
Vec<String>likeprimitivesto keep the min/max values, it is too expensive to convert them toStringArray/StringViewArray(many many copy)
I think the overhead is actually mostly that there is an additional (small) allocation for each String. For queries with a small numer of groups (like 100) an extra 100 allocations isn't all that bad. For queries with millions of groups the overhad is substantial
- But if we use
StringArraylike approach to keep the values, we can't update the min/max values.
I suppose we could potentially update the values as long as the new strings were shorter :thinking:
- So Finally we need to use a
StringViewArraylike approach to make it, but still have the new challenge about gc?
@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:
It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management π€
@Rachelint your implicit idea of using
Vec<String>to store the state I think is actually quite interesting and maybe we should try that one first:It would at least avoid calling
Array::sliceand likely be better than usingGroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management π€
π€ Can we still use the string view like approach to store states https://github.com/apache/datafusion/issues/6906#issuecomment-2355604402, but for the uninlined state, we use a single String to store it?
And when we output them to StringViewArray, we convert this single String to a tiny buffer.
For example, if all states are uninlined(len > 12), the output StringViewArray may be like:
row1: view1 buffer1(with only one string)
row2: view2 buffer2(with only one string)
...
rown: viewn buffern
I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?
@Rachelint your implicit idea of using
Vec<String>to store the state I think is actually quite interesting and maybe we should try that one first:It would at least avoid calling
Array::sliceand likely be better than usingGroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management π€
Yes... At least it will be better than now, even we just use Vec<String> to impl a specific GroupsAccumulator for String type...
I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?
I think using a single Buffer for each string will be bad for performance (likely worse than storing as String and copying them at the end. StringViewArray is really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed on i32)
I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?
I think using a single
Bufferfor each string will be bad for performance (likely worse than storing asStringand copying them at the end.StringViewArrayis really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed oni32)
Ok, for StringView min/max, seems we can just start with using Vec<u128>(views) to store the inlined state(<= 12), use Vec<String> to store the unlined.
And when converting it to StringViewArray, we just copy the Vec<String> to create the buffer (GroupsAccumulatorAdapter copy the states too).
For the short strings(<=12), it can avoid allocating String, and for the long ones, it just do the same thing as GroupsAccumulatorAdapter. Seems it can have a better performance(due optimization for shorts)?
I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?
I think using a single
Bufferfor each string will be bad for performance (likely worse than storing asStringand copying them at the end.StringViewArrayis really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed oni32)Ok, for
StringView min/max, seems we can just start with usingVec<u128>(views)to store the inlined state(<= 12), useVec<String>to store the unlined.And when converting it to
StringViewArray, we just copy theVec<String>to create the buffer (GroupsAccumulatorAdaptercopy the states too).For the short strings(<=12), it can avoid allocating
String, and for the long ones, it just do the same thing asGroupsAccumulatorAdapter. Seems it can have a better performance(due optimization for shorts)?
Seems like a reasonable place to start in my opinion. If we want to get more sophisticated at a later time we can try something more exotic. I suspect there will be times when individual allocations will be faster and times when a buffer will be faster and there will be memory consumption tradeoffs as well.
TLDR we should implement something and as long as it is better than what is currently going on that will be good.
while working on https://github.com/apache/datafusion/pull/12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.
Fixed in #12575, now min/max on StringViewArray should be slightly faster than StringArray
while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.
Fixed in #12575, now min/max on
StringViewArrayshould be slightly faster thanStringArray
I mean, fixed for that particular clickbench query which does not leverage stats to prune parquet access.
while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.
Fixed in #12575, now min/max on
StringViewArrayshould be slightly faster thanStringArrayI mean, fixed for that particular clickbench query which does not leverage stats to prune parquet access.
Cool, I'm assuming this change didn't impact the GroupsAccumulatorAdapter call in anyway since I'm still seeing a similar performance + flamegraph with the following query:
set datafusion.execution.parquet.schema_force_view_types = true;
SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
BTW I was thinking more about this issue -- while a native Min/Max for strings / stringview will help, I have an idea that might make it simply it an added bonus: pushing the cast down into the parquet reader https://github.com/apache/datafusion/issues/12509#issuecomment-2366775581
BTW I was thinking more about this issue -- while a native Min/Max for strings / stringview will help, I have an idea that might make it simply it an added bonus: pushing the cast down into the parquet reader https://github.com/apache/datafusion/issues/12509#issuecomment-2366775581
This would only work for parquet correct? It would effectively be a part of the TableScan within the logical plan?
@Rachelint your implicit idea of using
Vec<String>to store the state I think is actually quite interesting and maybe we should try that one first:It would at least avoid calling
Array::sliceand likely be better than usingGroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management π€
So this would effectively be using a Vec<String> for state instead of your proposal of using something akin to StringView notated here https://github.com/apache/datafusion/issues/6906#issuecomment-2355629355 @alamb?
I have had more time to take a look at this and sort of just wrap my head around how GroupsAccumulatorAdapter works a bit. I'm seeing that the performance impact is happening here https://github.com/apache/datafusion/blob/a35d0075744a058f81bd9ebed747e2e597434019/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs#L475 in slice_and_maybe_filter I'm under the assumption that this is happening due to the difference between the BinaryView and the Binary Scalar values. Taking a look at https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ I understand that BinaryView is effectively a non-contiguous structure where as Binary is contiguous so it is easily sliced. So the idea here is to effectively change the underlying state in which the accumulator structure receives that data thus making it either easier to call slice or remove the calling of slice entirely?
@Rachelint your implicit idea of using
Vec<String>to store the state I think is actually quite interesting and maybe we should try that one first: It would at least avoid callingArray::sliceand likely be better than usingGroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management π€So this would effectively be using a
Vec<String>for state instead of your proposal of using something akin to StringView notated here #6906 (comment) @alamb?
I think what @alamb means is that just simply using Vec<String> to store the states will be at least not worse than StringArray + GroupsAccumulatorAdapter, and it is easy to start from.
And after improving it to be not worse than StringArray + GroupsAccumulatorAdapter, we can continue to push forward enabling string view by default.
I have had more time to take a look at this and sort of just wrap my head around how
GroupsAccumulatorAdapterworks a bit. I'm seeing that the performance impact is happening herehttps://github.com/apache/datafusion/blob/a35d0075744a058f81bd9ebed747e2e597434019/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs#L475 in
slice_and_maybe_filterI'm under the assumption that this is happening due to the difference between theBinaryViewand theBinaryScalar values. Taking a look at https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ I understand thatBinaryViewis effectively a non-contiguous structure where asBinaryis contiguous so it is easily sliced. So the idea here is to effectively change the underlying state in which the accumulator structure receives that data thus making it either easier to call slice or remove the calling of slice entirely?
I guess the current goal is to remove the calling of slice, and get an at least not worse performance than StringArray + GroupsAccumulatorAdapter as mentioned above.
I think what @alamb means is that just simply using Vec<String> to store the states will be at least not worse than StringArray + GroupsAccumulatorAdapter, and it is easy to start from.
Yes indeed. Thank you
I guess the current goal is to remove the calling of slice, and get an at least not worse performance than StringArray + GroupsAccumulatorAdapter as mentioned above.
π―