-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7183][Network] Fix memory leak of TransportRequestHandler.streamIds #5743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
3b3f38a
37a4b6c
17f020f
45908b7
f9a0c37
d35f19a
97e205c
cf2c086
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,14 +20,18 @@ | |
| import java.util.Iterator; | ||
| import java.util.Map; | ||
| import java.util.Random; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import io.netty.channel.Channel; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.spark.network.buffer.ManagedBuffer; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
|
|
||
| /** | ||
| * StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually | ||
| * fetched as chunks by the client. Each registered buffer is one chunk. | ||
|
|
@@ -42,11 +46,15 @@ public class OneForOneStreamManager extends StreamManager { | |
| private static class StreamState { | ||
| final Iterator<ManagedBuffer> buffers; | ||
|
|
||
| // The channel associated to the stream | ||
| Channel associatedChannel = null; | ||
|
|
||
| // Used to keep track of the index of the buffer that the user has retrieved, just to ensure | ||
| // that the caller only requests each chunk one at a time, in order. | ||
| int curChunk = 0; | ||
|
|
||
| StreamState(Iterator<ManagedBuffer> buffers) { | ||
| Preconditions.checkNotNull(buffers); | ||
| this.buffers = buffers; | ||
| } | ||
| } | ||
|
|
@@ -58,6 +66,13 @@ public OneForOneStreamManager() { | |
| streams = new ConcurrentHashMap<Long, StreamState>(); | ||
| } | ||
|
|
||
| @Override | ||
| public void registerChannel(Channel channel, long streamId) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can avoid the new field by doing streams.get(streamId).associatedChannel = channel;here and // Close all streams which have been associated with the channel.
Iterator<Map.Entry<Long, StreamState>> streamIterator = streams.iterator()
while (streamIterator.hasNext()) {
StreamState state = streamIterator.next().getValue()
if (state.associatedChannel == channel) {
streamIterator.remove();
// Release all remaining buffers.
while (state.buffers.hasNext()) {
state.buffers.next().release();
}
}
}Allowing the removal of the other connectionTerminated(). |
||
| if (streams.containsKey(streamId)) { | ||
| streams.get(streamId).associatedChannel = channel; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public ManagedBuffer getChunk(long streamId, int chunkIndex) { | ||
| StreamState state = streams.get(streamId); | ||
|
|
@@ -80,12 +95,17 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) { | |
| } | ||
|
|
||
| @Override | ||
| public void connectionTerminated(long streamId) { | ||
| // Release all remaining buffers. | ||
| StreamState state = streams.remove(streamId); | ||
| if (state != null && state.buffers != null) { | ||
| while (state.buffers.hasNext()) { | ||
| state.buffers.next().release(); | ||
| public void connectionTerminated(Channel channel) { | ||
| // Close all streams which have been associated with the channel. | ||
| for (Map.Entry<Long, StreamState> entry: streams.entrySet()) { | ||
| StreamState state = entry.getValue(); | ||
| if (state.associatedChannel == channel) { | ||
| streams.remove(entry.getKey()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point, this is safe because our Map is a ConcurrentHashMap (or else you would need to use an iterator to remove it safely). Would you mind making the left-hand type of the declaration of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making sense. Updated. |
||
|
|
||
| // Release all remaining buffers. | ||
| while (state.buffers.hasNext()) { | ||
| state.buffers.next().release(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,7 @@ | |
|
|
||
| package org.apache.spark.network.server; | ||
|
|
||
| import java.util.Set; | ||
|
|
||
| import com.google.common.base.Throwables; | ||
| import com.google.common.collect.Sets; | ||
| import io.netty.channel.Channel; | ||
| import io.netty.channel.ChannelFuture; | ||
| import io.netty.channel.ChannelFutureListener; | ||
|
|
@@ -62,9 +59,6 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { | |
| /** Returns each chunk part of a stream. */ | ||
| private final StreamManager streamManager; | ||
|
|
||
| /** List of all stream ids that have been read on this handler, used for cleanup. */ | ||
| private final Set<Long> streamIds; | ||
|
|
||
| public TransportRequestHandler( | ||
| Channel channel, | ||
| TransportClient reverseClient, | ||
|
|
@@ -73,7 +67,6 @@ public TransportRequestHandler( | |
| this.reverseClient = reverseClient; | ||
| this.rpcHandler = rpcHandler; | ||
| this.streamManager = rpcHandler.getStreamManager(); | ||
| this.streamIds = Sets.newHashSet(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -82,10 +75,8 @@ public void exceptionCaught(Throwable cause) { | |
|
|
||
| @Override | ||
| public void channelUnregistered() { | ||
| // Inform the StreamManager that these streams will no longer be read from. | ||
| for (long streamId : streamIds) { | ||
| streamManager.connectionTerminated(streamId); | ||
| } | ||
| // Inform the StreamManager that this channel is unregistered. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I think we can remove this comment, the new code is straightforward enough. |
||
| streamManager.connectionTerminated(channel); | ||
| rpcHandler.connectionTerminated(reverseClient); | ||
| } | ||
|
|
||
|
|
@@ -102,7 +93,8 @@ public void handle(RequestMessage request) { | |
|
|
||
| private void processFetchRequest(final ChunkFetchRequest req) { | ||
| final String client = NettyUtils.getRemoteAddress(channel); | ||
| streamIds.add(req.streamChunkId.streamId); | ||
|
|
||
| streamManager.registerChannel(channel, req.streamChunkId.streamId); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we don't know from this point in the code whether the registerChannel method will throw an exception, let's move it inside the try-catch for the getChunk(). Otherwise we could leave a message unresponded-to, which would be bad. |
||
|
|
||
| logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
this.buffers = Preconditions.checkNotNull(buffers)