Add BloomFilter PhysicalExpr
Is your feature request related to a problem or challenge?
Related to #15512
I think this is a first step towards HashJoinExec pushdown. I think we should model that as col >= hash_table_min AND col <= hash_table_max AND hash_table_bloom(col). The simple bounds should help with stats pruning, etc.
I believe we could also use this to accelerate IN <LIST> and re-use the code for bloom filter pruning in parquet scans (just for code reuse)
@adriangb I would like to work on this
FYI @mbutrovich -- I believe you were working on something like this related to Comet -- maybe it is worth a look / review here to make sure the design works with comet too if possible
So the high level for Spark is that there’s a BloomFilterAgg aggregate function that returns a byte sequence representing the bloom filter. The BloomFilterMightContain scalar function takes the byte sequence and an input to test and returns a Boolean.
I’m not sure we’d aim for Spark compatibility or not: we’d have to use the same hash function, seed, and they do some weirdness with endianness in the bloom filter’s byte sequence. I can point to the relevant code for interest, but we may want a different solution for core DF.
https://github.com/apache/datafusion-comet/blob/2c604f9ac68f9723f1ae9107ac2864ac1041f6e9/native/core/src/execution/util/spark_bloom_filter.rs#L31
I had also played with building one with the fastbloom crate in the hash join operator, but lacked the ability to push it anywhere useful in the plan, which we now have.
#13147
I can point to the relevant code for interest, but we may want a different solution for core DF.
Maybe this code would at least make it easy for us to have a performance bake-off of something like fastbloom vs. the Spark-compatible bloom filter.
I believe it should also be possible to share the Arc<JoinLeftData> within the created PhysicalExpr.
This avoids to build a bloom filter. We already have the full table in memory, so we can not really save anything by compressing it into a bloom filter.
We already have the full table in memory, so we can not really save anything by compressing it into a bloom filter.
Agreed: if we're not concerned with larger-than-memory hash joins, or having to serialize the data structure to other compute nodes, then a Bloom filter might not be necessary yet.
@Dandandan the two ways I thought a bloom filter would be advantageous:
- More performant if applied to each row than the full hash table, although I admit I haven't poked around in HashJoinExec. Even if not faster it could be a useful tool to have around to e.g. implement larger than memory joins where you only hit disk if the bloom filter passes.
- Easier to serialize across the wire (e.g. for remote scans like Liquidcache).
Either way I think we can decouple the two things: there seems to be some interest in adding a bloom filter expression, that can be developed in parallel with the hash join pushdown work and then we can just bench what is more performant.
- Easier to serialize across the wire
This is actually something I've started looking at in the last day and got stuck pretty quickly trying to serialize the HashBrown HashTable, but that's a bit of a tangent.
- Easier to serialize across the wire Yeah that part is of course true (especially larger tables you probably want to avoid sending over the network).
the 1. More performant if applied to each row - I think the constructing the bloom filter and (potentially) slower hash function will probably have more (negative) impact than the bloom filter being faster (might be because it uses less memory, so it's a bit more cache efficient).
From my past experience, bloom filter mostly generates a negative impact. And for most cases, min-max works fine.
We already have the full table in memory, so we can not really save anything by compressing it into a bloom filter.
Agreed: if we're not concerned with larger-than-memory hash joins, or having to serialize the data structure to other compute nodes, then a Bloom filter might not be necessary yet.
FWIW this is the approach I have seen in the past (share a pointer to the (read only) hash table)