Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
For comments.
  • Loading branch information
viirya committed May 1, 2015
commit cf2c086cd7dcdb0f047dd1eca73e1708d812f25a
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class OneForOneStreamManager extends StreamManager {
private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class);

private final AtomicLong nextStreamId;
private final Map<Long, StreamState> streams;
private final ConcurrentHashMap<Long, StreamState> streams;

/** State of a single stream. */
private static class StreamState {
Expand All @@ -54,8 +54,7 @@ private static class StreamState {
int curChunk = 0;

StreamState(Iterator<ManagedBuffer> buffers) {
Preconditions.checkNotNull(buffers);
this.buffers = buffers;
this.buffers = Preconditions.checkNotNull(buffers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public void exceptionCaught(Throwable cause) {

@Override
public void channelUnregistered() {
// Inform the StreamManager that this channel is unregistered.
streamManager.connectionTerminated(channel);
rpcHandler.connectionTerminated(reverseClient);
}
Expand All @@ -94,12 +93,11 @@ public void handle(RequestMessage request) {
private void processFetchRequest(final ChunkFetchRequest req) {
final String client = NettyUtils.getRemoteAddress(channel);

streamManager.registerChannel(channel, req.streamChunkId.streamId);

logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);

ManagedBuffer buf;
try {
streamManager.registerChannel(channel, req.streamChunkId.streamId);
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format(
Expand Down