Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
private final TaskContext taskContext;
private final ShuffleWriteMetrics writeMetrics;

/** Force this sorter to spill when there are this many elements in memory. For testing only */
/**
* Force this sorter to spill when there are this many elements in memory. The default value is
* 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G.
*/
private final long numElementsForSpillThreshold;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
Expand Down Expand Up @@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.numElementsForSpillThreshold =
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
Expand Down Expand Up @@ -372,7 +375,8 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p

// for tests
assert(inMemSorter != null);
if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
spill();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
Expand Down Expand Up @@ -59,6 +60,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;

/**
* Force this sorter to spill when there are this many elements in memory. The default value is
* 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array to be 8G.
*/
public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 1024 * 1024 / 2;

private final long numElementsForSpillThreshold;
/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
Expand Down Expand Up @@ -88,9 +96,10 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparator, prefixComparator, initialSize,
serializerManager, taskContext, recordComparator, prefixComparator, initialSize, numElementsForSpillThreshold,
pageSizeBytes, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
Expand All @@ -107,9 +116,10 @@ public static UnsafeExternalSorter create(
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null,
canUseRadixSort);
}

Expand All @@ -122,6 +132,7 @@ private UnsafeExternalSorter(
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
@Nullable UnsafeInMemorySorter existingInMemorySorter,
boolean canUseRadixSort) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
Expand All @@ -143,6 +154,7 @@ private UnsafeExternalSorter(
this.inMemSorter = existingInMemorySorter;
}
this.peakMemoryUsedBytes = getMemoryUsage();
this.numElementsForSpillThreshold = numElementsForSpillThreshold;

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
Expand Down Expand Up @@ -373,6 +385,12 @@ public void insertRecord(
Object recordBase, long recordOffset, int length, long prefix, boolean prefixIsNull)
throws IOException {

assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
spill();
}

growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int required = length + 4;
Expand All @@ -384,7 +402,6 @@ public void insertRecord(
pageCursor += 4;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ private UnsafeExternalSorter newSorter() throws IOException {
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes,
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
}

Expand Down Expand Up @@ -399,6 +400,7 @@ public void forcedSpillingWithoutComparator() throws Exception {
null,
/* initialSize */ 1024,
pageSizeBytes,
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
long[] record = new long[100];
int recordSize = record.length * 8;
Expand Down Expand Up @@ -435,6 +437,7 @@ public void testPeakMemoryUsed() throws Exception {
prefixComparator,
1024,
pageSizeBytes,
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());

// Peak memory should be monotonically increasing. More specifically, every time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public UnsafeExternalRowSorter(
prefixComparator,
/* initialSize */ 4096,
pageSizeBytes,
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
canUseRadixSort
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;

/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
Expand Down Expand Up @@ -246,6 +247,8 @@ public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOExcepti
SparkEnv.get().blockManager(),
SparkEnv.get().serializerManager(),
map.getPageSizeBytes(),
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ public UnsafeKVExternalSorter(
StructType valueSchema,
BlockManager blockManager,
SerializerManager serializerManager,
long pageSizeBytes) throws IOException {
this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null);
long pageSizeBytes,
long numElementsForSpillThreshold) throws IOException {
this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, numElementsForSpillThreshold, null);
}

public UnsafeKVExternalSorter(
Expand All @@ -64,6 +65,7 @@ public UnsafeKVExternalSorter(
BlockManager blockManager,
SerializerManager serializerManager,
long pageSizeBytes,
long numElementsForSpillThreshold,
@Nullable BytesToBytesMap map) throws IOException {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
Expand All @@ -88,6 +90,7 @@ public UnsafeKVExternalSorter(
prefixComparator,
/* initialSize */ 4096,
pageSizeBytes,
numElementsForSpillThreshold,
canUseRadixSort);
} else {
// The array will be used to do in-place sort, which require half of the space to be empty.
Expand Down Expand Up @@ -133,6 +136,7 @@ public UnsafeKVExternalSorter(
prefixComparator,
/* initialSize */ 4096,
pageSizeBytes,
numElementsForSpillThreshold,
inMemSorter);

// reset the map, so we can re-use it to insert new records. the inMemSorter will not used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ case class WindowExec(
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)
rows.foreach { r =>
sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter


/** A container for all the details required when writing to a table. */
Expand Down Expand Up @@ -389,7 +390,9 @@ private[sql] class DynamicPartitionWriterContainer(
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get().taskMemoryManager().pageSizeBytes)
TaskContext.get().taskMemoryManager().pageSizeBytes,
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))

while (iterator.hasNext) {
val currentRow = iterator.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)

val partition = split.asInstanceOf[CartesianPartition]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
Expand Down Expand Up @@ -209,7 +210,9 @@ class FileStreamSinkWriter(
StructType.fromAttributes(writeColumns),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get().taskMemoryManager().pageSizeBytes)
TaskContext.get().taskMemoryManager().pageSizeBytes,
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))

while (iterator.hasNext) {
val currentRow = iterator.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
Expand Down Expand Up @@ -123,7 +124,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
metricsSystem = null))

val sorter = new UnsafeKVExternalSorter(
keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, pageSize)
keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager,
pageSize, UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)

// Insert the keys and values into the sorter
inputData.foreach { case (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableJobConf
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
Expand Down Expand Up @@ -280,7 +281,9 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
StructType.fromAttributes(dataOutput),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get().taskMemoryManager().pageSizeBytes)
TaskContext.get().taskMemoryManager().pageSizeBytes,
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))

while (iterator.hasNext) {
val inputRow = iterator.next()
Expand Down