Skip to content

Commit 29c6dcf

Browse files
committed
[SPARK-3453] Netty-based BlockTransferService, extracted from Spark core
This PR encapsulates apache#2330, which is itself a continuation of apache#2240. The first goal of this PR is to provide an alternate, simpler implementation of the ConnectionManager which is based on Netty. In addition to this goal, however, we want to resolve [SPARK-3796](https://issues.apache.org/jira/browse/SPARK-3796), which calls for a standalone shuffle service which can be integrated into the YARN NodeManager, Standalone Worker, or on its own. This PR makes the first step in this direction by ensuring that the actual Netty service is as small as possible and extracted from Spark core. Given this, we should be able to construct this standalone jar which can be included in other JVMs without incurring significant dependency or runtime issues. The actual work to ensure that such a standalone shuffle service would work in Spark will be left for a future PR, however. In order to minimize dependencies and allow for the service to be long-running (possibly much longer-running than Spark, and possibly having to support multiple version of Spark simultaneously), the entire service has been ported to Java, where we have full control over the binary compatibility of the components and do not depend on the Scala runtime or version. These PRs have been addressed by folding in apache#2330: SPARK-3453: Refactor Netty module to use BlockTransferService interface SPARK-3018: Release all buffers upon task completion/failure SPARK-3002: Create a connection pool and reuse clients across different threads SPARK-3017: Integration tests and unit tests for connection failures SPARK-3049: Make sure client doesn't block when server/connection has error(s) SPARK-3502: SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option SPARK-3503: Disable thread local cache in PooledByteBufAllocator TODO before mergeable: [ ] Implement uploadBlock() [ ] Unit tests for RPC side of code [ ] Performance testing [ ] Turn OFF by default (currently on for unit testing)
1 parent f7e7568 commit 29c6dcf

File tree

76 files changed

+3579
-1899
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+3579
-1899
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@
4444
</exclusion>
4545
</exclusions>
4646
</dependency>
47+
<dependency>
48+
<groupId>org.apache.spark</groupId>
49+
<artifactId>network</artifactId>
50+
<version>${project.version}</version>
51+
</dependency>
4752
<dependency>
4853
<groupId>net.java.dev.jets3t</groupId>
4954
<artifactId>jets3t</artifactId>

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,14 @@ import org.apache.spark.api.python.PythonWorkerFactory
3232
import org.apache.spark.broadcast.BroadcastManager
3333
import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.BlockTransferService
35-
import org.apache.spark.network.netty.NettyBlockTransferService
35+
import org.apache.spark.network.netty.{NettyBlockTransferService}
3636
import org.apache.spark.network.nio.NioBlockTransferService
3737
import org.apache.spark.scheduler.LiveListenerBus
3838
import org.apache.spark.serializer.Serializer
3939
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
4040
import org.apache.spark.storage._
4141
import org.apache.spark.util.{AkkaUtils, Utils}
4242

43-
4443
/**
4544
* :: DeveloperApi ::
4645
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
@@ -233,12 +232,14 @@ object SparkEnv extends Logging {
233232

234233
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
235234

236-
// TODO(rxin): Config option based on class name, similar to shuffle mgr and compression codec.
237-
val blockTransferService = if (conf.getBoolean("spark.shuffle.use.netty", false)) {
238-
new NettyBlockTransferService(conf)
239-
} else {
240-
new NioBlockTransferService(conf, securityManager)
241-
}
235+
// TODO: This is only netty by default for initial testing -- it should not be merged as such!!!
236+
val blockTransferService =
237+
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
238+
case "netty" =>
239+
new NettyBlockTransferService(conf)
240+
case "nio" =>
241+
new NioBlockTransferService(conf, securityManager)
242+
}
242243

243244
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
244245
"BlockManagerMaster",

core/src/main/scala/org/apache/spark/network/BlockDataManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.network
1919

20-
import org.apache.spark.storage.StorageLevel
21-
20+
import org.apache.spark.network.buffer.ManagedBuffer
21+
import org.apache.spark.storage.{BlockId, StorageLevel}
2222

2323
private[spark]
2424
trait BlockDataManager {
@@ -27,10 +27,10 @@ trait BlockDataManager {
2727
* Interface to get local block data. Throws an exception if the block cannot be found or
2828
* cannot be read successfully.
2929
*/
30-
def getBlockData(blockId: String): ManagedBuffer
30+
def getBlockData(blockId: BlockId): ManagedBuffer
3131

3232
/**
3333
* Put the block locally, using the given storage level.
3434
*/
35-
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
35+
def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
3636
}

core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.network
1919

2020
import java.util.EventListener
2121

22+
import org.apache.spark.network.buffer.ManagedBuffer
23+
2224

2325
/**
2426
* Listener callback interface for [[BlockTransferService.fetchBlocks]].

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@
1818
package org.apache.spark.network
1919

2020
import java.io.Closeable
21-
import java.nio.ByteBuffer
21+
22+
import org.apache.spark.network.buffer.ManagedBuffer
2223

2324
import scala.concurrent.{Await, Future}
2425
import scala.concurrent.duration.Duration
2526

27+
import org.apache.spark.Logging
2628
import org.apache.spark.storage.StorageLevel
27-
29+
import org.apache.spark.util.Utils
2830

2931
private[spark]
30-
abstract class BlockTransferService extends Closeable {
32+
abstract class BlockTransferService extends Closeable with Logging {
3133

3234
/**
3335
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
@@ -92,10 +94,7 @@ abstract class BlockTransferService extends Closeable {
9294
}
9395
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
9496
lock.synchronized {
95-
val ret = ByteBuffer.allocate(data.size.toInt)
96-
ret.put(data.nioByteBuffer())
97-
ret.flip()
98-
result = Left(new NioManagedBuffer(ret))
97+
result = Left(data)
9998
lock.notify()
10099
}
101100
}

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

Lines changed: 0 additions & 187 deletions
This file was deleted.

0 commit comments

Comments
 (0)