datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Push Dynamic Join Predicates into Scan ("Sideways Information Passing", etc)

Open alamb opened this issue 2 years ago • 16 comments

Is your feature request related to a problem or challenge?

If we want to make DataFusion the engine of choice for fast OLAP processing, eventually we will need to make joins faster. In addition to making sure the join order is not disastrous (e.g. https://github.com/apache/arrow-datafusion/issues/7949) we can consider other advanced OLAP techniques improve joins (especially queries with multiple joins)

Describe the solution you'd like

I would like to propose we look into pushing "join predicate" into scans (which I know of as "sideways information passing")

As an example, consider the joins from TPCH Q17

select
sum(l_extendedprice) / 7.0 as avg_yearly from
part, lineitem
where
  p_partkey = l_partkey
  and p_brand = 'Brand#23'
  and p_container = 'MED BOX'
  and l_quantity < (	select	0.2 * avg(l_quantity)	from	lineitem where	l_partkey = p_partkey	);

The first join (should) look like this. The observation is there are no predicates on the lineitem table (the big one), which means all the filtering happens in the join, which is bad because the scan can't do optimizations like "late materialization" and instead must decode all 60M values of selected columns, even though very few (2044!) are actually used

                          │                                                         
                          │                                                         
           2044 Rows      │                                                         
                          │                                                         
                          ▼                                                         
                 ┌────────────────┐                                                 
                 │    HashJoin    │                                                 
                 │   p_partkey =  │                                                 
                 │   l_partkey    │                                                 
                 └──┬─────────┬───┘                     This scan decodes 60M values
   2M Rows          │         │             60M Rows         of l_quantity and      
           ┌────────┘         └─────────┐               l_extendedprice, even though
           │                            │               all but 2044 are filtered by
           ▼                            ▼                         the join          
 ┌──────────────────┐        ┌─────────────────────┐                                
 │Scan: part        │        │Scan: lineitem       │                  │             
 │projection:       │        │projection:          │                                
 │  p_partkey       │        │  l_quantity,        │                  │             
 │filters:          │        │  l_extendedprice,   │◀─ ─ ─ ─ ─ ─ ─ ─ ─              
 │  p_brand = ..    │        │  l_partkey          │                                
 │  p_container = ..│        │filters:             │                                
 │                  │        │  NONE               │                                
 └──────────────────┘        └─────────────────────┘                                

The idea is to push the predicate into the join, by making something that acts like l_partkey IN (...) that can be applied during the scan


                               1. The HashJoin completely reads the build                        
                               side before starting the probe side.                              
                                                                                                 
                               Thus, all 2M known matching values of                             
                         │     l_partkey are in a hash table prior to                            
                         │     scanning lineitem                                                 
          2044 Rows      │                                                                       
                         │                           │                                           
                         ▼                                                                       
                ┌────────────────┐                   │                                           
                │    HashJoin    │                                                               
                │   p_partkey =  │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                           
                │   l_partkey    │                                                               
                └──┬─────────┬───┘                                                               
                   │         │             60M Rows                                              
          ┌────────┘         └────────────┐                  The idea is to introduce a filter   
          │                               │                  that is effectively "l_partkey IN   
          ▼                               ▼                  (HASH TABLE)" or something similar  
┌──────────────────┐        ┌──────────────────────────┐     that is applied during the scan     
│Scan: part        │        │Scan: lineitem            │┌ ─ ─                                    
│projection:       │        │projection:               │     If the scan can avoid decoding      
│  p_partkey       │        │  l_quantity,             ││    l_quantity and l_extended that do   
│filters:          │        │  l_extendedprice,        │     not match, there is significant     
│  p_brand = ..    │        │  l_partkey               ││    savings                             
│  p_container = ..│        │filters:                  │                                         
│                  │        │  l_partkey IN (....)   ◀─│┘                                        
└──────────────────┘        └──────────────────────────┘                                         

In a query with a single selective join (that filters many values) the savings is likely minimal as it depends on how much work can be saved in materialization (decoding). The only scan that does late materialization in DataFusion at the time of writing is the ParquetExec

However, in a query with multiple selective joins the savings becomes much more pronounced, because we can save the effort of creating intermediate join outputs which are filtered out by joins later in the plan

For example:

    Pass down in multiple joins                                                                 
                                                                                                
 While this doesn't happen in TPCH                                                              
Q17 (the subquery has no predicates)                                                            
 the SIPS approach can be even more                                                             
 effective with multiple selective                                                              
               joins                  │                                                         
                                      │                                                         
                                      │             Filters on both join keys can be applied    
                                      │             at this level, which can be even more       
                                      ▼             effective as it avoids the work to create   
                             ┌────────────────┐     the intermediate output of HashJoin(2)   ─ ┐
                             │  HashJoin (1)  │     which is then filtered by HashJoin(1)       
                             │     d1.key =   │                                                │
                             │    f.d1_key    │                                                 
                             └──┬─────────┬───┘                                                │
                                │         │                                                     
                     ┌──────────┘         └────────────┐                                       │
                     │                                 │                                        
                     ▼                                 ▼                                       │
           ┌──────────────────┐               ┌────────────────┐                                
           │Scan: D1          │               │  HashJoin (2)  │                               │
           │filters:          │               │     d2.key =   │                                
           │  ...             │               │    f.d2_key    │                               │
           └──────────────────┘               └───┬─────────┬──┘                                
                                                  │         │                                  │
                                      ┌───────────┘         └─────────────┐                     
                                      │                                   │                    │
                                      ▼                                   ▼                     
                             ┌────────────────┐                ┌─────────────────────┐         │
                             │Scan: D2        │                │Scan: F              │          
                             │filters:        │                │filters:             │         │
                             │  ...           │                │  f.d1_key IN (...)  │◀ ─ ─ ─ ─ 
                             └────────────────┘                │  f.d2_key IN (...)  │          
                                                               │                     │          
                                                               └─────────────────────┘          

Describe alternatives you've considered

Some version of this technique is described in "Bloom Filter Joins" in Spark: https://issues.apache.org/jira/browse/SPARK-32268

Building a seprate Bloom Filter has the nice property that you can distribute them in a networked cluster, however, the overhead of creating the bloom filter would likely be non trivial

Additional context

See a description of how DataFusion HashJoins work here: https://github.com/apache/arrow-datafusion/pull/7953

Here is an industrial paper that describes industrial experience with using SIPS techniques here: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf

alamb avatar Oct 27 '23 16:10 alamb

cc @sunchao @viirya and @kazuyukitanimura whom I mention this technique the other day

alamb avatar Oct 27 '23 16:10 alamb

Is your feature request related to a problem or challenge?

If we want to make DataFusion the engine of choice for fast OLAP processing, eventually we will need to make joins faster. In addition to making sure the join order is not disastrous (e.g. #7949) we can consider other advanced OLAP techniques improve joins (especially queries with multiple joins)

Describe the solution you'd like

I would like to propose we look into pushing "join predicate" into scans (which I know of as "sideways information passing")

As an example, consider the joins from TPCH Q17

select
sum(l_extendedprice) / 7.0 as avg_yearly from
part, lineitem
where
  p_partkey = l_partkey
  and p_brand = 'Brand#23'
  and p_container = 'MED BOX'
  and l_quantity < (	select	0.2 * avg(l_quantity)	from	lineitem where	l_partkey = p_partkey	);

The first join (should) look like this. The observation is there are no predicates on the lineitem table (the big one), which means all the filtering happens in the join, which is bad because the scan can't do optimizations like "late materialization" and instead must decode all 60M values of selected columns, even though very few (2044!) are actually used

                          │                                                         
                          │                                                         
           2044 Rows      │                                                         
                          │                                                         
                          ▼                                                         
                 ┌────────────────┐                                                 
                 │    HashJoin    │                                                 
                 │   p_partkey =  │                                                 
                 │   l_partkey    │                                                 
                 └──┬─────────┬───┘                     This scan decodes 60M values
   2M Rows          │         │             60M Rows         of l_quantity and      
           ┌────────┘         └─────────┐               l_extendedprice, even though
           │                            │               all but 2044 are filtered by
           ▼                            ▼                         the join          
 ┌──────────────────┐        ┌─────────────────────┐                                
 │Scan: part        │        │Scan: lineitem       │                  │             
 │projection:       │        │projection:          │                                
 │  p_partkey       │        │  l_quantity,        │                  │             
 │filters:          │        │  l_extendedprice,   │◀─ ─ ─ ─ ─ ─ ─ ─ ─              
 │  p_brand = ..    │        │  l_partkey          │                                
 │  p_container = ..│        │filters:             │                                
 │                  │        │  NONE               │                                
 └──────────────────┘        └─────────────────────┘                                

The idea is to push the predicate into the join, by making something that acts like l_partkey IN (...) that can be applied during the scan


                               1. The HashJoin completely reads the build                        
                               side before starting the probe side.                              
                                                                                                 
                               Thus, all 2M known matching values of                             
                         │     l_partkey are in a hash table prior to                            
                         │     scanning lineitem                                                 
          2044 Rows      │                                                                       
                         │                           │                                           
                         ▼                                                                       
                ┌────────────────┐                   │                                           
                │    HashJoin    │                                                               
                │   p_partkey =  │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                           
                │   l_partkey    │                                                               
                └──┬─────────┬───┘                                                               
                   │         │             60M Rows                                              
          ┌────────┘         └────────────┐                  The idea is to introduce a filter   
          │                               │                  that is effectively "l_partkey IN   
          ▼                               ▼                  (HASH TABLE)" or something similar  
┌──────────────────┐        ┌──────────────────────────┐     that is applied during the scan     
│Scan: part        │        │Scan: lineitem            │┌ ─ ─                                    
│projection:       │        │projection:               │     If the scan can avoid decoding      
│  p_partkey       │        │  l_quantity,             ││    l_quantity and l_extended that do   
│filters:          │        │  l_extendedprice,        │     not match, there is significant     
│  p_brand = ..    │        │  l_partkey               ││    savings                             
│  p_container = ..│        │filters:                  │                                         
│                  │        │  l_partkey IN (....)   ◀─│┘                                        
└──────────────────┘        └──────────────────────────┘                                         

In a query with a single selective join (that filters many values) the savings is likely minimal as it depends on how much work can be saved in materialization (decoding). The only scan that does late materialization in DataFusion at the time of writing is the ParquetExec

However, in a query with multiple selective joins the savings becomes much more pronounced, because we can save the effort of creating intermediate join outputs which are filtered out by joins later in the plan

For example:

    Pass down in multiple joins                                                                 
                                                                                                
 While this doesn't happen in TPCH                                                              
Q17 (the subquery has no predicates)                                                            
 the SIPS approach can be even more                                                             
 effective with multiple selective                                                              
               joins                  │                                                         
                                      │                                                         
                                      │             Filters on both join keys can be applied    
                                      │             at this level, which can be even more       
                                      ▼             effective as it avoids the work to create   
                             ┌────────────────┐     the intermediate output of HashJoin(2)   ─ ┐
                             │  HashJoin (1)  │     which is then filtered by HashJoin(1)       
                             │     d1.key =   │                                                │
                             │    f.d1_key    │                                                 
                             └──┬─────────┬───┘                                                │
                                │         │                                                     
                     ┌──────────┘         └────────────┐                                       │
                     │                                 │                                        
                     ▼                                 ▼                                       │
           ┌──────────────────┐               ┌────────────────┐                                
           │Scan: D1          │               │  HashJoin (2)  │                               │
           │filters:          │               │     d2.key =   │                                
           │  ...             │               │    f.d2_key    │                               │
           └──────────────────┘               └───┬─────────┬──┘                                
                                                  │         │                                  │
                                      ┌───────────┘         └─────────────┐                     
                                      │                                   │                    │
                                      ▼                                   ▼                     
                             ┌────────────────┐                ┌─────────────────────┐         │
                             │Scan: D2        │                │Scan: F              │          
                             │filters:        │                │filters:             │         │
                             │  ...           │                │  f.d1_key IN (...)  │◀ ─ ─ ─ ─ 
                             └────────────────┘                │  f.d2_key IN (...)  │          
                                                               │                     │          
                                                               └─────────────────────┘          

Describe alternatives you've considered

Some version of this technique is described in "Bloom Filter Joins" in Spark: https://issues.apache.org/jira/browse/SPARK-32268

Building a seprate Bloom Filter has the nice property that you can distribute them in a networked cluster, however, the overhead of creating the bloom filter would likely be non trivial

Additional context

See a description of how DataFusion HashJoins work here: #7953

Here is an industrial paper that describes industrial experience with using SIPS techniques here: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf

I'm very curious about how this kind of graph is drawn😯

acking-you avatar Oct 29 '23 05:10 acking-you

I'm very curious about how this kind of graph is drawn😯

@ACking-you I draw it by hand using https://monodraw.helftone.com/ (and it unfortunately takes me a long time)

@devinjdangelo has suggested https://asciiflow.com/#/ works for making something quick in the browser

@westonpace pointed out that if you're writing on Github then you can use mermaid syntax in comments / issues / prs and it will render automatically: https://github.blog/2022-02-14-include-diagrams-markdown-files-mermaid/

alamb avatar Oct 29 '23 11:10 alamb

From: https://github.com/apache/datafusion/discussions/9963

TakeExec (index lookup) -- really like an indexed scan somehow>

I wonder if TakeExec or something quite similar could also be used for dynamic join predicates?

ahirner avatar Jul 12 '24 20:07 ahirner

I wonder if TakeExec or something quite similar could also be used for dynamic join predicates?

If there is a secondary index on l_partkey then I think a TakeExec could be useful. Otherwise there is no way to know the row offsets and a filtered scan is probably the best you can do.

westonpace avatar Jul 12 '24 21:07 westonpace

Interested in this one!

Lordworms avatar Aug 09 '24 01:08 Lordworms

Thanks @Lordworms -- I am hoping someone else can step up and help you with this. I just don't have time to help with a project to improve join performance at this time.

alamb avatar Aug 14 '24 19:08 alamb

I believe DuckDB just announced support for this feature in 1.1: https://duckdb.org/2024/09/09/announcing-duckdb-110.html#dynamic-filter-pushdown-from-joins

alamb avatar Sep 09 '24 17:09 alamb

I have a PR up for doing something similar for TopK sorts (ORDER BY col LIMIT 10) in https://github.com/apache/datafusion/pull/15301. I think we should be able to re-use that work for this change, at which point it would just be a question of implementing a DynamicFilterSource for the join ExecutionPlans. Does that sound right to folks here? I want to make sure we don't land something that is almost what is needed here but not quite.

adriangb avatar Mar 20 '25 17:03 adriangb

Thank you! @adriangb Basically, they're similar. Join runtime filter should have different kinds of filters, such as min-max, inlist, bloom filter, we can build them based on the build side of hashjoin.

Supporting the min-max runtime filter should be a good start.

xudong963 avatar Mar 24 '25 14:03 xudong963

I think that since #15301 pushes an arbitrary Arc<dyn PhysicalExpression> down min/max, inlist, etc. should all be doable 😄

adriangb avatar Mar 24 '25 14:03 adriangb

I think that since #15301 pushes an arbitrary Arc<dyn PhysicalExpression> down min/max, inlist, etc. should all be doable 😄

Yes, I noticed theDynamicFilterSource trait.

xudong963 avatar Mar 24 '25 14:03 xudong963

So now that #15568 is in, what is a reasonable approach to do SIP with bloom filters?

  • Modify HashJoinExec to build a bloom filter on the build side, and when complete call DynamicFilterPhysicalExpr::update
  • How do we represent the bloom filter test as an expression? Is it a new PhysicalExpr, or do we define a ScalarUDF for bloom_fiilter_contains(bloom_filter, input) and use a ScalarFunctionExpr? I'm not convinced of the ScalarUDF approach for two reasons: 1) taking the bloom filter bytes as an arg would require construction/validation of a bloom filter from those bytes at every invocation, right? 2) I'm not sure we want to expose this as SQL function.

I'm interested to start tackling this in pieces next week, but I also suspect others have thoughts (maybe even progress) in this area already.

mbutrovich avatar Apr 25 '25 15:04 mbutrovich

Modify HashJoinExec to build a bloom filter on the build side, and when complete call DynamicFilterPhysicalExpr::update

Pretty much: once HashjoinExec has completed the build side it builds a bloom filter and wraps it up in a PhysicalExpr. I don't think you need to use a DynamicFilterPhysicalExpr since once you build the hash table / bloom filter it's never updated. The reason to have DynamicFilterPhysicalExpr is e.g. in a TopK operator where every RecordBatch you process you gain new information to do more selective pruning (new heap values). So I'd think you could just pass the hardcoded PhysicalExpr directly?

To your second point: one thing to consider is if you want this to be compatible with any sort of distributed query execution. If you do then one of two things would need to happen:

  1. Teach the serialization how to serialize the bloom filter PhysicalExpr -> in practice update the protobuf serialization to match against it.
  2. Implement PhysicalExpr::snapshot() to convert it into an InList or something.

Given all of this my recommendation would be to build a dedicated BloomFilter PhysicalExpr and tech DataFusion how to serialize it to ProtoBuf. Then don't use DynamicFilterPhysicalExpr or PhysicalExpr::snapshot().

The nice thing about this is that the new bloom filter expr could be re-used in other places, e.g. InList could build a bloom filter instead of a HashSet for pre-filtering (I think that's what it currently does).

adriangb avatar Apr 25 '25 16:04 adriangb

I opened #16435

adriangb avatar Jun 18 '25 01:06 adriangb

Took an initial stab at this in https://github.com/apache/datafusion/pull/16445

adriangb avatar Jun 18 '25 15:06 adriangb

Took an initial stab at this in https://github.com/apache/datafusion/pull/16445

👋 @adriangb I just read your PR, this is a really nice optimization ty 🙇‍♀

If I understand correctly, the builder side is currently always set to the left. Do you think it would make sense to make this configurable in the future, with the left side as the default? I think it could be potentially valuable for users that query custom datasources and have statistics on their side.

Also, separately, I was thinking if we could support for dynamic filters for WHERE IN (...) clauses as well. Definitely all of this are follow-ups, and I’d be happy to take a stab at it if no one else is interested :)

LiaCastaneda avatar Jun 27 '25 16:06 LiaCastaneda

Also, separately, I was thinking if we could support for dynamic filters for WHERE IN (...) clauses as well. Definitely all of this are follow-ups, and I’d be happy to take a stab at it if no one else is interested :)

I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.

alamb avatar Jun 27 '25 16:06 alamb

If I understand correctly, the builder side is currently always set to the left. Do you think it would make sense to make this configurable in the future, with the left side as the default?

Yes totally, but that's a larger problem of join optimization / planning that I don't think this PR interacts with.

adriangb avatar Jun 27 '25 16:06 adriangb

I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.

make the build side value to a list, then construct a in list dynamic predicate for hash join

xudong963 avatar Jun 27 '25 16:06 xudong963

I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.

make the build side value to a list, then construct a in list dynamic predicate for hash join

I think the way to do that is to share an Arc'ed ref to the hash table directly as @Dandandan is suggesting. We'll have to decide the behavior for PhysicalExpr::snapshot, I guess if the hash table is small enough we can build an IN LIST expression but for bigger hash tables we might not be able to serialize them across the wire or use them in stats/predicate pruning and will have to rely on just the bounds.

adriangb avatar Jun 27 '25 16:06 adriangb

I am not sure what you mean by WHERE IN(..) clauses -- specifically I am not sure what is dynamic -- for example for WHERE x IN (1,2,3) the IN list is constant.

make the build side value to a list, then construct a in list dynamic predicate for hash join

I think the way to do that is to share an Arc'ed ref to the hash table directly as @Dandandan is suggesting. We'll have to decide the behavior for PhysicalExpr::snapshot, I guess if the hash table is small enough we can build an IN LIST expression but for bigger hash tables we might not be able to serialize them across the wire or use them in stats/predicate pruning and will have to rely on just the bounds.

Yeah, that makes sense. Before we made inlist was to add a check, such as only the build size < 1024 or less, then inlist predicate will be generated

xudong963 avatar Jun 27 '25 17:06 xudong963

Took an initial stab at this in #16445

👋 @adriangb I just read your PR, this is a really nice optimization ty 🙇‍♀

If I understand correctly, the builder side is currently always set to the left. Do you think it would make sense to make this configurable in the future, with the left side as the default? I think it could be potentially valuable for users that query custom datasources and have statistics on their side.

Also, separately, I was thinking if we could support for dynamic filters for WHERE IN (...) clauses as well. Definitely all of this are follow-ups, and I’d be happy to take a stab at it if no one else is interested :)

