Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
further rename
  • Loading branch information
xuanyuanking committed Sep 18, 2019
commit c86f6cc4d0fded370621e73e4ac910e8f1b01dd3
Original file line number Diff line number Diff line change
Expand Up @@ -321,30 +321,30 @@ private class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {
private final String appId;
private final String execId;
private final int shuffleId;
private final long[] mapTaskIds;
private final long[] mapIds;
private final int[][] reduceIds;

ShuffleManagedBufferIterator(FetchShuffleBlocks msg) {
appId = msg.appId;
execId = msg.execId;
shuffleId = msg.shuffleId;
mapTaskIds = msg.mapTaskIds;
mapIds = msg.mapIds;
reduceIds = msg.reduceIds;
}

@Override
public boolean hasNext() {
// mapTaskIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
// must have non-empty mapTaskIds and reduceIds, see the checking logic in
// mapIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add check logic here to be safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Xingbo mean a double check here? Basically there's existing checking for both the length and non-empty.

if (blockIds.length == 0) {
throw new IllegalArgumentException("Zero-sized blockIds array");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea if the place is not super performance critical I'd prefer a double check here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done the double-check in 00e78b2.

// must have non-empty mapIds and reduceIds, see the checking logic in
// OneForOneBlockFetcher.
assert(mapTaskIds.length != 0 && mapTaskIds.length == reduceIds.length);
return mapIdx < mapTaskIds.length && reduceIdx < reduceIds[mapIdx].length;
assert(mapIds.length != 0 && mapIds.length == reduceIds.length);
return mapIdx < mapIds.length && reduceIdx < reduceIds[mapIdx].length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(
appId, execId, shuffleId, mapTaskIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
appId, execId, shuffleId, mapIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
if (reduceIdx < reduceIds[mapIdx].length - 1) {
reduceIdx += 1;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ public void registerExecutor(
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapTaskId, reduceId). We make assumptions
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
long mapTaskId,
long mapId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking why change this from int to long? Is it possible that a mapId can be greater than 2^31?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous the map id is the index of the mapper, and can get conflicts when we re-run the task. Now the map id is the task id, which is unique. task id needs to be long.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after this patch, we set mapId by using the taskAttemptId of map task, which is a unique Id within the same SparkContext. You can see the comment #25620 (comment)

int reduceId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapTaskId, reduceId);
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
}

public ManagedBuffer getRddBlockData(
Expand Down Expand Up @@ -291,23 +291,22 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
}

/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapTaskId_0.index" into a data
* file called "shuffle_ShuffleId_MapTaskId_0.data".
* This logic is from IndexShuffleBlockResolver, and the block id format is from
* ShuffleDataBlockId and ShuffleIndexBlockId.
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, long mapTaskId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, long mapId, int reduceId) {
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.index");
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.data"),
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private boolean isShuffleBlocks(String[] blockIds) {

/**
* Analyze the pass in blockIds and create FetchShuffleBlocks message.
* The blockIds has been sorted by mapTaskId and reduceId. It's produced in
* The blockIds has been sorted by mapId and reduceId. It's produced in
* org.apache.spark.MapOutputTracker.convertMapStatuses.
*/
private FetchShuffleBlocks createFetchShuffleBlocksMsg(
Expand All @@ -121,21 +121,21 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
long mapTaskId = blockIdParts.middle;
if (!mapIdToReduceIds.containsKey(mapTaskId)) {
mapIdToReduceIds.put(mapTaskId, new ArrayList<>());
long mapId = blockIdParts.middle;
if (!mapIdToReduceIds.containsKey(mapId)) {
mapIdToReduceIds.put(mapId, new ArrayList<>());
}
mapIdToReduceIds.get(mapTaskId).add(blockIdParts.right);
mapIdToReduceIds.get(mapId).add(blockIdParts.right);
}
long[] mapTaskIds = Longs.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapTaskIds.length][];
for (int i = 0; i < mapTaskIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapTaskIds[i]));
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapIds.length][];
for (int i = 0; i < mapIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
}
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIdArr);
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIdArr);
}

