-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-3958] TorrentBroadcast cleanup / debugging improvements. #2844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This PR makes several changes to TorrentBroadcast in order to make it easier to reason about, which should help when debugging SPARK-3958. The key changes: - Remove all state from the global TorrentBroadcast object. This state consisted mainly of configuration options, like the block size and compression codec, and was read by the blockify / unblockify methods. Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block size was always determined by the first SparkConf that TorrentBroadast was initialized with; as a result, unit tests could not properly test TorrentBroadcast with different block sizes. Instead, blockifyObject and unBlockifyObject now accept compression codecs and blockSizes as arguments. These arguments are supplied at the call sites inside of TorrentBroadcast instances. Each TorrentBroadcast instance determines these values from SparkEnv's SparkConf. I was careful to ensure that we do not accidentally serialize CompressionCodec or SparkConf objects as part of the TorrentBroadcast object. - Remove special-case handling of local-mode in TorrentBroadcast. I don't think that broadcast implementations should know about whether we're running in local mode. If we want to optimize the performance of broadcast in local mode, then we should detect this at a higher level and use a dummy LocalBroadcastFactory implementation instead. Removing this code fixes a subtle error condition: in the old local mode code, a failure to find the broadcast in the local BlockManager would lead to an attempt to deblockify zero blocks, which could lead to confusing deserialization or decompression errors when we attempted to decompress an empty byte array. This should never have happened, though: a failure to find the block in local mode is evidence of some other error. The changes here will make it easier to debug those errors if they ever happen. - Add a check that throws an exception when attempting to deblockify an empty array. - Use ScalaCheck to add a test to check that TorrentBroadcast's blockifyObject and unBlockifyObject methods are inverses. - Misc. cleanup and logging improvements.
48c98c1 to
618a872
Compare
|
/cc @rxin for review. I'd like to apply this to |
|
Also, /cc @davies, who helped me to spot the "local mode might deblockify an empty array" bug and who's been working on TorrentBroadcast optimizations. |
|
QA tests have started for PR 2844 at commit
|
|
QA tests have finished for PR 2844 at commit
|
|
Test FAILed. |
This makes them easier to test, since they now have no dependency on SparkEnv.
|
QA tests have started for PR 2844 at commit
|
|
It looks like this build is going to fail a ReplSuite test: test("broadcast vars") {
// Test that the value that a broadcast var had when it was created is used,
// even if that variable is then modified in the driver program
// TODO: This doesn't actually work for arrays when we run in local mode!
val output = runInterpreter("local",
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|array(0) = 5
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
}I see now that my change to remove the special local-mode handling inadvertently leads to a duplication of the variable in the driver program. This could maybe be a performance issue, since now we will use 2x the memory in the driver for each broadcast variable. I'll restore the line that stores the local copy of the broadcast variable when it's created. |
This is necessary to avoid extra memory usage for broadcast variables in driver tasks.
|
QA tests have finished for PR 2844 at commit
|
|
Test FAILed. |
|
QA tests have started for PR 2844 at commit
|
|
QA tests have finished for PR 2844 at commit
|
|
Test FAILed. |
|
This most recent test-failure is another side-effect of removing TorrentBroadcast's optimizations for local mode: This time, the error is because there's a check that asserts that broadcast pieces are not stored into the driver's block manager when running in local mode. I don't think that this optimization necessarily makes sense, since we'll have to store those blocks anyways when running in distributed mode. Therefore, I'm going to change these tests to remove this local-mode special-casing. |
|
QA tests have started for PR 2844 at commit
|
|
QA tests have finished for PR 2844 at commit
|
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder that store a serialized copy in local mode will not help anything. If it failed to fetch the original copy of value from blockManager, it will also can not fetch the serialized copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for this store is to avoid creating two copies of _value in the driver. If we serialize and deserialize a broadcast variable on the driver and then attempt to access its value, then without this code we will end up going through the regular de-chunking code path, which will cause us to deserialize the serialized copy of _value and waste memory.
I believe that this serialization and deserialization can take place when tasks are run in local mode, since we still serialize tasks in order to help users be aware of serialization issues that would impact them if they moved to a cluster. This complexity is another reason why I'm in favor of just scrapping all local-mode special-casing and configuring Spark to use a dummy LocalBroadcastFactory for local mode instead of whichever setting the user specified. That would be a larger, more-invasive change, which is why I opted for the simpler fix here.
|
LGTM now, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from ScalaCheck; see http://www.scalatest.org/user_guide/generator_driven_property_checks
|
QA tests have started for PR 2844 at commit
|
|
QA tests have finished for PR 2844 at commit
|
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, maybe just use a random number generator here since Gen brings extra complexity but not much benefit in this specific case.
|
QA tests have started for PR 2844 at commit
|
|
QA tests have finished for PR 2844 at commit
|
|
Test PASSed. |
|
I've merged this into master. |
This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:
Remove all state from the global TorrentBroadcast object. This state
consisted mainly of configuration options, like the block size and
compression codec, and was read by the blockify / unblockify methods.
Unfortunately, the use of
lazy valforBLOCK_SIZEmeant that the blocksize was always determined by the first SparkConf that TorrentBroadast was
initialized with; as a result, unit tests could not properly test
TorrentBroadcast with different block sizes.
Instead, blockifyObject and unBlockifyObject now accept compression codecs
and blockSizes as arguments. These arguments are supplied at the call sites
inside of TorrentBroadcast instances. Each TorrentBroadcast instance
determines these values from SparkEnv's SparkConf. I was careful to ensure
that we do not accidentally serialize CompressionCodec or SparkConf objects
as part of the TorrentBroadcast object.
Remove special-case handling of local-mode in TorrentBroadcast. I don't
think that broadcast implementations should know about whether we're running
in local mode. If we want to optimize the performance of broadcast in local
mode, then we should detect this at a higher level and use a dummy
LocalBroadcastFactory implementation instead.
Removing this code fixes a subtle error condition: in the old local mode
code, a failure to find the broadcast in the local BlockManager would lead
to an attempt to deblockify zero blocks, which could lead to confusing
deserialization or decompression errors when we attempted to decompress
an empty byte array. This should never have happened, though: a failure to
find the block in local mode is evidence of some other error. The changes
here will make it easier to debug those errors if they ever happen.
Add a check that throws an exception when attempting to deblockify an
empty array.
Use ScalaCheck to add a test to check that TorrentBroadcast's
blockifyObject and unBlockifyObject methods are inverses.
Misc. cleanup and logging improvements.