spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-47547] BloomFilter fpp degradation

Open ishnagy opened this issue 7 months ago • 9 comments

What changes were proposed in this pull request?

This change fixes a performance degradation issue in the current BloomFilter implementation.

The current bit index calculation logic does not use any part of the indexable space above the first 31bits, so when the inserted item count approaches (or exceeds) Integer.MAX_VALUE, it will produce significantly worse collision rates than an (ideal) uniformly distributing hash function.

Why are the changes needed?

This should qualify as a bug.

The upper bound on the bit capacity of the current BloomFilter implementation in spark is approx 137G bits (64 bit longs in an Integer.MAX_VALUE sized array). The current indexing scheme can only address about 2G bits of these.

On the other hand, due to the way the BloomFilters are used, the bug won't cause any logical errors, it will gradually render the BloomFilter instance useless by forcing more-and-more queries on the slow path.

Does this PR introduce any user-facing change?

No

How was this patch tested?

new test

One new java testclass was added to sketch to test different combinations of item counts and expected fpp rates.

common/sketch/src/test/java/org/apache/spark/util/sketch/TestSparkBloomFilter.java

testAccuracyEvenOdd in N number of iterations inserts N even numbers (2*i), and leaves out N odd numbers (2*i+1) from the BloomFilter.

The test checks the 100% accuracy of mightContain=true on all of the even items, and measures the mightContain=true (false positive) rate on the not-inserted odd numbers.

testAccuracyRandom in 2N number of iterations inserts N pseudorandomly generated numbers in two differently seeded (theoretically independent) BloomFilter instances. All the random numbers generated in an even-iteration will be inserted into both filters, all the random numbers generated in an odd-iteration will be left out from both.

The test checks the 100% accuracy of mightContain=true for all of the items inserted in an even-loop. It counts the false positives as the number of odd-loop items for which the primary filter reports mightContain=true but secondary reports mightContain=false. Since we inserted the same elements into both instances, and the secondary reports non-insertion, the mightContain=true from the primary can only be a false positive.

patched

One minor (test) issue was fixed in

common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala

where the potential repetitions in the randomly generated stream of insertable items resulted in slightly worse fpp measurements than the actual. The problem affected the those testcases more where the cardinality of the tested type is low (the chance of repetition is high), e.g. Byte and Short.

removed from the default runs

Running these test as part of the default build process was turned off with adding @Disabled annotation to the new testclass.

Was this patch authored or co-authored using generative AI tooling?

No

ishnagy avatar May 19 '25 09:05 ishnagy

Can you please post the output of the new TestSparkBloomFilter here when the 4GB limit of REQUIRED_HEAP_UPPER_BOUND_IN_BYTES is lifted? And summarize the actual false positive rate (FPP) before and after this fix when numItems = {1000000, 1000000000, 5000000000} and expected FPP is the default 3%?

peter-toth avatar May 20 '25 09:05 peter-toth

the tests with the 4GB limit are still running, I'll post a summary from the results tomorrow, and start a new run that can cover all of the 5G element count cases.

ishnagy avatar May 20 '25 17:05 ishnagy

The filter-from-hex-constant test started to make me worry about compatibility with serialized instances created with the older logic. Even if we can deserialize the buffer and the seed properly, the actual bits will be set in completely different positions. That is, there's no point in trying to use an old (serialized) buffer with the new logic.

Should we create a dedicated BloomFilterImplV2 class for the fixed logic, just so we can keep the old V1 implementation for deserializing old byte streams?

ishnagy avatar May 21 '25 16:05 ishnagy

Should we create a dedicated BloomFilterImplV2 class for the fixed logic, just so we can keep the old V1 implementation for deserializing old byte streams?

I don't think we need to keep the old implementation just to support old serialized versions. It seems we use our bloom filter implementation only in BloomFilterAggregate.

cc @cloud-fan

peter-toth avatar May 22 '25 09:05 peter-toth

I ran into some trouble with generating the test results (running on a single thread, the whole batch takes ~10h on my machine). I'll try to make an update on Monday.

ishnagy avatar May 23 '25 07:05 ishnagy

