Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,27 @@ public static int[] decode(ByteBuf buf) {
return ints;
}
}

/** Long integer arrays are encoded with their length followed by long integers. */
public static class LongArrays {
public static int encodedLength(long[] longs) {
return 4 + 8 * longs.length;
}

public static void encode(ByteBuf buf, long[] longs) {
buf.writeInt(longs.length);
for (long i : longs) {
buf.writeLong(i);
}
}

public static long[] decode(ByteBuf buf) {
int numLongs = buf.readInt();
long[] longs = new long[numLongs];
for (int i = 0; i < longs.length; i ++) {
longs[i] = buf.readLong();
}
return longs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected void handleMessage(
numBlockIds += ids.length;
}
streamId = streamManager.registerStream(client.getClientId(),
new ManagedBufferIterator(msg, numBlockIds), client.getChannel());
new ShuffleManagedBufferIterator(msg), client.getChannel());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also remove

numBlockIds = 0;
          for (int[] ids: msg.reduceIds) {
            numBlockIds += ids.length;
          }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The numBlockIds used in callback:

callback.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer());

} else {
// For the compatibility with the old version, still keep the support for OpenBlocks.
OpenBlocks msg = (OpenBlocks) msgObj;
Expand Down Expand Up @@ -299,21 +299,6 @@ private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
return mapIdAndReduceIds;
}

ManagedBufferIterator(FetchShuffleBlocks msg, int numBlockIds) {
final int[] mapIdAndReduceIds = new int[2 * numBlockIds];
int idx = 0;
for (int i = 0; i < msg.mapIds.length; i++) {
for (int reduceId : msg.reduceIds[i]) {
mapIdAndReduceIds[idx++] = msg.mapIds[i];
mapIdAndReduceIds[idx++] = reduceId;
}
}
assert(idx == 2 * numBlockIds);
size = mapIdAndReduceIds.length;
blockDataForIndexFn = index -> blockManager.getBlockData(msg.appId, msg.execId,
msg.shuffleId, mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
}

@Override
public boolean hasNext() {
return index < size;
Expand All @@ -328,6 +313,49 @@ public ManagedBuffer next() {
}
}

private class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {

private int mapIdx = 0;
private int reduceIdx = 0;

private final String appId;
private final String execId;
private final int shuffleId;
private final long[] mapTaskIds;
private final int[][] reduceIds;

ShuffleManagedBufferIterator(FetchShuffleBlocks msg) {
appId = msg.appId;
execId = msg.execId;
shuffleId = msg.shuffleId;
mapTaskIds = msg.mapTaskIds;
reduceIds = msg.reduceIds;
}

@Override
public boolean hasNext() {
// mapTaskIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
// must have non-empty mapTaskIds and reduceIds, see the checking logic in
// OneForOneBlockFetcher.
assert(mapTaskIds.length != 0 && mapTaskIds.length == reduceIds.length);
return mapIdx < mapTaskIds.length && reduceIdx < reduceIds[mapIdx].length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(
appId, execId, shuffleId, mapTaskIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
if (reduceIdx < reduceIds[mapIdx].length - 1) {
reduceIdx += 1;
} else {
reduceIdx = 0;
mapIdx += 1;
}
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
}

@Override
public void channelActive(TransportClient client) {
metrics.activeConnections.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ public void registerExecutor(
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapTaskId, reduceId). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
long mapTaskId,
int reduceId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
return getSortBasedShuffleBlockData(executor, shuffleId, mapTaskId, reduceId);
}

public ManagedBuffer getRddBlockData(
Expand Down Expand Up @@ -291,22 +291,23 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
}

/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapTaskId_0.index" into a data
* file called "shuffle_ShuffleId_MapTaskId_0.data".
* This logic is from IndexShuffleBlockResolver, and the block id format is from
* ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, long mapTaskId, int reduceId) {
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.HashMap;

import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,45 +108,44 @@ private boolean isShuffleBlocks(String[] blockIds) {

/**
* Analyze the pass in blockIds and create FetchShuffleBlocks message.
* The blockIds has been sorted by mapId and reduceId. It's produced in
* The blockIds has been sorted by mapTaskId and reduceId. It's produced in
* org.apache.spark.MapOutputTracker.convertMapStatuses.
*/
private FetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId, String execId, String[] blockIds) {
int shuffleId = splitBlockId(blockIds[0])[0];
HashMap<Integer, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
int shuffleId = splitBlockId(blockIds[0]).left;
HashMap<Long, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
for (String blockId : blockIds) {
int[] blockIdParts = splitBlockId(blockId);
if (blockIdParts[0] != shuffleId) {
ImmutableTriple<Integer, Long, Integer> blockIdParts = splitBlockId(blockId);
if (blockIdParts.left != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
int mapId = blockIdParts[1];
if (!mapIdToReduceIds.containsKey(mapId)) {
mapIdToReduceIds.put(mapId, new ArrayList<>());
long mapTaskId = blockIdParts.middle;
if (!mapIdToReduceIds.containsKey(mapTaskId)) {
mapIdToReduceIds.put(mapTaskId, new ArrayList<>());
}
mapIdToReduceIds.get(mapId).add(blockIdParts[2]);
mapIdToReduceIds.get(mapTaskId).add(blockIdParts.right);
}
int[] mapIds = Ints.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapIds.length][];
for (int i = 0; i < mapIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
long[] mapTaskIds = Longs.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapTaskIds.length][];
for (int i = 0; i < mapTaskIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapTaskIds[i]));
}
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIdArr);
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIdArr);
}

/** Split the shuffleBlockId and return shuffleId, mapId and reduceId. */
private int[] splitBlockId(String blockId) {
/** Split the shuffleBlockId and return shuffleId, mapTaskId and reduceId. */
private ImmutableTriple<Integer, Long, Integer> splitBlockId(String blockId) {
String[] blockIdParts = blockId.split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException(
"Unexpected shuffle block id format: " + blockId);
}
return new int[] {
Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]),
Integer.parseInt(blockIdParts[3])
};
return new ImmutableTriple<>(
Integer.parseInt(blockIdParts[1]),
Long.parseLong(blockIdParts[2]),
Integer.parseInt(blockIdParts[3]));
}

