Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.Map;

import com.codahale.metrics.Gauge;
Expand All @@ -30,7 +30,6 @@
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,14 +92,25 @@ protected void handleMessage(
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);

List<ManagedBuffer> blocks = Lists.newArrayList();
long totalBlockSize = 0;
for (String blockId : msg.blockIds) {
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId);
totalBlockSize += block != null ? block.size() : 0;
blocks.add(block);
}
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() {
private int index = 0;

@Override
public boolean hasNext() {
return index < msg.blockIds.length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId,
msg.blockIds[index]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to look more to verify, but I don't think you can hang onto the msg here without duplicating it. TransportRequestHandler.processRpcRequest is going to release the request so I think it could get reused. @rxin can perhaps verify.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs
Thanks a lot for taking time looking into this :)
In my understanding, the OpenBlocks will be kept in heap after initialization(https://github.com/apache/spark/blob/master/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java#L84).
Yes, TransportRequestHandler.processRpcRequest will release the ByteBuf, but the OpenBlocks will not be released.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right I missed that when I did a quick skim originally of this.

index++;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
};

long streamId = streamManager.registerStream(client.getClientId(), iter);
if (logger.isTraceEnabled()) {
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
Expand All @@ -109,7 +119,6 @@ protected void handleMessage(
getRemoteAddress(client.getChannel()));
}
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
metrics.blockTransferRateBytes.mark(totalBlockSize);
} finally {
responseDelayContext.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ public void testOpenShuffleBlocks() {
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
.toByteBuffer();
handler.receive(client, openBlocks, callback);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");

ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
verify(callback, times(1)).onSuccess(response.capture());
Expand All @@ -107,6 +105,8 @@ public void testOpenShuffleBlocks() {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");

// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ public void testFetchWrongExecutor() throws Exception {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult execFetch = fetchBlocks("exec-0",
new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
// Both still fail, as we start by checking for all block.
assertTrue(execFetch.successBlocks.isEmpty());
assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks);
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ class NettyBlockRpcServer(

message match {
case openBlocks: OpenBlocks =>
val blocks: Seq[ManagedBuffer] =
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
val blocksNum = openBlocks.blockIds.length
val blocks = for (i <- (0 until blocksNum).view)
yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
logTrace(s"Registered streamId $streamId with $blocksNum buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)

case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
Expand Down