version testName n fpp seed allocatedBitCount setBitCount saturation expectedFpp% actualFpp% runningTime
OLD testAccuracyEvenOdd 1000000 0.05 0 6235264 (0 MB) 2952137 0.473458 5.000000 % 5.025400 % PT19.267149499S
OLD testAccuracyEvenOdd 1000000 0.03 0 7298496 (0 MB) 3618475 0.495784 3.000000 % 3.022900 % PT19.628671953S
OLD testAccuracyEvenOdd 1000000 0.01 0 9585088 (1 MB) 4968111 0.518317 1.000000 % 0.994700 % PT19.476457289S
OLD testAccuracyEvenOdd 1000000 0.001 0 14377600 (1 MB) 7203887 0.501049 0.100000 % 0.102200 % PT19.944492903S
OLD testAccuracyEvenOdd 1000000000 0.05 0 6235224256 (743 MB) 1814052150 0.290936 5.000000 % 50.920521 % PT28M6.091484671S
OLD testAccuracyEvenOdd 1000000000 0.03 0 7298440896 (870 MB) 1938187323 0.265562 3.000000 % 59.888499 % PT30M26.383544378S
OLD testAccuracyEvenOdd 1000000000 0.01 0 9585058432 (1142 MB) 2065015223 0.215441 1.000000 % 76.025548 % PT36M30.827858084S
OLD testAccuracyEvenOdd 1000000000 0.001 0 14377587584 (1713 MB) 2127081112 0.147944 0.100000 % 90.896130 % PT45M58.403282401S
OLD testAccuracyEvenOdd 5000000000 0.05 0 31176121152 (3716 MB) 2147290054 0.068876 5.000000 % 99.963940 % PT1H28M39.598973373S
OLD testAccuracyEvenOdd 5000000000 0.03 0 36492204224 (4350 MB) 2147464804 0.058847 3.000000 % 99.995623 % PT1H41M22.171084285S
OLD testAccuracyEvenOdd 5000000000 0.01 0 47925291904 (5713 MB) 2147483464 0.044809 1.000000 % 99.999939 % PT1H59M42.481346242S
OLD testAccuracyEvenOdd 5000000000 0.001 0 71887937856 (8569 MB) 2147483648 0.029873 0.100000 % 100.000000 % PT2H32M41.743734635S

ishnagy avatar May 26 '25 22:05 ishnagy

version testName n fpp seed allocatedBitCount setBitCount saturation expectedFpp% actualFpp% runningTime
NEW testAccuracyEvenOdd 1000000 0.05 0 6235264 (0 MB) 2952282 0.473481 5.000000 % 5.046800 % PT13.599525353S
NEW testAccuracyEvenOdd 1000000 0.03 0 7298496 (0 MB) 3619967 0.495988 3.000000 % 3.018000 % PT14.086955381S
NEW testAccuracyEvenOdd 1000000 0.01 0 9585088 (1 MB) 4968081 0.518314 1.000000 % 1.013400 % PT14.300125629S
NEW testAccuracyEvenOdd 1000000 0.001 0 14377600 (1 MB) 7205256 0.501145 0.100000 % 0.095100 % PT14.746387272S
NEW testAccuracyEvenOdd 1000000000 0.05 0 6235224256 (743 MB) 2963568196 0.475295 5.000000 % 4.889721 % PT35M6.22696009S
NEW testAccuracyEvenOdd 1000000000 0.03 0 7298440896 (870 MB) 3628684972 0.497186 3.000000 % 2.963030 % PT37M31.833552669S
NEW testAccuracyEvenOdd 1000000000 0.01 0 9585058432 (1142 MB) 4973807865 0.518913 1.000000 % 1.001407 % PT43M23.782325058S
NEW testAccuracyEvenOdd 1000000000 0.001 0 14377587584 (1713 MB) 7210348423 0.501499 0.100000 % 0.100803 % PT57M35.474342424S
NEW testAccuracyEvenOdd 5000000000 0.05 0 31176121152 (3716 MB) 14360939834 0.460639 5.000000 % 6.727508 % PT2H21M2.643592951S
NEW testAccuracyEvenOdd 5000000000 0.03 0 36492204224 (4350 MB) 17711039216 0.485338 3.000000 % 3.806971 % PT2H29M18.334864292S
NEW testAccuracyEvenOdd 5000000000 0.01 0 47925291904 (5713 MB) 24462662240 0.510433 1.000000 % 1.321482 % PT2H56M51.935983408S
NEW testAccuracyEvenOdd 5000000000 0.001 0 71887937856 (8569 MB) 35637830341 0.495741 0.100000 % 0.176216 % PT3H38M21.888031962S

ishnagy avatar May 26 '25 23:05 ishnagy

@cloud-fan, as you added the original bloom filter implementation to Spark, could you please take a look at this PR?

peter-toth avatar May 27 '25 10:05 peter-toth

the only relevant difference between the OLD and the NEW versions is in the logic to derive the k hash bits:

