Should we support other scan types than SeqScan for runtime filter?
Currently, runtime filter only support Seqscan and Dynamic Seqscan, which has a positive performance gain for HashJoin, because it reduces probe HashTable cost when doing join by bloom filter.
For example,
postgres=# EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
QUERY PLAN
-------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1)
-> Hash Join (actual rows=0 loops=1)
Hash Cond: (t1.c2 = t2.c2)
Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets.
-> Seq Scan on t1 (actual rows=1 loops=1)
Rows Removed by Pushdown Runtime Filter: 127
-> Hash (actual rows=32 loops=1)
Buckets: 524288 Batches: 1 Memory Usage: 4098kB
-> Seq Scan on t2 (actual rows=32 loops=1)
Optimizer: Postgres query optimizer
(10 rows)
For HashJoin, the scan could be FunctionScan, SubqueryScan, TidScan and etc, so should we support these Scan types, shall we have performance gain?
Yes!
I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass )
That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics
Here the DuckDB implementation - https://github.com/duckdb/duckdb/pull/17326
What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?
Yes!
I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass )
That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics
Here the DuckDB implementation - duckdb/duckdb#17326
What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?
@leborchuk
how to use bloom filter in MPP environment? As you can see the plan in the current issue, the Hash Join operator will execute in every segment. The bloom filter will be built by the inner table(in this plan, BF will be built by t2 ).
Is it enough to create local bloom filters? The size of the bloom filter is calculated from workmem. This is fair enough, but if the inner table is too large, the runtime filter will not be very effective.
What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?
HI, do you mean how to use that if there is Motion node under Join?
Yes!
I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass )
That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics
Here the DuckDB implementation - duckdb/duckdb#17326
What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?
Hi @leborchuk, thanks for the suggestions and for providing other reference implementations in the industry, which is helpful.
In Cloudberry, for queries with Motion, a bloom filter can be enabled after Motion to reduce the amount of data in Hash Join and improve performance, for example:
-----------------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1582.50 rows=4203 width=8)
-> Hash Join (cost=0.00..1582.37 rows=1401 width=8)
Hash Cond: ((r.v + 1) = l.v)
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..1148.63 rows=14933 width=4).
Hash Key: (r.v + 1)
Rows Removed by Pushdown Runtime Filter: 127
-> Seq Scan on tbl2 r (cost=0.00..1148.44 rows=14933 width=4)
-> Hash (cost=431.01..431.01 rows=334 width=4)
-> Seq Scan on tbl1 l (cost=0.00..431.01 rows=334 width=4)
Optimizer: GPORCA
However, the data motion volume remains relatively large. A better approach is to perform filtering before data is sent, like:
-----------------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1582.50 rows=4203 width=8)
-> Hash Join (cost=0.00..1582.37 rows=1401 width=8)
Hash Cond: ((r.v + 1) = l.v)
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..1148.63 rows=14933 width=4).
Hash Key: (r.v + 1)
-> Seq Scan on tbl2 r (cost=0.00..1148.44 rows=14933 width=4)
Rows Removed by Pushdown Runtime Filter: 127
-> Hash (cost=431.01..431.01 rows=334 width=4)
-> Seq Scan on tbl1 l (cost=0.00..431.01 rows=334 width=4)
Optimizer: GPORCA
Motion in Cloudberry only supports unidirectional transmission of data from the sender to the receiver, and cannot send data from the receiver to the sender. Unless we can modify Motion to support sending the Bloom filter from the receiver to the sender?
Yes! I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass ) That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics Here the DuckDB implementation - duckdb/duckdb#17326 What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?
Hi @leborchuk, thanks for the suggestions and for providing other reference implementations in the industry, which is helpful.
In Cloudberry, for queries with Motion, a bloom filter can be enabled after Motion to reduce the amount of data in Hash Join and improve performance, for example:
----------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1582.50 rows=4203 width=8) -> Hash Join (cost=0.00..1582.37 rows=1401 width=8) Hash Cond: ((r.v + 1) = l.v) -> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..1148.63 rows=14933 width=4). Hash Key: (r.v + 1) Rows Removed by Pushdown Runtime Filter: 127 -> Seq Scan on tbl2 r (cost=0.00..1148.44 rows=14933 width=4) -> Hash (cost=431.01..431.01 rows=334 width=4) -> Seq Scan on tbl1 l (cost=0.00..431.01 rows=334 width=4) Optimizer: GPORCAHowever, the data motion volume remains relatively large. A better approach is to perform filtering before data is sent, like:
----------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1582.50 rows=4203 width=8) -> Hash Join (cost=0.00..1582.37 rows=1401 width=8) Hash Cond: ((r.v + 1) = l.v) -> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..1148.63 rows=14933 width=4). Hash Key: (r.v + 1) -> Seq Scan on tbl2 r (cost=0.00..1148.44 rows=14933 width=4) Rows Removed by Pushdown Runtime Filter: 127 -> Hash (cost=431.01..431.01 rows=334 width=4) -> Seq Scan on tbl1 l (cost=0.00..431.01 rows=334 width=4) Optimizer: GPORCAMotion in Cloudberry only supports unidirectional transmission of data from the sender to the receiver, and cannot send data from the receiver to the sender. Unless we can modify Motion to support sending the Bloom filter from the receiver to the sender?
For this issue, I believe we have a more elegant and broadly applicable solution that is not only suitable for sharing bloom filter information across nodes, but also applicable to globally sharing all runtime state information.
There is a paper we learn from: Anser: Adaptive Information Sharing Framework of AnalyticDB.
Yes! I want to believe we one time could reduce the number of rows in SubqueryScan. And use Forward and Backward filter pass ) That's what inspires me - Debunking the Myth of Join Ordering: Toward Robust SQL Analytics Here the DuckDB implementation - duckdb/duckdb#17326 What I cannot understand right now - how to use bloom filter in MPP environment? Is it enough to create local bloom filters?
Hi @leborchuk, thanks for the suggestions and for providing other reference implementations in the industry, which is helpful. In Cloudberry, for queries with Motion, a bloom filter can be enabled after Motion to reduce the amount of data in Hash Join and improve performance, for example:
----------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1582.50 rows=4203 width=8) -> Hash Join (cost=0.00..1582.37 rows=1401 width=8) Hash Cond: ((r.v + 1) = l.v) -> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..1148.63 rows=14933 width=4). Hash Key: (r.v + 1) Rows Removed by Pushdown Runtime Filter: 127 -> Seq Scan on tbl2 r (cost=0.00..1148.44 rows=14933 width=4) -> Hash (cost=431.01..431.01 rows=334 width=4) -> Seq Scan on tbl1 l (cost=0.00..431.01 rows=334 width=4) Optimizer: GPORCAHowever, the data motion volume remains relatively large. A better approach is to perform filtering before data is sent, like:
----------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1582.50 rows=4203 width=8) -> Hash Join (cost=0.00..1582.37 rows=1401 width=8) Hash Cond: ((r.v + 1) = l.v) -> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..1148.63 rows=14933 width=4). Hash Key: (r.v + 1) -> Seq Scan on tbl2 r (cost=0.00..1148.44 rows=14933 width=4) Rows Removed by Pushdown Runtime Filter: 127 -> Hash (cost=431.01..431.01 rows=334 width=4) -> Seq Scan on tbl1 l (cost=0.00..431.01 rows=334 width=4) Optimizer: GPORCAMotion in Cloudberry only supports unidirectional transmission of data from the sender to the receiver, and cannot send data from the receiver to the sender. Unless we can modify Motion to support sending the Bloom filter from the receiver to the sender?
For this issue, I believe we have a more elegant and broadly applicable solution that is not only suitable for sharing bloom filter information across nodes, but also applicable to globally sharing all runtime state information.
There is a paper we learn from: Anser: Adaptive Information Sharing Framework of AnalyticDB.
Thank Lirong for the valuable insights. The paper is worthy of further study by us, which provides inspiration on how to share information in distributed systems.