Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
  • Loading branch information
squito committed Aug 13, 2018
commit 034acb40036b50e459b72e6a4b4156dc33244ae9
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void onComplete(String streamId) throws IOException {
callback.onSuccess(ByteBuffer.allocate(0));
} catch (Exception ex) {
IOException ioExc = new IOException("Failure post-processing complete stream;" +
" failing this rpc and leaving channel active");
" failing this rpc and leaving channel active", ex);
callback.onFailure(ioExc);
streamHandler.onFailure(streamId, ioExc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;

/**
* A request to Upload a block, which the destintation should receive as a stream.
* A request to Upload a block, which the destination should receive as a stream.
*
* The actual block data is not contained here. It will be passed to the StreamCallbackWithID
* that is returned from RpcHandler.receiveStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ private[spark] class NettyBlockTransferService(
val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
val callback = new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId${if (asStream) "as stream" else ""}")
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
result.success((): Unit)
}

override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading $blockId${if (asStream) "as stream" else ""}", e)
logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)
result.failure(e)
}
}
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,10 @@ private[spark] class BlockManager(
// to the final location, but that would require a deeper refactor of this code. So instead
// we just write to a temp file, and call putBytes on the data in that file.
val tmpFile = diskBlockManager.createTempLocalBlock()._2
val channel = new CountingWritableChannel(
Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile))))
logTrace(s"Streaming block $blockId to tmp file $tmpFile")
new StreamCallbackWithID {
val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))

override def getID: String = blockId.name

Expand All @@ -425,15 +427,27 @@ private[spark] class BlockManager(
}

override def onComplete(streamId: String): Unit = {
logTrace(s"Done receiving block $blockId, now putting into local blockManager")
// Read the contents of the downloaded file as a buffer to put into the blockManager.
// Note this is all happening inside the netty thread as soon as it reads the end of the
// stream.
channel.close()
// TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
// using a lot of memory here. We won't get a jvm OOM, but might get killed by the
// OS / cluster manager. We could at least read the tmp file as a stream.
val buffer = ChunkedByteBuffer.map(tmpFile,
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
val buffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// we need to pass in the size of the unencrypted block
val blockSize = channel.getCount
val allocator = level.memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toChunkedByteBuffer is also pretty memory-hungry, right? You'll end up needing enough memory to hold the entire file in memory, if I read the code right.

This is probably ok for now, but should probably mention it in your TODO above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you store the entire file in memory (after decrypting). its not memory mapped either, so it'll probably be a regular OOM (depending on memory mode). updated the comment


case None =>
ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
}
putBytes(blockId, buffer, level)(classTag)
tmpFile.delete()
}
Expand Down