From 3b3f38a0f3eee2301a85f67b10658724864bfb4e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 28 Apr 2015 18:14:34 +0800 Subject: [PATCH 1/7] Fix memory leak of TransportRequestHandler.streamIds. --- .../apache/spark/network/server/OneForOneStreamManager.java | 5 +++++ .../java/org/apache/spark/network/server/StreamManager.java | 5 +++++ .../apache/spark/network/server/TransportRequestHandler.java | 3 +++ 3 files changed, 13 insertions(+) diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index a6d390e13f39..baf23da7b6a0 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -58,6 +58,11 @@ public OneForOneStreamManager() { streams = new ConcurrentHashMap(); } + @Override + public boolean streamHasNext(long streamId) { + return streams.containsKey(streamId); + } + @Override public ManagedBuffer getChunk(long streamId, int chunkIndex) { StreamState state = streams.get(streamId); diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java index 5a9a14a180c1..8085e63909dd 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java @@ -43,6 +43,11 @@ public abstract class StreamManager { */ public abstract ManagedBuffer getChunk(long streamId, int chunkIndex); + /** + * Indicates that if the specified stream has next chunks to read further. + */ + public boolean streamHasNext(long streamId) { return true; } + /** * Indicates that the TCP connection that was tied to the given stream has been terminated. After * this occurs, we are guaranteed not to read from the stream again, so any state can be cleaned diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 1580180cc17e..73898fe2fd46 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -109,6 +109,9 @@ private void processFetchRequest(final ChunkFetchRequest req) { ManagedBuffer buf; try { buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); + if (!streamManager.streamHasNext(req.streamChunkId.streamId)) { + streamIds.remove(req.streamChunkId.streamId); + } } catch (Exception e) { logger.error(String.format( "Error opening block %s for request from %s", req.streamChunkId, client), e); From 37a4b6c442a63a3d36f139cc82fd3692a10f70ea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 29 Apr 2015 10:18:53 +0800 Subject: [PATCH 2/7] Remove streamIds from TransportRequestHandler. --- .../server/OneForOneStreamManager.java | 25 +++++++++++++++++-- .../spark/network/server/StreamManager.java | 12 +++++++-- .../server/TransportRequestHandler.java | 16 +++--------- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index baf23da7b6a0..4fb16c98f291 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -20,9 +20,12 @@ import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.Sets; +import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +41,9 @@ public class OneForOneStreamManager extends StreamManager { private final AtomicLong nextStreamId; private final Map streams; + /** List of all stream ids that are associated to specified channel. **/ + private final Map> streamIds; + /** State of a single stream. */ private static class StreamState { final Iterator buffers; @@ -56,11 +62,15 @@ public OneForOneStreamManager() { // This does not need to be globally unique, only unique to this class. nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000); streams = new ConcurrentHashMap(); + streamIds = new ConcurrentHashMap>(); } @Override - public boolean streamHasNext(long streamId) { - return streams.containsKey(streamId); + public void registerChannel(Channel channel, long streamId) { + if (!streamIds.containsKey(channel)) { + streamIds.put(channel, Sets.newHashSet()); + } + streamIds.get(channel).add(streamId); } @Override @@ -84,6 +94,17 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) { return nextChunk; } + @Override + public void connectionTerminated(Channel channel) { + // Release all associated streams + if (streamIds.containsKey(channel)) { + for (long streamId : streamIds.get(channel)) { + connectionTerminated(streamId); + } + streamIds.remove(channel); + } + } + @Override public void connectionTerminated(long streamId) { // Release all remaining buffers. diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java index 8085e63909dd..3c53b4dfa567 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java @@ -17,6 +17,8 @@ package org.apache.spark.network.server; +import io.netty.channel.Channel; + import org.apache.spark.network.buffer.ManagedBuffer; /** @@ -44,9 +46,15 @@ public abstract class StreamManager { public abstract ManagedBuffer getChunk(long streamId, int chunkIndex); /** - * Indicates that if the specified stream has next chunks to read further. + * Register the given stream to the associated channel. So these streams can be cleaned up later. + */ + public void registerChannel(Channel channel, long streamId) { } + + /** + * Indicates that the given channel has been terminated. After this occurs, we are guaranteed not + * to read from the associated streams again, so any state can be cleaned up. */ - public boolean streamHasNext(long streamId) { return true; } + public void connectionTerminated(Channel channel) { } /** * Indicates that the TCP connection that was tied to the given stream has been terminated. After diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 73898fe2fd46..38be7225d2be 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -62,9 +62,6 @@ public class TransportRequestHandler extends MessageHandler { /** Returns each chunk part of a stream. */ private final StreamManager streamManager; - /** List of all stream ids that have been read on this handler, used for cleanup. */ - private final Set streamIds; - public TransportRequestHandler( Channel channel, TransportClient reverseClient, @@ -73,7 +70,6 @@ public TransportRequestHandler( this.reverseClient = reverseClient; this.rpcHandler = rpcHandler; this.streamManager = rpcHandler.getStreamManager(); - this.streamIds = Sets.newHashSet(); } @Override @@ -82,10 +78,8 @@ public void exceptionCaught(Throwable cause) { @Override public void channelUnregistered() { - // Inform the StreamManager that these streams will no longer be read from. - for (long streamId : streamIds) { - streamManager.connectionTerminated(streamId); - } + // Inform the StreamManager that this channel is unregistered. + streamManager.connectionTerminated(channel); rpcHandler.connectionTerminated(reverseClient); } @@ -102,16 +96,14 @@ public void handle(RequestMessage request) { private void processFetchRequest(final ChunkFetchRequest req) { final String client = NettyUtils.getRemoteAddress(channel); - streamIds.add(req.streamChunkId.streamId); + + streamManager.registerChannel(channel, req.streamChunkId.streamId); logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId); ManagedBuffer buf; try { buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); - if (!streamManager.streamHasNext(req.streamChunkId.streamId)) { - streamIds.remove(req.streamChunkId.streamId); - } } catch (Exception e) { logger.error(String.format( "Error opening block %s for request from %s", req.streamChunkId, client), e); From 17f020f53f7f77a9e896be985c21e34bf3a8fb9c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 29 Apr 2015 10:37:45 +0800 Subject: [PATCH 3/7] Remove unused import. --- .../apache/spark/network/server/OneForOneStreamManager.java | 4 ++-- .../apache/spark/network/server/TransportRequestHandler.java | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 4fb16c98f291..e886ec857d1a 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -17,6 +17,7 @@ package org.apache.spark.network.server; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -24,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.collect.Sets; import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +68,7 @@ public OneForOneStreamManager() { @Override public void registerChannel(Channel channel, long streamId) { if (!streamIds.containsKey(channel)) { - streamIds.put(channel, Sets.newHashSet()); + streamIds.put(channel, new HashSet()); } streamIds.get(channel).add(streamId); } diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 38be7225d2be..3e4468a2858d 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -17,10 +17,7 @@ package org.apache.spark.network.server; -import java.util.Set; - import com.google.common.base.Throwables; -import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; From 45908b7ad5ac2efd08a00602c3e97f3a5434b227 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 29 Apr 2015 16:19:23 +0800 Subject: [PATCH 4/7] for style. --- .../org/apache/spark/network/server/OneForOneStreamManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index e886ec857d1a..379fe9278d67 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -41,7 +41,7 @@ public class OneForOneStreamManager extends StreamManager { private final AtomicLong nextStreamId; private final Map streams; - /** List of all stream ids that are associated to specified channel. **/ + /** List of all stream ids that are associated to specified channel. */ private final Map> streamIds; /** State of a single stream. */ From d35f19ac751696f41d07cbec23c3a2aa7c696b69 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 May 2015 17:55:49 +0800 Subject: [PATCH 5/7] For comments. --- .../server/OneForOneStreamManager.java | 41 ++++++++----------- .../spark/network/server/StreamManager.java | 14 +++---- 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 379fe9278d67..1d0491c0f222 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -31,6 +31,8 @@ import org.apache.spark.network.buffer.ManagedBuffer; +import com.google.common.base.Preconditions; + /** * StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually * fetched as chunks by the client. Each registered buffer is one chunk. @@ -41,18 +43,19 @@ public class OneForOneStreamManager extends StreamManager { private final AtomicLong nextStreamId; private final Map streams; - /** List of all stream ids that are associated to specified channel. */ - private final Map> streamIds; - /** State of a single stream. */ private static class StreamState { final Iterator buffers; + // The channel associated to the stream + Channel associatedChannel = null; + // Used to keep track of the index of the buffer that the user has retrieved, just to ensure // that the caller only requests each chunk one at a time, in order. int curChunk = 0; StreamState(Iterator buffers) { + Preconditions.checkNotNull(buffers); this.buffers = buffers; } } @@ -62,15 +65,13 @@ public OneForOneStreamManager() { // This does not need to be globally unique, only unique to this class. nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000); streams = new ConcurrentHashMap(); - streamIds = new ConcurrentHashMap>(); } @Override public void registerChannel(Channel channel, long streamId) { - if (!streamIds.containsKey(channel)) { - streamIds.put(channel, new HashSet()); + if (streams.containsKey(streamId)) { + streams.get(streamId).associatedChannel = channel; } - streamIds.get(channel).add(streamId); } @Override @@ -96,22 +97,16 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) { @Override public void connectionTerminated(Channel channel) { - // Release all associated streams - if (streamIds.containsKey(channel)) { - for (long streamId : streamIds.get(channel)) { - connectionTerminated(streamId); - } - streamIds.remove(channel); - } - } - - @Override - public void connectionTerminated(long streamId) { - // Release all remaining buffers. - StreamState state = streams.remove(streamId); - if (state != null && state.buffers != null) { - while (state.buffers.hasNext()) { - state.buffers.next().release(); + // Close all streams which have been associated with the channel. + for (Map.Entry entry: streams.entrySet()) { + StreamState state = entry.getValue(); + if (state.associatedChannel == channel) { + streams.remove(entry.getKey()); + + // Release all remaining buffers. + while (state.buffers.hasNext()) { + state.buffers.next().release(); + } } } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java index 3c53b4dfa567..929f789bf9d2 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java @@ -46,7 +46,12 @@ public abstract class StreamManager { public abstract ManagedBuffer getChunk(long streamId, int chunkIndex); /** - * Register the given stream to the associated channel. So these streams can be cleaned up later. + * Associates a stream with a single client connection, which is guaranteed to be the only reader + * of the stream. The getChunk() method will be called serially on this connection and once the + * connection is closed, the stream will never be used again, enabling cleanup. + * + * This must be called before the first getChunk() on the stream, but it may be invoked multiple + * times with the same channel and stream id. */ public void registerChannel(Channel channel, long streamId) { } @@ -55,11 +60,4 @@ public void registerChannel(Channel channel, long streamId) { } * to read from the associated streams again, so any state can be cleaned up. */ public void connectionTerminated(Channel channel) { } - - /** - * Indicates that the TCP connection that was tied to the given stream has been terminated. After - * this occurs, we are guaranteed not to read from the stream again, so any state can be cleaned - * up. - */ - public void connectionTerminated(long streamId) { } } From 97e205c3832ea30f87ca8c607fbf418fbd837fd5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 May 2015 19:54:19 +0800 Subject: [PATCH 6/7] Remove unused import. --- .../org/apache/spark/network/server/OneForOneStreamManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 1d0491c0f222..10ab9ec356af 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -17,7 +17,6 @@ package org.apache.spark.network.server; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Random; From cf2c086cd7dcdb0f047dd1eca73e1708d812f25a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 2 May 2015 01:03:28 +0800 Subject: [PATCH 7/7] For comments. --- .../apache/spark/network/server/OneForOneStreamManager.java | 5 ++--- .../apache/spark/network/server/TransportRequestHandler.java | 4 +--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 10ab9ec356af..c95e64e8e2cd 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -40,7 +40,7 @@ public class OneForOneStreamManager extends StreamManager { private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class); private final AtomicLong nextStreamId; - private final Map streams; + private final ConcurrentHashMap streams; /** State of a single stream. */ private static class StreamState { @@ -54,8 +54,7 @@ private static class StreamState { int curChunk = 0; StreamState(Iterator buffers) { - Preconditions.checkNotNull(buffers); - this.buffers = buffers; + this.buffers = Preconditions.checkNotNull(buffers); } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 3e4468a2858d..e5159ab56d0d 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -75,7 +75,6 @@ public void exceptionCaught(Throwable cause) { @Override public void channelUnregistered() { - // Inform the StreamManager that this channel is unregistered. streamManager.connectionTerminated(channel); rpcHandler.connectionTerminated(reverseClient); } @@ -94,12 +93,11 @@ public void handle(RequestMessage request) { private void processFetchRequest(final ChunkFetchRequest req) { final String client = NettyUtils.getRemoteAddress(channel); - streamManager.registerChannel(channel, req.streamChunkId.streamId); - logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId); ManagedBuffer buf; try { + streamManager.registerChannel(channel, req.streamChunkId.streamId); buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); } catch (Exception e) { logger.error(String.format(