datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

feat: Implement bloom_filter_agg

Open mbutrovich opened this issue 1 year ago • 7 comments

Which issue does this PR close?

Closes #846.

Rationale for this change

What changes are included in this PR?

  • Native implementation (bloom_filter_agg.rs) that uses DataFusion's Accumulator trait. We do not have a GroupsAccumulator implementation and leave it as a possible future optimization.
  • Serde logic (planner.rs, QueryPlanSerde.scala)
  • Serialization and merging logic for underlying data structures (spark_bloom_filter.rs, spark_bit_array.rs)

How are these changes tested?

  • New test in CometExecSuite
  • Spark tests in CI exercise this aggregation
  • Scala benchmark to compare against Spark code path
  • Native benchmark for partial and final aggregation modes
  • Native tests for new bit array logic spark_bit_array.rs

mbutrovich avatar Sep 30 '24 19:09 mbutrovich

Codecov Report

Attention: Patch coverage is 76.47059% with 4 lines in your changes missing coverage. Please review.

Project coverage is 34.41%. Comparing base (c3023c5) to head (bf22902). Report is 19 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 76.47% 2 Missing and 2 partials :warning:
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #987      +/-   ##
============================================
+ Coverage     34.03%   34.41%   +0.38%     
- Complexity      875      889      +14     
============================================
  Files           112      112              
  Lines         43289    43428     +139     
  Branches       9572     9627      +55     
============================================
+ Hits          14734    14947     +213     
+ Misses        25521    25437      -84     
- Partials       3034     3044      +10     
Flag Coverage Δ
?

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov-commenter avatar Oct 02 '24 01:10 codecov-commenter

Results from the benchmark I just added:

BloomFilterAggregate Exec (cardinality 100):       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         117            136          18         89.4          11.2       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  117            134          18         89.4          11.2       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)             71             78           9        148.3           6.7       1.7X

BloomFilterAggregate Exec (cardinality 1024):      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         111            128          11         94.7          10.6       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  110            135          15         95.7          10.4       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)             69             78          12        152.9           6.5       1.6X

BloomFilterAggregate Exec (cardinality 1048576):   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         165            183          14         63.6          15.7       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  169            184          11         62.0          16.1       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)            117            126           9         89.2          11.2       1.4X

mbutrovich avatar Oct 02 '24 14:10 mbutrovich

Results from the benchmark I just added:

Looks like a 40% improvement.

image

alamb avatar Oct 02 '24 16:10 alamb

Just putting notes for the test failure. It's failing one Spark test in InjectRuntimeFilterSuite. The test is "Merge runtime bloom filters". It's failing in a check in CometArrayImporter when it's bringing Arrow data from Native back over to JVM.

