-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle … #31934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,6 @@ | |
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ConcurrentMap; | ||
|
|
@@ -445,9 +444,9 @@ static class PushBlockStreamCallback implements StreamCallbackWithID { | |
| private final AppShufflePartitionInfo partitionInfo; | ||
| private int length = 0; | ||
| // This indicates that this stream got the opportunity to write the blocks to the merged file. | ||
| // Once this is set to true and the stream encounters a failure then it will take necessary | ||
| // action to overwrite any partial written data. This is reset to false when the stream | ||
| // completes without any failures. | ||
| // Once this is set to true and the stream encounters a failure then it will unset the | ||
| // currentMapId of the partition so that another stream can start merging the blocks to the | ||
| // partition. This is reset to false when the stream completes. | ||
| private boolean isWriting = false; | ||
| // Use on-heap instead of direct ByteBuffer since these buffers will be GC'ed very quickly | ||
| private List<ByteBuffer> deferredBufs; | ||
|
|
@@ -477,16 +476,11 @@ public String getID() { | |
| */ | ||
| private void writeBuf(ByteBuffer buf) throws IOException { | ||
| while (buf.hasRemaining()) { | ||
| if (partitionInfo.isEncounteredFailure()) { | ||
| long updatedPos = partitionInfo.getDataFilePos() + length; | ||
| logger.debug( | ||
| "{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}", | ||
| partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, | ||
| partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); | ||
| length += partitionInfo.dataChannel.write(buf, updatedPos); | ||
| } else { | ||
| length += partitionInfo.dataChannel.write(buf); | ||
| } | ||
| long updatedPos = partitionInfo.getDataFilePos() + length; | ||
| logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos {}", | ||
| partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, | ||
| partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); | ||
| length += partitionInfo.dataChannel.write(buf, updatedPos); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -581,7 +575,6 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { | |
| } | ||
| // Check whether we can write to disk | ||
| if (allowedToWrite()) { | ||
| isWriting = true; | ||
| // Identify duplicate block generated by speculative tasks. We respond success to | ||
| // the client in cases of duplicate even though no data is written. | ||
| if (isDuplicateBlock()) { | ||
|
|
@@ -598,6 +591,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { | |
|
|
||
| // If we got here, it's safe to write the block data to the merged shuffle file. We | ||
| // first write any deferred block. | ||
| isWriting = true; | ||
| try { | ||
| if (deferredBufs != null && !deferredBufs.isEmpty()) { | ||
| writeDeferredBufs(); | ||
|
|
@@ -609,16 +603,6 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { | |
| // back to the client so the block could be retried. | ||
| throw ioe; | ||
| } | ||
| // If we got here, it means we successfully write the current chunk of block to merged | ||
| // shuffle file. If we encountered failure while writing the previous block, we should | ||
| // reset the file channel position and the status of partitionInfo to indicate that we | ||
| // have recovered from previous disk write failure. However, we do not update the | ||
| // position tracked by partitionInfo here. That is only updated while the entire block | ||
| // is successfully written to merged shuffle file. | ||
| if (partitionInfo.isEncounteredFailure()) { | ||
| partitionInfo.dataChannel.position(partitionInfo.getDataFilePos() + length); | ||
| partitionInfo.setEncounteredFailure(false); | ||
| } | ||
| } else { | ||
| logger.trace("{} shuffleId {} reduceId {} onData deferred", | ||
| partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, | ||
|
|
@@ -639,7 +623,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { | |
| // written to disk due to this reason. We thus decide to optimize for server | ||
| // throughput and memory usage. | ||
| if (deferredBufs == null) { | ||
| deferredBufs = new LinkedList<>(); | ||
| deferredBufs = new ArrayList<>(); | ||
| } | ||
| // Write the buffer to the in-memory deferred cache. Since buf is a slice of a larger | ||
| // byte buffer, we cache only the relevant bytes not the entire large buffer to save | ||
|
|
@@ -670,7 +654,6 @@ public void onComplete(String streamId) throws IOException { | |
| } | ||
| // Check if we can commit this block | ||
| if (allowedToWrite()) { | ||
| isWriting = true; | ||
| // Identify duplicate block generated by speculative tasks. We respond success to | ||
| // the client in cases of duplicate even though no data is written. | ||
| if (isDuplicateBlock()) { | ||
|
|
@@ -681,6 +664,7 @@ public void onComplete(String streamId) throws IOException { | |
| try { | ||
| if (deferredBufs != null && !deferredBufs.isEmpty()) { | ||
| abortIfNecessary(); | ||
| isWriting = true; | ||
| writeDeferredBufs(); | ||
| } | ||
| } catch (IOException ioe) { | ||
|
|
@@ -738,14 +722,14 @@ public void onFailure(String streamId, Throwable throwable) throws IOException { | |
| Map<Integer, AppShufflePartitionInfo> shufflePartitions = | ||
| mergeManager.partitions.get(partitionInfo.appShuffleId); | ||
| if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) { | ||
| logger.debug("{} shuffleId {} reduceId {} set encountered failure", | ||
| logger.debug("{} shuffleId {} reduceId {} encountered failure", | ||
| partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, | ||
| partitionInfo.reduceId); | ||
| partitionInfo.setCurrentMapIndex(-1); | ||
| partitionInfo.setEncounteredFailure(true); | ||
| } | ||
| } | ||
| } | ||
| isWriting = false; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this into the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can move this to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, keeping it consistent sounds fine. we can leave it as it is since it's trivial. |
||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -802,8 +786,6 @@ public static class AppShufflePartitionInfo { | |
| public FileChannel dataChannel; | ||
| // Location offset of the last successfully merged block for this shuffle partition | ||
| private long dataFilePos; | ||
| // Indicating whether failure was encountered when merging the previous block | ||
| private boolean encounteredFailure; | ||
| // Track the map index whose block is being merged for this shuffle partition | ||
| private int currentMapIndex; | ||
| // Bitmap tracking which mapper's blocks have been merged for this shuffle partition | ||
|
|
@@ -836,7 +818,6 @@ public static class AppShufflePartitionInfo { | |
| // Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex() | ||
| updateChunkInfo(0L, -1); | ||
| this.dataFilePos = 0; | ||
| this.encounteredFailure = false; | ||
| this.mapTracker = new RoaringBitmap(); | ||
| this.chunkTracker = new RoaringBitmap(); | ||
| } | ||
|
|
@@ -851,14 +832,6 @@ public void setDataFilePos(long dataFilePos) { | |
| this.dataFilePos = dataFilePos; | ||
| } | ||
|
|
||
| boolean isEncounteredFailure() { | ||
| return encounteredFailure; | ||
| } | ||
|
|
||
| void setEncounteredFailure(boolean encounteredFailure) { | ||
| this.encounteredFailure = encounteredFailure; | ||
| } | ||
|
|
||
| int getCurrentMapIndex() { | ||
| return currentMapIndex; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.