Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ final class ShuffleExternalSorter extends MemoryConsumer {
*/
private final int numElementsForSpillThreshold;

/**
* Force this sorter to spill when the size in memory is beyond this threshold.
*/
private final long recordsSizeForSpillThreshold;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;

Expand All @@ -106,6 +111,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
@Nullable private ShuffleInMemorySorter inMemSorter;
@Nullable private MemoryBlock currentPage = null;
private long pageCursor = -1;
private long inMemRecordsSize = 0;

ShuffleExternalSorter(
TaskMemoryManager memoryManager,
Expand All @@ -127,6 +133,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
(int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.numElementsForSpillThreshold =
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
this.recordsSizeForSpillThreshold =
(long) conf.get(package$.MODULE$.SHUFFLE_SPILL_MAP_MAX_SIZE_FORCE_SPILL_THRESHOLD());
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, (boolean) conf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()));
Expand Down Expand Up @@ -316,6 +324,7 @@ private long freeMemory() {
allocatedPages.clear();
currentPage = null;
pageCursor = 0;
inMemRecordsSize = 0;
return memoryFreed;
}

Expand Down Expand Up @@ -397,11 +406,14 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
// for tests
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
logger.info("Spilling data because number of spilledRecords ({}) crossed the threshold: {}",
inMemSorter.numRecords(), numElementsForSpillThreshold);
spill();
} else if (inMemRecordsSize >= recordsSizeForSpillThreshold) {
logger.info("Spilling data because size of spilledRecords ({}) crossed the threshold: {}",
inMemRecordsSize, recordsSizeForSpillThreshold);
spill();
}

growPointerArrayIfNecessary();
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
Expand All @@ -416,6 +428,7 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, partitionId);
inMemRecordsSize += length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also include the uaoSize?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, the pageCursor is also increased by uaoSize and length

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
*/
private final int numElementsForSpillThreshold;

/**
* Force this sorter to spill when the size in memory is beyond this threshold.
*/
private final long maxRecordsSizeForSpillThreshold;

/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
Expand All @@ -86,6 +91,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {

// These variables are reset after spilling:
@Nullable private volatile UnsafeInMemorySorter inMemSorter;
private long inMemRecordsSize = 0;

private MemoryBlock currentPage = null;
private long pageCursor = -1;
Expand All @@ -104,10 +110,12 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
long maxRecordsSizeForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, maxRecordsSizeForSpillThreshold,
inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
sorter.inMemSorter = null;
Expand All @@ -124,10 +132,11 @@ public static UnsafeExternalSorter create(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
long maxRecordsSizeForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes,
numElementsForSpillThreshold, null, canUseRadixSort);
numElementsForSpillThreshold, maxRecordsSizeForSpillThreshold, null, canUseRadixSort);
}

