Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
boolean batchFetchEnabled = firstBlock.length == 5;

HashMap<Long, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
ArrayList<Long> orderedMapId = new ArrayList<>();
for (String blockId : blockIds) {
String[] blockIdParts = splitBlockId(blockId);
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
long mapId = Long.parseLong(blockIdParts[2]);
assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1));
if (!mapIdToReduceIds.containsKey(mapId)) {
mapIdToReduceIds.put(mapId, new ArrayList<>());
orderedMapId.add(mapId);
}
mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[3]));
ArrayList<Integer> reduceIdsByMapId = mapIdToReduceIds.get(mapId);
int reduceId = Integer.parseInt(blockIdParts[3]);
assert(reduceIdsByMapId.isEmpty() || reduceId > reduceIdsByMapId.get(reduceIdsByMapId.size() - 1));
reduceIdsByMapId.add(reduceId);
if (batchFetchEnabled) {
// When we read continuous shuffle blocks in batch, we will reuse reduceIds in
// FetchShuffleBlocks to store the start and end reduce id for range
Expand All @@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
}
}
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
long[] mapIds = Longs.toArray(orderedMapId);
int[][] reduceIdArr = new int[mapIds.length][];
for (int i = 0; i < mapIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
}

return new FetchShuffleBlocks(
appId, execId, shuffleId, mapIds, reduceIdArr, batchFetchEnabled);
}
Expand Down