Skip to content

Commit 07f46af

Browse files
Sital Kediadavies
authored andcommitted
[SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital Kedia <skedia@fb.com> Closes #13107 from sitalkedia/fix_TimSort.
1 parent 5344bad commit 07f46af

File tree

12 files changed

+60
-12
lines changed

12 files changed

+60
-12
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
7272
private final TaskContext taskContext;
7373
private final ShuffleWriteMetrics writeMetrics;
7474

75-
/** Force this sorter to spill when there are this many elements in memory. For testing only */
75+
/**
76+
* Force this sorter to spill when there are this many elements in memory. The default value is
77+
* 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G.
78+
*/
7679
private final long numElementsForSpillThreshold;
7780

7881
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
114117
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
115118
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
116119
this.numElementsForSpillThreshold =
117-
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
120+
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
118121
this.writeMetrics = writeMetrics;
119122
this.inMemSorter = new ShuffleInMemorySorter(
120123
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
@@ -372,7 +375,8 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
372375

373376
// for tests
374377
assert(inMemSorter != null);
375-
if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
378+
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
379+
logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
376380
spill();
377381
}
378382

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import org.apache.spark.SparkEnv;
3031
import org.apache.spark.TaskContext;
3132
import org.apache.spark.executor.ShuffleWriteMetrics;
3233
import org.apache.spark.memory.MemoryConsumer;
@@ -59,6 +60,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
5960
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
6061
private final int fileBufferSizeBytes;
6162

63+
/**
64+
* Force this sorter to spill when there are this many elements in memory. The default value is
65+
* 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array to be 8G.
66+
*/
67+
public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 1024 * 1024 / 2;
68+
69+
private final long numElementsForSpillThreshold;
6270
/**
6371
* Memory pages that hold the records being sorted. The pages in this list are freed when
6472
* spilling, although in principle we could recycle these pages across spills (on the other hand,
@@ -88,9 +96,10 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
8896
PrefixComparator prefixComparator,
8997
int initialSize,
9098
long pageSizeBytes,
99+
long numElementsForSpillThreshold,
91100
UnsafeInMemorySorter inMemorySorter) throws IOException {
92101
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
93-
serializerManager, taskContext, recordComparator, prefixComparator, initialSize,
102+
serializerManager, taskContext, recordComparator, prefixComparator, initialSize, numElementsForSpillThreshold,
94103
pageSizeBytes, inMemorySorter, false /* ignored */);
95104
sorter.spill(Long.MAX_VALUE, sorter);
96105
// The external sorter will be used to insert records, in-memory sorter is not needed.
@@ -107,9 +116,10 @@ public static UnsafeExternalSorter create(
107116
PrefixComparator prefixComparator,
108117
int initialSize,
109118
long pageSizeBytes,
119+
long numElementsForSpillThreshold,
110120
boolean canUseRadixSort) {
111121
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
112-
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null,
122+
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null,
113123
canUseRadixSort);
114124
}
115125

@@ -122,6 +132,7 @@ private UnsafeExternalSorter(
122132
PrefixComparator prefixComparator,
123133
int initialSize,
124134
long pageSizeBytes,
135+
long numElementsForSpillThreshold,
125136
@Nullable UnsafeInMemorySorter existingInMemorySorter,
126137
boolean canUseRadixSort) {
127138
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
@@ -143,6 +154,7 @@ private UnsafeExternalSorter(
143154
this.inMemSorter = existingInMemorySorter;
144155
}
145156
this.peakMemoryUsedBytes = getMemoryUsage();
157+
this.numElementsForSpillThreshold = numElementsForSpillThreshold;
146158

147159
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
148160
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
@@ -373,6 +385,12 @@ public void insertRecord(
373385
Object recordBase, long recordOffset, int length, long prefix, boolean prefixIsNull)
374386
throws IOException {
375387

388+
assert(inMemSorter != null);
389+
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
390+
logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
391+
spill();
392+
}
393+
376394
growPointerArrayIfNecessary();
377395
// Need 4 bytes to store the record length.
378396
final int required = length + 4;
@@ -384,7 +402,6 @@ public void insertRecord(
384402
pageCursor += 4;
385403
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
386404
pageCursor += length;
387-
assert(inMemSorter != null);
388405
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
389406
}
390407

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ private UnsafeExternalSorter newSorter() throws IOException {
176176
prefixComparator,
177177
/* initialSize */ 1024,
178178
pageSizeBytes,
179+
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
179180
shouldUseRadixSort());
180181
}
181182

