feat: support inner iejoin
Which issue does this PR close?
ref #8393
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
Basicly ready for review, more sqllogicaltests will be add later.
comparison
data: iedataset.sql
CREATE
OR REPLACE EXTERNAL TABLE employees(
id INT,
salary INT,
tax INT)
STORED AS PARQUET
LOCATION 'employees.parquet';
INSERT INTO employees (id, salary, tax)
SELECT
facts.id AS id,
salary,
(salary / 10 + CASE WHEN random() <= 0.005 THEN round(random() * 100000)::INTEGER ELSE 0 END)::INTEGER AS tax
FROM (
SELECT
id,
100 * id AS salary
FROM (SELECT UNNEST(range(1, 500000))) tbl(id)
) facts;
COPY employees TO 'employees.parquet' STORED AS PARQUET;
SELECT COUNT(*) FROM 'employees.parquet' r;
create dataset by
datafusion-cli -f iedataset.sql
execute: iejoin.sql
SELECT COUNT(*) FROM (
SELECT r.id, s.id
FROM 'employees.parquet' r, 'employees.parquet' s
WHERE r.salary < s.salary AND r.tax > s.tax
) q1;
It seems the main cost is sorting.
ptal @xudong963
Really impressive work!
-
I suggest opening another PR for benchmarks only, it can get merged easily and also help attract more attention.
-
I have a question: (just skimmed through the duckdb blog, haven't fully understood the algorithm, please correct me if I'm wrong)
My understandings on how IEJoin works: For query in above benchmark's example
SELECT r.id, s.id
FROM 'employees.parquet' r, 'employees.parquet' s
WHERE r.salary < s.salary AND r.tax > s.tax
Conceptually it's executing in 3 steps:
- Sort (r union s) on
salary, add one column forsalary_rank - Sort (r union s) again on
tax, thesalary_rankin step 1 gets permutated into Permutation Array - Construct a bit array (same length as (r union s)), and do nest loop on it using Permutation Array information, to find matches
It's still input_size^2 complexity, however the N^2 step (step 3) is just looping through a bit array, it's way more efficient than do N^2 join condition evaluation if it's using NLJ, to make this implementation efficient.
This PR only use datafusion's existing sort executer to do step 1, this can be parallelized to run on multiple partitions, and everything else is left inside new IEJoin executor, including union all two input tables, step 2 sorting, and looping over bit vector.
Is it possible to extract the second sorting outside IEJoin executor, and let the existing sort executor to do this? This step then can be more easily parallelized, or does IEJoin executor have a better way to parallelize its job?
It's still input_size^2 complexity, however the N^2 step (step 3) is just looping through a bit array, it's way more efficient than do N^2 join condition evaluation if it's using NLJ, to make this implementation efficient.
If use btreemap to maintain it, the complexity will be $NlogN + OutputSize$. And the bitmap[i..n] maybe sparse in many scenes, use bitmap with bloom filter(bloomfilter[i]=0 means bitmap[ki..k(i+1)] are all 0) can skip lots of useless value, and bitmap is very cache-friendly.
Is it possible to extract the second sorting outside
IEJoinexecutor, and let the existing sort executor to do this? This step then can be more easily parallelized, or doesIEJoinexecutor have a better way to parallelize its job?
I don't know how to use extract the second sorting outside IEJoin executor though. And currently parallelize mechanism is split the left table into n blocks and right table into m blocks (left table and right table already sort by condition1). Then we have n*m part data to compute, and we can check whether l[i] block and r[j] block can produce any result pair satisfying condition1 in O(1) complexity, if not result pair can be produced, we can just skip the block pair l[i] and r[j]. If you have a better idea, I'd love to hear about it.
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.
I'm planning on taking a look at this over the next week or so, but it will take a little time for me to get up to speed on the details of what you're trying to do here. Can you add any descriptive text to the PR so I don't need to wade through the entire conversation on the issue?
I'm planning on taking a look at this over the next week or so, but it will take a little time for me to get up to speed on the details of what you're trying to do here. Can you add any descriptive text to the PR so I don't need to wade through the entire conversation on the issue?
I'm really glad that you're willing to review this PR @timsaucer. This PR might appear to be quite lengthy. Additionally, I've been quite busy recently preparing for my graduation thesis, so my responses might not be timely.
And this pr wants to support for inner IEJoin, optimizing join operations without equality join conditions but with two or more inequality conditions, and improving the performance of specific queries.
The main idea of the IEJoin algorithm is to convert the join operation with inequality conditions into an ordered pair/inversion pair of permutation problem.
For example,
SELECT t1.t id, t2.t id
FROM west t1, west t2
WHERE t1.time < t2.time AND t1.cost < t2.cost
Conceptually it's executing in 3 steps: 1、Sort (r union s) on time in ascend order, add one column for time_rank(1..n) 2、Sort (r union s with time_rank) again on cost in ascend order, the time_rank in step 1 gets permutated into Permutation Array(represented as p) 3、Compute the ordered pair of permutation array p. For a pair (i, j) in l2, if i < j then e_i.cost < e_j.cost because l2 is sorted by cost in ascending order. And if p[i] < p[j], then e_i.time < e_j.time because l1 is sorted by time in ascending order.
If we use btreemap to maintain all the p[i] where i<j, we can get all pairs in $NlogN+OutputSize$. And you can find more detailed examples in the comments.
To parallel the above algorithm, we can sort t1 and t2 by time (condition 1) firstly, and repartition the data into N partitions, then join t1[i] and t2[j] respectively. And if the minimum time of t1[i] is greater than the maximum time of t2[j], we can skip the join of t1[i] and t2[j] because there is no join result between them according to condition 1. So I add the optimizer to ensure the input data has been sorted by condition1.
It seems the main cost is sorting.
By the way, this perf result shows that the sort process in permutation computing is the main cost currently.
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.