Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SPARK-4516] Avoid allocating unnecessarily Netty PooledByteBufAlloca…
…tors

Turns out we are allocating an allocator pool for every TransportClient (which means that the number increases with the number of nodes in the cluster), when really we should just reuse one for all clients.

This patch, as expected, massively decreases off-heap memory allocation, and appears to make allocation only proportional to the number of cores.
  • Loading branch information
aarondav committed Nov 26, 2014
commit 36c49da5bbc503ddadce8b3b88e5748a5f4163c9
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
Expand All @@ -37,7 +36,6 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,6 +65,7 @@ public class TransportClientFactory implements Closeable {

private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;

public TransportClientFactory(
TransportContext context,
Expand All @@ -80,6 +79,8 @@ public TransportClientFactory(
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
// TODO: Make thread pool name configurable.
this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
}

/**
Expand Down Expand Up @@ -115,11 +116,8 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());

// Use pooled buffers to reduce temporary buffer allocation
bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);

final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public static String getRemoteAddress(Channel channel) {

/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
* are disabled because the ByteBufs are allocated by the event loop thread, but released by the
* executor thread rather than the event loop thread. Those thread-local caches actually delay
* the recycling of buffers, leading to larger memory usage.
* are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
* but released by the executor thread rather than the event loop thread. Those thread-local
* caches actually delay the recycling of buffers, leading to larger memory usage.
*/
public static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs,
Expand Down