datafusion
datafusion copied to clipboard
[EPIC] (Even More) Grouping / Group By / Aggregation Performance
Is your feature request related to a problem or challenge?
Aggregation is a key operation of Analytic engines. DataFusion has made great progress recently (e.g. https://github.com/apache/arrow-datafusion/issues/4973 and https://github.com/apache/arrow-datafusion/issues/6889)
This Epic gathers other potential ways we can improve the performance of aggregation
Core Hash Grouping Algorithm:
- [x] https://github.com/apache/arrow-datafusion/issues/6969
- [x] https://github.com/apache/arrow-datafusion/issues/7064
- [ ] https://github.com/apache/arrow-datafusion/issues/9403
- [ ] https://github.com/apache/arrow-datafusion/issues/7023
- [ ] https://github.com/apache/arrow-datafusion/issues/7095
Specialized Aggregators:
- [ ] https://github.com/apache/arrow-datafusion/issues/6906
- [x] https://github.com/apache/arrow-datafusion/issues/5547
- [x] https://github.com/apache/arrow-datafusion/issues/5472
- [ ] https://github.com/apache/arrow-datafusion/issues/7065
- [ ] https://github.com/apache/arrow-datafusion/issues/7066
New features:
- [ ] https://github.com/apache/arrow-datafusion/issues/6937
- [ ] https://github.com/apache/arrow-datafusion/issues/9562
- [ ] https://github.com/apache/arrow-datafusion/issues/8699
- [x] https://github.com/apache/arrow-datafusion/issues/8934
Improved partitioning:
- [ ] https://github.com/apache/arrow-datafusion/issues/6928
- [ ] https://github.com/apache/arrow-datafusion/issues/7001
- [ ] https://github.com/apache/arrow-datafusion/issues/6822
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
No response
Hi! There is great job done here!
I faced with an issues with CoalesceBatches: it seams that there is a performance killer somewhere in CoalesceBatchesStream. It's spending too much time in arrow_select::concat::concat (especially in arrow_data::transform::variable_size::build_extend::_{{closure}}). I think that it's an issue of expanding MutableBatch (or not vectorized copying)
In Query:
select count(*)
from SomeTable as a
join AnotherTable as b
on a.c1 = b.c1
and a.c2 = b.c2
and a.c3 = b.c3
and a.c4 = b.c4
SomeTable consists of 200m rows
Anothertable -- 15m rows
17% of all execution time is CoalesceBatchesStream:
coalesce_batches::CoalesceBatchesStream::poll_next (55,950 samples, 17.03%)
|--arrow_select::concat::concat_batches (36,618 samples, 11.14%)
|--arrow_select::concat::concat (30,947 samples, 9.42%)
Another topic related issue is performance of RowConverter used for grouping.
More than 75% of GroupedHashAggregateStream work is converting composite aggregation key to row Apprx 50% of GroupedHashAggregateStream work is encoding (variable::encode)
physical_plan::aggregates::row_hash::GroupedHashAggregateStream::poll_next (17,163 samples, **80.95%**)
|-- arrow_row::RowConverter::convert_columns (10,684 samples, **50.39%**)
|--arrow_row::row_lengths (2,080 samples, 9.81%)
|--arrow_row::variable::encode (7,267 samples, 34.28%)
The query is:
SELECT count(*), col1, col2, col3, col4
FROM SomeTable
GROUP BY col1, col2, col3, col4
ORDER BY col1, col2, col3, col4
The length of SomeTable is ~150m rows
I haven't figured out the root of the problem yet
Thanks for these profiles @karlovnv
I faced with an issues with CoalesceBatches: it seams that there is a performance killer somewhere in CoalesceBatchesStream.
Looking at the trace in https://github.com/apache/datafusion/issues/7000#issuecomment-2094202184 while I agree that CoalesceBatchesStream
is spending the time, I believe it is effectively part of the HashJoin.
Specifically the HashJoin is basically acting like a filter on the probe side input (and thus may emit many small batches where most rows were filtered). Then the CoalesceBatchesStream
copies these small batches togehter back into bigger batches
Thus in my opinion the way to speed up this query is not to look at CoalesceBatchesStream
itself but instead look at HashJoin
-- specifically maybe we could improve the code (especially after @korowa 's work to encapsulate the output generation) so that the join itself handles creating the large output batches
Another topic related issue is performance of RowConverter used for grouping.
Similarly, in while most of the time is being spent in RowConverter
I think it would be hard to improve the performance of the Row Converter itself as it is already quite optimized
Instead, I think the key to improveing a query such as you have in https://github.com/apache/datafusion/issues/7000#issuecomment-2094206289 is to stop using using the RowCoverter (at least as much). Here is one idea for doing so: https://github.com/apache/datafusion/issues/9403 (I still harbor dreams of working on this sometime)
Looking at the trace in
@alamb I'd like to mention, that extending of mutable batch spends a lot of time (MutableArrayData::Extend, utils::extend_offsets) and related allocator's work.
I suppose that it's much better to preallocate bigger arrow buffer instead of extending it by small portions. And I believe that it will give us an effect.
Also I noticed that ~18% was spent by asm_exc_page_fault
which is probably an issue of enabled transparent huge pages (which is bad for databases workloads). I will investigate more on that and post some conclusions later
Looking at the trace in
@alamb I'd like to mention, that extending of mutable batch spends a lot of time (MutableArrayData::Extend, utils::extend_offsets) and related allocator's work.
I think those particular functions are the ones that actually copy data, so I am not sure how much more they can be optimized
I suppose that it's much better to preallocate bigger arrow buffer instead of extending it by small portions. And I believe that it will give us an effect.
I agree it may well
Also I noticed that ~18% was spent by
asm_exc_page_fault
which is probably an issue of enabled transparent huge pages (which is bad for databases workloads). I will investigate more on that and post some conclusions later
👍
Here is one idea for doing so: #9403
I thought over a join issue in case when left table may be not columnar.
For instance let's consider Events
and Users
tables.
Events
is a columnar table and consist of 10^9 rows
Users
table is only of 10^6 rows
So in case of that Users table may be considered as a row-based table with persistent (or stored only in memory) hash (or b*-tree) index.
We can achieve performance boost using different approaches:
- Introduce Dictionary feature. Consider
Users
table as a dictionary (like in clickhouse)
ClickHouse supports special functions for working with dictionaries that can be used in queries. It is easier and more efficient to use dictionaries with functions than a JOIN with reference tables.
Now we are playing with UDFs like
select timestamp,
e.user_id,
get_dict_utf8("Users", "Id", "Name", e.user_id) as user_name
from events e
But this is not a kind of general solution so that leads us to the next approach.
- Introduce row-based table provider with its special type of LookupRecordBatchStream
The main idea is to add an ability of providing data to HashJoinStream by a request:
get_items_from_table_by_ids(join_on: RecordBatch) → Result<SendableRecordBatchStream>
Also this approach may be useful for joining columnar data with another relational source like Postgres (by loading portions of joining table data on demand by list of ids) in future.
- Cache indices that have been built during JOIN execution or use an external user provided index
Introduce Dictionary feature. Consider Users table as a dictionary (like in clickhouse)
This is an excellent idea -- Arrow has something equivalent for DictionaryArray
and minimize the copying required
Something else that might help is to use StringViewArray when coalescing which would avoid a copy. This might be quite effective if the next operation was GroupByHash or sort where the data would be copied again 🤔 I think the next release of arrow-rs might have sufficient functionality to try it
Introduce row-based table provider with its special type of LookupRecordBatchStream
I wonder if we could combine this with something like https://github.com/apache/datafusion/issues/7955 🤔
I wonder if we could combine this with something like #7955 🤔
It's quite a good idea!
But I think it's a tricky to push ON condition down. The main reason is following: we know the list of ids (in perspective of columnindex) only at JOIN stage but not at filtering and getting data from the source.
So the second approach:
2. Introduce row-based table provider Is about adding an ability of getting data directly from HASH stream by list of ids like so:
Or even better to get only offsets by ids (arrow take
index for take_record_batch() kernel). This idea is very similar to indices in duck db.
DictionaryArray
DictionaryArray is something different. It is the best choice for low cardinality columns (now to efficiently encode data in a single column to save space and increase performance of filtering) ClickHouse offers special option in ddl - LowCardinality. UPD: https://clickhouse.com/docs/en/sql-reference/data-types/lowcardinality
Changes the internal representation of other data types to be dictionary-encoded
But it will be great if we support arrow Dictionary encoded type! Also we can use shared dictionary buffer for all the batches.
Since DictionaryArray has no index based on value, we cannot use it for fast O(1) getting data.