Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
df48e01
LIHADOOP-48527 Magnet shuffle service block transfer netty protocol
Victsm May 9, 2020
6472267
LIHADOOP-53438 Using different appId for the tests in RemoteBlockPush…
otterc May 12, 2020
1c78e1d
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
71f3246
LIHADOOP-53700 Separate configuration for caching the merged index fi…
zhouyejoe Jun 1, 2020
221178f
LIHADOOP-53940 Logging the data file and index file path when shuffle…
otterc Jun 10, 2020
55b4a5f
LIHADOOP-54059 LIHADOOP-53496 Handle the inconsistencies between loc…
otterc Jun 15, 2020
f9d0e86
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
548e2c0
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
50efba9
LIHADOOP-55372 reduced the default for minChunkSizeInMergedShuffleFile
otterc Aug 26, 2020
8a6e01b
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
zhouyejoe Sep 9, 2020
ae5ffac
LIHADOOP-55654 Duplicate application init calls trigger NPE and wrong…
zhouyejoe Sep 12, 2020
e51042b
Further prune changes that should go into a later PR.
Victsm Sep 23, 2020
83aca99
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
04e0efe
LIHADOOP-55022 Disable the merged shuffle file cleanup in stopApplica…
zhouyejoe Aug 11, 2020
71dfd48
Tests and cleanup
otterc Oct 6, 2020
0c411c1
LIHADOOP-55948 Failure in the push stream should not change the curre…
otterc Oct 1, 2020
d029463
Minor style corrections
otterc Oct 15, 2020
8f3839f
Fixed style issues
otterc Oct 15, 2020
1cd2d03
Renamed variables, methods, fixed indentation, addressed other review…
otterc Oct 19, 2020
3356c19
Addressing review comments
otterc Oct 23, 2020
d879beb
Changed the partitions map and addressed other review comments
otterc Oct 26, 2020
48ae819
Added support for subdirs under merge_manager dirs and removed the ya…
otterc Oct 28, 2020
9b031f7
Addressed test failure and other review comments in RemoteBlockPushRe…
otterc Oct 29, 2020
807cc7b
Minor change in finalization
otterc Oct 29, 2020
5b169bc
Removing the partition from inner map after the files are closed
otterc Oct 30, 2020
9ece587
Server side configuration to specify the implementation of MergedShuf…
otterc Oct 30, 2020
d13c7ad
Change the Push block stream to not encode shuffle Id, map index, and…
otterc Nov 2, 2020
63843bb
Fixed typos, address review comments, made NoOp the default impl, and…
otterc Nov 2, 2020
d35aa4b
Addressed review comments
otterc Nov 3, 2020
ba92311
Fix IndexOutOfBoundsException and avoid instantiating AppsPathInfo mu…
otterc Nov 4, 2020
9be25b3
Removed duplicate declaration of shuffle push prefix
otterc Nov 4, 2020
ba51796
Added UT for collision with 2 streams
otterc Nov 4, 2020
1f4fcfe
Removed unnecessary TODOs, marked MergedShuffleFileManager evolving, …
otterc Nov 4, 2020
28edaae
Changed the serialization on chunktracker and removed serializedSizeI…
otterc Nov 6, 2020
cb1881c
Use RandomAccessFile
otterc Nov 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
LIHADOOP-55948 Failure in the push stream should not change the curre…
…ntMap if it's not the active stream

RB=2308973
BUG=LIHADOOP-55948
G=spark-reviewers
R=mshen,vsowrira,yezhou,mmuralid
A=mshen
  • Loading branch information
