cloudberry icon indicating copy to clipboard operation
cloudberry copied to clipboard

Should we support other scan types than SeqScan for runtime filter?

Open my-ship-it opened this issue 5 months ago • 6 comments

Currently, runtime filter only support Seqscan and Dynamic Seqscan, which has a positive performance gain for HashJoin, because it reduces probe HashTable cost when doing join by bloom filter.

For example,

postgres=# EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
                                        QUERY PLAN
-------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
   ->  Hash Join (actual rows=0 loops=1)
         Hash Cond: (t1.c2 = t2.c2)
         Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets.
         ->  Seq Scan on t1 (actual rows=1 loops=1)
               Rows Removed by Pushdown Runtime Filter: 127
         ->  Hash (actual rows=32 loops=1)
               Buckets: 524288  Batches: 1  Memory Usage: 4098kB
               ->  Seq Scan on t2 (actual rows=32 loops=1)
 Optimizer: Postgres query optimizer
(10 rows)

For HashJoin, the scan could be FunctionScan, SubqueryScan, TidScan and etc, so should we support these Scan types, shall we have performance gain?

my-ship-it avatar Jul 16 '25 06:07 my-ship-it

Yes!

I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass )

That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics

Here the DuckDB implementation - https://github.com/duckdb/duckdb/pull/17326

What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?

leborchuk avatar Jul 16 '25 17:07 leborchuk

Yes!

I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass )

That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics

Here the DuckDB implementation - duckdb/duckdb#17326

What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?

@leborchuk

how to use bloom filter in MPP environment? As you can see the plan in the current issue, the Hash Join operator will execute in every segment. The bloom filter will be built by the inner table(in this plan, BF will be built by t2 ).

Is it enough to create local bloom filters? The size of the bloom filter is calculated from workmem. This is fair enough, but if the inner table is too large, the runtime filter will not be very effective.

jiaqizho avatar Jul 17 '25 01:07 jiaqizho

What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?

HI, do you mean how to use that if there is Motion node under Join?

avamingli avatar Jul 17 '25 02:07 avamingli

Yes!

I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass )

That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics

Here the DuckDB implementation - duckdb/duckdb#17326

What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?

Hi @leborchuk, thanks for the suggestions and for providing other reference implementations in the industry, which is helpful.

In Cloudberry, for queries with Motion, a bloom filter can be enabled after Motion to reduce the amount of data in Hash Join and improve performance, for example:

-----------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..1582.50 rows=4203 width=8)
   ->  Hash Join  (cost=0.00..1582.37 rows=1401 width=8)
         Hash Cond: ((r.v + 1) = l.v)
         ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..1148.63 rows=14933 width=4).  
               Hash Key: (r.v + 1)
                 Rows Removed by Pushdown Runtime Filter: 127
               ->  Seq Scan on tbl2 r  (cost=0.00..1148.44 rows=14933 width=4)
         ->  Hash  (cost=431.01..431.01 rows=334 width=4)
               ->  Seq Scan on tbl1 l  (cost=0.00..431.01 rows=334 width=4)
 Optimizer: GPORCA

However, the data motion volume remains relatively large. A better approach is to perform filtering before data is sent, like:

-----------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..1582.50 rows=4203 width=8)
   ->  Hash Join  (cost=0.00..1582.37 rows=1401 width=8)
         Hash Cond: ((r.v + 1) = l.v)
         ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..1148.63 rows=14933 width=4).  
               Hash Key: (r.v + 1)
               ->  Seq Scan on tbl2 r  (cost=0.00..1148.44 rows=14933 width=4)
                         Rows Removed by Pushdown Runtime Filter: 127
         ->  Hash  (cost=431.01..431.01 rows=334 width=4)
               ->  Seq Scan on tbl1 l  (cost=0.00..431.01 rows=334 width=4)
 Optimizer: GPORCA

Motion in Cloudberry only supports unidirectional transmission of data from the sender to the receiver, and cannot send data from the receiver to the sender. Unless we can modify Motion to support sending the Bloom filter from the receiver to the sender?

my-ship-it avatar Jul 17 '25 03:07 my-ship-it

Yes! I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass ) That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics Here the DuckDB implementation - duckdb/duckdb#17326 What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?

Hi @leborchuk, thanks for the suggestions and for providing other reference implementations in the industry, which is helpful.

In Cloudberry, for queries with Motion, a bloom filter can be enabled after Motion to reduce the amount of data in Hash Join and improve performance, for example:

