Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:

- Remove all state from the global TorrentBroadcast object.  This state
  consisted mainly of configuration options, like the block size and
  compression codec, and was read by the blockify / unblockify methods.
  Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
  size was always determined by the first SparkConf that TorrentBroadast was
  initialized with; as a result, unit tests could not properly test
  TorrentBroadcast with different block sizes.

  Instead, blockifyObject and unBlockifyObject now accept compression codecs
  and blockSizes as arguments.  These arguments are supplied at the call sites
  inside of TorrentBroadcast instances.  Each TorrentBroadcast instance
  determines these values from SparkEnv's SparkConf.  I was careful to ensure
  that we do not accidentally serialize CompressionCodec or SparkConf objects
  as part of the TorrentBroadcast object.

- Remove special-case handling of local-mode in TorrentBroadcast.  I don't
  think that broadcast implementations should know about whether we're running
  in local mode.  If we want to optimize the performance of broadcast in local
  mode, then we should detect this at a higher level and use a dummy
  LocalBroadcastFactory implementation instead.

  Removing this code fixes a subtle error condition: in the old local mode
  code, a failure to find the broadcast in the local BlockManager would lead
  to an attempt to deblockify zero blocks, which could lead to confusing
  deserialization or decompression errors when we attempted to decompress
  an empty byte array.  This should never have happened, though: a failure to
  find the block in local mode is evidence of some other error.  The changes
  here will make it easier to debug those errors if they ever happen.

- Add a check that throws an exception when attempting to deblockify an
  empty array.

- Use ScalaCheck to add a test to check that TorrentBroadcast's
  blockifyObject and unBlockifyObject methods are inverses.

- Misc. cleanup and logging improvements.
  • Loading branch information
JoshRosen committed Oct 19, 2014
commit 618a87260faaebf353c1d9b4abc17af9f0cfa472
120 changes: 51 additions & 69 deletions core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,9 @@ import org.apache.spark.util.io.ByteArrayChunkOutputStream
* broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
*
* @param obj object to broadcast
* @param isLocal whether Spark is running in local mode (single JVM process).
* @param id A unique identifier for the broadcast variable.
*/
private[spark] class TorrentBroadcast[T: ClassTag](
obj : T,
@transient private val isLocal: Boolean,
id: Long)
private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

/**
Expand All @@ -62,6 +58,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* blocks from the driver and/or other executors.
*/
@transient private var _value: T = obj
/** The compression codec to use, or None if compression is disabled */
@transient private var compressionCodec: Option[CompressionCodec] = _
/** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */
@transient private var blockSize: Int = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about move these two as part of Constructor? Reading the Conf in TorrentBroadcastFactor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this and agree that it might be cleaner, but this will require more refactoring of other code. One design goal here was to minimize the serialized size of TorrentBroadcast objects, so we can't serialize the SparkConf or CompressionCodec instances (which contain SparkConfs). SparkEnv.conf determines these values anyways.