@@ -399,6 +400,7 @@ public void forcedSpillingWithoutComparator() throws Exception {
399400
null,
400401
/* initialSize */ 1024,
401402
pageSizeBytes,
403+
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
402404
shouldUseRadixSort());
403405
long[] record = new long[100];
404406
int recordSize = record.length * 8;
@@ -435,6 +437,7 @@ public void testPeakMemoryUsed() throws Exception {
435437
prefixComparator,
436438
1024,
437439
pageSizeBytes,
440+
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
438441
shouldUseRadixSort());
439442

440443
// Peak memory should be monotonically increasing. More specifically, every time

sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public UnsafeExternalRowSorter(
8989
sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
9090
DEFAULT_INITIAL_SORT_BUFFER_SIZE),
9191
pageSizeBytes,
92+
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
93+
.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
9294
canUseRadixSort
9395
);
9496
}

sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.spark.unsafe.KVIterator;
3030
import org.apache.spark.unsafe.Platform;
3131
import org.apache.spark.unsafe.map.BytesToBytesMap;
32+
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;
3233

3334
/**
3435
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
@@ -246,6 +247,8 @@ public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOExcepti
246247
SparkEnv.get().blockManager(),
247248
SparkEnv.get().serializerManager(),
248249
map.getPageSizeBytes(),
250+
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
251+
.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
249252
map);
250253
}
251254
}

sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ public UnsafeKVExternalSorter(
5555
StructType valueSchema,
5656
BlockManager blockManager,
5757
SerializerManager serializerManager,
58-
long pageSizeBytes) throws IOException {
59-
this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null);
58+
long pageSizeBytes,
59+
long numElementsForSpillThreshold) throws IOException {
60+
this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, numElementsForSpillThreshold, null);
6061
}
6162

6263
public UnsafeKVExternalSorter(
@@ -65,6 +66,7 @@ public UnsafeKVExternalSorter(
6566
BlockManager blockManager,
6667
SerializerManager serializerManager,
6768
long pageSizeBytes,
69+
long numElementsForSpillThreshold,
6870
@Nullable BytesToBytesMap map) throws IOException {
6971
this.keySchema = keySchema;
7072
this.valueSchema = valueSchema;
@@ -90,6 +92,7 @@ public UnsafeKVExternalSorter(
9092
SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
9193
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
9294
pageSizeBytes,
95+
numElementsForSpillThreshold,
9396
canUseRadixSort);
9497
} else {
9598
// The array will be used to do in-place sort, which require half of the space to be empty.
@@ -136,6 +139,7 @@ public UnsafeKVExternalSorter(
136139
SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
137140
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
138141
pageSizeBytes,
142+
numElementsForSpillThreshold,
139143
inMemSorter);
140144

141145
// reset the map, so we can re-use it to insert new records. the inMemSorter will not used

sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ case class WindowExec(
345345
null,
346346
1024,
347347
SparkEnv.get.memoryManager.pageSizeBytes,
348+
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
349+
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
348350
false)
349351
rows.foreach { r =>
350352
sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
3636
import org.apache.spark.sql.internal.SQLConf
3737
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
3838
import org.apache.spark.util.{SerializableConfiguration, Utils}
39+
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
3940

4041

4142
/** A container for all the details required when writing to a table. */
@@ -389,7 +390,9 @@ private[sql] class DynamicPartitionWriterContainer(
389390
StructType.fromAttributes(dataColumns),
390391
SparkEnv.get.blockManager,
391392
SparkEnv.get.serializerManager,
392-
TaskContext.get().taskMemoryManager().pageSizeBytes)
393+
TaskContext.get().taskMemoryManager().pageSizeBytes,
394+
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
395+
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
393396

394397
while (iterator.hasNext) {
395398
val currentRow = iterator.next()

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
4949
null,
5050
1024,
5151
SparkEnv.get.memoryManager.pageSizeBytes,
52+
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
53+
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
5254
false)
5355

5456
val partition = split.asInstanceOf[CartesianPartition]

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
3333
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils}
3434
import org.apache.spark.sql.types.{StringType, StructType}
3535
import org.apache.spark.util.SerializableConfiguration
36+
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
3637

3738
object FileStreamSink {
3839
// The name of the subdirectory that is used to store metadata about which files are valid.
@@ -209,7 +210,9 @@ class FileStreamSinkWriter(
209210
StructType.fromAttributes(writeColumns),
210211
SparkEnv.get.blockManager,
211212
SparkEnv.get.serializerManager,
212-
TaskContext.get().taskMemoryManager().pageSizeBytes)
213+
TaskContext.get().taskMemoryManager().pageSizeBytes,
214+
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
215+
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
213216

214217
while (iterator.hasNext) {
215218
val currentRow = iterator.next()

0 commit comments

Comments
 (0)