diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 76bce8592816..9afd5decd5e6 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -19,7 +19,6 @@ import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; @@ -37,7 +36,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.util.internal.PlatformDependent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +65,7 @@ public class TransportClientFactory implements Closeable { private final Class socketChannelClass; private EventLoopGroup workerGroup; + private PooledByteBufAllocator pooledAllocator; public TransportClientFactory( TransportContext context, @@ -80,6 +79,8 @@ public TransportClientFactory( this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); // TODO: Make thread pool name configurable. this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client"); + this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); } /** @@ -115,11 +116,8 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO // Disable Nagle's Algorithm since we don't want packets to wait .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()); - - // Use pooled buffers to reduce temporary buffer allocation - bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator( - conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads())); + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) + .option(ChannelOption.ALLOCATOR, pooledAllocator); final AtomicReference clientRef = new AtomicReference(); diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 5c654a6fd6eb..b3991a6577cf 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -109,9 +109,9 @@ public static String getRemoteAddress(Channel channel) { /** * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches - * are disabled because the ByteBufs are allocated by the event loop thread, but released by the - * executor thread rather than the event loop thread. Those thread-local caches actually delay - * the recycling of buffers, leading to larger memory usage. + * are disabled for TransportClients because the ByteBufs are allocated by the event loop thread, + * but released by the executor thread rather than the event loop thread. Those thread-local + * caches actually delay the recycling of buffers, leading to larger memory usage. */ public static PooledByteBufAllocator createPooledByteBufAllocator( boolean allowDirectBufs,