The plan is a bit of a monster, but I'll provide it below:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [c1#45920, b1#45919], [c2#45926, b2#45925], Inner
   :- Sort [c1#45920 ASC NULLS FIRST, b1#45919 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1#45920, b1#45919, 5), ENSURE_REQUIREMENTS, [plan_id=739]
   :     +- Filter (((isnotnull(c1#45920) AND isnotnull(b1#45919)) AND might_contain(Subquery subquery#45946, [id=#610].bloomFilter, xxhash64(c1#45920, 42))) AND might_contain(Subquery subquery#45949, [id=#678].bloomFilter, xxhash64(b1#45919, 42)))
   :        :  :- Subquery subquery#45946, [id=#610]
   :        :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :        :  :     +- CometProject [mergedValue#45952], [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :  :        +- !CometHashAggregate [buf#45954, buf#45955], Final, [bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :  :           +- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=605]
   :        :  :              +- !CometHashAggregate [c2#45926, b2#45925], Partial, [partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :  :                 +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :  :                    +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :  :                       +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        :  +- Subquery subquery#45949, [id=#678]
   :        :     +- AdaptiveSparkPlan isFinalPlan=false
   :        :        +- CometProject [mergedValue#45952], [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :           +- !CometHashAggregate [buf#45954, buf#45955], Final, [bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :              +- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=673]
   :        :                 +- !CometHashAggregate [c2#45926, b2#45925], Partial, [partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :                    +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :                       +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :                          +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        +- CometScan parquet spark_catalog.default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] Batched: true, DataFilters: [isnotnull(c1#45920), isnotnull(b1#45919)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/matt/git/spark/spark-warehouse/org.apache.spark.sql.Inject..., PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(b1)], ReadSchema: struct<a1:int,b1:int,c1:int,d1:int,e1:int,f1:int>
   +- CometSort [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], [c2#45926 ASC NULLS FIRST, b2#45925 ASC NULLS FIRST]
      +- CometExchange hashpartitioning(c2#45926, b2#45925, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=742]
         +- CometFilter [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
            +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>

This is what it looks like on the main branch:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [c1#45920, b1#45919], [c2#45926, b2#45925], Inner
   :- Sort [c1#45920 ASC NULLS FIRST, b1#45919 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1#45920, b1#45919, 5), ENSURE_REQUIREMENTS, [plan_id=729]
   :     +- Filter (((isnotnull(c1#45920) AND isnotnull(b1#45919)) AND might_contain(Subquery subquery#45946, [id=#605].bloomFilter, xxhash64(c1#45920, 42))) AND might_contain(Subquery subquery#45949, [id=#668].bloomFilter, xxhash64(b1#45919, 42)))
   :        :  :- Subquery subquery#45946, [id=#605]
   :        :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :        :  :     +- Project [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :  :        +- ObjectHashAggregate(keys=[], functions=[bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :  :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=601]
   :        :  :              +- ObjectHashAggregate(keys=[], functions=[partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :  :                 +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :  :                    +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :  :                       +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/matt/git/spark/spark-warehouse/org.apache.spark.sql.Inject..., PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        :  +- Subquery subquery#45949, [id=#668]
   :        :     +- AdaptiveSparkPlan isFinalPlan=false
   :        :        +- Project [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :           +- ObjectHashAggregate(keys=[], functions=[bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=664]
   :        :                 +- ObjectHashAggregate(keys=[], functions=[partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :                    +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :                       +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :                          +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        +- CometScan parquet spark_catalog.default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] Batched: true, DataFilters: [isnotnull(c1#45920), isnotnull(b1#45919)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(b1)], ReadSchema: struct<a1:int,b1:int,c1:int,d1:int,e1:int,f1:int>
   +- CometSort [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], [c2#45926 ASC NULLS FIRST, b2#45925 ASC NULLS FIRST]
      +- CometExchange hashpartitioning(c2#45926, b2#45925, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=732]
         +- CometFilter [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
            +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>

mbutrovich avatar Oct 03 '24 20:10 mbutrovich

Debugger output from the failing state in CometArrayImporter.

this = {CometArrayImporter@18451} 
snapshot = {ArrowArray$Snapshot@18448} 
 length = 1
 null_count = 0
 offset = 0
 n_buffers = 1
 n_children = 2
 buffers = 105553139081104
 children = 105553139081088
 dictionary = 0
 release = 6002611972
 private_data = 105553130135808
children = {long[2]@18449} [105553174820048, 105553174820208]
 0 = 105553174820048
 1 = 105553174820208
childVectors = {ArrayList@18450}  size = 1
 0 = {VarBinaryVector@22881} "[]"
vector = {StructVector@18452} "[]"
 reader = {NullableStructReaderImpl@22883} 
 writer = {NullableStructWriter@22884} "org.apache.comet.shaded.arrow.vector.complex.impl.NullableStructWriter@8a493b[index = 0]"
 validityBuffer = {ArrowBuf@22885} "ArrowBuf[1], address:0, capacity:0"
 validityAllocationSizeInBytes = 497
 NonNullableStructVector.reader = {SingleStructReaderImpl@22886} 
 field = {Field@22887} "Struct not null"
 valueCount = 0
 ephPair = null
 vectors = {PromotableMultiMapWithOrdinal@22888} 
 allowConflictPolicyChanges = true
 conflictPolicy = {AbstractStructVector$ConflictPolicy@22889} "CONFLICT_REPLACE"
 name = null
 allocator = {RootAllocator@18453} "Allocator(ROOT) 0/2176/4196904/9223372036854775807 (res/actual/peak/limit)\n"
 callBack = null
children.length = 2

The entire subquery runs in native code now, so my guess is that the output from that projection, which looks like it should be a struct with two binary values in it, is wrong. I'm not sure if it's a bug in the projection, or something further downstream.

mbutrovich avatar Oct 04 '24 13:10 mbutrovich

I do not have time to look at this error yet. I may take a look after the conference.

viirya avatar Oct 09 '24 03:10 viirya

Can't say I see a huge different in TPC-H or TPC-DS locally, but the plans I looked at were typically building filters over very small relations.

mbutrovich avatar Oct 09 '24 21:10 mbutrovich

@mbutrovich Is it possible to trace back where children and childVectors are populated?

kazuyukitanimura avatar Oct 10 '24 22:10 kazuyukitanimura

The Spark SQL test failure can be fixed by #1016.

viirya avatar Oct 12 '24 08:10 viirya

I merged the fix. You can rebase and re-trigger CI now.

viirya avatar Oct 13 '24 07:10 viirya

Merged in updated main, thanks for the fix!

mbutrovich avatar Oct 13 '24 15:10 mbutrovich

I tested with TPC-H q5 and see that we are now running the bloom filter agg natively

andygrove avatar Oct 18 '24 20:10 andygrove