-----------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..1582.50 rows=4203 width=8)
   ->  Hash Join  (cost=0.00..1582.37 rows=1401 width=8)
         Hash Cond: ((r.v + 1) = l.v)
         ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..1148.63 rows=14933 width=4).  
               Hash Key: (r.v + 1)
                 Rows Removed by Pushdown Runtime Filter: 127
               ->  Seq Scan on tbl2 r  (cost=0.00..1148.44 rows=14933 width=4)
         ->  Hash  (cost=431.01..431.01 rows=334 width=4)
               ->  Seq Scan on tbl1 l  (cost=0.00..431.01 rows=334 width=4)
 Optimizer: GPORCA

However, the data motion volume remains relatively large. A better approach is to perform filtering before data is sent, like:

-----------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..1582.50 rows=4203 width=8)
   ->  Hash Join  (cost=0.00..1582.37 rows=1401 width=8)
         Hash Cond: ((r.v + 1) = l.v)
         ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..1148.63 rows=14933 width=4).  
               Hash Key: (r.v + 1)
               ->  Seq Scan on tbl2 r  (cost=0.00..1148.44 rows=14933 width=4)
                         Rows Removed by Pushdown Runtime Filter: 127
         ->  Hash  (cost=431.01..431.01 rows=334 width=4)
               ->  Seq Scan on tbl1 l  (cost=0.00..431.01 rows=334 width=4)
 Optimizer: GPORCA

Motion in Cloudberry only supports unidirectional transmission of data from the sender to the receiver, and cannot send data from the receiver to the sender. Unless we can modify Motion to support sending the Bloom filter from the receiver to the sender?

For this issue, I believe we have a more elegant and broadly applicable solution that is not only suitable for sharing bloom filter information across nodes, but also applicable to globally sharing all runtime state information.

There is a paper we learn from: Anser: Adaptive Information Sharing Framework of AnalyticDB.

jianlirong avatar Jul 17 '25 09:07 jianlirong

Yes! I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass ) That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics Here the DuckDB implementation - duckdb/duckdb#17326 What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?

Hi @leborchuk, thanks for the suggestions and for providing other reference implementations in the industry, which is helpful. In Cloudberry, for queries with Motion, a bloom filter can be enabled after Motion to reduce the amount of data in Hash Join and improve performance, for example:

-----------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..1582.50 rows=4203 width=8)
   ->  Hash Join  (cost=0.00..1582.37 rows=1401 width=8)
         Hash Cond: ((r.v + 1) = l.v)
         ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..1148.63 rows=14933 width=4).  
               Hash Key: (r.v + 1)
                 Rows Removed by Pushdown Runtime Filter: 127
               ->  Seq Scan on tbl2 r  (cost=0.00..1148.44 rows=14933 width=4)
         ->  Hash  (cost=431.01..431.01 rows=334 width=4)
               ->  Seq Scan on tbl1 l  (cost=0.00..431.01 rows=334 width=4)
 Optimizer: GPORCA

However, the data motion volume remains relatively large. A better approach is to perform filtering before data is sent, like:

-----------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..1582.50 rows=4203 width=8)
   ->  Hash Join  (cost=0.00..1582.37 rows=1401 width=8)
         Hash Cond: ((r.v + 1) = l.v)
         ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..1148.63 rows=14933 width=4).  
               Hash Key: (r.v + 1)
               ->  Seq Scan on tbl2 r  (cost=0.00..1148.44 rows=14933 width=4)
                         Rows Removed by Pushdown Runtime Filter: 127
         ->  Hash  (cost=431.01..431.01 rows=334 width=4)
               ->  Seq Scan on tbl1 l  (cost=0.00..431.01 rows=334 width=4)
 Optimizer: GPORCA

Motion in Cloudberry only supports unidirectional transmission of data from the sender to the receiver, and cannot send data from the receiver to the sender. Unless we can modify Motion to support sending the Bloom filter from the receiver to the sender?

For this issue, I believe we have a more elegant and broadly applicable solution that is not only suitable for sharing bloom filter information across nodes, but also applicable to globally sharing all runtime state information.

There is a paper we learn from: Anser: Adaptive Information Sharing Framework of AnalyticDB.

Thank Lirong for the valuable insights. The paper is worthy of further study by us, which provides inspiration on how to share information in distributed systems.

my-ship-it avatar Jul 25 '25 03:07 my-ship-it