Skip to content

Commit 36c49da

Browse files
committed
[SPARK-4516] Avoid allocating unnecessarily Netty PooledByteBufAllocators
Turns out we are allocating an allocator pool for every TransportClient (which means that the number increases with the number of nodes in the cluster), when really we should just reuse one for all clients. This patch, as expected, massively decreases off-heap memory allocation, and appears to make allocation only proportional to the number of cores.
1 parent bf1a6aa commit 36c49da

File tree

2 files changed

+8
-10
lines changed

2 files changed

+8
-10
lines changed

network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22-
import java.lang.reflect.Field;
2322
import java.net.InetSocketAddress;
2423
import java.net.SocketAddress;
2524
import java.util.List;
@@ -37,7 +36,6 @@
3736
import io.netty.channel.ChannelOption;
3837
import io.netty.channel.EventLoopGroup;
3938
import io.netty.channel.socket.SocketChannel;
40-
import io.netty.util.internal.PlatformDependent;
4139
import org.slf4j.Logger;
4240
import org.slf4j.LoggerFactory;
4341

@@ -67,6 +65,7 @@ public class TransportClientFactory implements Closeable {
6765

6866
private final Class<? extends Channel> socketChannelClass;
6967
private EventLoopGroup workerGroup;
68+
private PooledByteBufAllocator pooledAllocator;
7069

7170
public TransportClientFactory(
7271
TransportContext context,
@@ -80,6 +79,8 @@ public TransportClientFactory(
8079
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
8180
// TODO: Make thread pool name configurable.
8281
this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
82+
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
83+
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
8384
}
8485

8586
/**
@@ -115,11 +116,8 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
115116
// Disable Nagle's Algorithm since we don't want packets to wait
116117
.option(ChannelOption.TCP_NODELAY, true)
117118
.option(ChannelOption.SO_KEEPALIVE, true)
118-
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
119-
120-
// Use pooled buffers to reduce temporary buffer allocation
121-
bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
122-
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
119+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
120+
.option(ChannelOption.ALLOCATOR, pooledAllocator);
123121

124122
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
125123

network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ public static String getRemoteAddress(Channel channel) {
109109

110110
/**
111111
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
112-
* are disabled because the ByteBufs are allocated by the event loop thread, but released by the
113-
* executor thread rather than the event loop thread. Those thread-local caches actually delay
114-
* the recycling of buffers, leading to larger memory usage.
112+
* are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
113+
* but released by the executor thread rather than the event loop thread. Those thread-local
114+
* caches actually delay the recycling of buffers, leading to larger memory usage.
115115
*/
116116
public static PooledByteBufAllocator createPooledByteBufAllocator(
117117
boolean allowDirectBufs,

0 commit comments

Comments
 (0)