Some where x in (...) clauses (correlated subqueries) are converted to a join, those will benefit from the optimization already automatically!

Those that don't AFAIK can't run on DataFusion as of now as there is no support for executing subqueries. If DataFusion would support that, it would be interesting to look at cases when dynamically pushing down the subquery results into a dynamic filter would be worth it (probably also checking for the results to be smaller than n rows).

Dandandan avatar Jun 27 '25 17:06 Dandandan

Thanks! Just to clarify - I didn’t mean user-facing WHERE IN (SELECT ...) clauses. I meant building IN LIST expression filter dynamically at runtime (or essentially what @xudong963 said, building a list from the values of build side) and pushing them down to the probe side.

LiaCastaneda avatar Jul 01 '25 10:07 LiaCastaneda

@adriangb says https://github.com/apache/datafusion/pull/16445#issuecomment-3026127559:

Btw here's an article that explains how DuckDB does join filter pushdown. It sounds like they only push down min/max filters: https://duckdb.org/2024/09/09/announcing-duckdb-110.html#dynamic-filter-pushdown-from-joins

alamb avatar Jul 02 '25 19:07 alamb

I think this is mostly done!

There's a couple followups but the main goal is achieved:

  • https://github.com/apache/datafusion/issues/17171
  • https://github.com/apache/datafusion/issues/16973
  • https://github.com/apache/datafusion/pull/17090

adriangb avatar Aug 13 '25 11:08 adriangb

@alamb shall we close this issue and continue in the followups?

adriangb avatar Aug 13 '25 11:08 adriangb

@alamb shall we close this issue and continue in the followups?

Sounds good to me -- what PR had the main change? was it

  • https://github.com/apache/datafusion/pull/16445 ?

alamb avatar Aug 13 '25 20:08 alamb

@alamb shall we close this issue and continue in the followups?

Sounds good to me -- what PR had the main change? was it

Yep that's the one!

adriangb avatar Aug 13 '25 20:08 adriangb

I also added this to the list of things to document on DataFusion 50.0.0: https://github.com/apache/datafusion/issues/16799

alamb avatar Aug 13 '25 20:08 alamb