Skip to content

Commit c685c43

Browse files
cloud-fandongjoon-hyun
authored andcommitted
[SPARK-46700][CORE] Count the last spilling for the shuffle disk spilling bytes metric
### What changes were proposed in this pull request? This PR fixes a long-standing bug in ShuffleExternalSorter about the "spilled disk bytes" metrics. When we close the sorter, we will spill the remaining data in the buffer, with a flag `isLastFile = true`. This flag means the spilling will not increase the "spilled disk bytes" metrics. This makes sense if the sorter has never spilled before, then the final spill file will be used as the final shuffle output file, and we should keep the "spilled disk bytes" metrics as 0. However, if spilling did happen before, then we simply miscount the final spill file for the "spilled disk bytes" metrics today. This PR fixes this issue, by setting that flag when closing the sorter only if this is the first spilling. ### Why are the changes needed? make metrics accurate ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#44709 from cloud-fan/shuffle. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 4ea3742) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent a88eb7e commit c685c43

File tree

3 files changed

+33
-27
lines changed

3 files changed

+33
-27
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,21 @@ public long[] getChecksums() {
150150
* Sorts the in-memory records and writes the sorted records to an on-disk file.
151151
* This method does not free the sort data structures.
152152
*
153-
* @param isLastFile if true, this indicates that we're writing the final output file and that the
154-
* bytes written should be counted towards shuffle spill metrics rather than
155-
* shuffle write metrics.
153+
* @param isFinalFile if true, this indicates that we're writing the final output file and that
154+
* the bytes written should be counted towards shuffle write metrics rather
155+
* than shuffle spill metrics.
156156
*/
157-
private void writeSortedFile(boolean isLastFile) {
157+
private void writeSortedFile(boolean isFinalFile) {
158+
// Only emit the log if this is an actual spilling.
159+
if (!isFinalFile) {
160+
logger.info(
161+
"Task {} on Thread {} spilling sort data of {} to disk ({} {} so far)",
162+
taskContext.taskAttemptId(),
163+
Thread.currentThread().getId(),
164+
Utils.bytesToString(getMemoryUsage()),
165+
spills.size(),
166+
spills.size() != 1 ? " times" : " time");
167+
}
158168

159169
// This call performs the actual sort.
160170
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
@@ -167,13 +177,14 @@ private void writeSortedFile(boolean isLastFile) {
167177

168178
final ShuffleWriteMetricsReporter writeMetricsToUse;
169179

170-
if (isLastFile) {
180+
if (isFinalFile) {
171181
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
172182
writeMetricsToUse = writeMetrics;
173183
} else {
174184
// We're spilling, so bytes written should be counted towards spill rather than write.
175185
// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
176186
// them towards shuffle bytes written.
187+
// The actual shuffle bytes written will be counted when we merge the spill files.
177188
writeMetricsToUse = new ShuffleWriteMetrics();
178189
}
179190

@@ -246,7 +257,7 @@ private void writeSortedFile(boolean isLastFile) {
246257
spills.add(spillInfo);
247258
}
248259

249-
if (!isLastFile) { // i.e. this is a spill file
260+
if (!isFinalFile) { // i.e. this is a spill file
250261
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
251262
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
252263
// relies on its `recordWritten()` method being called in order to trigger periodic updates to
@@ -281,12 +292,6 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
281292
return 0L;
282293
}
283294

284-
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
285-
Thread.currentThread().getId(),
286-
Utils.bytesToString(getMemoryUsage()),
287-
spills.size(),
288-
spills.size() > 1 ? " times" : " time");
289-
290295
writeSortedFile(false);
291296
final long spillSize = freeMemory();
292297
inMemSorter.reset();
@@ -440,8 +445,9 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
440445
*/
441446
public SpillInfo[] closeAndGetSpills() throws IOException {
442447
if (inMemSorter != null) {
443-
// Do not count the final file towards the spill count.
444-
writeSortedFile(true);
448+
// Here we are spilling the remaining data in the buffer. If there is no spill before, this
449+
// final spill file will be the final shuffle output file.
450+
writeSortedFile(/* isFinalFile = */spills.isEmpty());
445451
freeMemory();
446452
inMemSorter.free();
447453
inMemSorter = null;

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,12 +327,6 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep
327327
logger.debug("Using slow merge");
328328
mergeSpillsWithFileStream(spills, mapWriter, compressionCodec);
329329
}
330-
// When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
331-
// in-memory records, we write out the in-memory records to a file but do not count that
332-
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
333-
// to be counted as shuffle write, but this will lead to double-counting of the final
334-
// SpillInfo's bytes.
335-
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
336330
partitionLengths = mapWriter.commitAllPartitions(sorter.getChecksums()).getPartitionLengths();
337331
} catch (Exception e) {
338332
try {

core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class UnsafeShuffleWriterSuite implements ShuffleChecksumTestHelper {
6969
File tempDir;
7070
long[] partitionSizesInMergedFile;
7171
final LinkedList<File> spillFilesCreated = new LinkedList<>();
72+
long totalSpilledDiskBytes = 0;
7273
SparkConf conf;
7374
final Serializer serializer =
7475
new KryoSerializer(new SparkConf().set("spark.kryo.unsafe", "false"));
@@ -96,6 +97,7 @@ public void setUp() throws Exception {
9697
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
9798
partitionSizesInMergedFile = null;
9899
spillFilesCreated.clear();
100+
totalSpilledDiskBytes = 0;
99101
conf = new SparkConf()
100102
.set(package$.MODULE$.BUFFER_PAGESIZE().key(), "1m")
101103
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)
@@ -160,7 +162,11 @@ public void setUp() throws Exception {
160162

161163
when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> {
162164
TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
163-
File file = File.createTempFile("spillFile", ".spill", tempDir);
165+
File file = spy(File.createTempFile("spillFile", ".spill", tempDir));
166+
when(file.delete()).thenAnswer(inv -> {
167+
totalSpilledDiskBytes += file.length();
168+
return inv.callRealMethod();
169+
});
164170
spillFilesCreated.add(file);
165171
return Tuple2$.MODULE$.apply(blockId, file);
166172
});
@@ -284,6 +290,9 @@ public void writeWithoutSpilling() throws Exception {
284290
final Option<MapStatus> mapStatus = writer.stop(true);
285291
assertTrue(mapStatus.isDefined());
286292
assertTrue(mergedOutputFile.exists());
293+
// Even if there is no spill, the sorter still writes its data to a spill file at the end,
294+
// which will become the final shuffle file.
295+
assertEquals(1, spillFilesCreated.size());
287296

288297
long sumOfPartitionSizes = 0;
289298
for (long size: partitionSizesInMergedFile) {
@@ -425,9 +434,8 @@ private void testMergingSpills(
425434
assertSpillFilesWereCleanedUp();
426435
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
427436
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
428-
assertTrue(taskMetrics.diskBytesSpilled() > 0L);
429-
assertTrue(taskMetrics.diskBytesSpilled() < mergedOutputFile.length());
430437
assertTrue(taskMetrics.memoryBytesSpilled() > 0L);
438+
assertEquals(totalSpilledDiskBytes, taskMetrics.diskBytesSpilled());
431439
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
432440
}
433441

@@ -517,9 +525,8 @@ public void writeEnoughDataToTriggerSpill() throws Exception {
517525
assertSpillFilesWereCleanedUp();
518526
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
519527
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
520-
assertTrue(taskMetrics.diskBytesSpilled() > 0L);
521-
assertTrue(taskMetrics.diskBytesSpilled() < mergedOutputFile.length());
522528
assertTrue(taskMetrics.memoryBytesSpilled()> 0L);
529+
assertEquals(totalSpilledDiskBytes, taskMetrics.diskBytesSpilled());
523530
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
524531
}
525532

@@ -550,9 +557,8 @@ private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exc
550557
assertSpillFilesWereCleanedUp();
551558
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
552559
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
553-
assertTrue(taskMetrics.diskBytesSpilled() > 0L);
554-
assertTrue(taskMetrics.diskBytesSpilled() < mergedOutputFile.length());
555560
assertTrue(taskMetrics.memoryBytesSpilled()> 0L);
561+
assertEquals(totalSpilledDiskBytes, taskMetrics.diskBytesSpilled());
556562
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
557563
}
558564

0 commit comments

Comments
 (0)