/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ public class FetchShuffleBlocks extends BlockTransferMessage {
public final String appId;
public final String execId;
public final int shuffleId;
// The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
public final int[] mapIds;
// The length of mapTaskIds must equal to reduceIds.size(), for the i-th mapTaskId in mapTaskIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map task.
public final long[] mapTaskIds;
public final int[][] reduceIds;

public FetchShuffleBlocks(
String appId,
String execId,
int shuffleId,
int[] mapIds,
long[] mapTaskIds,
int[][] reduceIds) {
this.appId = appId;
this.execId = execId;
this.shuffleId = shuffleId;
this.mapIds = mapIds;
this.mapTaskIds = mapTaskIds;
this.reduceIds = reduceIds;
assert(mapIds.length == reduceIds.length);
assert(mapTaskIds.length == reduceIds.length);
}

@Override
Expand All @@ -60,7 +60,7 @@ public String toString() {
.add("appId", appId)
.add("execId", execId)
.add("shuffleId", shuffleId)
.add("mapIds", Arrays.toString(mapIds))
.add("mapTaskIds", Arrays.toString(mapTaskIds))
.add("reduceIds", Arrays.deepToString(reduceIds))
.toString();
}
Expand All @@ -75,7 +75,7 @@ public boolean equals(Object o) {
if (shuffleId != that.shuffleId) return false;
if (!appId.equals(that.appId)) return false;
if (!execId.equals(that.execId)) return false;
if (!Arrays.equals(mapIds, that.mapIds)) return false;
if (!Arrays.equals(mapTaskIds, that.mapTaskIds)) return false;
return Arrays.deepEquals(reduceIds, that.reduceIds);
}

Expand All @@ -84,7 +84,7 @@ public int hashCode() {
int result = appId.hashCode();
result = 31 * result + execId.hashCode();
result = 31 * result + shuffleId;
result = 31 * result + Arrays.hashCode(mapIds);
result = 31 * result + Arrays.hashCode(mapTaskIds);
result = 31 * result + Arrays.deepHashCode(reduceIds);
return result;
}
Expand All @@ -98,7 +98,7 @@ public int encodedLength() {
return Encoders.Strings.encodedLength(appId)
+ Encoders.Strings.encodedLength(execId)
+ 4 /* encoded length of shuffleId */
+ Encoders.IntArrays.encodedLength(mapIds)
+ Encoders.LongArrays.encodedLength(mapTaskIds)
+ 4 /* encoded length of reduceIds.size() */
+ encodedLengthOfReduceIds;
}
Expand All @@ -108,7 +108,7 @@ public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, execId);
buf.writeInt(shuffleId);
Encoders.IntArrays.encode(buf, mapIds);
Encoders.LongArrays.encode(buf, mapTaskIds);
buf.writeInt(reduceIds.length);
for (int[] ids: reduceIds) {
Encoders.IntArrays.encode(buf, ids);
Expand All @@ -119,12 +119,12 @@ public static FetchShuffleBlocks decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
String execId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int[] mapIds = Encoders.IntArrays.decode(buf);
long[] mapTaskIds = Encoders.LongArrays.decode(buf);
int reduceIdsSize = buf.readInt();
int[][] reduceIds = new int[reduceIdsSize][];
for (int i = 0; i < reduceIdsSize; i++) {
reduceIds[i] = Encoders.IntArrays.decode(buf);
}
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds);
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class BlockTransferMessagesSuite {
public void serializeOpenShuffleBlocks() {
checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
checkSerializeDeserialize(new FetchShuffleBlocks(
"app-1", "exec-2", 0, new int[] {0, 1},
"app-1", "exec-2", 0, new long[] {0, 1},
new int[][] {{ 0, 1 }, { 0, 1, 2 }}));
checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testFetchShuffleBlocks() {
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(blockMarkers[1]);

FetchShuffleBlocks fetchShuffleBlocks = new FetchShuffleBlocks(
"app0", "exec1", 0, new int[] { 0 }, new int[][] {{ 0, 1 }});
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }});
checkOpenBlocksReceive(fetchShuffleBlocks, blockMarkers);

verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testFetchOne() {
BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new FetchShuffleBlocks("app-id", "exec-id", 0, new int[] { 0 }, new int[][] {{ 0 }}),
new FetchShuffleBlocks("app-id", "exec-id", 0, new long[] { 0 }, new int[][] {{ 0 }}),
conf);

verify(listener).onBlockFetchSuccess("shuffle_0_0_0", blocks.get("shuffle_0_0_0"));
Expand Down Expand Up @@ -100,7 +100,7 @@ public void testFetchThreeShuffleBlocks() {
BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new FetchShuffleBlocks("app-id", "exec-id", 0, new int[] { 0 }, new int[][] {{ 0, 1, 2 }}),
new FetchShuffleBlocks("app-id", "exec-id", 0, new long[] { 0 }, new int[][] {{ 0, 1, 2 }}),
conf);

for (int i = 0; i < 3; i ++) {
Expand Down
Loading