otterc committed Nov 6, 2020
commit 0c411c15c6f7c7ccd6e8a370884e8b912c1b939a
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public boolean useOldFetchProtocol() {

/**
* The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during
* push-based shuffle.
* push-based shuffle.
* A merged shuffle file consists of multiple small shuffle blocks. Fetching the
* complete merged shuffle file in a single response increases the memory requirements for the
* clients. Instead of serving the entire merged file, the shuffle service serves the
Expand All @@ -383,6 +383,6 @@ public int minChunkSizeInMergedShuffleFile() {
*/
public long mergedIndexCacheSize() {
return JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -80,6 +79,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
private final TransportConf conf;
private final int minChunkSize;
private final String relativeMergeDirPathPattern;
private final ErrorHandler.BlockPushErrorHandler errorHandler;

@SuppressWarnings("UnstableApiUsage")
private final LoadingCache<File, ShuffleIndexInformation> indexCache;
Expand All @@ -104,6 +104,7 @@ public ShuffleIndexInformation load(File file) throws IOException {
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
this.relativeMergeDirPathPattern = relativeMergeDirPathPattern;
this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
}

/**
Expand All @@ -125,7 +126,8 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
if (mergedShuffleFile.exists()) {
return null;
} else {
return new AppShufflePartitionInfo(mergedShuffleFile, mergedIndexFile, mergedMetaFile);
return new AppShufflePartitionInfo(id, mergedShuffleFile, mergedIndexFile,
mergedMetaFile);
}
} catch (IOException e) {
logger.error(
Expand Down Expand Up @@ -354,7 +356,11 @@ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {

return new StreamCallbackWithID() {
private int length = 0;
private boolean canWrite = true;
// This indicates that this stream got the opportunity to write the blocks to the merged file.
// Once this is set to true and the stream encounters a failure then it will take necessary
// action to overwrite the partial written data. This is reset to false when the stream
// completes without any failures.
private boolean startBlockWrite = false;
// Use on-heap instead of direct ByteBuffer since these buffers will be GC'ed very quickly
private List<ByteBuffer> deferredBufs;

Expand All @@ -372,13 +378,61 @@ public String getID() {
private void writeBuf(ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
if (partitionInfo.isEncounteredFailure()) {
length += partitionInfo.channel.write(buf, partitionInfo.getPosition() + length);
long updatedPos = partitionInfo.getPosition() + length;
logger.debug(
"{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}",
partitionId.appId, partitionId.shuffleId, partitionId.reduceId,
partitionInfo.getPosition(), updatedPos);
length += partitionInfo.channel.write(buf, updatedPos);
} else {
length += partitionInfo.channel.write(buf);
}
}
}

/**
* There will be multiple streams of map blocks belonging to the same reduce partition.
* At any given point of time, only a single map stream can write it's data to the merged
* file. Until this stream is completed, the other streams defer writing. This prevents
* corruption of merged data.
* This returns whether this stream is the active stream that can write to the merged file.
*/
private boolean allowedToWrite() {
assert partitionInfo != null;
return partitionInfo.getCurrentMapId() < 0 || partitionInfo.getCurrentMapId() == mapId;
}

/**
* Returns if this is a duplicate block generated by speculative tasks. With speculative
* tasks, we could receive the same block from 2 different sources at the same time.
* One of them is going to be the first to set the currentMapId. When that block does
* so, it's going to see the currentMapId initially as -1. After it sets the
* currentMapId, it's going to write some data to disk, thus increasing the length
* counter. The other duplicate block is going to see the currentMapId already set to
* its mapId. However, it hasn't written any data yet. If the first block gets written
* completely and resets the currentMapId to -1 before the processing for the second
* block finishes, we can just check the bitmap to identify the second as a duplicate.
*/
private boolean isDuplicateBlock() {
assert partitionInfo != null;
return (partitionInfo.getCurrentMapId() == mapId && length == 0)
|| partitionInfo.mapTracker.contains(mapId);
}

/**
* This is only invoked when the stream is able to write.
* The stream first writes any deferred block parts buffered in memory.
*/
private void writeAnyDeferredBlocks() throws IOException {
assert partitionInfo != null;
if (deferredBufs != null && !deferredBufs.isEmpty()) {
for (ByteBuffer deferredBuf : deferredBufs) {
writeBuf(deferredBuf);
}
deferredBufs = null;
}
}

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
// If partition info is null, ignore the requests. It could only be
Expand Down Expand Up @@ -412,34 +466,23 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
return;
}
// Check whether we can write to disk
if (partitionInfo.getCurrentMapId() < 0 || partitionInfo.getCurrentMapId() == mapId) {
// Check if this is a duplicate block generated by speculative tasks. With speculative
// tasks, we could receive the same block from 2 different sources at the same time.
// One of them is going to be the first to set the currentMapId. When that block does
// so, it's going to see the currentMapId initially as -1. After it sets the
// currentMapId, it's going to write some data to disk, thus increasing the length
// counter. The other duplicate block is going to see the currentMapId already set to
// its mapId. However, it hasn't written any data yet. If the first block gets written
// completely and resets the currentMapId to -1 before the processing for the second
// block finishes, we can just check the bitmap to identify the second as a duplicate.
if ((partitionInfo.getCurrentMapId() == mapId && length == 0) ||
partitionInfo.mapTracker.contains(mapId)) {
if (allowedToWrite()) {
// Identify duplicate block generated by speculative tasks. We respond success to
// the client in cases of duplicate even though no data is written.
startBlockWrite = true;
if (isDuplicateBlock()) {
deferredBufs = null;
return;
}
logger.trace("{} shuffleId {} reduceId {} onData writable", partitionId.appId,
partitionId.shuffleId, partitionId.reduceId);
if (partitionInfo.getCurrentMapId() < 0) {
partitionInfo.setCurrentMapId(mapId);
}

// If we got here, it's safe to write the block data to the merged shuffle file. We
// first write any deferred block chunk buffered in memory, then write the remaining
// of the block.
if (deferredBufs != null && !deferredBufs.isEmpty()) {
for (ByteBuffer deferredBuf : deferredBufs) {
writeBuf(deferredBuf);
}
deferredBufs = null;
}
// first write any deferred block.
writeAnyDeferredBlocks();
writeBuf(buf);
// If we got here, it means we successfully write the current chunk of block to merged
// shuffle file. If we encountered failure while writing the previous block, we should
Expand All @@ -452,6 +495,8 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
partitionInfo.setEncounteredFailure(false);
}
} else {
logger.trace("{} shuffleId {} reduceId {} onData deferred", partitionId.appId,
partitionId.shuffleId, partitionId.reduceId);
// If we cannot write to disk, we buffer the current block chunk in memory so it could
// potentially be written to disk later. We take our best effort without guarantee
// that the block will be written to disk. If the block data is divided into multiple
Expand Down Expand Up @@ -481,12 +526,14 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {

@Override
public void onComplete(String streamId) throws IOException {
logger.trace("{} shuffleId {} reduceId {} onComplete invoked", partitionId.appId,
partitionId.shuffleId, partitionId.reduceId);
if (partitionInfo == null) {
if (isTooLate) {
// Throw an exception here so the block data is drained from channel and server
// responds RpcFailure to the client.
throw new RuntimeException(String.format("Block %s %s", msg.blockId,
BlockPushException.TOO_LATE_MESSAGE_SUFFIX));
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
} else {
// For duplicate block that is received before the shuffle merge finalizes, the
// server should respond success to the client.
Expand All @@ -505,23 +552,19 @@ public void onComplete(String streamId) throws IOException {
if (!partitions.containsKey(partitionId)) {
deferredBufs = null;
throw new RuntimeException(String.format("Block %s %s", msg.blockId,
BlockPushException.TOO_LATE_MESSAGE_SUFFIX));
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
}
// Check if we can commit this block
if (partitionInfo.getCurrentMapId() < 0 || partitionInfo.getCurrentMapId() == mapId) {
if (allowedToWrite()) {
startBlockWrite = true;
// Identify duplicate block generated by speculative tasks. We respond success to
// the client in cases of duplicate even though no data is written.
if ((partitionInfo.getCurrentMapId() == mapId && length == 0) ||
partitionInfo.mapTracker.contains(mapId)) {
if (isDuplicateBlock()) {
deferredBufs = null;
return;
}
if (partitionInfo.getCurrentMapId() < 0 && deferredBufs != null
&& !deferredBufs.isEmpty()) {
for (ByteBuffer deferredBuf : deferredBufs) {
writeBuf(deferredBuf);
}
deferredBufs = null;
if (partitionInfo.getCurrentMapId() < 0) {
writeAnyDeferredBlocks();
}
long updatedPos = partitionInfo.getPosition() + length;
boolean indexUpdated = false;
Expand All @@ -539,25 +582,17 @@ public void onComplete(String streamId) throws IOException {
}
} else {
deferredBufs = null;
canWrite = false;
throw new RuntimeException(String.format("%s %s to merged shuffle",
BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX, msg.blockId));
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX,
msg.blockId));
}
}
startBlockWrite = false;
}

@Override
public void onFailure(String streamId, Throwable throwable) throws IOException {
if ((throwable.getMessage() != null &&
(throwable.getMessage().contains(
BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX) ||
throwable.getMessage().contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX))) ||

(throwable.getCause() != null && throwable.getCause().getMessage() != null &&
(throwable.getCause().getMessage().contains(
BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX) ||
throwable.getCause().getMessage().contains(
BlockPushException.TOO_LATE_MESSAGE_SUFFIX)))) {
if (!errorHandler.shouldLogError(throwable)) {
logger.debug("Encountered issue when merging shuffle partition block {}", msg, throwable);
} else {
logger.error("Encountered issue when merging shuffle partition block {}", msg, throwable);
Expand All @@ -566,12 +601,12 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
// request is too late, i.e. received after shuffle merge finalize, #onFailure will
// also be triggered, and we can just ignore. Also, if we couldn't find an opportunity
// to write the block data to disk, we should also ignore here.
if (canWrite && partitionInfo != null && partitions.containsKey(partitionId)) {
if (startBlockWrite && partitionInfo != null && partitions.containsKey(partitionId)) {
synchronized (partitionInfo) {
if (partitionInfo.getCurrentMapId() >= 0 && partitionInfo.getCurrentMapId() == mapId) {
partitionInfo.setCurrentMapId(-1);
partitionInfo.setEncounteredFailure(true);
}
logger.debug("{} shuffleId {} reduceId {} set encountered failure", partitionId.appId,
partitionId.shuffleId, partitionId.reduceId);
partitionInfo.setCurrentMapId(-1);
partitionInfo.setEncounteredFailure(true);
}
}
}
Expand Down Expand Up @@ -645,11 +680,6 @@ public void registerExecutor(String appId, String[] localDirs) {
targetAppId, relativeMergeDirPathPattern, localDirs));
}

@VisibleForTesting
public Path[] getLocalDirs() {
return localDirs;
}

/**
* ID that uniquely identifies a shuffle partition for an application. This is used to key
* the metadata tracked for each shuffle partition that's being actively merged.
Expand Down Expand Up @@ -713,6 +743,8 @@ boolean compareAppShuffleId(String appId, int shuffleId) {
* Metadata tracked for an actively merged shuffle partition
*/
public static class AppShufflePartitionInfo {

private final AppShufflePartitionId partitionId;
// The merged shuffle data file
final File targetFile;
public final FileChannel channel;
Expand Down Expand Up @@ -740,7 +772,11 @@ public static class AppShufflePartitionInfo {
private RoaringBitmap chunkTracker;
ByteBuf trackerBuf = null;

AppShufflePartitionInfo(File targetFile, File indexFile, File metaFile) throws IOException {
AppShufflePartitionInfo(AppShufflePartitionId partitionId,
File targetFile,
File indexFile,
File metaFile) throws IOException {
this.partitionId = Preconditions.checkNotNull(partitionId, "partition id");
targetFile.createNewFile();
this.targetFile = targetFile;
this.channel = new FileOutputStream(targetFile, true).getChannel();
Expand All @@ -764,6 +800,8 @@ public long getPosition() {
}

public void setPosition(long position) {
logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", partitionId.appId,
partitionId.shuffleId, partitionId.reduceId, this.position, position);
this.position = position;
}

Expand All @@ -780,6 +818,8 @@ int getCurrentMapId() {
}

void setCurrentMapId(int mapId) {
logger.trace("{} shuffleId {} reduceId {} updated mapId {} current mapId {}",
partitionId.appId, partitionId.shuffleId, partitionId.reduceId, currentMapId, mapId);
this.currentMapId = mapId;
}

Expand All @@ -788,6 +828,8 @@ long getLastChunkOffset() {
}

void blockMerged(int mapId) {
logger.debug("{} shuffleId {} reduceId {} updated merging mapId {}", partitionId.appId,
partitionId.shuffleId, partitionId.reduceId, mapId);
mapTracker.add(mapId);
chunkTracker.add(mapId);
lastMergedMapId = mapId;
Expand All @@ -808,7 +850,9 @@ void updateChunkInfo(long chunkOffset, int mapId) throws IOException {
// update the chunk tracker to meta file before index file
writeChunkTracker(mapId);
idxStartPos = indexChannel.position();
logger.trace("{} updated index with offset {}", targetFile.getName(), chunkOffset);
logger.trace("{} shuffleId {} reduceId {} updated index current {} updated {}",
partitionId.appId, partitionId.shuffleId, partitionId.reduceId, this.lastChunkOffset,
chunkOffset);
indexWriteStream.writeLong(chunkOffset);
} catch (IOException ioe) {
if (idxStartPos != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,4 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
))
}

test("NM local directories will be sorted") {
s1 = new YarnShuffleService
val localDir1 = "/tmp/b"
val localDir2 = "tmp/a"
yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2)

s1.init(yarnConfig)
val expected = Array(localDir1, localDir2).sorted
s1.shuffleMergeManager.getLocalDirs.map(path => path.toString) should equal(expected)
}
}