Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Get remaining() before consuming ByteBuffer
  • Loading branch information
zsxwing committed Dec 7, 2015
commit 4cdbc7c06d29abc8a0df40979b2ba05566354ac6
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val serializedSize = serializedData.remaining()
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.remaining())) {
if (!taskSetManager.canFetchMoreResults(serializedSize)) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.remaining())
(directResult, serializedSize)
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,11 @@ private[spark] class BlockManager(

if (data != null) {
if (asBlockResult) {
val dataSize = data.remaining()
return Some(new BlockResult(
dataDeserialize(blockId, data),
DataReadMethod.Network,
data.remaining()))
dataSize))
} else {
return Some(data)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
if (externalBlockManager.isDefined) {
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
externalBlockManager.get.putBytes(blockId, byteBuffer)
val size = bytes.remaining()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting one. The original bytes is reset to position 0 and put, so it means that the size that was put really is bytes.limit or byteBuffer.remaining (before it's put). I think this might have to be adjusted but I'm also not sure why bytes is returned here instead of what was put into the block manager. Maybe it's really assumed position is 0, but then why rewind?

externalBlockManager.get.putBytes(blockId, byteBuffer)
val data = if (returnValues) {
Right(bytes)
} else {
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
putIterator(blockId, values, level, returnValues = true)
} else {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
tryToPut(blockId, bytes, bytes.remaining(), deserialized = false, droppedBlocks)
PutResult(bytes.remaining(), Right(bytes.duplicate()), droppedBlocks)
val size = bytes.remaining()
tryToPut(blockId, bytes, size, deserialized = false, droppedBlocks)
PutResult(size, Right(bytes.duplicate()), droppedBlocks)
}
}

Expand Down Expand Up @@ -134,8 +135,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.remaining(), deserialized = false, droppedBlocks)
PutResult(bytes.remaining(), Right(bytes.duplicate()), droppedBlocks)
val size = bytes.remaining()
tryToPut(blockId, bytes, size, deserialized = false, droppedBlocks)
PutResult(size, Right(bytes.duplicate()), droppedBlocks)
}
}

Expand Down