OLD

    for (int i = 1; i <= numHashFunctions; i++) {
      int combinedHash = h1 + (i * h2);
      // ...
    }

NEW

    long combinedHash = (long) h1 * Integer.MAX_VALUE;
    for (long i = 0; i < numHashFunctions; i++) {
      combinedHash += h2;
      // ...
    }

ishnagy avatar May 27 '25 14:05 ishnagy

Now I have added a new tag org.apache.spark.tags.SlowTest which can be used in inclusion/exclusion rules. (*) none of the currently available test tags seemed to be a great match for a slow test in the sketch module

% find common/tags/src/test -type f | sort
common/tags/src/test/java/org/apache/spark/tags/AmmoniteTest.java
common/tags/src/test/java/org/apache/spark/tags/ChromeUITest.java
common/tags/src/test/java/org/apache/spark/tags/DockerTest.java
common/tags/src/test/java/org/apache/spark/tags/ExtendedHiveTest.java
common/tags/src/test/java/org/apache/spark/tags/ExtendedLevelDBTest.java
common/tags/src/test/java/org/apache/spark/tags/ExtendedSQLTest.java
common/tags/src/test/java/org/apache/spark/tags/ExtendedYarnTest.java
common/tags/src/test/java/org/apache/spark/tags/SlowHiveTest.java
common/tags/src/test/java/org/apache/spark/tags/SlowSQLTest.java
common/tags/src/test/java/org/apache/spark/tags/WebBrowserTest.java

The problem is, I don't quite know where those inclusion exclusions should happen exactly. I found

.github/workflows/maven_test.yml
.github/workflows/build_and_test.yml

but I'm not quite sure, how to add the new configuration to them for the sketch project only.

ishnagy avatar Jun 16 '25 12:06 ishnagy

@LuciferYang @peter-toth please have a look at my updates / comments.

ishnagy avatar Jun 16 '25 12:06 ishnagy

but I'm not quite sure, how to add the new configuration to them for the sketch project only.

... so, I expect the running time of the pre merge build to blow up on the next run.

ishnagy avatar Jun 16 '25 12:06 ishnagy

Those tags are only used for grouping the tests. It doesn't imply that tests labeled as @SlowTest won't be executed. Moreover, since there are originally not many test cases for the sketch module, there's no need for grouping them. I think it's enough as long as we can ensure that the test can be finished within a few minutes.

LuciferYang avatar Jun 16 '25 13:06 LuciferYang

Those tags are only used for grouping the tests. It doesn't imply that tests labeled as @SlowTest won't be executed.

I totally get that. @Disabled guaranteed non-execution, these new grouping options just make it possible to configure some workflows to pick up these tests and run them regularly, even if we happen to exclude them from the fast path.

The workflows still have to be adjusted to consider the new tag, but (I think) this can be the cleanest way to isolate this very specialized and very slow test from the rest of the regular tests. Perhaps we can configure the benchmark workflow to pick this up, and run among the other long-running benchmark cases?

[...] I think it's enough as long as we can ensure that the test can be finished within a few minutes.

Although I do have some performance improvement ideas already in mind, I don't think the slowest testcase (5G elements, 0.1% fpp) can be completed under an hour.

If we could somehow guarantee (code or documentation) that the impl class wouldn't be instantiated with a lower error rate than 3%, then we could get rid of the 1% and the 0.1% cases, which constitute the bulk of the runtime. IIRC, one round of the 3% case takes around half an hour, so with the other parameter combinations, we can bring down the total runtime to around 3h.

If we can run the suite on multiple cores (and we can configure maven to use them), we should be able to fit into the 2h GH actions execution limit conveniently. Do we know anything about the number of cores in the runners?

ishnagy avatar Jun 16 '25 14:06 ishnagy

Github-hosted runners generally have 4 vcores. Currently, the submit pipeline uses sbt for testing. Additionally, I would like to reiterate my viewpoint: we should strive to have this test completed within a few minutes(5~10 mins), rather than taking 2 hours. Otherwise, we ought to optimize it or temporarily remove this test case.

LuciferYang avatar Jun 17 '25 02:06 LuciferYang

cutting down the testcases to the bare minimum (3%fpp and 1G items), the test now completes in little over 10minutes on my machine. would this be an acceptable running time?

