Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
df48e01
LIHADOOP-48527 Magnet shuffle service block transfer netty protocol
Victsm May 9, 2020
6472267
LIHADOOP-53438 Using different appId for the tests in RemoteBlockPush…
otterc May 12, 2020
1c78e1d
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
71f3246
LIHADOOP-53700 Separate configuration for caching the merged index fi…
zhouyejoe Jun 1, 2020
221178f
LIHADOOP-53940 Logging the data file and index file path when shuffle…
otterc Jun 10, 2020
55b4a5f
LIHADOOP-54059 LIHADOOP-53496 Handle the inconsistencies between loc…
otterc Jun 15, 2020
f9d0e86
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
548e2c0
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
50efba9
LIHADOOP-55372 reduced the default for minChunkSizeInMergedShuffleFile
otterc Aug 26, 2020
8a6e01b
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
zhouyejoe Sep 9, 2020
ae5ffac
LIHADOOP-55654 Duplicate application init calls trigger NPE and wrong…
zhouyejoe Sep 12, 2020
e51042b
Further prune changes that should go into a later PR.
Victsm Sep 23, 2020
83aca99
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
04e0efe
LIHADOOP-55022 Disable the merged shuffle file cleanup in stopApplica…
zhouyejoe Aug 11, 2020
71dfd48
Tests and cleanup
otterc Oct 6, 2020
0c411c1
LIHADOOP-55948 Failure in the push stream should not change the curre…
otterc Oct 1, 2020
d029463
Minor style corrections
otterc Oct 15, 2020
8f3839f
Fixed style issues
otterc Oct 15, 2020
1cd2d03
Renamed variables, methods, fixed indentation, addressed other review…
otterc Oct 19, 2020
3356c19
Addressing review comments
otterc Oct 23, 2020
d879beb
Changed the partitions map and addressed other review comments
otterc Oct 26, 2020
48ae819
Added support for subdirs under merge_manager dirs and removed the ya…
otterc Oct 28, 2020
9b031f7
Addressed test failure and other review comments in RemoteBlockPushRe…
otterc Oct 29, 2020
807cc7b
Minor change in finalization
otterc Oct 29, 2020
5b169bc
Removing the partition from inner map after the files are closed
otterc Oct 30, 2020
9ece587
Server side configuration to specify the implementation of MergedShuf…
otterc Oct 30, 2020
d13c7ad
Change the Push block stream to not encode shuffle Id, map index, and…
otterc Nov 2, 2020
63843bb
Fixed typos, address review comments, made NoOp the default impl, and…
otterc Nov 2, 2020
d35aa4b
Addressed review comments
otterc Nov 3, 2020
ba92311
Fix IndexOutOfBoundsException and avoid instantiating AppsPathInfo mu…
otterc Nov 4, 2020
9be25b3
Removed duplicate declaration of shuffle push prefix
otterc Nov 4, 2020
ba51796
Added UT for collision with 2 streams
otterc Nov 4, 2020
1f4fcfe
Removed unnecessary TODOs, marked MergedShuffleFileManager evolving, …
otterc Nov 4, 2020
28edaae
Changed the serialization on chunktracker and removed serializedSizeI…
otterc Nov 6, 2020
cb1881c
Use RandomAccessFile
otterc Nov 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tests and cleanup
  • Loading branch information
otterc committed Nov 6, 2020
commit 71dfd48d32b9ce797744a0fb2cb2db6d52e1e422
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,25 @@ public boolean useOldFetchProtocol() {
}

