From 36c49da5bbc503ddadce8b3b88e5748a5f4163c9 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Tue, 25 Nov 2014 18:15:56 -0800 Subject: [PATCH] [SPARK-4516] Avoid allocating unnecessarily Netty PooledByteBufAllocators Turns out we are allocating an allocator pool for every TransportClient (which means that the number increases with the number of nodes in the cluster), when really we should just reuse one for all clients. This patch, as expected, massively decreases off-heap memory allocation, and appears to make allocation only proportional to the number of cores. --- .../spark/network/client/TransportClientFactory.java | 12 +++++------- .../org/apache/spark/network/util/NettyUtils.java | 6 +++--- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 76bce8592816..9afd5decd5e6 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -19,7 +19,6 @@ import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; @@ -37,7 +36,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.util.internal.PlatformDependent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +65,7 @@ public class TransportClientFactory implements Closeable { private final Class socketChannelClass; private EventLoopGroup workerGroup; + private PooledByteBufAllocator pooledAllocator; public TransportClientFactory( TransportContext context, @@ -80,6 +79,8 @@ public TransportClientFactory( this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); // TODO: Make thread pool name configurable. this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client"); + this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); } /** @@ -115,11 +116,8 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO // Disable Nagle's Algorithm since we don't want packets to wait .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()); - - // Use pooled buffers to reduce temporary buffer allocation - bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator( - conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads())); + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) + .option(ChannelOption.ALLOCATOR, pooledAllocator); final AtomicReference clientRef = new AtomicReference(); diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 5c654a6fd6eb..b3991a6577cf 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -109,9 +109,9 @@ public static String getRemoteAddress(Channel channel) { /** * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches - * are disabled because the ByteBufs are allocated by the event loop thread, but released by the - * executor thread rather than the event loop thread. Those thread-local caches actually delay - * the recycling of buffers, leading to larger memory usage. + * are disabled for TransportClients because the ByteBufs are allocated by the event loop thread, + * but released by the executor thread rather than the event loop thread. Those thread-local + * caches actually delay the recycling of buffers, leading to larger memory usage. */ public static PooledByteBufAllocator createPooledByteBufAllocator( boolean allowDirectBufs,