Skip to content
Prev Previous commit
Next Next commit
add test case
  • Loading branch information
yuhaiyang authored and yuhaiyang committed Feb 26, 2021
commit a08ca1da29103ce8b16bc81fc208992f4561fb1f
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,50 @@ public void testEmptyBlockFetch() {
}
}

@Test
public void testFetchShuffleBlocksOrder() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
blocks.put("shuffle_0_2_1", new NioManagedBuffer(ByteBuffer.wrap(new byte[2])));
blocks.put("shuffle_0_10_2", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[3])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);

BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new FetchShuffleBlocks("app-id", "exec-id", 0, new long[]{0}, new int[][]{{0, 3}}, false),
conf);

for (int chunkIndex = 0; chunkIndex < blockIds.length; chunkIndex++) {
String blockId = blockIds[chunkIndex];
verify(listener).onBlockFetchSuccess(blockId, blocks.get(blockId));
}
}

@Test
public void testBatchFetchShuffleBlocksOrder() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("shuffle_0_0_1_2", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
blocks.put("shuffle_0_2_2_3", new NioManagedBuffer(ByteBuffer.wrap(new byte[2])));
blocks.put("shuffle_0_10_3_4", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[3])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);

BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new OpenBlocks("app-id", "exec-id", blockIds),
new TransportConf("shuffle", new MapConfigProvider(
new HashMap<String, String>() {{
put("spark.shuffle.useOldFetchProtocol", "true");
}}
)));

for (int chunkIndex = 0; chunkIndex < blockIds.length; chunkIndex++) {
String blockId = blockIds[chunkIndex];
verify(listener).onBlockFetchSuccess(blockId, blocks.get(blockId));
}
}

/**
* Begins a fetch on the given set of blocks by mocking out the server side of the RPC which
* simply returns the given (BlockId, Block) pairs.
Expand Down