/**
* The minimum size of a chunk when dividing a merged shuffle file in push based shuffle into
* multiple chunks. This is an optimization so that when push based shuffle merges multiple
* shuffle blocks belonging to the same shuffle partition into a merged shuffle file, it
* divides the merged shuffle file into multiple MB-sized chunks to optimize reading it later.
* A corresponding index file for each merged shuffle file will be generated indicating chunk
* boundaries.
* The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during
* push-based shuffle.
* A merged shuffle file consists of multiple small shuffle blocks. Fetching the
* complete merged shuffle file in a single response increases the memory requirements for the
* clients. Instead of serving the entire merged file, the shuffle service serves the
* merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this
* configuration controls how big a chunk can get. A corresponding index file for each merged
* shuffle file will be generated indicating chunk boundaries.
*/
public int minChunkSizeInMergedShuffleFile() {
return Ints.checkedCast(JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
}

/**
* The size of cache used in push-based shuffle for storing merged index files.
*/
public long mergedIndexCacheSize() {
return JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.apache.spark.network.util.TransportConf;

/**
* An implementation of MergedShuffleFileManager that provides the most essential shuffle
* An implementation of {@link MergedShuffleFileManager} that provides the most essential shuffle
* service processing logic to support push based shuffle.
*/
public class RemoteBlockPushResolver implements MergedShuffleFileManager {
Expand All @@ -81,6 +81,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
private final int minChunkSize;
private final String relativeMergeDirPathPattern;

@SuppressWarnings("UnstableApiUsage")
private final LoadingCache<File, ShuffleIndexInformation> indexCache;

@SuppressWarnings("UnstableApiUsage")
Expand All @@ -92,15 +93,14 @@ public RemoteBlockPushResolver(TransportConf conf, String relativeMergeDirPathPa
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
String indexCacheSize = conf.get("spark.shuffle.service.mergedIndex.cache.size", "100m");
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
indexCache = CacheBuilder.newBuilder()
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.maximumWeight(conf.mergedIndexCacheSize())
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
this.relativeMergeDirPathPattern = relativeMergeDirPathPattern;
Expand Down Expand Up @@ -208,7 +208,7 @@ private File getFile(String appId, String filename) {
String relativePath = getRelativePath(appPathsInfo.user, appId);
Path filePath = localDir.resolve(relativePath);
File targetFile = new File(filePath.toFile(), filename);
logger.debug("Get the file for {}", targetFile.getAbsolutePath());
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}

Expand Down Expand Up @@ -526,7 +526,7 @@ public void onComplete(String streamId) throws IOException {
long updatedPos = partitionInfo.getPosition() + length;
boolean indexUpdated = false;
if (updatedPos - partitionInfo.getLastChunkOffset() >= minChunkSize) {
partitionInfo.updateLastChunkOffset(updatedPos, mapId);
partitionInfo.updateChunkInfo(updatedPos, mapId);
indexUpdated = true;
}
partitionInfo.setPosition(updatedPos);
Expand Down Expand Up @@ -568,8 +568,10 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
// to write the block data to disk, we should also ignore here.
if (canWrite && partitionInfo != null && partitions.containsKey(partitionId)) {
synchronized (partitionInfo) {
partitionInfo.setCurrentMapId(-1);
partitionInfo.setEncounteredFailure(true);
if (partitionInfo.getCurrentMapId() >= 0 && partitionInfo.getCurrentMapId() == mapId) {
partitionInfo.setCurrentMapId(-1);
partitionInfo.setEncounteredFailure(true);
}
}
}
}
Expand Down Expand Up @@ -597,7 +599,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
try {
partition.channel.truncate(partition.getPosition());
if (partition.getPosition() != partition.getLastChunkOffset()) {
partition.updateLastChunkOffset(partition.getPosition(), partition.lastMergedMapId);
partition.updateChunkInfo(partition.getPosition(), partition.lastMergedMapId);
}
bitmaps.add(partition.mapTracker);
reduceIds.add(partitionId.reduceId);
Expand Down Expand Up @@ -722,15 +724,19 @@ public static class AppShufflePartitionInfo {
private int currentMapId;
// Bitmap tracking which mapper's blocks have been merged for this shuffle partition
private RoaringBitmap mapTracker;
// The merged shuffle index file
// The index file for a particular merged shuffle contains the chunk offsets.
private final FileChannel indexChannel;
/**
* The meta file for a particular merged shuffle contains all the map ids that belong to every
* chunk. The entry per chunk is a serialized bitmap.
*/
private final FileChannel metaChannel;
private final DataOutputStream indexWriteStream;
// The offset for the last chunk tracked in the index file for this shuffle partition
private long lastChunkOffset;
private int lastMergedMapId;
private int lastMergedMapId = -1;

// Bitmap tracking which mapper's blocks are in shuffle chunk
// Bitmap tracking which mapper's blocks are in the current shuffle chunk
private RoaringBitmap chunkTracker;
ByteBuf trackerBuf = null;

Expand All @@ -746,7 +752,7 @@ public static class AppShufflePartitionInfo {
metaChannel = new FileOutputStream(metaFile, true).getChannel();
this.currentMapId = -1;
// Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex()
updateLastChunkOffset(0L, -1);
updateChunkInfo(0L, -1);
this.position = 0;
this.encounteredFailure = false;
this.mapTracker = new RoaringBitmap();
Expand Down Expand Up @@ -791,14 +797,19 @@ void resetChunkTracker() {
chunkTracker.clear();
}

void updateLastChunkOffset(long lastChunkOffset, int mapId) throws IOException {
/**
* Appends the chunk offset to the index file and adds the mapId to the chunk tracker.
* @param chunkOffset the offset of the chunk in the data file.
* @param mapId the mapId to be added to chunk tracker.
*/
void updateChunkInfo(long chunkOffset, int mapId) throws IOException {
long idxStartPos = -1;
try {
// update the chunk tracker to meta file before index file
writeChunkTracker(mapId);
idxStartPos = indexChannel.position();
logger.trace("{} updated index with offset {}", targetFile.getName(), lastChunkOffset);
indexWriteStream.writeLong(lastChunkOffset);
logger.trace("{} updated index with offset {}", targetFile.getName(), chunkOffset);
indexWriteStream.writeLong(chunkOffset);
} catch (IOException ioe) {
if (idxStartPos != -1) {
// reset the position to avoid corrupting index files during exception.
Expand All @@ -807,7 +818,7 @@ void updateLastChunkOffset(long lastChunkOffset, int mapId) throws IOException {
}
throw ioe;
}
this.lastChunkOffset = lastChunkOffset;
this.lastChunkOffset = chunkOffset;
}

private void writeChunkTracker(int mapId) throws IOException {
Expand All @@ -834,7 +845,7 @@ private void writeChunkTracker(int mapId) throws IOException {
}

/**
* Wraps all the information related to the merge_dir of an application.
* Wraps all the information related to the merge directory of an application.
*/
private static class AppPathsInfo {

Expand Down
Loading