Skip to content

Commit 5ea4df6

Browse files
committed
[SPARK-3796] Create external service which can serve shuffle files
This patch introduces the tooling necessary to construct an external shuffle service which is independent of Spark executors, and then use this service inside Spark. An example (just for the sake of this PR) of the service creation can be found in Worker, and the service itself is used by plugging in the StandaloneShuffleClient as Spark's ShuffleClient (setup in BlockManager). This PR continues the work from apache#2753, which extracted out the transport layer of Spark's block transfer into an independent package within Spark. A new package was created which contains the Spark business logic necessary to retrieve the actual shuffle data, which is completely independent of the transport layer introduced in the previous patch. Similar to the transport layer, this package must not depend on Spark as we anticipate plugging this service as a lightweight process within, say, the YARN ApplicationManager, and do not wish to include Spark's dependencies (including Scala itself). There are several outstanding tasks which must be complete before this PR can be merged: - [ ] Complete unit testing of network/shuffle package. - [ ] Performance and correctness testing on a real cluster. - [ ] Documentation of the feature in the Spark docs. - [ ] Remove example service instantiation from Worker.scala. There are even more shortcomings of this PR which should be addressed in followup patche: - Don't use Java serializer for RPC layer! It is not cross-version compatible. - Handle shuffle file cleanup for dead executors once the application terminates or the ContextCleaner triggers. - Integrate unit testing with Spark's tests (currently only runnable via maven). - Improve behavior if the shuffle service itself goes down (right now we don't blacklist it, and new executors cannot spawn on that machine). - SSL and SASL integration - Nice to have: Handle shuffle file consolidation (this would requires changes to Spark's implementation).
1 parent 087e31a commit 5ea4df6

File tree

48 files changed

+1021
-246
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1021
-246
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
<artifactId>spark-network-common_2.10</artifactId>
5050
<version>${project.version}</version>
5151
</dependency>
52+
<dependency>
53+
<groupId>org.apache.spark</groupId>
54+
<artifactId>network-shuffle</artifactId>
55+
<version>${project.version}</version>
56+
</dependency>
5257
<dependency>
5358
<groupId>net.java.dev.jets3t</groupId>
5459
<artifactId>jets3t</artifactId>

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
3232
import org.apache.spark.broadcast.BroadcastManager
3333
import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.BlockTransferService
35-
import org.apache.spark.network.netty.{NettyBlockTransferService}
35+
import org.apache.spark.network.netty.NettyBlockTransferService
3636
import org.apache.spark.network.nio.NioBlockTransferService
3737
import org.apache.spark.scheduler.LiveListenerBus
3838
import org.apache.spark.serializer.Serializer

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
4040
import org.apache.spark.metrics.MetricsSystem
4141
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
4242

43+
// TODO: Remove me before merge!
44+
import org.apache.spark.network.util.SystemPropertyConfigProvider
45+
import org.apache.spark.network.TransportContext
46+
import org.apache.spark.network.shuffle.StandaloneShuffleBlockHandler
47+
import org.apache.spark.network.util.TransportConf
48+
4349
/**
4450
* @param masterUrls Each url should look like spark://host:port.
4551
*/
@@ -186,11 +192,11 @@ private[spark] class Worker(
186192
private def retryConnectToMaster() {
187193
Utils.tryOrExit {
188194
connectionAttemptCount += 1
189-
logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount")
190195
if (registered) {
191196
registrationRetryTimer.foreach(_.cancel())
192197
registrationRetryTimer = None
193198
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
199+
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
194200
tryRegisterAllMasters()
195201
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
196202
registrationRetryTimer.foreach(_.cancel())
@@ -428,6 +434,17 @@ private[spark] object Worker extends Logging {
428434
def main(argStrings: Array[String]) {
429435
SignalLogger.register(log)
430436
val conf = new SparkConf
437+
438+
// Create external shuffle server
439+
// TODO: Remove this before PR goes in -- this is just to demonstrate how it looks!
440+
scala.util.Try {
441+
val port = conf.getInt("spark.shuffle.service.port", 7337)
442+
val transportConf = new TransportConf(new SystemPropertyConfigProvider())
443+
val rpcHandler = new StandaloneShuffleBlockHandler()
444+
val transportContext = new TransportContext(transportConf, rpcHandler)
445+
transportContext.createServer(port)
446+
}
447+
431448
val args = new WorkerArguments(argStrings, conf)
432449
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
433450
args.memory, args.masters, args.workDir)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private[spark] class Executor(
7878
val executorSource = new ExecutorSource(this, executorId)
7979

8080
// Initialize Spark environment (using system properties read above)
81-
conf.set("spark.executor.id", "executor." + executorId)
81+
conf.set("spark.executor.id", executorId)
8282
private val env = {
8383
if (!isLocal) {
8484
val port = conf.getInt("spark.executor.port", 0)

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private[spark] class MetricsSystem private (
109109
*/
110110
def buildRegistryName(source: Source): String = {
111111
val appId = conf.getOption("spark.app.id")
112-
val executorId = conf.getOption("spark.executor.id")
112+
val executorId = conf.getOption("spark.executor.id").map("executor." + _)
113113
val defaultName = MetricRegistry.name(source.sourceName)
114114

115115
if (instance == "driver" || instance == "executor") {

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@ package org.apache.spark.network
2020
import java.io.Closeable
2121
import java.nio.ByteBuffer
2222

23-
import scala.concurrent.{Await, Future}
23+
import scala.concurrent.{Promise, Await, Future}
2424
import scala.concurrent.duration.Duration
2525

2626
import org.apache.spark.Logging
2727
import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
28-
import org.apache.spark.storage.{BlockId, StorageLevel}
29-
import org.apache.spark.util.Utils
28+
import org.apache.spark.network.shuffle.{ShuffleClient, BlockFetchingListener}
29+
import org.apache.spark.storage.{BlockManagerId, BlockId, StorageLevel}
3030

3131
private[spark]
32-
abstract class BlockTransferService extends Closeable with Logging {
32+
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
3333

3434
/**
3535
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
@@ -60,10 +60,11 @@ abstract class BlockTransferService extends Closeable with Logging {
6060
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
6161
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
6262
*/
63-
def fetchBlocks(
64-
hostName: String,
63+
override def fetchBlocks(
64+
host: String,
6565
port: Int,
66-
blockIds: Seq[String],
66+
execId: String,
67+
blockIds: Array[String],
6768
listener: BlockFetchingListener): Unit
6869

6970
/**
@@ -81,43 +82,23 @@ abstract class BlockTransferService extends Closeable with Logging {
8182
*
8283
* It is also only available after [[init]] is invoked.
8384
*/
84-
def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = {
85+
def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
8586
// A monitor for the thread to wait on.
86-
val lock = new Object
87-
@volatile var result: Either[ManagedBuffer, Throwable] = null
88-
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
89-
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
90-
lock.synchronized {
91-
result = Right(exception)
92-
lock.notify()
87+
val result = Promise[ManagedBuffer]()
88+
fetchBlocks(host, port, execId, Array(blockId),
89+
new BlockFetchingListener {
90+
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
91+
result.failure(exception)
9392
}
94-
}
95-
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
96-
lock.synchronized {
93+
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
9794
val ret = ByteBuffer.allocate(data.size.toInt)
9895
ret.put(data.nioByteBuffer())
9996
ret.flip()
100-
result = Left(new NioManagedBuffer(ret))
101-
lock.notify()
97+
result.success(new NioManagedBuffer(ret))
10298
}
103-
}
104-
})
99+
})
105100

106-
// Sleep until result is no longer null
107-
lock.synchronized {
108-
while (result == null) {
109-
try {
110-
lock.wait()
111-
} catch {
112-
case e: InterruptedException =>
113-
}
114-
}
115-
}
116-
117-
result match {
118-
case Left(data) => data
119-
case Right(e) => throw e
120-
}
101+
Await.result(result.future, Duration.Inf)
121102
}
122103

123104
/**

core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala

Lines changed: 0 additions & 95 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,41 @@ package org.apache.spark.network.netty
1919

2020
import java.nio.ByteBuffer
2121

22+
import scala.collection.JavaConversions._
23+
2224
import org.apache.spark.Logging
2325
import org.apache.spark.network.BlockDataManager
26+
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
27+
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
28+
import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager}
29+
import org.apache.spark.network.shuffle.ShuffleStreamHandle
2430
import org.apache.spark.serializer.Serializer
25-
import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
26-
import org.apache.spark.network.client.{TransportClient, RpcResponseCallback}
27-
import org.apache.spark.network.server.{DefaultStreamManager, RpcHandler}
28-
import org.apache.spark.storage.{StorageLevel, BlockId}
29-
30-
import scala.collection.JavaConversions._
31+
import org.apache.spark.storage.{BlockId, StorageLevel}
3132

3233
object NettyMessages {
33-
3434
/** Request to read a set of blocks. Returns [[ShuffleStreamHandle]] to identify the stream. */
3535
case class OpenBlocks(blockIds: Seq[BlockId])
3636

3737
/** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
3838
case class UploadBlock(blockId: BlockId, blockData: Array[Byte], level: StorageLevel)
39-
40-
/** Identifier for a fixed number of chunks to read from a stream created by [[OpenBlocks]]. */
41-
case class ShuffleStreamHandle(streamId: Long, numChunks: Int)
4239
}
4340

4441
/**
4542
* Serves requests to open blocks by simply registering one chunk per block requested.
43+
* Handles opening and uploading arbitrary BlockManager blocks.
44+
*
45+
* Opened blocks are registered with the "one-for-one" strategy, meaning each Transport-layer Chunk
46+
* is equivalent to one Spark-level shuffle block.
4647
*/
4748
class NettyBlockRpcServer(
4849
serializer: Serializer,
49-
streamManager: DefaultStreamManager,
5050
blockManager: BlockDataManager)
5151
extends RpcHandler with Logging {
5252

5353
import NettyMessages._
5454

55+
private val streamManager = new OneForOneStreamManager()
56+
5557
override def receive(
5658
client: TransportClient,
5759
messageBytes: Array[Byte],
@@ -73,4 +75,6 @@ class NettyBlockRpcServer(
7375
responseContext.onSuccess(new Array[Byte](0))
7476
}
7577
}
78+
79+
override def getStreamManager(): StreamManager = streamManager
7680
}

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717

1818
package org.apache.spark.network.netty
1919

20-
import scala.concurrent.{Promise, Future}
20+
import org.apache.spark.network.shuffle.{OneForOneBlockFetcher, BlockFetchingListener, StandaloneShuffleMessages}
21+
22+
import scala.concurrent.{Await, Future, Promise}
23+
import scala.concurrent.duration._
2124

2225
import org.apache.spark.SparkConf
2326
import org.apache.spark.network._
27+
import StandaloneShuffleMessages.{RegisterExecutor}
2428
import org.apache.spark.network.buffer.ManagedBuffer
25-
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientFactory}
26-
import org.apache.spark.network.netty.NettyMessages.UploadBlock
29+
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientFactory}
30+
import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock}
2731
import org.apache.spark.network.server._
2832
import org.apache.spark.network.util.{ConfigProvider, TransportConf}
2933
import org.apache.spark.serializer.JavaSerializer
30-
import org.apache.spark.storage.{BlockId, StorageLevel}
34+
import org.apache.spark.storage.{BlockId, BlockManagerId, StorageLevel}
3135
import org.apache.spark.util.Utils
3236

3337
/**
@@ -37,30 +41,29 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
3741
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
3842
val serializer = new JavaSerializer(conf)
3943

40-
// Create a TransportConfig using SparkConf.
41-
private[this] val transportConf = new TransportConf(
42-
new ConfigProvider { override def get(name: String) = conf.get(name) })
43-
4444
private[this] var transportContext: TransportContext = _
4545
private[this] var server: TransportServer = _
4646
private[this] var clientFactory: TransportClientFactory = _
4747

4848
override def init(blockDataManager: BlockDataManager): Unit = {
49-
val streamManager = new DefaultStreamManager
50-
val rpcHandler = new NettyBlockRpcServer(serializer, streamManager, blockDataManager)
51-
transportContext = new TransportContext(transportConf, streamManager, rpcHandler)
49+
val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
50+
transportContext = new TransportContext(SparkTransportConf.fromSparkConf(conf), rpcHandler)
5251
clientFactory = transportContext.createClientFactory()
5352
server = transportContext.createServer()
53+
logInfo("Server created on " + server.getPort)
5454
}
5555

5656
override def fetchBlocks(
57-
hostname: String,
57+
host: String,
5858
port: Int,
59-
blockIds: Seq[String],
59+
execId: String,
60+
blockIds: Array[String],
6061
listener: BlockFetchingListener): Unit = {
62+
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
6163
try {
62-
val client = clientFactory.createClient(hostname, port)
63-
new NettyBlockFetcher(serializer, client, blockIds, listener).start()
64+
val client = clientFactory.createClient(host, port)
65+
new OneForOneBlockFetcher(client, blockIds.toArray, listener)
66+
.start(OpenBlocks(blockIds.map(BlockId.apply)))
6467
} catch {
6568
case e: Exception =>
6669
logError("Exception while beginning fetchBlocks", e)

0 commit comments

Comments
 (0)