Skip to content

Commit dd38f6d

Browse files
committed
fix
1 parent e15ae60 commit dd38f6d

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.shuffle.sort;
1919

2020
import java.nio.channels.Channels;
21+
import java.util.Arrays;
2122
import java.util.Optional;
2223
import javax.annotation.Nullable;
2324
import java.io.*;
@@ -274,6 +275,8 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
274275
// Here, we don't need to perform any metrics updates because the bytes written to this
275276
// output file would have already been counted as shuffle bytes written.
276277
partitionLengths = spills[0].partitionLengths;
278+
logger.debug("Merge shuffle spills for mapId {} with length {}", mapId,
279+
partitionLengths.length);
277280
maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths);
278281
} else {
279282
partitionLengths = mergeSpillsUsingStandardWriter(spills);
@@ -360,6 +363,7 @@ private void mergeSpillsWithFileStream(
360363
SpillInfo[] spills,
361364
ShuffleMapOutputWriter mapWriter,
362365
@Nullable CompressionCodec compressionCodec) throws IOException {
366+
logger.debug("Merge shuffle spills with FileStream for mapId {}", mapId);
363367
final int numPartitions = partitioner.numPartitions();
364368
final InputStream[] spillInputStreams = new InputStream[spills.length];
365369

@@ -369,6 +373,11 @@ private void mergeSpillsWithFileStream(
369373
spillInputStreams[i] = new NioBufferedFileInputStream(
370374
spills[i].file,
371375
inputBufferSizeInBytes);
376+
// Only convert the partitionLengths when debug level is enabled.
377+
if (logger.isDebugEnabled()) {
378+
logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i,
379+
Arrays.toString(spills[i].partitionLengths));
380+
}
372381
}
373382
for (int partition = 0; partition < numPartitions; partition++) {
374383
boolean copyThrewException = true;
@@ -431,6 +440,7 @@ private void mergeSpillsWithFileStream(
431440
private void mergeSpillsWithTransferTo(
432441
SpillInfo[] spills,
433442
ShuffleMapOutputWriter mapWriter) throws IOException {
443+
logger.debug("Merge shuffle spills with TransferTo for mapId {}", mapId);
434444
final int numPartitions = partitioner.numPartitions();
435445
final FileChannel[] spillInputChannels = new FileChannel[spills.length];
436446
final long[] spillInputChannelPositions = new long[spills.length];
@@ -439,6 +449,11 @@ private void mergeSpillsWithTransferTo(
439449
try {
440450
for (int i = 0; i < spills.length; i++) {
441451
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
452+
// Only convert the partitionLengths when debug level is enabled.
453+
if (logger.isDebugEnabled()) {
454+
logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i,
455+
Arrays.toString(spills[i].partitionLengths));
456+
}
442457
}
443458
for (int partition = 0; partition < numPartitions; partition++) {
444459
boolean copyThrewException = true;

core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public MapOutputCommitMessage commitAllPartitions() throws IOException {
113113
}
114114
cleanUp();
115115
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
116+
log.debug("Writing shuffle index file for mapId {} with length {}", mapId,
117+
partitionLengths.length);
116118
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
117119
return MapOutputCommitMessage.of(partitionLengths);
118120
}
@@ -211,14 +213,14 @@ public long getNumBytesWritten() {
211213

212214
private class PartitionWriterStream extends OutputStream {
213215
private final int partitionId;
214-
private int count = 0;
216+
private long count = 0;
215217
private boolean isClosed = false;
216218

217219
PartitionWriterStream(int partitionId) {
218220
this.partitionId = partitionId;
219221
}
220222

221-
public int getCount() {
223+
public long getCount() {
222224
return count;
223225
}
224226

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ private[spark] class IndexShuffleBlockResolver(
302302
}
303303
}
304304
} finally {
305+
logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}")
305306
if (indexTmp.exists() && !indexTmp.delete()) {
306307
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
307308
}

0 commit comments

Comments
 (0)