velox
velox copied to clipboard
Vectorizing Merge Joins
Description
Merge joins are useful to join two sorted streams of data. For queries that require joining two large tables, it is a common pattern to store them as bucketed and sorted tables (sort at write-time), so that merge joins can be used to join them at read-time. If the streams are already sorted on the join key, merge join often (a) presents lower memory requirements since they do not need to materialize hash tables (the build side), and (b) are commonly faster since memory patterns are more predictable (it avoids hash table lookups). Merge joins are, however, harder to parallelize so their implementation is commonly single-threaded for a given stream.
Current Velox Implementation
Velox's current merge-join algorithm works in the following way. Starting from the left, it picks the first key and finds all consecutive occurrences of it. Then it looks for matches on the right side, and finds all consecutive occurrences. Once these two ranges of matches are found, it produces an output with the cartesian product of them, appending each row and corresponding left and right projections to the output batch. It then moves to the next key value on the left, and the same process is repeated. If keys without matches are found, they are ignored (or added, depending on left or right outer join semantics).
Since input streams are commonly large, only a window of batches from left and right are kept in memory, being discarded when all matches from keys on that buffer were produced, and loading more when the current buffer is exhausted but matches are still possible.
Opportunities
There are three main opportunities to can make this process more efficient:
- The output always contains repetitions of the input values, in such a way that they can be more efficiently represented as RLEs or dictionaries.
- If either columns from the left or right side are large - in some internal workloads we see columns which are +1MB - copying them to the output adds significant overhead. This gets exacerbated with opportunity (1) above as each row may need to be copied multiple times. The process can be more efficient if dictionaries could reuse the input buffers (if possible), to prevent copying the data, i.e: producing dictionaries which just wrap around the input. This may not be possible to achieve in some cases, as both input streams from left and right are split into multiple consecutive buffers which are not aligned on the join key. This is further discussed below.
- Enable pass through (lazy) columns. In the current implementation, lazily loaded input columns need to be materialized during the join, before they can be copied. It is not clear if this will present performance wins as columns usually need to be materialized upstream in any case. But this may unlock future optimizations.
A Vectorized Merge-Join Algorithm
The proposed vectorized merge join will always produce dictionaries, so it will first allocate a buffer to store the indices from the left and one buffer to store indices for right projections . Once key matches are identified, instead of copying projections from each row to the output, only left and right dictionary indices will be written.
Output and Buffer Alignment
Output is always produced outputBatchSize_ rows at a time. The vectorized merge join operator will generally return dictionaries which are wrapped around input (left) vectors. Since dictionaries cannot wrap around more than one vector, at times merge join may return fewer than outputBatchSize_ rows. Dictionaries for right projections are generated optimistically created; it will start by wrapping the current right vector, but if the output happens to span more than one right vector, it gets copied and flattened.
In internal experiments, we have seen the vectorized implementation to be ~10x more efficient, presenting 4x speed up to some real world pipelines.
Cc: @mbasmanova @Yuhta @xiaoxmeng @arhimondr @bikramSingh91 @kagamiori @majetideepak @aditi-pandit @FelixYBW
Hi @pedroerp , thanks for the proposal!
Since dictionaries cannot wrap around more than one vector, at times merge join may return fewer than outputBatchSize_ rows.
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches? For example after filter, partition, etc.
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches?
that's a great question. I've been debating that myself, and don't remember seeing discussion about this in the past.
@mbasmanova @oerling @Yuhta @xiaoxmeng thoughts?
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches?
that's a great question. I've been debating that myself, and don't remember seeing discussion about this in the past.
@mbasmanova @oerling @Yuhta @xiaoxmeng thoughts?
I've been thinking about this for a while. If the batch size is too small, it will not benefit from the vectorization model. In some systems such as duckdb, they cache upstream operator's output if the batch size is very small, see https://github.com/duckdb/duckdb/blob/d9efdd14270245c4369096e909acecea174d86cc/src/include/duckdb/execution/physical_operator.hpp#L221.
Here is a related discussion: https://github.com/facebookincubator/velox/issues/7801
That is slightly different though; I guess there is a point is potentially making dictionaries able to wrap around more than one inner vector.
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches?
that's a great question. I've been debating that myself, and don't remember seeing discussion about this in the past.
@mbasmanova @oerling @Yuhta @xiaoxmeng thoughts?
This is possible but will be a very big change and need to fix the UDFs to handle this. A more practical approach would be flatten it if there is no repetition in indices, or concatenate the dictionary values if indices are repeating.
This is possible but will be a very big change and need to fix the UDFs to handle this.
I guess this should be mostly covered if we assume decoded vector does the right thing. Ideally we shouldn't have too many vectorized functions actually relying on the dictionaries directly. Though I'm sure we have at least a few today. But agreed that this will be pretty involved and touch many parts of the codebase.
@pedroerp Even the interface of DecodedVector need to be changed. index, indices, data, base in DecodedVector will no longer make sense and we will need remove them and fix all the use sites as well. Also worse than that, we end up with a slower DecodedVector that cannot handle complex types, and might need to move back to handle encoding manually for complex types and performance critical code.
Hello,
We have developed a new merge join; however, it currently only supports single-column joins for integer/long types. We've achieved the best results with range joins (improving performance by n times compared to hash join, where n depends on the complexity of the columns on the build side). To support strings or multi-column joins, we might only need to modify the hashing approach. We hope this optimization can serve as a reference for the community. (P.S., the code is somewhat disorganized. We haven't had the time to clean it up, but we have fully deployed it to replace hash join, and we have also fixed a bug in merge join and shared the fix with the community.)
Here's our main thought process:
-
We replaced comparison with hashing (inspired by hash join) (RangeHashTable::initHashTable). We hash the two vector join keys directly, resulting in two buffers that record the row index, the count of consecutive hash values, and the presence of hash values.
-
For merging, we consult the hash table to find the row index on the opposite side corresponding to the probe vector's row, and then wrap and output directly.
-
The complexity arises because we must ensure that there is only one vector (probe) on one side so that we can use a dictionary.
Relevant code paths:
-
Join code: https://github.com/proxverse/velox/blob/starry/velox/exec/MergeJoin.cpp
-
Test code (which is executable and can be used for further development): https://github.com/proxverse/velox/blob/starry/velox/exec/tests/MergeJoinTest.cpp
-
RangeHashTable: https://github.com/proxverse/velox/blob/starry/velox/exec/RangeHashTable.cpp
I will post the performance report when I have time. In a join scenario of approximately 4 million vs. 2 million, without any additional columns, pure inner join, the performance is hash: 40ms, SMJ: 14ms.
@hn5092 Thanks for your POC, so the main idea is instead of comparing the vector values themselves, you compress the values into int32_ts (you call them "hash" but they are really "unique ID" in Velox term, and this unique ID must be order preserving) and compare them instead, right? Sounds like we can reuse the bytes identifiers in PrefixSortEncoder for this purpose for a more generic solution.
Hi @hn5092 thank you for starting this discussion. I'm curious about how much faster this gets if you compare to our current merge join implementation.
If I understood it correctly, then for single integer keys they should be equivalent since comparing the integer key themselves or the hash should have similar performance? I suppose the point is that your approach could be better if you have long keys though.
It would also be nice if you could submit the bug fix you mentioned to our current merge join code. I'm happy to assist with code review.
@hn5092 Thanks for your POC, so the main idea is instead of comparing the vector values themselves, you compress the values into
int32_ts (you call them "hash" but they are really "unique ID" in Velox term, and this unique ID must be order preserving) and compare them instead, right? Sounds like we can reuse the bytes identifiers inPrefixSortEncoderfor this purpose for a more generic solution.
@Yuhta Sure
Correct, my core approach is to use a method similar to hash join because I observed during development that using merge join actually required 200ms, whereas hash join only needed 40ms. I think if merge join could reach the speed of hash join, that would actually be sufficient because it would save memory. So, in fact, my implementation is like doing many small hash builds to perform the probe. In the current scenario, because our join keys are all encoded and range-based, we directly used range hashing. For other types, I think we could try your method or just hash directly using a unique ID approach.
However, the most challenging implementation is how to allow merge join to output using a dictionary wrapper. Here, my main approach is just to ensure that the probe has only one vector. This part is actually the most complex and the most prone to bugs in the entire implementation.
Hi @hn5092 thank you for starting this discussion. I'm curious about how much faster this gets if you compare to our current merge join implementation.
If I understood it correctly, then for single integer keys they should be equivalent since comparing the integer key themselves or the hash should have similar performance? I suppose the point is that your approach could be better if you have long keys though.
It would also be nice if you could submit the bug fix you mentioned to our current merge join code. I'm happy to assist with code review.
@pedroerp
I've already submitted a pull request for a related bug into the Velox codebase. The difference between my implementation and the current one is that, similar to a hash join, I calculate the hash values first and then proceed to probe. The hashing process is done in memory order and might leverage SIMD. Compared to the current method, findEndOfMatch might be somewhat more complex, as vector comparison functions so slowly. Additionally, another practice of mine is to apply dictionary wrapping when outputting vectors.
@pedroerp
performance diff :
env: macbook m1 max
query:
select event, count(precaseid(Stringtype)) from a join b
hash:
new mergejoin :
select event, count(encoded_case_id(int type)) from a join b
hash:
new mergejoin:
the old version merge join code i can't found it... with the diff, the buid side more complex , the merge join more quickly
@pedroerp Currently we are doing comparison one row at a time with a virtual function in MergeJoin::compare. If we manage to vectorize that comparison, it could achieve the similar performance as the unique ID approach.
That makes sense to me. We had planned to fix this as part of the vectorized merge join effort, but somehow in the pipelines we were testing, key comparisons did not show up in the profile. Perhaps because the rest of the pipeline was significantly more expensive.
@hn5092 @Yuhta assuming we vectorize key comparisons, is there anything else that could be made more efficient?
Additionally, another practice of mine is to apply dictionary wrapping when outputting vectors.
We should do that in already in the current merge join code. Check out this function:
https://github.com/facebookincubator/velox/blob/main/velox/exec/MergeJoin.cpp#L382
@pedroerp Yes MergeJoin itself almost never become a bottleneck in E2E pipeline, nor does Presto use it heavily. This is more for benchmark race not for production workload.
My understanding is that merge join is heavily used in Spark though. Cc: @FelixYBW is that the case?
That makes sense to me. We had planned to fix this as part of the vectorized merge join effort, but somehow in the pipelines we were testing, key comparisons did not show up in the profile. Perhaps because the rest of the pipeline was significantly more expensive.
@hn5092 @Yuhta assuming we vectorize key comparisons, is there anything else that could be made more efficient?
Additionally, another practice of mine is to apply dictionary wrapping when outputting vectors.
We should do that in already in the current merge join code. Check out this function:
https://github.com/facebookincubator/velox/blob/main/velox/exec/MergeJoin.cpp#L382
@pedroerp Your thinking is correct; our main difference lies in the implementation. Also, I'm wondering about how to perform SIMD comparison – is there a good method for this? I haven't found one yet. I'm also looking to use SIMD comparison in another scenario. Previously, I considered an approach that involved staggered comparison, but after some experimentation, it didn't differ much from comparing buffers in a for loop at the lower level. However, it was significantly faster than using a compare function.
@hn5092 We are currently refactoring our vector comparison framework, and will add the function that compare 2 vectors in one call. The saving of SIMD comparison (vs non-SIMD tight loop) does not mainly come from the comparison itself, it mainly comes from that we can compact the result into bits and thus reduce memory write traffic. As soon as we get rid of the per row virtual call we should see significant gain though, even with a tight loop of inline non-SIMD functions.
CC: @kevinwilfong
@hn5092 We are currently refactoring our vector comparison framework, and will add the function that compare 2 vectors in one call. The saving of SIMD comparison (vs non-SIMD tight loop) does not mainly come from the comparison itself, it mainly comes from that we can compact the result into bits and thus reduce memory write traffic. As soon as we get rid of the per row virtual call we should see significant gain though, even with a tight loop of inline non-SIMD functions.
CC: @kevinwilfong
How do we apply the results of comparing two vectors to a merge join? I understand that in the end it resembles a hash join model, except that the build side consists of only one or mored vector not all vector.
How do we apply the results of comparing two vectors to a merge join? I understand that in the end it resembles a hash join model, except that the build side consists of only one or mored vector not all vector.
@hn5092 I suppose we will first have to refactor the merge join code to ensure it has a tighter loop of key comparisons, then we can start using the SIMD-based key comparison method @Yuhta mentioned.
Cc: @JkSelf who has been helping with that code for a bit.