Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
use weak reference for torrent broadcast
  • Loading branch information
Brandon Krieger committed Nov 8, 2018
commit a2683b62985fc9c7d15fb92f3bb170a4b5225058
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import java.util.zip.Adler32

import scala.collection.JavaConverters._
import scala.ref.WeakReference
import scala.reflect.ClassTag
import scala.util.Random

Expand Down Expand Up @@ -61,9 +62,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
* Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
* which builds this value by reading blocks from the driver and/or other executors.
*
* On the driver, if the value is required, it is read lazily from the block manager.
* On the driver, if the value is required, it is read lazily from the block manager. We hold
* a weak reference so that it can be garbage collected if required, as we can always reconstruct
* in the future.
*/
@transient private lazy val _value: T = readBroadcastBlock()
@transient private var _value: WeakReference[T] = new WeakReference()

/** The compression codec to use, or None if compression is disabled */
@transient private var compressionCodec: Option[CompressionCodec] = _
Expand Down Expand Up @@ -93,7 +96,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private var checksums: Array[Int] = _

override protected def getValue() = {
_value
val memoized: Option[T] = _value.get
if (memoized.isDefined) {
memoized.get
} else {
val newlyRead = readBroadcastBlock()
_value = new WeakReference(newlyRead)
newlyRead
}
}

private def calcChecksum(block: ByteBuffer): Int = {
Expand Down