datafusion
datafusion copied to clipboard
Push Dynamic Join Predicates into Scan ("Sideways Information Passing", etc)
Is your feature request related to a problem or challenge?
If we want to make DataFusion the engine of choice for fast OLAP processing, eventually we will need to make joins faster. In addition to making sure the join order is not disastrous (e.g. https://github.com/apache/arrow-datafusion/issues/7949) we can consider other advanced OLAP techniques improve joins (especially queries with multiple joins)
Describe the solution you'd like
I would like to propose we look into pushing "join predicate" into scans (which I know of as "sideways information passing")
As an example, consider the joins from TPCH Q17
select
sum(l_extendedprice) / 7.0 as avg_yearly from
part, lineitem
where
p_partkey = l_partkey
and p_brand = 'Brand#23'
and p_container = 'MED BOX'
and l_quantity < ( select 0.2 * avg(l_quantity) from lineitem where l_partkey = p_partkey );
The first join (should) look like this. The observation is there are no predicates on the lineitem table (the big one), which means all the filtering happens in the join, which is bad because the scan can't do optimizations like "late materialization" and instead must decode all 60M values of selected columns, even though very few (2044!) are actually used
│
│
2044 Rows │
│
▼
┌────────────────┐
│ HashJoin │
│ p_partkey = │
│ l_partkey │
└──┬─────────┬───┘ This scan decodes 60M values
2M Rows │ │ 60M Rows of l_quantity and
┌────────┘ └─────────┐ l_extendedprice, even though
│ │ all but 2044 are filtered by
▼ ▼ the join
┌──────────────────┐ ┌─────────────────────┐
│Scan: part │ │Scan: lineitem │ │
│projection: │ │projection: │
│ p_partkey │ │ l_quantity, │ │
│filters: │ │ l_extendedprice, │◀─ ─ ─ ─ ─ ─ ─ ─ ─
│ p_brand = .. │ │ l_partkey │
│ p_container = ..│ │filters: │
│ │ │ NONE │
└──────────────────┘ └─────────────────────┘
The idea is to push the predicate into the join, by making something that acts like l_partkey IN (...) that can be applied during the scan
1. The HashJoin completely reads the build
side before starting the probe side.
Thus, all 2M known matching values of
│ l_partkey are in a hash table prior to
│ scanning lineitem
2044 Rows │
│ │
▼
┌────────────────┐ │
│ HashJoin │
│ p_partkey = │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
│ l_partkey │
└──┬─────────┬───┘
│ │ 60M Rows
┌────────┘ └────────────┐ The idea is to introduce a filter
│ │ that is effectively "l_partkey IN
▼ ▼ (HASH TABLE)" or something similar
┌──────────────────┐ ┌──────────────────────────┐ that is applied during the scan
│Scan: part │ │Scan: lineitem │┌ ─ ─
│projection: │ │projection: │ If the scan can avoid decoding
│ p_partkey │ │ l_quantity, ││ l_quantity and l_extended that do
│filters: │ │ l_extendedprice, │ not match, there is significant
│ p_brand = .. │ │ l_partkey ││ savings
│ p_container = ..│ │filters: │
│ │ │ l_partkey IN (....) ◀─│┘
└──────────────────┘ └──────────────────────────┘
In a query with a single selective join (that filters many values) the savings is likely minimal as it depends on how much work can be saved in materialization (decoding). The only scan that does late materialization in DataFusion at the time of writing is the ParquetExec
However, in a query with multiple selective joins the savings becomes much more pronounced, because we can save the effort of creating intermediate join outputs which are filtered out by joins later in the plan
For example:
Pass down in multiple joins
While this doesn't happen in TPCH
Q17 (the subquery has no predicates)
the SIPS approach can be even more
effective with multiple selective
joins │
│
│ Filters on both join keys can be applied
│ at this level, which can be even more
▼ effective as it avoids the work to create
┌────────────────┐ the intermediate output of HashJoin(2) ─ ┐
│ HashJoin (1) │ which is then filtered by HashJoin(1)
│ d1.key = │ │
│ f.d1_key │
└──┬─────────┬───┘ │
│ │
┌──────────┘ └────────────┐ │
│ │
▼ ▼ │
┌──────────────────┐ ┌────────────────┐
│Scan: D1 │ │ HashJoin (2) │ │
│filters: │ │ d2.key = │
│ ... │ │ f.d2_key │ │
└──────────────────┘ └───┬─────────┬──┘
│ │ │
┌───────────┘ └─────────────┐
│ │ │
▼ ▼
┌────────────────┐ ┌─────────────────────┐ │
│Scan: D2 │ │Scan: F │
│filters: │ │filters: │ │
│ ... │ │ f.d1_key IN (...) │◀ ─ ─ ─ ─
└────────────────┘ │ f.d2_key IN (...) │
│ │
└─────────────────────┘
Describe alternatives you've considered
Some version of this technique is described in "Bloom Filter Joins" in Spark: https://issues.apache.org/jira/browse/SPARK-32268
Building a seprate Bloom Filter has the nice property that you can distribute them in a networked cluster, however, the overhead of creating the bloom filter would likely be non trivial
Additional context
See a description of how DataFusion HashJoins work here: https://github.com/apache/arrow-datafusion/pull/7953
Here is an industrial paper that describes industrial experience with using SIPS techniques here: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf
cc @sunchao @viirya and @kazuyukitanimura whom I mention this technique the other day
Is your feature request related to a problem or challenge?
If we want to make DataFusion the engine of choice for fast OLAP processing, eventually we will need to make joins faster. In addition to making sure the join order is not disastrous (e.g. #7949) we can consider other advanced OLAP techniques improve joins (especially queries with multiple joins)
Describe the solution you'd like
I would like to propose we look into pushing "join predicate" into scans (which I know of as "sideways information passing")
As an example, consider the joins from TPCH Q17
select sum(l_extendedprice) / 7.0 as avg_yearly from part, lineitem where p_partkey = l_partkey and p_brand = 'Brand#23' and p_container = 'MED BOX' and l_quantity < ( select 0.2 * avg(l_quantity) from lineitem where l_partkey = p_partkey );The first join (should) look like this. The observation is there are no predicates on the
lineitemtable (the big one), which means all the filtering happens in the join, which is bad because the scan can't do optimizations like "late materialization" and instead must decode all 60M values of selected columns, even though very few (2044!) are actually used│ │ 2044 Rows │ │ ▼ ┌────────────────┐ │ HashJoin │ │ p_partkey = │ │ l_partkey │ └──┬─────────┬───┘ This scan decodes 60M values 2M Rows │ │ 60M Rows of l_quantity and ┌────────┘ └─────────┐ l_extendedprice, even though │ │ all but 2044 are filtered by ▼ ▼ the join ┌──────────────────┐ ┌─────────────────────┐ │Scan: part │ │Scan: lineitem │ │ │projection: │ │projection: │ │ p_partkey │ │ l_quantity, │ │ │filters: │ │ l_extendedprice, │◀─ ─ ─ ─ ─ ─ ─ ─ ─ │ p_brand = .. │ │ l_partkey │ │ p_container = ..│ │filters: │ │ │ │ NONE │ └──────────────────┘ └─────────────────────┘The idea is to push the predicate into the join, by making something that acts like
l_partkey IN (...)that can be applied during the scan1. The HashJoin completely reads the build side before starting the probe side. Thus, all 2M known matching values of │ l_partkey are in a hash table prior to │ scanning lineitem 2044 Rows │ │ │ ▼ ┌────────────────┐ │ │ HashJoin │ │ p_partkey = │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ l_partkey │ └──┬─────────┬───┘ │ │ 60M Rows ┌────────┘ └────────────┐ The idea is to introduce a filter │ │ that is effectively "l_partkey IN ▼ ▼ (HASH TABLE)" or something similar ┌──────────────────┐ ┌──────────────────────────┐ that is applied during the scan │Scan: part │ │Scan: lineitem │┌ ─ ─ │projection: │ │projection: │ If the scan can avoid decoding │ p_partkey │ │ l_quantity, ││ l_quantity and l_extended that do │filters: │ │ l_extendedprice, │ not match, there is significant │ p_brand = .. │ │ l_partkey ││ savings │ p_container = ..│ │filters: │ │ │ │ l_partkey IN (....) ◀─│┘ └──────────────────┘ └──────────────────────────┘In a query with a single selective join (that filters many values) the savings is likely minimal as it depends on how much work can be saved in materialization (decoding). The only scan that does late materialization in DataFusion at the time of writing is the
ParquetExecHowever, in a query with multiple selective joins the savings becomes much more pronounced, because we can save the effort of creating intermediate join outputs which are filtered out by joins later in the plan
For example:
Pass down in multiple joins While this doesn't happen in TPCH Q17 (the subquery has no predicates) the SIPS approach can be even more effective with multiple selective joins │ │ │ Filters on both join keys can be applied │ at this level, which can be even more ▼ effective as it avoids the work to create ┌────────────────┐ the intermediate output of HashJoin(2) ─ ┐ │ HashJoin (1) │ which is then filtered by HashJoin(1) │ d1.key = │ │ │ f.d1_key │ └──┬─────────┬───┘ │ │ │ ┌──────────┘ └────────────┐ │ │ │ ▼ ▼ │ ┌──────────────────┐ ┌────────────────┐ │Scan: D1 │ │ HashJoin (2) │ │ │filters: │ │ d2.key = │ │ ... │ │ f.d2_key │ │ └──────────────────┘ └───┬─────────┬──┘ │ │ │ ┌───────────┘ └─────────────┐ │ │ │ ▼ ▼ ┌────────────────┐ ┌─────────────────────┐ │ │Scan: D2 │ │Scan: F │ │filters: │ │filters: │ │ │ ... │ │ f.d1_key IN (...) │◀ ─ ─ ─ ─ └────────────────┘ │ f.d2_key IN (...) │ │ │ └─────────────────────┘Describe alternatives you've considered
Some version of this technique is described in "Bloom Filter Joins" in Spark: https://issues.apache.org/jira/browse/SPARK-32268
Building a seprate Bloom Filter has the nice property that you can distribute them in a networked cluster, however, the overhead of creating the bloom filter would likely be non trivial
Additional context
See a description of how DataFusion HashJoins work here: #7953
Here is an industrial paper that describes industrial experience with using SIPS techniques here: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf
I'm very curious about how this kind of graph is drawn😯
I'm very curious about how this kind of graph is drawn😯
@ACking-you I draw it by hand using https://monodraw.helftone.com/ (and it unfortunately takes me a long time)
@devinjdangelo has suggested https://asciiflow.com/#/ works for making something quick in the browser
@westonpace pointed out that if you're writing on Github then you can use mermaid syntax in comments / issues / prs and it will render automatically: https://github.blog/2022-02-14-include-diagrams-markdown-files-mermaid/
From: https://github.com/apache/datafusion/discussions/9963
TakeExec (index lookup) -- really like an indexed scan somehow>
I wonder if TakeExec or something quite similar could also be used for dynamic join predicates?
I wonder if TakeExec or something quite similar could also be used for dynamic join predicates?
If there is a secondary index on l_partkey then I think a TakeExec could be useful. Otherwise there is no way to know the row offsets and a filtered scan is probably the best you can do.
Interested in this one!
Thanks @Lordworms -- I am hoping someone else can step up and help you with this. I just don't have time to help with a project to improve join performance at this time.
I believe DuckDB just announced support for this feature in 1.1: https://duckdb.org/2024/09/09/announcing-duckdb-110.html#dynamic-filter-pushdown-from-joins
I have a PR up for doing something similar for TopK sorts (ORDER BY col LIMIT 10) in https://github.com/apache/datafusion/pull/15301. I think we should be able to re-use that work for this change, at which point it would just be a question of implementing a DynamicFilterSource for the join ExecutionPlans. Does that sound right to folks here? I want to make sure we don't land something that is almost what is needed here but not quite.
Thank you! @adriangb Basically, they're similar. Join runtime filter should have different kinds of filters, such as min-max, inlist, bloom filter, we can build them based on the build side of hashjoin.
Supporting the min-max runtime filter should be a good start.
I think that since #15301 pushes an arbitrary Arc<dyn PhysicalExpression> down min/max, inlist, etc. should all be doable 😄
I think that since #15301 pushes an arbitrary
Arc<dyn PhysicalExpression>down min/max, inlist, etc. should all be doable 😄
Yes, I noticed theDynamicFilterSource trait.
So now that #15568 is in, what is a reasonable approach to do SIP with bloom filters?
- Modify
HashJoinExecto build a bloom filter on the build side, and when complete callDynamicFilterPhysicalExpr::update - How do we represent the bloom filter test as an expression? Is it a new
PhysicalExpr, or do we define aScalarUDFforbloom_fiilter_contains(bloom_filter, input)and use aScalarFunctionExpr? I'm not convinced of theScalarUDFapproach for two reasons: 1) taking the bloom filter bytes as an arg would require construction/validation of a bloom filter from those bytes at every invocation, right? 2) I'm not sure we want to expose this as SQL function.
I'm interested to start tackling this in pieces next week, but I also suspect others have thoughts (maybe even progress) in this area already.
Modify HashJoinExec to build a bloom filter on the build side, and when complete call DynamicFilterPhysicalExpr::update
Pretty much: once HashjoinExec has completed the build side it builds a bloom filter and wraps it up in a PhysicalExpr.
I don't think you need to use a DynamicFilterPhysicalExpr since once you build the hash table / bloom filter it's never updated. The reason to have DynamicFilterPhysicalExpr is e.g. in a TopK operator where every RecordBatch you process you gain new information to do more selective pruning (new heap values). So I'd think you could just pass the hardcoded PhysicalExpr directly?
To your second point: one thing to consider is if you want this to be compatible with any sort of distributed query execution. If you do then one of two things would need to happen:
- Teach the serialization how to serialize the bloom filter PhysicalExpr -> in practice update the protobuf serialization to match against it.
- Implement
PhysicalExpr::snapshot()to convert it into anInListor something.
Given all of this my recommendation would be to build a dedicated BloomFilter PhysicalExpr and tech DataFusion how to serialize it to ProtoBuf. Then don't use DynamicFilterPhysicalExpr or PhysicalExpr::snapshot().
The nice thing about this is that the new bloom filter expr could be re-used in other places, e.g. InList could build a bloom filter instead of a HashSet for pre-filtering (I think that's what it currently does).
I opened #16435
Took an initial stab at this in https://github.com/apache/datafusion/pull/16445
Took an initial stab at this in https://github.com/apache/datafusion/pull/16445
👋 @adriangb I just read your PR, this is a really nice optimization ty 🙇♀
If I understand correctly, the builder side is currently always set to the left. Do you think it would make sense to make this configurable in the future, with the left side as the default? I think it could be potentially valuable for users that query custom datasources and have statistics on their side.
Also, separately, I was thinking if we could support for dynamic filters for WHERE IN (...) clauses as well. Definitely all of this are follow-ups, and I’d be happy to take a stab at it if no one else is interested :)
Also, separately, I was thinking if we could support for dynamic filters for WHERE IN (...) clauses as well. Definitely all of this are follow-ups, and I’d be happy to take a stab at it if no one else is interested :)
I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.
If I understand correctly, the builder side is currently always set to the left. Do you think it would make sense to make this configurable in the future, with the left side as the default?
Yes totally, but that's a larger problem of join optimization / planning that I don't think this PR interacts with.
I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.
make the build side value to a list, then construct a in list dynamic predicate for hash join
I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.
make the build side value to a list, then construct a in list dynamic predicate for hash join
I think the way to do that is to share an Arc'ed ref to the hash table directly as @Dandandan is suggesting. We'll have to decide the behavior for PhysicalExpr::snapshot, I guess if the hash table is small enough we can build an IN LIST expression but for bigger hash tables we might not be able to serialize them across the wire or use them in stats/predicate pruning and will have to rely on just the bounds.
I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.
make the build side value to a list, then construct a in list dynamic predicate for hash join
I think the way to do that is to share an Arc'ed ref to the hash table directly as @Dandandan is suggesting. We'll have to decide the behavior for
PhysicalExpr::snapshot, I guess if the hash table is small enough we can build anIN LISTexpression but for bigger hash tables we might not be able to serialize them across the wire or use them in stats/predicate pruning and will have to rely on just the bounds.
Yeah, that makes sense. Before we made inlist was to add a check, such as only the build size < 1024 or less, then inlist predicate will be generated
Took an initial stab at this in #16445
👋 @adriangb I just read your PR, this is a really nice optimization ty 🙇♀
If I understand correctly, the builder side is currently always set to the left. Do you think it would make sense to make this configurable in the future, with the left side as the default? I think it could be potentially valuable for users that query custom datasources and have statistics on their side.
Also, separately, I was thinking if we could support for dynamic filters for
WHERE IN (...)clauses as well. Definitely all of this are follow-ups, and I’d be happy to take a stab at it if no one else is interested :)
Some where x in (...) clauses (correlated subqueries) are converted to a join, those will benefit from the optimization already automatically!
Those that don't AFAIK can't run on DataFusion as of now as there is no support for executing subqueries. If DataFusion would support that, it would be interesting to look at cases when dynamically pushing down the subquery results into a dynamic filter would be worth it (probably also checking for the results to be smaller than n rows).
Thanks! Just to clarify - I didn’t mean user-facing WHERE IN (SELECT ...) clauses. I meant building IN LIST expression filter dynamically at runtime (or essentially what @xudong963 said, building a list from the values of build side) and pushing them down to the probe side.
@adriangb says https://github.com/apache/datafusion/pull/16445#issuecomment-3026127559:
Btw here's an article that explains how DuckDB does join filter pushdown. It sounds like they only push down min/max filters: https://duckdb.org/2024/09/09/announcing-duckdb-110.html#dynamic-filter-pushdown-from-joins
I think this is mostly done!
There's a couple followups but the main goal is achieved:
- https://github.com/apache/datafusion/issues/17171
- https://github.com/apache/datafusion/issues/16973
- https://github.com/apache/datafusion/pull/17090
@alamb shall we close this issue and continue in the followups?
@alamb shall we close this issue and continue in the followups?
Sounds good to me -- what PR had the main change? was it
- https://github.com/apache/datafusion/pull/16445 ?
@alamb shall we close this issue and continue in the followups?
Sounds good to me -- what PR had the main change? was it
Yep that's the one!
I also added this to the list of things to document on DataFusion 50.0.0: https://github.com/apache/datafusion/issues/16799