/** Split the shuffleBlockId and return shuffleId, mapTaskId and reduceId. */
/** Split the shuffleBlockId and return shuffleId, mapId and reduceId. */
private ImmutableTriple<Integer, Long, Integer> splitBlockId(String blockId) {
String[] blockIdParts = blockId.split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ public class FetchShuffleBlocks extends BlockTransferMessage {
public final String appId;
public final String execId;
public final int shuffleId;
// The length of mapTaskIds must equal to reduceIds.size(), for the i-th mapTaskId in mapTaskIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map task.
public final long[] mapTaskIds;
// The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
public final long[] mapIds;
public final int[][] reduceIds;

public FetchShuffleBlocks(
String appId,
String execId,
int shuffleId,
long[] mapTaskIds,
long[] mapIds,
int[][] reduceIds) {
this.appId = appId;
this.execId = execId;
this.shuffleId = shuffleId;
this.mapTaskIds = mapTaskIds;
this.mapIds = mapIds;
this.reduceIds = reduceIds;
assert(mapTaskIds.length == reduceIds.length);
assert(mapIds.length == reduceIds.length);
}

@Override
Expand All @@ -60,7 +60,7 @@ public String toString() {
.add("appId", appId)
.add("execId", execId)
.add("shuffleId", shuffleId)
.add("mapTaskIds", Arrays.toString(mapTaskIds))
.add("mapIds", Arrays.toString(mapIds))
.add("reduceIds", Arrays.deepToString(reduceIds))
.toString();
}
Expand All @@ -75,7 +75,7 @@ public boolean equals(Object o) {
if (shuffleId != that.shuffleId) return false;
if (!appId.equals(that.appId)) return false;
if (!execId.equals(that.execId)) return false;
if (!Arrays.equals(mapTaskIds, that.mapTaskIds)) return false;
if (!Arrays.equals(mapIds, that.mapIds)) return false;
return Arrays.deepEquals(reduceIds, that.reduceIds);
}

Expand All @@ -84,7 +84,7 @@ public int hashCode() {
int result = appId.hashCode();
result = 31 * result + execId.hashCode();
result = 31 * result + shuffleId;
result = 31 * result + Arrays.hashCode(mapTaskIds);
result = 31 * result + Arrays.hashCode(mapIds);
result = 31 * result + Arrays.deepHashCode(reduceIds);
return result;
}
Expand All @@ -98,7 +98,7 @@ public int encodedLength() {
return Encoders.Strings.encodedLength(appId)
+ Encoders.Strings.encodedLength(execId)
+ 4 /* encoded length of shuffleId */
+ Encoders.LongArrays.encodedLength(mapTaskIds)
+ Encoders.LongArrays.encodedLength(mapIds)
+ 4 /* encoded length of reduceIds.size() */
+ encodedLengthOfReduceIds;
}
Expand All @@ -108,7 +108,7 @@ public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, execId);
buf.writeInt(shuffleId);
Encoders.LongArrays.encode(buf, mapTaskIds);
Encoders.LongArrays.encode(buf, mapIds);
buf.writeInt(reduceIds.length);
for (int[] ids: reduceIds) {
Encoders.IntArrays.encode(buf, ids);
Expand All @@ -119,12 +119,12 @@ public static FetchShuffleBlocks decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
String execId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
long[] mapTaskIds = Encoders.LongArrays.decode(buf);
long[] mapIds = Encoders.LongArrays.decode(buf);
int reduceIdsSize = buf.readInt();
int[][] reduceIds = new int[reduceIdsSize][];
for (int i = 0; i < reduceIdsSize; i++) {
reduceIds[i] = Encoders.IntArrays.decode(buf);
}
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIds);
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final Partitioner partitioner;
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final long mapTaskId;
private final long mapId;
private final Serializer serializer;
private final ShuffleExecutorComponents shuffleExecutorComponents;

Expand All @@ -105,7 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BypassMergeSortShuffleWriter(
BlockManager blockManager,
BypassMergeSortShuffleHandle<K, V> handle,
long mapTaskId,
long mapId,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics,
ShuffleExecutorComponents shuffleExecutorComponents) {
Expand All @@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapTaskId = mapTaskId;
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
Expand All @@ -127,12 +127,12 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapTaskId, numPartitions);
.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
blockManager.shuffleServerId(), partitionLengths, mapId);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -166,7 +166,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
blockManager.shuffleServerId(), partitionLengths, mapId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final ShuffleWriteMetricsReporter writeMetrics;
private final ShuffleExecutorComponents shuffleExecutorComponents;
private final int shuffleId;
private final long mapTaskId;
private final long mapId;
private final TaskContext taskContext;
private final SparkConf sparkConf;
private final boolean transferToEnabled;
Expand Down Expand Up @@ -122,7 +122,7 @@ public UnsafeShuffleWriter(
}
this.blockManager = blockManager;
this.memoryManager = memoryManager;
this.mapTaskId = taskContext.taskAttemptId();
this.mapId = taskContext.taskAttemptId();
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.shuffleId = dep.shuffleId();
this.serializer = dep.serializer().newInstance();
Expand Down Expand Up @@ -228,7 +228,7 @@ void closeAndWriteOutput() throws IOException {
}
}
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
blockManager.shuffleServerId(), partitionLengths, mapId);
}

