From fe31a7d61ecabca76356b313211bb7b769e02b5b Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 15 May 2018 11:48:51 -0500 Subject: [PATCH 1/5] [SPARK-24296][CORE] Replicate large blocks as a stream. When replicating large cached RDD blocks, it can be helpful to replicate them as a stream, to avoid using large amounts of memory during the transfer. This also allows blocks larger than 2GB to be replicated. Added unit tests in DistributedSuite. Also ran tests on a cluster for blocks > 2gb. --- .../protocol/BlockTransferMessage.java | 3 +- .../shuffle/protocol/UploadBlockStream.java | 89 +++++++++++++++++++ .../org/apache/spark/executor/Executor.scala | 4 +- .../spark/internal/config/package.scala | 6 ++ .../spark/network/BlockDataManager.scala | 12 +++ .../network/netty/NettyBlockRpcServer.scala | 26 +++++- .../netty/NettyBlockTransferService.scala | 39 ++++---- .../apache/spark/storage/BlockManager.scala | 52 ++++++++++- .../storage/BlockManagerManagedBuffer.scala | 7 +- .../org/apache/spark/storage/DiskStore.scala | 5 +- .../spark/util/io/ChunkedByteBuffer.scala | 4 + .../org/apache/spark/DistributedSuite.scala | 25 +++++- .../spark/security/EncryptionFunSuite.scala | 12 ++- .../apache/spark/storage/DiskStoreSuite.scala | 3 +- project/MimaExcludes.scala | 3 +- 15 files changed, 253 insertions(+), 37 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 9af6759f5d5f..a68a297519b6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -42,7 +42,7 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public enum Type { OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4), - HEARTBEAT(5); + HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6); private final byte id; @@ -67,6 +67,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 3: return StreamHandle.decode(buf); case 4: return RegisterDriver.decode(buf); case 5: return ShuffleServiceHeartbeat.decode(buf); + case 6: return UploadBlockStream.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java new file mode 100644 index 000000000000..67bdd681b6a7 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import java.util.Arrays; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.protocol.Encoders; + +// Needed by ScalaDoc. See SPARK-7726 +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; + +/** + * A request to Upload a block, which the destintation should receive as a stream. + * + * The actual block data is not contained here. It will be passed to the StreamCallbackWithID + * that is returned from RpcHandler.receiveStream() + */ +public class UploadBlockStream extends BlockTransferMessage { + public final String blockId; + public final byte[] metadata; + + public UploadBlockStream(String blockId, byte[] metadata) { + this.blockId = blockId; + this.metadata = metadata; + } + + @Override + protected Type type() { return Type.UPLOAD_BLOCK_STREAM; } + + @Override + public int hashCode() { + int objectsHashCode = Objects.hashCode(blockId); + return objectsHashCode * 41 + Arrays.hashCode(metadata); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("blockId", blockId) + .add("metadata size", metadata.length) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof UploadBlockStream) { + UploadBlockStream o = (UploadBlockStream) other; + return Objects.equal(blockId, o.blockId) + && Arrays.equals(metadata, o.metadata); + } + return false; + } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(blockId) + + Encoders.ByteArrays.encodedLength(metadata); + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, blockId); + Encoders.ByteArrays.encode(buf, metadata); + } + + public static UploadBlockStream decode(ByteBuf buf) { + String blockId = Encoders.Strings.decode(buf); + byte[] metadata = Encoders.ByteArrays.decode(buf); + return new UploadBlockStream(blockId, metadata); + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b1856ff0f324..86b19578037d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -363,14 +363,14 @@ private[spark] class Executor( threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true - val value = try { + val value = Utils.tryWithSafeFinally { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res - } finally { + } { val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8fef2aa6863c..5005bf9466de 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -567,4 +567,10 @@ package object config { .intConf .checkValue(v => v > 0, "The value should be a positive integer.") .createWithDefault(2000) + + private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS = + ConfigBuilder("spark.storage.memoryMapLimitForTests") + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index b3f8bfe8b1d4..e94a01244474 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.network import scala.reflect.ClassTag import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.storage.{BlockId, StorageLevel} private[spark] @@ -43,6 +44,17 @@ trait BlockDataManager { level: StorageLevel, classTag: ClassTag[_]): Boolean + /** + * Put the given block that will be received as a stream. + * + * When this method is called, the block data itself is not available -- it will be passed to the + * returned StreamCallbackWithID. + */ + def putBlockDataAsStream( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[_]): StreamCallbackWithID + /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index eb4cf94164fd..a320af64b4bf 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -26,9 +26,9 @@ import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.BlockDataManager import org.apache.spark.network.buffer.NioManagedBuffer -import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.client.{RpcResponseCallback, StreamCallbackWithID, TransportClient} import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager} -import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, OpenBlocks, StreamHandle, UploadBlock} +import org.apache.spark.network.shuffle.protocol._ import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, StorageLevel} @@ -73,10 +73,32 @@ class NettyBlockRpcServer( } val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) val blockId = BlockId(uploadBlock.blockId) + logInfo(s"Receiving replicated block $blockId with level ${level} " + + s"from ${client.getSocketAddress}") blockManager.putBlockData(blockId, data, level, classTag) responseContext.onSuccess(ByteBuffer.allocate(0)) } } + override def receiveStream( + client: TransportClient, + messageHeader: ByteBuffer, + responseContext: RpcResponseCallback): StreamCallbackWithID = { + val message = + BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream] + val (level: StorageLevel, classTag: ClassTag[_]) = { + serializer + .newInstance() + .deserialize(ByteBuffer.wrap(message.metadata)) + .asInstanceOf[(StorageLevel, ClassTag[_])] + } + val blockId = BlockId(message.blockId) + logInfo(s"Receiving replicated block $blockId with level ${level} as stream " + + s"from ${client.getSocketAddress}") + // This will return immediately, but will setup a callback on streamData which will still + // do all the processing in the netty thread. + blockManager.putBlockDataAsStream(blockId, level, classTag) + } + override def getStreamManager(): StreamManager = streamManager } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index b7d8c3503276..ff7640af78c5 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -27,13 +27,14 @@ import scala.reflect.ClassTag import com.codahale.metrics.{Metric, MetricSet} import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.internal.config import org.apache.spark.network._ -import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager} -import org.apache.spark.network.shuffle.protocol.UploadBlock +import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage.{BlockId, StorageLevel} @@ -148,20 +149,28 @@ private[spark] class NettyBlockTransferService( // Everything else is encoded using our binary protocol. val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) - // Convert or copy nio buffer into array in order to serialize it. - val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) + val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) + val callback = new RpcResponseCallback { + override def onSuccess(response: ByteBuffer): Unit = { + logTrace(s"Successfully uploaded block $blockId${if (asStream) "as stream" else ""}") + result.success((): Unit) + } - client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer, - new RpcResponseCallback { - override def onSuccess(response: ByteBuffer): Unit = { - logTrace(s"Successfully uploaded block $blockId") - result.success((): Unit) - } - override def onFailure(e: Throwable): Unit = { - logError(s"Error while uploading block $blockId", e) - result.failure(e) - } - }) + override def onFailure(e: Throwable): Unit = { + logError(s"Error while uploading $blockId${if (asStream) "as stream" else ""}", e) + result.failure(e) + } + } + if (asStream) { + val streamHeader = new UploadBlockStream(blockId.name, metadata).toByteBuffer + client.uploadStream(new NioManagedBuffer(streamHeader), blockData, callback) + } else { + // Convert or copy nio buffer into array in order to serialize it. + val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) + + client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer, + callback) + } result.future } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1db032711ce4..e1cc3a9af8c2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import java.io._ import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference} import java.nio.ByteBuffer -import java.nio.channels.Channels +import java.nio.channels.{Channels, WritableByteChannel} import java.util.Collections import java.util.concurrent.ConcurrentHashMap @@ -41,6 +41,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo @@ -404,6 +405,47 @@ private[spark] class BlockManager( putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) } + override def putBlockDataAsStream( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[_]): StreamCallbackWithID = { + // TODO if we're going to only put the data in the disk store, we should just write it directly + // to the final location, but that would require a deeper refactor of this code. So instead + // we just write to a temp file, and call putBytes on the data in that file. + val tmpFile = diskBlockManager.createTempLocalBlock()._2 + new StreamCallbackWithID { + val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile)) + + override def getID: String = blockId.name + + override def onData(streamId: String, buf: ByteBuffer): Unit = { + while (buf.hasRemaining) { + channel.write(buf) + } + } + + override def onComplete(streamId: String): Unit = { + // Read the contents of the downloaded file as a buffer to put into the blockManager. + // Note this is all happening inside the netty thread as soon as it reads the end of the + // stream. + channel.close() + // TODO Even if we're only going to write the data to disk after this, we end up using a lot + // of memory here. We wont' get a jvm OOM, but might get killed by the OS / cluster + // manager. We could at least read the tmp file as a stream. + val buffer = ChunkedByteBuffer.map(tmpFile, + conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) + putBytes(blockId, buffer, level)(classTag) + tmpFile.delete() + } + + override def onFailure(streamId: String, cause: Throwable): Unit = { + // the framework handles the connection itself, we just need to do local cleanup + channel.close() + tmpFile.delete() + } + } + } + /** * Get the BlockStatus for the block identified by the given ID, if it exists. * NOTE: This is mainly for testing. @@ -665,7 +707,7 @@ private[spark] class BlockManager( // TODO if we change this method to return the ManagedBuffer, then getRemoteValues // could just use the inputStream on the temp file, rather than memory-mapping the file. // Until then, replication can cause the process to use too much memory and get killed - // by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though + // by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though // we've read the data to disk. logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") @@ -1349,12 +1391,16 @@ private[spark] class BlockManager( try { val onePeerStartTime = System.nanoTime logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") + // This thread keeps a lock on the block, so we do not want the netty thread to unlock + // block when it finishes sending the message. + val buffer = new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false, + unlockOnDeallocate = false) blockTransferService.uploadBlockSync( peer.host, peer.port, peer.executorId, blockId, - new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false), + buffer, tLevel, classTag) logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala index 3d3806126676..5c12b5cee4d2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala @@ -38,7 +38,8 @@ private[storage] class BlockManagerManagedBuffer( blockInfoManager: BlockInfoManager, blockId: BlockId, data: BlockData, - dispose: Boolean) extends ManagedBuffer { + dispose: Boolean, + unlockOnDeallocate: Boolean = true) extends ManagedBuffer { private val refCount = new AtomicInteger(1) @@ -58,7 +59,9 @@ private[storage] class BlockManagerManagedBuffer( } override def release(): ManagedBuffer = { - blockInfoManager.unlock(blockId) + if (unlockOnDeallocate) { + blockInfoManager.unlock(blockId) + } if (refCount.decrementAndGet() == 0 && dispose) { data.dispose() } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 39249d411b58..3fc8c014cc5e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -29,7 +29,7 @@ import com.google.common.io.Closeables import io.netty.channel.DefaultFileRegion import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.util.Utils @@ -44,8 +44,7 @@ private[spark] class DiskStore( securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") - private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", - Int.MaxValue.toString) + private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS) private val blockSizes = new ConcurrentHashMap[BlockId, Long]() def getSize(blockId: BlockId): Long = blockSizes.get(blockId) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index efed90cb7678..39f050f6ca5a 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -181,6 +181,10 @@ object ChunkedByteBuffer { } } + def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = { + map(file, maxChunkSize, 0, file.length()) + } + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { Utils.tryWithResource(FileChannel.open(file.toPath, StandardOpenOption.READ)) { channel => var remaining = length diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 28ea0c6f0bdb..629a323042ff 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Millis, Span} +import org.apache.spark.internal.config import org.apache.spark.security.EncryptionFunSuite import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.io.ChunkedByteBuffer @@ -154,6 +155,21 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex sc.parallelize(1 to 10).count() } + private def testCaching(testName: String, conf: SparkConf, storageLevel: StorageLevel): Unit = { + test(testName) { + testCaching(conf, storageLevel) + } + if (storageLevel.replication > 1) { + // also try with block replication as a stream + val uploadStreamConf = new SparkConf() + uploadStreamConf.setAll(conf.getAll) + uploadStreamConf.set(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1L) + test(s"$testName (with replication as stream)") { + testCaching(uploadStreamConf, storageLevel) + } + } + } + private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = { sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test")) TestUtils.waitUntilExecutorsUp(sc, 2, 30000) @@ -169,7 +185,10 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val blockManager = SparkEnv.get.blockManager val blockTransfer = blockManager.blockTransferService val serializerManager = SparkEnv.get.serializerManager - blockManager.master.getLocations(blockId).foreach { cmId => + val locations = blockManager.master.getLocations(blockId) + assert(locations.size === storageLevel.replication, + s"; got ${locations.size} replicas instead of ${storageLevel.replication}") + locations.foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, blockId.toString, null) val deserialized = serializerManager.dataDeserializeStream(blockId, @@ -189,8 +208,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2, "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2 ).foreach { case (testName, storageLevel) => - encryptionTest(testName) { conf => - testCaching(conf, storageLevel) + encryptionTestHelper(testName) { case (name, conf) => + testCaching(name, conf, storageLevel) } } diff --git a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala index 3f52dc41abf6..85918479cc89 100644 --- a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala @@ -28,11 +28,15 @@ trait EncryptionFunSuite { * for the test to modify the provided SparkConf. */ final protected def encryptionTest(name: String)(fn: SparkConf => Unit) { + encryptionTestHelper(name) { case (name, conf) => + test(name)(fn(conf)) + } + } + + final protected def encryptionTestHelper(name: String)(fn: (String, SparkConf) => Unit): Unit = { Seq(false, true).foreach { encrypt => - test(s"$name (encryption = ${ if (encrypt) "on" else "off" })") { - val conf = new SparkConf().set(IO_ENCRYPTION_ENABLED, encrypt) - fn(conf) - } + val conf = new SparkConf().set(IO_ENCRYPTION_ENABLED, encrypt) + fn(s"$name (encryption = ${ if (encrypt) "on" else "off" })", conf) } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index efdd02fff787..ed4f76e2beae 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -24,6 +24,7 @@ import com.google.common.io.{ByteStreams, Files} import io.netty.channel.FileRegion import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config import org.apache.spark.network.util.{ByteArrayWritableChannel, JavaUtils} import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.util.Utils @@ -94,7 +95,7 @@ class DiskStoreSuite extends SparkFunSuite { test("blocks larger than 2gb") { val conf = new SparkConf() - .set("spark.storage.memoryMapLimitForTests", "10k" ) + .set(config.MEMORY_MAP_LIMIT_FOR_TESTS.key, "10k") val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8f96bb0f3384..ff799a23c75c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,11 +36,12 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( + // [SPARK-24296][CORE] Replicate large blocks as a stream. + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.this"), // [SPARK-23528] Add numIter to ClusteringSummary ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.ClusteringSummary.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.KMeansSummary.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.BisectingKMeansSummary.this"), - // [SPARK-6237][NETWORK] Network-layer changes to allow stream upload ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"), From 6d059f25f3595243a8dd6195a5ee938a78e40d99 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 6 Aug 2018 12:24:01 -0500 Subject: [PATCH 2/5] review feedback --- .../scala/org/apache/spark/internal/config/package.scala | 1 + .../apache/spark/network/netty/NettyBlockRpcServer.scala | 6 +++--- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5005bf9466de..8e6d7fa2710e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -571,6 +571,7 @@ package object config { private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS = ConfigBuilder("spark.storage.memoryMapLimitForTests") .internal() + .doc("For testing only, controls the size of chunks when memory mapping a file") .bytesConf(ByteUnit.BYTE) .createWithDefault(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index a320af64b4bf..7076701421e2 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -73,7 +73,7 @@ class NettyBlockRpcServer( } val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) val blockId = BlockId(uploadBlock.blockId) - logInfo(s"Receiving replicated block $blockId with level ${level} " + + logDebug(s"Receiving replicated block $blockId with level ${level} " + s"from ${client.getSocketAddress}") blockManager.putBlockData(blockId, data, level, classTag) responseContext.onSuccess(ByteBuffer.allocate(0)) @@ -83,7 +83,7 @@ class NettyBlockRpcServer( override def receiveStream( client: TransportClient, messageHeader: ByteBuffer, - responseContext: RpcResponseCallback): StreamCallbackWithID = { + responseContext: RpcResponseCallback): StreamCallbackWithID = { val message = BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream] val (level: StorageLevel, classTag: ClassTag[_]) = { @@ -93,7 +93,7 @@ class NettyBlockRpcServer( .asInstanceOf[(StorageLevel, ClassTag[_])] } val blockId = BlockId(message.blockId) - logInfo(s"Receiving replicated block $blockId with level ${level} as stream " + + logDebug(s"Receiving replicated block $blockId with level ${level} as stream " + s"from ${client.getSocketAddress}") // This will return immediately, but will setup a callback on streamData which will still // do all the processing in the netty thread. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e1cc3a9af8c2..6ce440b282d7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -429,9 +429,9 @@ private[spark] class BlockManager( // Note this is all happening inside the netty thread as soon as it reads the end of the // stream. channel.close() - // TODO Even if we're only going to write the data to disk after this, we end up using a lot - // of memory here. We wont' get a jvm OOM, but might get killed by the OS / cluster - // manager. We could at least read the tmp file as a stream. + // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up + // using a lot of memory here. We won't get a jvm OOM, but might get killed by the + // OS / cluster manager. We could at least read the tmp file as a stream. val buffer = ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) putBytes(blockId, buffer, level)(classTag) From 034acb40036b50e459b72e6a4b4156dc33244ae9 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 9 Aug 2018 16:03:11 -0500 Subject: [PATCH 3/5] fix --- .../server/TransportRequestHandler.java | 2 +- .../shuffle/protocol/UploadBlockStream.java | 2 +- .../netty/NettyBlockTransferService.scala | 4 ++-- .../apache/spark/storage/BlockManager.scala | 20 ++++++++++++++++--- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index e1d7b2dbff60..eb0ef11a390d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -234,7 +234,7 @@ public void onComplete(String streamId) throws IOException { callback.onSuccess(ByteBuffer.allocate(0)); } catch (Exception ex) { IOException ioExc = new IOException("Failure post-processing complete stream;" + - " failing this rpc and leaving channel active"); + " failing this rpc and leaving channel active", ex); callback.onFailure(ioExc); streamHandler.onFailure(streamId, ioExc); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java index 67bdd681b6a7..9df30967d5bb 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java @@ -28,7 +28,7 @@ import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** - * A request to Upload a block, which the destintation should receive as a stream. + * A request to Upload a block, which the destination should receive as a stream. * * The actual block data is not contained here. It will be passed to the StreamCallbackWithID * that is returned from RpcHandler.receiveStream() diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index ff7640af78c5..1905632a936d 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -152,12 +152,12 @@ private[spark] class NettyBlockTransferService( val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) val callback = new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { - logTrace(s"Successfully uploaded block $blockId${if (asStream) "as stream" else ""}") + logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") result.success((): Unit) } override def onFailure(e: Throwable): Unit = { - logError(s"Error while uploading $blockId${if (asStream) "as stream" else ""}", e) + logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e) result.failure(e) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6ce440b282d7..f6bcccede2c0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -413,8 +413,10 @@ private[spark] class BlockManager( // to the final location, but that would require a deeper refactor of this code. So instead // we just write to a temp file, and call putBytes on the data in that file. val tmpFile = diskBlockManager.createTempLocalBlock()._2 + val channel = new CountingWritableChannel( + Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile)))) + logTrace(s"Streaming block $blockId to tmp file $tmpFile") new StreamCallbackWithID { - val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile)) override def getID: String = blockId.name @@ -425,6 +427,7 @@ private[spark] class BlockManager( } override def onComplete(streamId: String): Unit = { + logTrace(s"Done receiving block $blockId, now putting into local blockManager") // Read the contents of the downloaded file as a buffer to put into the blockManager. // Note this is all happening inside the netty thread as soon as it reads the end of the // stream. @@ -432,8 +435,19 @@ private[spark] class BlockManager( // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up // using a lot of memory here. We won't get a jvm OOM, but might get killed by the // OS / cluster manager. We could at least read the tmp file as a stream. - val buffer = ChunkedByteBuffer.map(tmpFile, - conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) + val buffer = securityManager.getIOEncryptionKey() match { + case Some(key) => + // we need to pass in the size of the unencrypted block + val blockSize = channel.getCount + val allocator = level.memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) + + case None => + ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) + } putBytes(blockId, buffer, level)(classTag) tmpFile.delete() } From 4664def1ce02ebf4019c49c336304f739c594696 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 13 Aug 2018 14:31:32 -0500 Subject: [PATCH 4/5] cleanup --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f6bcccede2c0..901f916a73ae 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import java.io._ import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference} import java.nio.ByteBuffer -import java.nio.channels.{Channels, WritableByteChannel} +import java.nio.channels.Channels import java.util.Collections import java.util.concurrent.ConcurrentHashMap From 44149a51fd817324488508ba006da2d3272db490 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 14 Aug 2018 20:56:37 -0500 Subject: [PATCH 5/5] review feedback --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 ++++-- .../org/apache/spark/security/EncryptionFunSuite.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c138c0643bea..e7cdfab99b34 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -435,8 +435,10 @@ private[spark] class BlockManager( // stream. channel.close() // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up - // using a lot of memory here. We won't get a jvm OOM, but might get killed by the - // OS / cluster manager. We could at least read the tmp file as a stream. + // using a lot of memory here. With encryption, we'll read the whole file into a regular + // byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm + // OOM, but might get killed by the OS / cluster manager. We could at least read the tmp + // file as a stream in both cases. val buffer = securityManager.getIOEncryptionKey() match { case Some(key) => // we need to pass in the size of the unencrypted block diff --git a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala index 85918479cc89..be6b8a6b5b10 100644 --- a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala @@ -29,7 +29,7 @@ trait EncryptionFunSuite { */ final protected def encryptionTest(name: String)(fn: SparkConf => Unit) { encryptionTestHelper(name) { case (name, conf) => - test(name)(fn(conf)) + test(name)(fn(conf)) } }