private def setConf(conf: SparkConf) {
compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
Some(CompressionCodec.createCodec(conf))
} else {
None
}
blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
}
setConf(SparkEnv.get.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the javadoc for this class to make it very obvious that at init time, this class reads configuration from SparkEnv.get.conf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private val broadcastId = BroadcastBlockId(id)

Expand All @@ -76,23 +86,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(): Int = {
// For local mode, just put the object in the BlockManager so we can find it later.
SparkEnv.get.blockManager.putSingle(
broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

if (!isLocal) {
val blocks = TorrentBroadcast.blockifyObject(_value)
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
blocks.length
} else {
0
val blocks = TorrentBroadcast.blockifyObject(_value, blockSize, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
blocks.length
}

/** Fetch torrent blocks from the driver and/or other executors. */
Expand All @@ -104,29 +106,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](

for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)

// First try getLocalBytes because there is a chance that previous attempts to fetch the
logDebug(s"Reading piece $pieceId of $broadcastId")
// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
var blockOpt = bm.getLocalBytes(pieceId)
if (!blockOpt.isDefined) {
blockOpt = bm.getRemoteBytes(pieceId)
blockOpt match {
case Some(block) =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)

case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
bm.getRemoteBytes(pieceId).map { block =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given this block is long, can we avoid using map.getOrElse? Just make it more explicit so we know it is a failure case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be more explicit, i'm suggesting the old style is easier to understand

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I agree with @rxin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like me to revert back to the old code layout then? FWIW, I prefer the style here to the old code, which used a var and had this "if we get here, the option is defined" comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm -- The thing I want the code to reflect is that there are three cases

  1. We get it locally
  2. If not, we get it from remote
  3. If that fails, we throw an exception.

Right now it looks like one big block instead of this three way switch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a new commit that simplifies this code. I think that the problem was the use of nested getOrElse calls. I replaced this with a series of defs that show how to get the bytes locally and remotely, followed by a non-nested orElse chain. I think this is a lot cleaner now, since the core logic is a one-liner:

getLocal.orElse(getRemote).getOrElse(
        throw new SparkException(s"Failed to get $pieceId of $broadcastId"))

// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
block
}.getOrElse(throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
}
// If we get here, the option is defined.
blocks(pid) = blockOpt.get
blocks(pid) = block
}
blocks
}
Expand Down Expand Up @@ -156,6 +152,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wired, how can we make sure that this conf is equals to the one used when create the Broadcast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conf is application-scoped. The same conf should be present on this application's executors, where this task will be deserialized. This assumption is used elsewhere, too.

SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) =>
_value = x.asInstanceOf[T]
Expand All @@ -167,7 +164,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val time = (System.nanoTime() - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")

_value = TorrentBroadcast.unBlockifyObject[T](blocks)
_value = TorrentBroadcast.unBlockifyObject[T](blocks, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle(
Expand All @@ -179,42 +176,26 @@ private[spark] class TorrentBroadcast[T: ClassTag](


private object TorrentBroadcast extends Logging {
/** Size of each block. Default value is 4MB. */
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
private var compress: Boolean = false
private var compressionCodec: CompressionCodec = null

def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
compress = conf.getBoolean("spark.broadcast.compress", true)
compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
}

def stop() {
initialized = false
}

def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
def blockifyObject[T: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conf has been moved into class Broadcast, maybe blockifyObject and unblockify also should be moved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two methods, blockifyObject and unBlockifyObject, now accept all of their dependencies directly, which makes it easier to unit-test them.

obj: T,
blockSize: Int,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(blockSize)
val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
bos.toArrays.map(ByteBuffer.wrap)
}

def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer]): T = {
def unBlockifyObject[T: ClassTag](
blocks: Array[ByteBuffer],
compressionCodec: Option[CompressionCodec]): T = {
require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
val is = new SequenceInputStream(
asJavaEnumeration(blocks.iterator.map(block => new ByteBufferInputStream(block))))
val in: InputStream = if (compress) compressionCodec.compressedInputStream(is) else is

val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
Expand All @@ -227,6 +208,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
logInfo(s"Unpersisting TorrentBroadcast $id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be chatty. logdebug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is useful for debugging. I'd suggest keeping this at info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HttpBroadcast has info-level logging for this. I'm going to leave this at info for now while we debug TorrentBroadcast issues; we can always revisit later as part of a larger log-level cleanup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel super strongly over this one, but I feel given this is for "debugging" of exceptional cases, it should be in debug. If your worry is that the broadcast cleaner might clean up stuff prematurely, then I think we should log in the cleaner instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its mostly for debugging what broadcasts have been removed and what has not. It can be probably be made debug once we have a UI for this (#2851), but right now this is the only way to figure out if a broadcast variable has been removed by looking at the driver logs.
Also its just one line per broadcast variable (we have 2-3 lines per variable when it is created)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to get #2851 merged this week; I'm in the middle of some significant UI code cleanup and I'm planning to merge most of the existing UI patches or to re-implement them myself.

SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
TorrentBroadcast.initialize(isDriver, conf)
}
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = {
new TorrentBroadcast[T](value_, id)
}

override def stop() { TorrentBroadcast.stop() }
override def stop() { }

/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.broadcast

import scala.util.Random

import org.scalacheck.Gen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.scalatest.FunSuite
import org.scalatest.prop.GeneratorDrivenPropertyChecks

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.storage._


class BroadcastSuite extends FunSuite with LocalSparkContext {
class BroadcastSuite extends FunSuite with LocalSparkContext with GeneratorDrivenPropertyChecks {

private val httpConf = broadcastConf("HttpBroadcastFactory")
private val torrentConf = broadcastConf("TorrentBroadcastFactory")
Expand Down Expand Up @@ -84,6 +88,20 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
}

test("TorrentBroadcast's blockifyObject and unblockifyObject are inverses") {
import org.apache.spark.broadcast.TorrentBroadcast._
val blockSize = 1024
val snappy = Some(new SnappyCompressionCodec(new SparkConf()))
val objects = for (size <- Gen.choose(1, 1024 * 10)) yield {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, maybe just use a random number generator here since Gen brings extra complexity but not much benefit in this specific case.

val data: Array[Byte] = new Array[Byte](size)
Random.nextBytes(data)
data
}
forAll (objects) { (obj: Array[Byte]) =>
assert(unBlockifyObject[Array[Byte]](blockifyObject(obj, blockSize, snappy), snappy) === obj)
}
}

test("Unpersisting HttpBroadcast on executors only in local mode") {
testUnpersistHttpBroadcast(distributed = false, removeFromDriver = false)
}
Expand Down