@VisibleForTesting
Expand Down Expand Up @@ -264,11 +264,11 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
long[] partitionLengths;
if (spills.length == 0) {
final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapTaskId, partitioner.numPartitions());
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
return mapWriter.commitAllPartitions();
} else if (spills.length == 1) {
Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapTaskId);
shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);
if (maybeSingleFileWriter.isPresent()) {
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
Expand All @@ -293,7 +293,7 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapTaskId, partitioner.numPartitions());
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
try {
// There are multiple spills to merge, so none of these spill files' lengths were counted
// towards our shuffle write count or shuffle write time. If we use the slow merge path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ class NettyBlockRpcServer(
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)

case fetchShuffleBlocks: FetchShuffleBlocks =>
val blocks = fetchShuffleBlocks.mapTaskIds.zipWithIndex.flatMap {
case (mapTaskId, index) =>
fetchShuffleBlocks.reduceIds.apply(index).map { reduceId =>
blockManager.getBlockData(
ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapTaskId, reduceId))
}
val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) =>
fetchShuffleBlocks.reduceIds.apply(index).map { reduceId =>
blockManager.getBlockData(
ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))
}
}
val numBlockIds = fetchShuffleBlocks.reduceIds.map(_.length).sum
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,26 @@ private[spark] class IndexShuffleBlockResolver(

private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")

def getDataFile(shuffleId: Int, mapTaskId: Long): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapTaskId, NOOP_REDUCE_ID))
def getDataFile(shuffleId: Int, mapId: Long): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

private def getIndexFile(shuffleId: Int, mapTaskId: Long): File = {
blockManager.diskBlockManager.getFile(
ShuffleIndexBlockId(shuffleId, mapTaskId, NOOP_REDUCE_ID))
private def getIndexFile(shuffleId: Int, mapId: Long): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

/**
* Remove data file and index file that contain the output data from one map.
*/
def removeDataByMap(shuffleId: Int, mapTaskId: Long): Unit = {
var file = getDataFile(shuffleId, mapTaskId)
def removeDataByMap(shuffleId: Int, mapId: Long): Unit = {
var file = getDataFile(shuffleId, mapId)
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting data ${file.getPath()}")
}
}

file = getIndexFile(shuffleId, mapTaskId)
file = getIndexFile(shuffleId, mapId)
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting index ${file.getPath()}")
Expand Down Expand Up @@ -136,13 +135,13 @@ private[spark] class IndexShuffleBlockResolver(
*/
def writeIndexFileAndCommit(
shuffleId: Int,
mapTaskId: Long,
mapId: Long,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapTaskId)
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val dataFile = getDataFile(shuffleId, mapTaskId)
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
Expand Down
Loading