Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,31 +116,44 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
int shuffleId = Integer.parseInt(firstBlock[1]);
boolean batchFetchEnabled = firstBlock.length == 5;

HashMap<Long, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
HashMap<Long, ArrayList<String[]>> mapIdToBlockIds = new HashMap<>();
for (String blockId : blockIds) {
String[] blockIdParts = splitBlockId(blockId);
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
long mapId = Long.parseLong(blockIdParts[2]);
if (!mapIdToReduceIds.containsKey(mapId)) {
mapIdToReduceIds.put(mapId, new ArrayList<>());
}
mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[3]));
if (batchFetchEnabled) {
// When we read continuous shuffle blocks in batch, we will reuse reduceIds in
// FetchShuffleBlocks to store the start and end reduce id for range
// [startReduceId, endReduceId).
assert(blockIdParts.length == 5);
mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
if (!mapIdToBlockIds.containsKey(mapId)) {
mapIdToBlockIds.put(mapId, new ArrayList<>());
}
ArrayList<String[]> blocks = mapIdToBlockIds.get(mapId);
// override the useless prefix "shuffle" to reuse the blockIdParts
blockIdParts[0] = blockId;
blocks.add(blockIdParts);
}
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
long[] mapIds = Longs.toArray(mapIdToBlockIds.keySet());
int[][] reduceIdArr = new int[mapIds.length][];
int blockIdIndex = 0;
for (int i = 0; i < mapIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
ArrayList<String[]> blocks = mapIdToBlockIds.get(mapIds[i]);
reduceIdArr[i] = batchFetchEnabled ? new int[2] : new int[blocks.size()];
for (int j = 0; j < blocks.size(); j++) {
String[] curBlock = blocks.get(j);
// ensure the order between `blockIds` and `FetchShuffleBlocks` is consistent
this.blockIds[blockIdIndex++] = curBlock[0];
reduceIdArr[i][j] = Integer.parseInt(curBlock[3]);
if (batchFetchEnabled) {
// When we read continuous shuffle blocks in batch, we will reuse reduceIds in
// FetchShuffleBlocks to store the start and end reduce id for range
// [startReduceId, endReduceId).
assert(curBlock.length == 5 && j == 0);
reduceIdArr[i][1] = (Integer.parseInt(curBlock[4]));
}
}
}
assert(blockIdIndex == this.blockIds.length);

return new FetchShuffleBlocks(
appId, execId, shuffleId, mapIds, reduceIdArr, batchFetchEnabled);
}
Expand Down