From dd38f6d1bf348e419108f88eac1b8207c9c792f0 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Tue, 18 Aug 2020 23:39:45 -0700 Subject: [PATCH] fix --- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 15 +++++++++++++++ .../sort/io/LocalDiskShuffleMapOutputWriter.java | 6 ++++-- .../spark/shuffle/IndexShuffleBlockResolver.scala | 1 + 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 5515a85295d7..79e38a824fea 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -18,6 +18,7 @@ package org.apache.spark.shuffle.sort; import java.nio.channels.Channels; +import java.util.Arrays; import java.util.Optional; import javax.annotation.Nullable; import java.io.*; @@ -274,6 +275,8 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { // Here, we don't need to perform any metrics updates because the bytes written to this // output file would have already been counted as shuffle bytes written. partitionLengths = spills[0].partitionLengths; + logger.debug("Merge shuffle spills for mapId {} with length {}", mapId, + partitionLengths.length); maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths); } else { partitionLengths = mergeSpillsUsingStandardWriter(spills); @@ -360,6 +363,7 @@ private void mergeSpillsWithFileStream( SpillInfo[] spills, ShuffleMapOutputWriter mapWriter, @Nullable CompressionCodec compressionCodec) throws IOException { + logger.debug("Merge shuffle spills with FileStream for mapId {}", mapId); final int numPartitions = partitioner.numPartitions(); final InputStream[] spillInputStreams = new InputStream[spills.length]; @@ -369,6 +373,11 @@ private void mergeSpillsWithFileStream( spillInputStreams[i] = new NioBufferedFileInputStream( spills[i].file, inputBufferSizeInBytes); + // Only convert the partitionLengths when debug level is enabled. + if (logger.isDebugEnabled()) { + logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i, + Arrays.toString(spills[i].partitionLengths)); + } } for (int partition = 0; partition < numPartitions; partition++) { boolean copyThrewException = true; @@ -431,6 +440,7 @@ private void mergeSpillsWithFileStream( private void mergeSpillsWithTransferTo( SpillInfo[] spills, ShuffleMapOutputWriter mapWriter) throws IOException { + logger.debug("Merge shuffle spills with TransferTo for mapId {}", mapId); final int numPartitions = partitioner.numPartitions(); final FileChannel[] spillInputChannels = new FileChannel[spills.length]; final long[] spillInputChannelPositions = new long[spills.length]; @@ -439,6 +449,11 @@ private void mergeSpillsWithTransferTo( try { for (int i = 0; i < spills.length; i++) { spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); + // Only convert the partitionLengths when debug level is enabled. + if (logger.isDebugEnabled()) { + logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i, + Arrays.toString(spills[i].partitionLengths)); + } } for (int partition = 0; partition < numPartitions; partition++) { boolean copyThrewException = true; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index eea6c762f5c6..0b286264be43 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -113,6 +113,8 @@ public MapOutputCommitMessage commitAllPartitions() throws IOException { } cleanUp(); File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null; + log.debug("Writing shuffle index file for mapId {} with length {}", mapId, + partitionLengths.length); blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp); return MapOutputCommitMessage.of(partitionLengths); } @@ -211,14 +213,14 @@ public long getNumBytesWritten() { private class PartitionWriterStream extends OutputStream { private final int partitionId; - private int count = 0; + private long count = 0; private boolean isClosed = false; PartitionWriterStream(int partitionId) { this.partitionId = partitionId; } - public int getCount() { + public long getCount() { return count; } diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 0d0dad6d77ac..a019a3382d5b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -302,6 +302,7 @@ private[spark] class IndexShuffleBlockResolver( } } } finally { + logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}") if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") }