private UnsafeExternalSorter(
Expand All @@ -140,6 +149,7 @@ private UnsafeExternalSorter(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
long maxRecordsSizeForSpillThreshold,
@Nullable UnsafeInMemorySorter existingInMemorySorter,
boolean canUseRadixSort) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
Expand Down Expand Up @@ -170,6 +180,7 @@ private UnsafeExternalSorter(
}
this.peakMemoryUsedBytes = getMemoryUsage();
this.numElementsForSpillThreshold = numElementsForSpillThreshold;
this.maxRecordsSizeForSpillThreshold = maxRecordsSizeForSpillThreshold;

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
Expand Down Expand Up @@ -228,7 +239,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
// pages, we might not be able to get memory for the pointer array.

inMemRecordsSize = 0;
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
totalSpillBytes += spillSize;
Expand Down Expand Up @@ -396,8 +407,11 @@ public void insertRecord(
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
spill();
} else if (inMemRecordsSize >= maxRecordsSizeForSpillThreshold) {
logger.info("Spilling data because size of spilledRecords crossed the threshold " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also include the number of records and threshold here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxRecordsSizeForSpillThreshold);
spill();
}

growPointerArrayIfNecessary();
int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
Expand All @@ -411,6 +425,7 @@ public void insertRecord(
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
inMemRecordsSize += length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, uaoSize

}

/**
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,26 @@ package object config {
.intConf
.createWithDefault(Integer.MAX_VALUE)

private[spark] val SHUFFLE_SPILL_MAP_MAX_SIZE_FORCE_SPILL_THRESHOLD =
ConfigBuilder("spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the "map" mean inside this config name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to have different threshold between map task and reduce task?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question. What's the use case to separate them differently?

.internal()
.doc("The maximum size in memory before forcing the map-side shuffle sorter to spill. " +
"By default it is Long.MAX_VALUE, which means we never force the sorter to spill, " +
"until we reach some limitations, like the max page size limitation for the pointer " +
"array in the sorter.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amanomer, you can make this configuration optional via createOptional to represent no limit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder, we need to attach version info for the new configuration now. Just use .version().


private[spark] val SHUFFLE_SPILL_REDUCE_MAX_SIZE_FORCE_SPILL_THRESHOLD =
ConfigBuilder("spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold")
.internal()
.doc("The maximum size in memory before forcing the reduce-side to spill. " +
"By default it is Long.MAX_VALUE, which means we never force the sorter to spill, " +
"until we reach some limitations, like the max page size limitation for the pointer " +
"array in the sorter.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.


private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
private[this] val initialMemoryThreshold: Long =
SparkEnv.get.conf.get(SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD)

// Force this collection to spill when its size is greater than this threshold
private[this] val maxSizeForceSpillThreshold: Long =
SparkEnv.get.conf.get(SHUFFLE_SPILL_REDUCE_MAX_SIZE_FORCE_SPILL_THRESHOLD)

// Force this collection to spill when there are this many elements in memory
// For testing only
private[this] val numElementsForceSpillThreshold: Int =
Expand Down Expand Up @@ -81,7 +85,11 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
*/
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Check number of elements or memory usage limits, whichever is hit first
if (_elementsRead > numElementsForceSpillThreshold
|| currentMemory > maxSizeForceSpillThreshold) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wonder if we need maxSizeForceSpillThreshold here since we've already have memory towards control here?

shouldSpill = true
} else if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
Expand All @@ -90,11 +98,10 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
logSpillage(currentMemory, elementsRead)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
Expand Down Expand Up @@ -141,10 +148,10 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
*
* @param size number of bytes spilled
*/
@inline private def logSpillage(size: Long): Unit = {
@inline private def logSpillage(size: Long, elements: Int) {
val threadId = Thread.currentThread().getId
logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
logInfo("Thread %d spilling in-memory map of %s (elements: %d) to disk (%d time%s so far)"
.format(threadId, org.apache.spark.util.Utils.bytesToString(size), elements,
_spillCount, if (_spillCount > 1) "s" else ""))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ public int compare(
private final long pageSizeBytes = conf.getSizeAsBytes(
package$.MODULE$.BUFFER_PAGESIZE().key(), "4m");

private final int spillThreshold =
private final int spillElementsThreshold =
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());

private final long spillSizeThreshold =
(long) conf.get(package$.MODULE$.SHUFFLE_SPILL_MAP_MAX_SIZE_FORCE_SPILL_THRESHOLD());

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -167,7 +170,8 @@ private UnsafeExternalSorter newSorter() throws IOException {
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes,
spillThreshold,
spillElementsThreshold,
spillSizeThreshold,
shouldUseRadixSort());
}

Expand Down Expand Up @@ -394,7 +398,8 @@ public void forcedSpillingWithoutComparator() throws Exception {
null,
/* initialSize */ 1024,
pageSizeBytes,
spillThreshold,
spillElementsThreshold,
spillSizeThreshold,
shouldUseRadixSort());
long[] record = new long[100];
int recordSize = record.length * 8;
Expand Down Expand Up @@ -456,7 +461,8 @@ public void testPeakMemoryUsed() throws Exception {
prefixComparator,
1024,
pageSizeBytes,
spillThreshold,
spillElementsThreshold,
spillSizeThreshold,
shouldUseRadixSort());

// Peak memory should be monotonically increasing. More specifically, every time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,13 @@ object SQLConf {
.intConf
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)

val WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.size.threshold")
.internal()
.doc("Threshold for size of rows to be spilled by window operator")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(SHUFFLE_SPILL_REDUCE_MAX_SIZE_FORCE_SPILL_THRESHOLD.defaultValue.get)

val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
.internal()
Expand All @@ -1531,6 +1538,13 @@ object SQLConf {
.intConf
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)

val SORT_MERGE_JOIN_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.size.threshold")
.internal()
.doc("Threshold for size of rows to be spilled by sort merge join operator")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(SHUFFLE_SPILL_MAP_MAX_SIZE_FORCE_SPILL_THRESHOLD.defaultValue.get)

val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
.internal()
Expand All @@ -1546,6 +1560,15 @@ object SQLConf {
.intConf
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)


val CARTESIAN_PRODUCT_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.spill.size.threshold")
.internal()
.doc("Threshold for size of rows to be spilled by cartesian product operator")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(SHUFFLE_SPILL_MAP_MAX_SIZE_FORCE_SPILL_THRESHOLD.defaultValue.get)


val SUPPORT_QUOTED_REGEX_COLUMN_NAME = buildConf("spark.sql.parser.quotedRegexColumnNames")
.doc("When true, quoted Identifiers (using backticks) in SELECT statement are interpreted" +
" as regular expressions.")
Expand Down Expand Up @@ -2647,18 +2670,26 @@ class SQLConf extends Serializable with Logging {

def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)

def windowExecBufferSpillSizeThreshold: Long = getConf(WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD)

def sortMergeJoinExecBufferInMemoryThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def sortMergeJoinExecBufferSpillThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)

def sortMergeJoinExecBufferSpillSizeThreshold: Long =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SIZE_SPILL_THRESHOLD)

def cartesianProductExecBufferInMemoryThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def cartesianProductExecBufferSpillThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)

def cartesianProductExecBufferSizeSpillThreshold: Long =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SIZE_SPILL_THRESHOLD)

def codegenSplitAggregateFunc: Boolean = getConf(SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC)

def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private UnsafeExternalRowSorter(
pageSizeBytes,
(int) SparkEnv.get().conf().get(
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
(long) SparkEnv.get().conf().get(
package$.MODULE$.SHUFFLE_SPILL_MAP_MAX_SIZE_FORCE_SPILL_THRESHOLD()),
canUseRadixSort
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOExcepti
map.getPageSizeBytes(),
(int) SparkEnv.get().conf().get(
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
(long) SparkEnv.get().conf().get(
package$.MODULE$.SHUFFLE_SPILL_MAP_MAX_SIZE_FORCE_SPILL_THRESHOLD()),
map);
}
}
Loading