Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use the taskAttemptId in ShuffleId
  • Loading branch information
xuanyuanking committed Sep 17, 2019
commit bbce8b417a0b9827c7ee947ba8025d8ebc6b8f0a
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,27 @@ public static int[] decode(ByteBuf buf) {
return ints;
}
}

/** Long integer arrays are encoded with their length followed by long integers. */
public static class LongArrays {
public static int encodedLength(long[] longs) {
return 4 + 8 * longs.length;
}

public static void encode(ByteBuf buf, long[] longs) {
buf.writeInt(longs.length);
for (long i : longs) {
buf.writeLong(i);
}
}

public static long[] decode(ByteBuf buf) {
int numLongs = buf.readInt();
long[] longs = new long[numLongs];
for (int i = 0; i < longs.length; i ++) {
longs[i] = buf.readLong();
}
return longs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
}

ManagedBufferIterator(FetchShuffleBlocks msg, int numBlockIds) {
final int[] mapIdAndReduceIds = new int[2 * numBlockIds];
final long[] mapIdAndReduceIds = new long[2 * numBlockIds];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should one long[] for map id and one int[] for reduce id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we already have long[] for map id and int[] for reduce id in the message, here we need is kinda assemble work to flatten reduce id and its corresponding mapid.
The current way waste memory, we can also do it in a cpu consuming way, which is for each index, calculate which map id and reduce id corresponding with the idx.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a further look, I split the new protocol managed buffer iterator in
539d725, that make us more flexible to control the iterator and no more array created.

int idx = 0;
for (int i = 0; i < msg.mapIds.length; i++) {
for (int reduceId : msg.reduceIds[i]) {
Expand All @@ -311,7 +311,7 @@ private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
assert(idx == 2 * numBlockIds);
size = mapIdAndReduceIds.length;
blockDataForIndexFn = index -> blockManager.getBlockData(msg.appId, msg.execId,
msg.shuffleId, mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
msg.shuffleId, mapIdAndReduceIds[index], (int) mapIdAndReduceIds[index + 1]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
long mapId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking why change this from int to long? Is it possible that a mapId can be greater than 2^31?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous the map id is the index of the mapper, and can get conflicts when we re-run the task. Now the map id is the task id, which is unique. task id needs to be long.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after this patch, we set mapId by using the taskAttemptId of map task, which is a unique Id within the same SparkContext. You can see the comment #25620 (comment)

int reduceId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
Expand Down Expand Up @@ -296,7 +296,7 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, long mapId, int reduceId) {
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.HashMap;

import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,21 +113,21 @@ private boolean isShuffleBlocks(String[] blockIds) {
*/
private FetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId, String execId, String[] blockIds) {
int shuffleId = splitBlockId(blockIds[0])[0];
HashMap<Integer, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
int shuffleId = splitBlockId(blockIds[0]).left;
HashMap<Long, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
for (String blockId : blockIds) {
int[] blockIdParts = splitBlockId(blockId);
if (blockIdParts[0] != shuffleId) {
ImmutableTriple<Integer, Long, Integer> blockIdParts = splitBlockId(blockId);
if (blockIdParts.left != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
int mapId = blockIdParts[1];
long mapId = blockIdParts.middle;
if (!mapIdToReduceIds.containsKey(mapId)) {
mapIdToReduceIds.put(mapId, new ArrayList<>());
}
mapIdToReduceIds.get(mapId).add(blockIdParts[2]);
mapIdToReduceIds.get(mapId).add(blockIdParts.right);
}
int[] mapIds = Ints.toArray(mapIdToReduceIds.keySet());
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapIds.length][];
for (int i = 0; i < mapIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
Expand All @@ -134,17 +136,16 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
}

/** Split the shuffleBlockId and return shuffleId, mapId and reduceId. */
private int[] splitBlockId(String blockId) {
private ImmutableTriple<Integer, Long, Integer> splitBlockId(String blockId) {
String[] blockIdParts = blockId.split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException(
"Unexpected shuffle block id format: " + blockId);
}
return new int[] {
Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]),
Integer.parseInt(blockIdParts[3])
};
return new ImmutableTriple<>(
Integer.parseInt(blockIdParts[1]),
Long.parseLong(blockIdParts[2]),
Integer.parseInt(blockIdParts[3]));
}

/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ public class FetchShuffleBlocks extends BlockTransferMessage {
public final int shuffleId;
// The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
public final int[] mapIds;
public final long[] mapIds;
public final int[][] reduceIds;

public FetchShuffleBlocks(
String appId,
String execId,
int shuffleId,
int[] mapIds,
long[] mapIds,
int[][] reduceIds) {
this.appId = appId;
this.execId = execId;
Expand Down Expand Up @@ -98,7 +98,7 @@ public int encodedLength() {
return Encoders.Strings.encodedLength(appId)
+ Encoders.Strings.encodedLength(execId)
+ 4 /* encoded length of shuffleId */
+ Encoders.IntArrays.encodedLength(mapIds)
+ Encoders.LongArrays.encodedLength(mapIds)
+ 4 /* encoded length of reduceIds.size() */
+ encodedLengthOfReduceIds;
}
Expand All @@ -108,7 +108,7 @@ public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, execId);
buf.writeInt(shuffleId);
Encoders.IntArrays.encode(buf, mapIds);
Encoders.LongArrays.encode(buf, mapIds);
buf.writeInt(reduceIds.length);
for (int[] ids: reduceIds) {
Encoders.IntArrays.encode(buf, ids);
Expand All @@ -119,7 +119,7 @@ public static FetchShuffleBlocks decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
String execId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int[] mapIds = Encoders.IntArrays.decode(buf);
long[] mapIds = Encoders.LongArrays.decode(buf);
int reduceIdsSize = buf.readInt();
int[][] reduceIds = new int[reduceIdsSize][];
for (int i = 0; i < reduceIdsSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class BlockTransferMessagesSuite {
public void serializeOpenShuffleBlocks() {
checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
checkSerializeDeserialize(new FetchShuffleBlocks(
"app-1", "exec-2", 0, new int[] {0, 1},
"app-1", "exec-2", 0, new long[] {0, 1},
new int[][] {{ 0, 1 }, { 0, 1, 2 }}));
checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testFetchShuffleBlocks() {
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(blockMarkers[1]);

FetchShuffleBlocks fetchShuffleBlocks = new FetchShuffleBlocks(
"app0", "exec1", 0, new int[] { 0 }, new int[][] {{ 0, 1 }});
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }});
checkOpenBlocksReceive(fetchShuffleBlocks, blockMarkers);

verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testFetchOne() {
BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new FetchShuffleBlocks("app-id", "exec-id", 0, new int[] { 0 }, new int[][] {{ 0 }}),
new FetchShuffleBlocks("app-id", "exec-id", 0, new long[] { 0 }, new int[][] {{ 0 }}),
conf);

verify(listener).onBlockFetchSuccess("shuffle_0_0_0", blocks.get("shuffle_0_0_0"));
Expand Down Expand Up @@ -100,7 +100,7 @@ public void testFetchThreeShuffleBlocks() {
BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new FetchShuffleBlocks("app-id", "exec-id", 0, new int[] { 0 }, new int[][] {{ 0, 1, 2 }}),
new FetchShuffleBlocks("app-id", "exec-id", 0, new long[] { 0 }, new int[][] {{ 0, 1, 2 }}),
conf);

for (int i = 0; i < 3; i ++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public interface ShuffleExecutorComponents {
/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
*
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
Expand All @@ -51,7 +49,6 @@ public interface ShuffleExecutorComponents {
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we name it mapId? To be consistent with the codebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original mapId still used in mapOutputTracker and scheduler, I doubt anybody will confused by these two ids use same name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed all mapTaskAttemptId stuff to mapId in 539d725.
So after the change, the mapId is the unique id for a map task. If we think it's confused to have a mapId represent the map index within a stage or a task set, mapIndex maybe a much better name.

int numPartitions) throws IOException;

Expand All @@ -64,14 +61,12 @@ ShuffleMapOutputWriter createMapOutputWriter(
* preserving an optimization in the local disk shuffle storage implementation.
*
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
*/
default Optional<SingleSpillShuffleMapOutputWriter> createSingleFileMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId) throws IOException {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final Partitioner partitioner;
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final int mapId;
private final long mapTaskAttemptId;
private final Serializer serializer;
private final ShuffleExecutorComponents shuffleExecutorComponents;
Expand All @@ -106,7 +105,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BypassMergeSortShuffleWriter(
BlockManager blockManager,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
long mapTaskAttemptId,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics,
Expand All @@ -116,7 +114,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.mapTaskAttemptId = mapTaskAttemptId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
Expand All @@ -130,11 +127,12 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
.createMapOutputWriter(shuffleId, mapTaskAttemptId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapTaskAttemptId);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -167,7 +165,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapTaskAttemptId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final ShuffleWriteMetricsReporter writeMetrics;
private final ShuffleExecutorComponents shuffleExecutorComponents;
private final int shuffleId;
private final int mapId;
private final long mapTaskAttemptId;
private final TaskContext taskContext;
private final SparkConf sparkConf;
private final boolean transferToEnabled;
Expand Down Expand Up @@ -109,7 +109,6 @@ public UnsafeShuffleWriter(
BlockManager blockManager,
TaskMemoryManager memoryManager,
SerializedShuffleHandle<K, V> handle,
int mapId,
TaskContext taskContext,
SparkConf sparkConf,
ShuffleWriteMetricsReporter writeMetrics,
Expand All @@ -123,7 +122,7 @@ public UnsafeShuffleWriter(
}
this.blockManager = blockManager;
this.memoryManager = memoryManager;
this.mapId = mapId;
this.mapTaskAttemptId = taskContext.taskAttemptId();
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.shuffleId = dep.shuffleId();
this.serializer = dep.serializer().newInstance();
Expand Down Expand Up @@ -228,7 +227,8 @@ void closeAndWriteOutput() throws IOException {
}
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapTaskAttemptId);
}

@VisibleForTesting
Expand Down Expand Up @@ -266,14 +266,13 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
.createMapOutputWriter(
shuffleId,
mapId,
taskContext.taskAttemptId(),
partitioner.numPartitions());
return mapWriter.commitAllPartitions();
} else if (spills.length == 1) {
Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
shuffleExecutorComponents.createSingleFileMapOutputWriter(
shuffleId, mapId, taskContext.taskAttemptId());
shuffleId, taskContext.taskAttemptId());
if (maybeSingleFileWriter.isPresent()) {
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
Expand All @@ -300,7 +299,6 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep
final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
.createMapOutputWriter(
shuffleId,
mapId,
taskContext.taskAttemptId(),
partitioner.numPartitions());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,20 @@ public void initializeExecutor(String appId, String execId) {
@Override
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId,
int numPartitions) {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
}
return new LocalDiskShuffleMapOutputWriter(
shuffleId, mapId, numPartitions, blockResolver, sparkConf);
shuffleId, mapTaskAttemptId, numPartitions, blockResolver, sparkConf);
}

@Override
public Optional<SingleSpillShuffleMapOutputWriter> createSingleFileMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId) {
long mapId) {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
Expand Down
Loading