Skip to content

Commit bec4ea2

Browse files
rxinaarondav
authored andcommitted
Removed OIO and added num threads settings.
1 parent 1bdd7ee commit bec4ea2

File tree

3 files changed

+15
-28
lines changed

3 files changed

+15
-28
lines changed

core/src/main/scala/org/apache/spark/network/netty/BlockClientFactory.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ import io.netty.buffer.PooledByteBufAllocator
2525
import io.netty.channel._
2626
import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
2727
import io.netty.channel.nio.NioEventLoopGroup
28-
import io.netty.channel.oio.OioEventLoopGroup
2928
import io.netty.channel.socket.SocketChannel
3029
import io.netty.channel.socket.nio.NioSocketChannel
31-
import io.netty.channel.socket.oio.OioSocketChannel
3230
import io.netty.util.internal.PlatformDependent
3331

3432
import org.apache.spark.{Logging, SparkConf}
@@ -65,23 +63,18 @@ class BlockClientFactory(val conf: NettyConfig) extends Logging with Closeable {
6563

6664
/** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
6765
private def init(): Unit = {
68-
def initOio(): Unit = {
69-
socketChannelClass = classOf[OioSocketChannel]
70-
workerGroup = new OioEventLoopGroup(0, threadFactory)
71-
}
7266
def initNio(): Unit = {
7367
socketChannelClass = classOf[NioSocketChannel]
74-
workerGroup = new NioEventLoopGroup(0, threadFactory)
68+
workerGroup = new NioEventLoopGroup(conf.clientThreads, threadFactory)
7569
}
7670
def initEpoll(): Unit = {
7771
socketChannelClass = classOf[EpollSocketChannel]
78-
workerGroup = new EpollEventLoopGroup(0, threadFactory)
72+
workerGroup = new EpollEventLoopGroup(conf.clientThreads, threadFactory)
7973
}
8074

8175
// For auto mode, first try epoll (only available on Linux), then nio.
8276
conf.ioMode match {
8377
case "nio" => initNio()
84-
case "oio" => initOio()
8578
case "epoll" => initEpoll()
8679
case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
8780
}
@@ -102,7 +95,7 @@ class BlockClientFactory(val conf: NettyConfig) extends Logging with Closeable {
10295
return cachedClient
10396
}
10497

105-
logInfo(s"Creating new connection to $remoteHost:$remotePort")
98+
logDebug(s"Creating new connection to $remoteHost:$remotePort")
10699

107100
// There is a chance two threads are creating two different clients connecting to the same host.
108101
// But that's probably ok ...

core/src/main/scala/org/apache/spark/network/netty/BlockServer.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ import io.netty.bootstrap.ServerBootstrap
2424
import io.netty.buffer.PooledByteBufAllocator
2525
import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollServerSocketChannel}
2626
import io.netty.channel.nio.NioEventLoopGroup
27-
import io.netty.channel.oio.OioEventLoopGroup
2827
import io.netty.channel.socket.SocketChannel
2928
import io.netty.channel.socket.nio.NioServerSocketChannel
30-
import io.netty.channel.socket.oio.OioServerSocketChannel
3129
import io.netty.channel.{ChannelInitializer, ChannelFuture, ChannelOption}
3230

3331
import org.apache.spark.Logging
@@ -60,24 +58,18 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataManager)
6058

6159
// Use only one thread to accept connections, and 2 * num_cores for worker.
6260
def initNio(): Unit = {
63-
val bossGroup = new NioEventLoopGroup(0, threadFactory)
61+
val bossGroup = new NioEventLoopGroup(conf.serverThreads, threadFactory)
6462
val workerGroup = bossGroup
6563
bootstrap.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
6664
}
67-
def initOio(): Unit = {
68-
val bossGroup = new OioEventLoopGroup(0, threadFactory)
69-
val workerGroup = bossGroup
70-
bootstrap.group(bossGroup, workerGroup).channel(classOf[OioServerSocketChannel])
71-
}
7265
def initEpoll(): Unit = {
73-
val bossGroup = new EpollEventLoopGroup(0, threadFactory)
66+
val bossGroup = new EpollEventLoopGroup(conf.serverThreads, threadFactory)
7467
val workerGroup = bossGroup
7568
bootstrap.group(bossGroup, workerGroup).channel(classOf[EpollServerSocketChannel])
7669
}
7770

7871
conf.ioMode match {
7972
case "nio" => initNio()
80-
case "oio" => initOio()
8173
case "epoll" => initEpoll()
8274
case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
8375
}

core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,20 @@ class NettyConfig(conf: SparkConf) {
3131
/** IO mode: nio, oio, epoll, or auto (try epoll first and then nio). */
3232
private[netty] val ioMode = conf.get("spark.shuffle.io.mode", "nio").toLowerCase
3333

34-
/** Connect timeout in secs. Default 60 secs. */
35-
private[netty] val connectTimeoutMs = conf.getInt("spark.shuffle.io.connectionTimeout", 60) * 1000
36-
37-
/**
38-
* Percentage of the desired amount of time spent for I/O in the child event loops.
39-
* Only applicable in nio and epoll.
40-
*/
41-
private[netty] val ioRatio = conf.getInt("spark.shuffle.io.netty.ioRatio", 80)
34+
/** Connect timeout in secs. Default 120 secs. */
35+
private[netty] val connectTimeoutMs = {
36+
conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000
37+
}
4238

4339
/** Requested maximum length of the queue of incoming connections. */
4440
private[netty] val backLog: Option[Int] = conf.getOption("spark.shuffle.io.backLog").map(_.toInt)
4541

42+
/** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
43+
private[netty] val serverThreads: Int = conf.getInt("spark.shuffle.io.serverThreads", 0)
44+
45+
/** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */
46+
private[netty] val clientThreads: Int = conf.getInt("spark.shuffle.io.clientThreads", 0)
47+
4648
/**
4749
* Receive buffer size (SO_RCVBUF).
4850
* Note: the optimal size for receive buffer and send buffer should be

0 commit comments

Comments
 (0)