Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove streamIds from TransportRequestHandler.
  • Loading branch information
viirya committed Apr 29, 2015
commit 37a4b6c442a63a3d36f139cc82fd3692a10f70ea
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,6 +41,9 @@ public class OneForOneStreamManager extends StreamManager {
private final AtomicLong nextStreamId;
private final Map<Long, StreamState> streams;

/** List of all stream ids that are associated to specified channel. **/
private final Map<Channel, Set<Long>> streamIds;

/** State of a single stream. */
private static class StreamState {
final Iterator<ManagedBuffer> buffers;
Expand All @@ -56,11 +62,15 @@ public OneForOneStreamManager() {
// This does not need to be globally unique, only unique to this class.
nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
streams = new ConcurrentHashMap<Long, StreamState>();
streamIds = new ConcurrentHashMap<Channel, Set<Long>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if you wouldn't mind, add a Preconditions.checkNotNull(buffers) to line 56 (to ensure buffers is not null).

}

@Override
public boolean streamHasNext(long streamId) {
return streams.containsKey(streamId);
public void registerChannel(Channel channel, long streamId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid the new field by doing

streams.get(streamId).associatedChannel = channel;

here and

// Close all streams which have been associated with the channel.
Iterator<Map.Entry<Long, StreamState>> streamIterator = streams.iterator()
while (streamIterator.hasNext()) {
  StreamState state = streamIterator.next().getValue()
  if (state.associatedChannel == channel) {
    streamIterator.remove();

    // Release all remaining buffers.
    while (state.buffers.hasNext()) {
      state.buffers.next().release();
    }
  }
}

Allowing the removal of the other connectionTerminated().

if (!streamIds.containsKey(channel)) {
streamIds.put(channel, Sets.newHashSet());
}
streamIds.get(channel).add(streamId);
}

@Override
Expand All @@ -84,6 +94,17 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {
return nextChunk;
}

@Override
public void connectionTerminated(Channel channel) {
// Release all associated streams
if (streamIds.containsKey(channel)) {
for (long streamId : streamIds.get(channel)) {
connectionTerminated(streamId);
}
streamIds.remove(channel);
}
}

@Override
public void connectionTerminated(long streamId) {
// Release all remaining buffers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.network.server;

import io.netty.channel.Channel;

import org.apache.spark.network.buffer.ManagedBuffer;

/**
Expand Down Expand Up @@ -44,9 +46,15 @@ public abstract class StreamManager {
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

/**
* Indicates that if the specified stream has next chunks to read further.
* Register the given stream to the associated channel. So these streams can be cleaned up later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's beef up the documentation:

Associates a stream with a single client connection, which is guaranteed to be the only reader of the stream. The getChunk() method will be called serially on this connection and once the connection is closed, the stream will never be used again, enabling cleanup.

This must be called before the first getChunk() on the stream, but it may be invoked multiple times with the same channel and stream id.

*/
public void registerChannel(Channel channel, long streamId) { }

/**
* Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
* to read from the associated streams again, so any state can be cleaned up.
*/
public boolean streamHasNext(long streamId) { return true; }
public void connectionTerminated(Channel channel) { }

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this method, it should no longer be used.

* Indicates that the TCP connection that was tied to the given stream has been terminated. After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
/** Returns each chunk part of a stream. */
private final StreamManager streamManager;

/** List of all stream ids that have been read on this handler, used for cleanup. */
private final Set<Long> streamIds;

public TransportRequestHandler(
Channel channel,
TransportClient reverseClient,
Expand All @@ -73,7 +70,6 @@ public TransportRequestHandler(
this.reverseClient = reverseClient;
this.rpcHandler = rpcHandler;
this.streamManager = rpcHandler.getStreamManager();
this.streamIds = Sets.newHashSet();
}

@Override
Expand All @@ -82,10 +78,8 @@ public void exceptionCaught(Throwable cause) {

@Override
public void channelUnregistered() {
// Inform the StreamManager that these streams will no longer be read from.
for (long streamId : streamIds) {
streamManager.connectionTerminated(streamId);
}
// Inform the StreamManager that this channel is unregistered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can remove this comment, the new code is straightforward enough.

streamManager.connectionTerminated(channel);
rpcHandler.connectionTerminated(reverseClient);
}

Expand All @@ -102,16 +96,14 @@ public void handle(RequestMessage request) {

private void processFetchRequest(final ChunkFetchRequest req) {
final String client = NettyUtils.getRemoteAddress(channel);
streamIds.add(req.streamChunkId.streamId);

streamManager.registerChannel(channel, req.streamChunkId.streamId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't know from this point in the code whether the registerChannel method will throw an exception, let's move it inside the try-catch for the getChunk(). Otherwise we could leave a message unresponded-to, which would be bad.


logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);

ManagedBuffer buf;
try {
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
if (!streamManager.streamHasNext(req.streamChunkId.streamId)) {
streamIds.remove(req.streamChunkId.streamId);
}
} catch (Exception e) {
logger.error(String.format(
"Error opening block %s for request from %s", req.streamChunkId, client), e);
Expand Down