Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
735eca6
Split MemoryEntry into two separate classes (serialized and deseriali…
JoshRosen Mar 15, 2016
8f08289
Add ChunkedByteBuffer and use it in storage layer.
JoshRosen Mar 15, 2016
79b1a6a
Add test cases and fix bug in ChunkedByteBuffer.toInputStream()
JoshRosen Mar 15, 2016
7dbcd5a
WIP towards understanding destruction.
JoshRosen Mar 15, 2016
3fbec21
Small fixes to dispose behavior.
JoshRosen Mar 15, 2016
e5e663f
Modify BlockManager.dataSerialize to write ChunkedByteBuffers.
JoshRosen Mar 15, 2016
de62f0d
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 16, 2016
0a347fd
Fix test compilation in streaming.
JoshRosen Mar 16, 2016
6852c48
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 16, 2016
43f8fa6
Allow ChunkedByteBuffer to contain no chunks.
JoshRosen Mar 16, 2016
25e6884
Document toByteBuffer() and toArray() size limitations.
JoshRosen Mar 16, 2016
325c83d
Move dispose() from BlockManager to StorageUtils.
JoshRosen Mar 16, 2016
4f5074e
Better documentation for dispose() methods.
JoshRosen Mar 16, 2016
b6ddf3e
Rename limit to size.
JoshRosen Mar 16, 2016
719ad3c
Implement missing InputStream methods.
JoshRosen Mar 16, 2016
2300607
More comments.
JoshRosen Mar 16, 2016
3fc0b66
Fix confusing getChunks().head
JoshRosen Mar 16, 2016
c747c85
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 17, 2016
cb9311b
Fix logging import.
JoshRosen Mar 17, 2016
2970932
Clean up dispose logic to address review comments.
JoshRosen Mar 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
*/
public final class NettyManagedBuffer extends ManagedBuffer {
public class NettyManagedBuffer extends ManagedBuffer {
private final ByteBuf buf;

public NettyManagedBuffer(ByteBuf buf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.ByteArrayChunkOutputStream
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}

/**
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
Expand Down Expand Up @@ -106,18 +106,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
val pieceId = BroadcastBlockId(id, "piece" + i)
if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
val bytes = new ChunkedByteBuffer(block.duplicate())
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
}
blocks.length
}

/** Fetch torrent blocks from the driver and/or other executors. */
private def readBlocks(): Array[ByteBuffer] = {
private def readBlocks(): Array[ChunkedByteBuffer] = {
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
// to the driver, so other executors can pull these chunks from this executor as well.
val blocks = new Array[ByteBuffer](numBlocks)
val blocks = new Array[ChunkedByteBuffer](numBlocks)
val bm = SparkEnv.get.blockManager

for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
Expand Down Expand Up @@ -181,7 +182,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()
val blocks = readBlocks().flatMap(_.getChunks())
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))

val obj = TorrentBroadcast.unBlockifyObject[T](
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTa
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer

/**
* Spark executor, backed by a threadpool to run tasks.
Expand Down Expand Up @@ -296,7 +297,9 @@ private[spark] class Executor(
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
// TODO(josh): assumption that there is only one chunk here is a hack
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
serializedTaskResult.get.getChunks().head)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not safe to do is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess it is because on the write side we enforced it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a messy temporary hack caused by some interface mismatching; I'll see about fixing this so that we don't make any assumptions about there only being one chunk here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's only ever (in expectation) going to be one chunk here, calling .toByteBuffer should have no real performance or correctness issues here, so I'll just do that instead.

sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
Expand Down
83 changes: 46 additions & 37 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.MemoryManager
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
Expand All @@ -42,6 +42,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
import org.apache.spark.util._
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}

/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
Expand Down Expand Up @@ -295,7 +296,7 @@ private[spark] class BlockManager(
* Put the block locally, using the given storage level.
*/
override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
putBytes(blockId, data.nioByteBuffer(), level)
putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
}

/**
Expand Down Expand Up @@ -443,7 +444,7 @@ private[spark] class BlockManager(
/**
* Get block from the local block manager as serialized bytes.
*/
def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
Expand All @@ -452,7 +453,8 @@ private[spark] class BlockManager(
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
new ChunkedByteBuffer(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()))
} else {
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
Expand All @@ -464,7 +466,7 @@ private[spark] class BlockManager(
* Must be called while holding a read lock on the block.
* Releases the read lock upon exception; keeps the read lock upon successful return.
*/
private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = {
val level = info.level
logDebug(s"Level for block $blockId is $level")
// In order, try to read the serialized bytes from memory, then from disk, then fall back to
Expand Down Expand Up @@ -503,7 +505,7 @@ private[spark] class BlockManager(
*/
def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit)
}
}

Expand All @@ -520,7 +522,7 @@ private[spark] class BlockManager(
/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
Expand Down Expand Up @@ -566,7 +568,7 @@ private[spark] class BlockManager(
}

if (data != null) {
return Some(data)
return Some(new ChunkedByteBuffer(data))
}
logDebug(s"The value of block $blockId is null")
}
Expand Down Expand Up @@ -704,7 +706,7 @@ private[spark] class BlockManager(
*/
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
Expand All @@ -724,7 +726,7 @@ private[spark] class BlockManager(
*/
private def doPutBytes(
blockId: BlockId,
bytes: ByteBuffer,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
Expand All @@ -733,25 +735,22 @@ private[spark] class BlockManager(
// Since we're storing bytes, initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = if (level.replication > 1) {
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = bytes.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, level)
replicate(blockId, bytes, level)
}(futureExecutionContext)
} else {
null
}

bytes.rewind()
val size = bytes.limit()
val size = bytes.limit

if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
val values = dataDeserialize(blockId, bytes.duplicate())
val values = dataDeserialize(blockId, bytes)
memoryStore.putIterator(blockId, values, level) match {
case Right(_) => true
case Left(iter) =>
Expand Down Expand Up @@ -921,7 +920,7 @@ private[spark] class BlockManager(
try {
replicate(blockId, bytesToReplicate, level)
} finally {
BlockManager.dispose(bytesToReplicate)
bytesToReplicate.dispose()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
Expand All @@ -943,29 +942,27 @@ private[spark] class BlockManager(
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
diskBytes: ByteBuffer): ByteBuffer = {
diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
require(!level.deserialized)
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
// put values read from disk into the MemoryStore.
blockInfo.synchronized {
if (memoryStore.contains(blockId)) {
BlockManager.dispose(diskBytes)
diskBytes.dispose()
memoryStore.getBytes(blockId).get
} else {
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we
// cannot put it into MemoryStore, copyForMemory should not be created. That's why
// this action is put into a `() => ByteBuffer` and created lazily.
val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
copyForMemory.put(diskBytes)
// this action is put into a `() => ChunkedByteBuffer` and created lazily.
diskBytes.copy()
})
if (putSucceeded) {
BlockManager.dispose(diskBytes)
diskBytes.dispose()
memoryStore.getBytes(blockId).get
} else {
diskBytes.rewind()
diskBytes
}
}
Expand Down Expand Up @@ -1031,7 +1028,7 @@ private[spark] class BlockManager(
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
Expand Down Expand Up @@ -1084,11 +1081,15 @@ private[spark] class BlockManager(
case Some(peer) =>
try {
val onePeerStartTime = System.currentTimeMillis
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
logTrace(s"Trying to replicate $blockId of ${data.limit} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
peer.host,
peer.port,
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
tLevel)
logTrace(s"Replicated $blockId of ${data.limit} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
peersForReplication -= peer
Expand All @@ -1111,7 +1112,7 @@ private[spark] class BlockManager(
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
logDebug(s"Replicating $blockId of ${data.limit} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
Expand Down Expand Up @@ -1153,7 +1154,7 @@ private[spark] class BlockManager(
*/
def dropFromMemory(
blockId: BlockId,
data: () => Either[Array[Any], ByteBuffer]): StorageLevel = {
data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
var blockIsUpdated = false
Expand Down Expand Up @@ -1280,11 +1281,11 @@ private[spark] class BlockManager(
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}

/** Serializes into a byte buffer. */
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
val byteStream = new ByteBufferOutputStream(4096)
dataSerializeStream(blockId, byteStream, values)
byteStream.toByteBuffer
/** Serializes into a chunked byte buffer. */
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = {
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
}

/**
Expand All @@ -1296,6 +1297,14 @@ private[spark] class BlockManager(
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
}

/**
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = {
dataDeserializeStream(blockId, bytes.toInputStream(dispose = true))
}

/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.storage

import java.nio.ByteBuffer

import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
import org.apache.spark.util.io.ChunkedByteBuffer

/**
* This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]]
* This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]]
* so that the corresponding block's read lock can be released once this buffer's references
* are released.
*
Expand All @@ -32,7 +31,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
private[storage] class BlockManagerManagedBuffer(
blockManager: BlockManager,
blockId: BlockId,
buf: ByteBuffer) extends NioManagedBuffer(buf) {
chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) {

override def retain(): ManagedBuffer = {
super.retain()
Expand Down
Loading