-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-47547][CORE] Add BloomFilter V2 and use it as default
#50933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3c5a843
08cbfeb
e3cb08e
c4e3f58
1a0b66f
d912b66
f589e2c
f597c76
4ea633d
6696106
b75e187
2d8a9f1
4a30794
d9d6980
39a46c9
7f235e7
16be3a9
e91b5ca
897c1d4
013bfe4
6d44c1e
925bf12
6f28882
ed6caac
7d4ef74
c52ead3
0ab8276
d2477bf
413c4fe
4599fcb
1ee2e13
c501b2a
1f5cfb6
f60d55f
0314963
f2df338
4aaff83
e214bd7
99f7343
58e3066
c06cb38
b99ef3a
ce3ad76
626e459
b0f5b45
6849dbe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…eckstyle errors, renaming test vars
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,16 +18,12 @@ | |
| package org.apache.spark.util.sketch; | ||
|
|
||
| import org.junit.jupiter.api.*; | ||
| import org.junit.jupiter.params.provider.ValueSource; | ||
| import org.junitpioneer.jupiter.cartesian.CartesianTest; | ||
| import org.junitpioneer.jupiter.cartesian.CartesianTest.Values; | ||
|
|
||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Random; | ||
| import java.util.stream.Stream; | ||
|
|
||
| @Disabled | ||
| public class TestSparkBloomFilter { | ||
|
|
@@ -38,6 +34,7 @@ public class TestSparkBloomFilter { | |
| final double FPP_RANDOM_ERROR_FACTOR = 0.04; | ||
|
|
||
| final long ONE_GB = 1024L * 1024L * 1024L; | ||
| final long REQUIRED_HEAP_UPPER_BOUND_IN_BYTES = 4 * ONE_GB; | ||
|
|
||
| private static Instant START; | ||
| private Instant start; | ||
|
|
@@ -64,6 +61,18 @@ public void afterEach(TestInfo testInfo) { | |
| System.err.println(duration + " " + testInfo.getDisplayName()); | ||
| } | ||
|
|
||
| /** | ||
| * This test, in N number of iterations, inserts N even numbers (2*i) int, | ||
| * and leaves out N odd numbers (2*i+1) from the tested BloomFilter instance. | ||
| * | ||
| * It checks the 100% accuracy of mightContain=true on all of the even items, | ||
| * and measures the mightContain=true (false positive) rate on the not-inserted odd numbers. | ||
| * | ||
| * @param numItems the number of items to be inserted | ||
| * @param expectedFpp the expected fpp rate of the tested BloomFilter instance | ||
| * @param deterministicSeed the deterministic seed to use to initialize | ||
| * the primary BloomFilter instance. | ||
| */ | ||
| @CartesianTest | ||
| public void testAccuracyEvenOdd( | ||
| @Values(longs = {1_000_000L, 1_000_000_000L, 5_000_000_000L}) long numItems, | ||
|
|
@@ -77,8 +86,10 @@ public void testAccuracyEvenOdd( | |
| optimalNumOfBits / Byte.SIZE / 1024 / 1024 | ||
| ); | ||
| Assumptions.assumeTrue( | ||
| optimalNumOfBits / Byte.SIZE < 4 * ONE_GB, | ||
| "this testcase would require allocating more than 4GB of heap mem (" + optimalNumOfBits + " bits)" | ||
| optimalNumOfBits / Byte.SIZE < REQUIRED_HEAP_UPPER_BOUND_IN_BYTES, | ||
| "this testcase would require allocating more than 4GB of heap mem (" | ||
| + optimalNumOfBits | ||
| + " bits)" | ||
| ); | ||
|
|
||
| BloomFilter bloomFilter = BloomFilter.create(numItems, optimalNumOfBits, deterministicSeed); | ||
|
|
@@ -152,6 +163,25 @@ public void testAccuracyEvenOdd( | |
| ); | ||
| } | ||
|
|
||
| /** | ||
| * This test inserts N pseudorandomly generated numbers in 2N number of iterations in two | ||
| * differently seeded (theoretically independent) BloomFilter instances. All the random | ||
| * numbers generated in an even-iteration will be inserted into both filters, all the | ||
| * random numbers generated in an odd-iteration will be left out from both. | ||
| * | ||
| * The test checks the 100% accuracy of 'mightContain=true' for all the items inserted | ||
| * in an even-loop. It counts the false positives as the number of odd-loop items for | ||
| * which the primary filter reports 'mightContain=true', but secondary reports | ||
| * 'mightContain=false'. Since we inserted the same elements into both instances, | ||
| * and the secondary reports non-insertion, the 'mightContain=true' from the primary | ||
| * can only be a false positive. | ||
| * | ||
| * @param numItems the number of items to be inserted | ||
| * @param expectedFpp the expected fpp rate of the tested BloomFilter instance | ||
| * @param deterministicSeed the deterministic seed to use to initialize | ||
| * the primary BloomFilter instance. (The secondary will be | ||
| * initialized with the constant seed of 0xCAFEBABE) | ||
| */ | ||
| @CartesianTest | ||
| public void testAccuracyRandom( | ||
| @Values(longs = {1_000_000L, 1_000_000_000L}) long numItems, | ||
|
|
@@ -165,12 +195,17 @@ public void testAccuracyRandom( | |
| optimalNumOfBits / Byte.SIZE / 1024 / 1024 | ||
| ); | ||
| Assumptions.assumeTrue( | ||
| 2 * optimalNumOfBits / Byte.SIZE < 4 * ONE_GB, | ||
| "this testcase would require allocating more than 4GB of heap mem (2x " + optimalNumOfBits + " bits)" | ||
| 2 * optimalNumOfBits / Byte.SIZE < REQUIRED_HEAP_UPPER_BOUND_IN_BYTES, | ||
| "this testcase would require allocating more than 4GB of heap mem (2x " | ||
| + optimalNumOfBits | ||
| + " bits)" | ||
| ); | ||
|
|
||
| BloomFilter bloomFilterPrimary = BloomFilter.create(numItems, optimalNumOfBits, deterministicSeed); | ||
| BloomFilter bloomFilterSecondary = BloomFilter.create(numItems, optimalNumOfBits, 0xCAFEBABE); | ||
| BloomFilter bloomFilterPrimary = | ||
| BloomFilter.create(numItems, optimalNumOfBits, deterministicSeed); | ||
| BloomFilter bloomFilterSecondary = | ||
| BloomFilter.create(numItems, optimalNumOfBits, 0xCAFEBABE); | ||
|
|
||
| System.err.printf( | ||
| "allocated bitArray: %d (%d MB)\n", | ||
| bloomFilterPrimary.bitSize(), | ||
|
|
@@ -199,8 +234,8 @@ public void testAccuracyRandom( | |
| } | ||
| } | ||
|
|
||
| long mightContainEven = 0; | ||
| long mightContainOdd = 0; | ||
| long mightContainEvenIndexed = 0; | ||
| long mightContainOddIndexed = 0; | ||
|
|
||
| pseudoRandom.setSeed(deterministicSeed); | ||
| for (long i = 0; i < iterationCount; i++) { | ||
|
|
@@ -212,26 +247,26 @@ public void testAccuracyRandom( | |
| long candidate = pseudoRandom.nextLong(); | ||
| if (bloomFilterPrimary.mightContainLong(candidate)) { | ||
| if (i % 2 == 0) { | ||
| mightContainEven++; | ||
| mightContainEvenIndexed++; | ||
| } else { | ||
| // only count those cases as false positives, | ||
| // where the secondary has confirmed, | ||
| // that we haven't inserted before | ||
| // (mitigating duplicates in input sequence) | ||
| if (!bloomFilterSecondary.mightContainLong(candidate)) { | ||
| mightContainOdd++; | ||
| mightContainOddIndexed++; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| System.err.println(); | ||
|
|
||
| Assertions.assertEquals( | ||
| numItems, mightContainEven, | ||
| numItems, mightContainEvenIndexed, | ||
| "mightContainLong must return true for all inserted numbers" | ||
| ); | ||
|
|
||
| double actualFpp = (double) mightContainOdd / numItems; | ||
| double actualFpp = (double) mightContainOddIndexed / numItems; | ||
|
||
| double acceptableFpp = expectedFpp * (1 + FPP_RANDOM_ERROR_FACTOR); | ||
|
|
||
| System.err.printf("expectedFpp: %f %%\n", 100 * expectedFpp); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting a test case and directly disabling it is not an ideal approach. Why can't it undergo regular validation through GitHub Actions?
Additionally, I suggest first creating a micro-benchmark relevant to this scenario and recording the results without this pr. Then, update the code and the new benchmark results in this pr to demonstrate the optimization effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or can the scenarios in
org.apache.spark.sql.execution.benchmark.BloomFilterBenchmarkreflect the optimizations brought about by the current pr?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, in spirit, this test code I submitted is much more close to a benchmark (measurement rather than validation) than to an actual test case, with the emphasis on expectations and assertions.
The reason I disabled it by default, because on a single thread, it takes 10+ hours to run the all the cases, and I didn't want to interfere with running time of the regular test suites.
I wasn't aware of the benchmark workflow, I will have a look whether I can fit this logic in there. Not sure, if it will be a snap in fit, because the code focuses on obtaining a Bloom filter specific measure, the false pos rate, not some more usual or generic measures like running time or resource consumption.
Moreover, the performance gains won't be directly apparent on the sketch level. If anything, it will have a slightly worse running time (but shouldn't consume more mem than the previous logic). The gains should only be measurable in client code (like sql) that uses the sketch implementation. E.g. with a reasonably low error rate in the implementation won't force almost any queried element (in a client) on the slow path when the filter is saturated.
This may or may not be measurable with the current benchmarks, I haven't looked into that yet. As a rule of thumb, in the current implementation, after a few hundred million elements the false pos rate gets noticeably (a few percents) higher than expected, around about a billion (2^30) it diverges significantly (a few tens of percents), and above 2G (2^31) items, it gets prohibitively high (90%+). With the proposed new logic the error rate remains within a few percents off of the expected on all scales.
If the current benchmarks already use Bloom filters with more than a few hundred million items inserted, then the performance improvements should be visible there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to adapt the current tests into the benchmark workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, unfortunately Spark benchmarks can measure only time, but can't measure qualities like the false positive rate of a bloom filter.
I wonder shall we remove
TestSparkBloomFilterfrom this PR or add some comments to it to explain why it is disabled?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are other test cases that already cover the changes in the current pull request? If so, I agree to remove
TestSparkBloomFilter(as per Spark's coding conventions, it should actually be namedSparkBloomFilterSuite). There's no point in adding a test case that cannot be continuously verified by GitHub Actions, as it's likely that no one will remember to execute it later on.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth I suspect the reason it was marked with
@Disabledis that the execution time was too long. I tried running it using GitHub Actions, but it was eventually terminated because it exceeded the two-hour execution limit...Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Another option is to reduce the test cases to still validate the improvement of the PR, but with reasoable runtimes.
It seems like the degradation of false positive rate is not yet visible at
n = 1M. But whenn = 1000Mthe actual FPP is much higher than the expected. (Actuals are 50.920521%, 59.888499% and 76.025548% when expecteds are 5%, 3% and 1%). Unfortunately it seems those test cases took 30-40 mins to complete each.So how about testing only 1
nbetween those 2 where the improvement of the PR is visible but the test completes in let's say 5-10 mins. It shoule be enough to test the 3% default FPP case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be excellent if the verification could be carried out within a relatively concise case.
Moreover, it would be even better if the test case could be rewritten in Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply, I got a bit lost in the test configuration of the project, it took a while until I could come up with something reasonable to address the concerns.
I think I have already mentioned why I disabled the test in the first place, just for the sake of completeness repeating it here, indeed, the main reason is the impractical running time. If not parallelized properly, running all the slower testcases one after the other, the total running time could easily end up at dozens of hours.
The intention wasn't removing it from regular runs altogether, but to err on the safe side, and not to add an extra 10+ hours of runtime accidentally to e.g. pre-merge runs (supposedly on a fast path).
Fortunately, the individual cases can be run concurrently, so if there are enough threads to run the suite, even the slowest cases can complete in ~2.5h
possibly, yes, but I haven't managed to run the sql benchmarks, and we would still have to solve the problem of capturing a custom measure (error rate) in the benchmarks, instead of the currently supported (e.g. running time).
other than that, everything seems functional.
I would rather not remove the new tests, in the end, at the moment these are the only piece of logic that can demonstrate the error with the current implementation. Rewriting the tests in scala may be an option, but I'm not that comfortable with my scala skills, to confidently jump into that.