if the testing concerns are adequately addressed, can we please have a look on the serialization/compatibility questions that came up earlier? in hindsight, it feels really sketchy to deserialize old bytestreams into the updated implementation without any errors or warnings (query results from the inconsistently deserialized object won't make any sense).

adding a new version enum feels like the clean solution, but I'm not sure if it is not an overkill. (e.g. if a serialized bloomfilter never gets shared between different application runs)

ishnagy avatar Jun 24 '25 15:06 ishnagy

Yeah, I believe 10 minutes runtime is acceptable, but if I were you I would test with 100M, maybe the improvement is visible there as well and 1 minute is just enough.

can we please have a look on the serialization/compatibility questions that came up earlier? in hindsight, it feels really sketchy to deserialize old bytestreams into the updated implementation without any errors or warnings (query results from the inconsistently deserialized object won't make any sense).

Bloom filter functions (BloomFilterAggregate / BloomFilterMightContain) seem like internal ones so I wouldn't expect serialized filters to be shared between different application runs, but it seems some native accelerators reimplemented Spark's logic (including its flaws) so we should probably use a new version with this improvement.

peter-toth avatar Jun 26 '25 12:06 peter-toth

Bloom filter functions (BloomFilterAggregate / BloomFilterMightContain) seem like internal ones so I wouldn't expect serialized filters to be shared between different application runs, but it seems some native accelerators reimplemented Spark's logic (including its flaws) so we should probably use a new version with this improvement.

ok, I'll try to provide an update by EOW.

ishnagy avatar Jul 02 '25 08:07 ishnagy

I like the fixed bloom filter logic in this PR but I'm not sure we need such convoluted logging in tests.

peter-toth avatar Jul 06 '25 17:07 peter-toth

cc @dongjoon-hyun

peter-toth avatar Jul 06 '25 17:07 peter-toth

I like the fixed bloom filter logic in this PR but I'm not sure we need such convoluted logging in tests.

the test should be silent now, by default.

you should only be able to see the logging when you explicitly enable verbose with a sysprop.

any other output generating is done to files, which I'd like to keep, if we want to turn this into some benchmark later.

ishnagy avatar Jul 06 '25 18:07 ishnagy

In addition, there are many duplicated code after this PR. I'm not sure this is the best way to extend the existing framework. Do you think we can minimize the code duplication instead of copying the whole file?

The original idea was to fix V1 in place, so the (not too extensive) differences can be easily validated. We only opted for creating a V2 version not to break serialization compatibility, and copying over the whole class still seemed something that can be validated easier than with some class refactoring included.

I can quickly pull up the common methods into a base class, if that's preferred. Not 100% sure how well the inheritance would play with the serialization, but it shouldn't be anything unsolvable anyway.

ishnagy avatar Jul 07 '25 18:07 ishnagy

Before verifying the any performance benefit, [...]

Mind you, any performance benefit won't be visible from the sketch project only. If anything, it can be somewhat slower, since the filter will access the full bit array instead of just the first two gigabytes (2^31). ~ More pagefaults, the cost of correctness.

The perf benefit could only be seen in any client libraries (e.g. spark sql), where a BloomFilter lookup can direct 97% of the mightContain queries on the fast path (with a 3% fpp rate, that is), instead of e.g 40% (with a 60% error rate for 10^9 inserted elements). And of course the number of inserted elements has to be high enough for the improvement to be noticeable. I haven't had the means to conduct some high scale benchmarks, but given some pointers, I can try to put together one to tease out real query improvements.

ishnagy avatar Jul 07 '25 19:07 ishnagy

One additional thing regarding performance.

The first versions of the new test code did both the inserts and the queries in a serial order. This didn't do any good for performance. The queries (reads) were quite straightforward to parallelize, and this reduced the test execution time by about 50%. It's a bit more tricky with the inserts (writes), there are some synchronization issues with flipping up bits in the same long "block" concurrently, but nothing unmanageable. If there's some interest for it, I can look into speeding up the writes as well.

(It may require a noncompatible V3 implementation, but I think we passed that being a boundary already with introducing V2.)

ishnagy avatar Jul 07 '25 19:07 ishnagy

Before verifying the any performance benefit, I requested several basic comments (indentation, code style, simple logic).

In addition, there are many duplicated code after this PR. I'm not sure this is the best way to extend the existing framework. Do you think we can minimize the code duplication instead of copying the whole file?

I agree, decreaseasing code duplication would be great.

Mind you, any performance benefit won't be visible from the sketch project only. If anything, it can be somewhat slower, since the filter will access the full bit array instead of just the first two gigabytes (2^31). ~ More pagefaults, the cost of correctness.

The perf benefit could only be seen in any client libraries (e.g. spark sql), where a BloomFilter lookup can direct 97% of the mightContain queries on the fast path (with a 3% fpp rate, that is), instead of e.g 40% (with a 60% error rate for 10^9 inserted elements). And of course the number of inserted elements has to be high enough for the improvement to be noticeable. I haven't had the means to conduct some high scale benchmarks, but given some pointers, I can try to put together one to tease out real query improvements.

I just wanted to higlight that this isn't a performance issue, but a correctness issue of bloom filters. Although internal bloom filter functions are not publicly available via SQL, bloomFilter is available as public API on dataframes. So if a filter with certain N and FPP is requested then we should return one that can satisfy those needs.

Indeed there are some performance implications of the fix like:

  • the correct implementation of put and might_contain operations can be slightly slower
  • but on the other hand, a correct FPP bloom filter used within injected runtime filters (see InjectRuntimeFilter rule) can be more selective and so reduce the number of rows on the application side and so improve query performance

but I think we should treat this issue as a correctness one essentially.

The queries (reads) were quite straightforward to parallelize, and this reduced the test execution time by about 50%. It's a bit more tricky with the inserts (writes), there are some synchronization issues with flipping up bits in the same long "block" concurrently, but nothing unmanageable. If there's some interest for it, I can look into speeding up the writes as well.

IMO let's focus on the correctness side in this PR and speed improvements can come in a follow-up ones.

peter-toth avatar Jul 08 '25 09:07 peter-toth

I looked into how we could reduce the code duplication and I see two high level options.

a quick-n-dirty monkey patch

where BloomFilterImplV2 extends BloomFilterImpl and overrides the methods we'd like to change. Its only advantage is its simplicity, but other than that, it confuses the class hierarchy even more, and we'll have to be extra careful around the is-a asymmetry between two incompatible classes. This can cause subtle bugs around the instanceof uses (equals, isCompatible, serialization).

a more thorough refactor

where we pull up the duplicated code into a common base class.

(Note: to preserve compatibility, we'll have to keep the implementation name BloomFilterImpl)

We can introduce a new abstract base class between BloomFilter and BloomFilterImpl, say BloomFilterBase. everything common in BloomFilterImpl and BloomFilterImplV2 would go into this base class, the put/mightContain and the serialization methods would be overriden in the descendant classes. Problem: we already have such an abstract base class, BloomFilter. BloomFilter is already overloaded with three different roles: 1. the API/Interface role (the abstract methods) 2. an abstract base class (where the default implementations can go, which apply to all BloomFilters) 3. the Factory role (utility methods for creatinging new BF instances without direct access to their constructor)

So, not to make the change too big, we can pull up the duplicated code to BloomFilter. But littering implementation into a class that's practically used as an interface feels wrong. The distinction between the original BloomFilter and BloomFilterImpl was intentional, it would kind of defeat the purpose if we moved most of the impl code back into its base class.

We could also break out the three different roles of the current BloomFilter class into an interface, an abstract base class, and a (static?) factory. This way, it wouldn't feel that awkward to move most of the current logic to the abstract base class. The only problem with this approach is that either the naming/hierarchy would be really counterintuitive if we have to keep the old names matched with their old functionality (an abstract base class has now the name which should belong to the new interface, the V1 impl has the name of what should be the base class, and names for the V1 and V2 impls aren't symmetric), OR we break compatibility.

Not the greatest of choices, either way. I'd be happy to do the whole refactor (maybe implementing a new, more future proof hierarchy in a new package (e.g. org.apache.spark.util.sketch.[bloom]). But I think the clean way to implement this is too broad of a scope for this particular PR (practically a few liner bugfix).

summary

The options I see

  1. going about the refactor in this PR (cleaner in the end, but more complex to implement and verify)
  2. accepting some tradeoffs now (code duplication / monkey patching ugliness) and break out the proper refactoring into a separate task (which I would be happy to pick up right away)

which one would you prefer?

ishnagy avatar Jul 08 '25 13:07 ishnagy

A new BloomFilterImplBase between BloomFilter and BloomFilterImpl / BloomFilterImplV2 sounds good to me.

peter-toth avatar Jul 11 '25 09:07 peter-toth

@dongjoon-hyun , @peter-toth , the requested changes to reduce code repetition are done, please have a look, when you have the time.

ishnagy avatar Jul 15 '25 04:07 ishnagy

should I resolve the started conversations myself, when I think I'm done, or will they be resolved by the reviewer who started them, when they are confirmed to be acceptable?

ishnagy avatar Jul 15 '25 11:07 ishnagy

Looks good from my side. @dongjoon-hyun, @LuciferYang do you think we can merge this PR?

peter-toth avatar Jul 15 '25 13:07 peter-toth