Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,13 @@ public void spill() throws IOException {
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;

/**
* Allocates a LongArray of `size`.
* Allocates a LongArray of `size`. Note that this method may throw `OutOfMemoryError` if Spark
* doesn't have enough memory for this allocation, or throw `TooLargePageException` if this
* `LongArray` is too large to fit in a single page. The caller side should take care of these
* two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
*
* @throws OutOfMemoryError
* @throws TooLargePageException
*/
public LongArray allocateArray(long size) {
long required = size * 8L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,14 @@ public long pageSizeBytes() {
*
* Returns `null` if there was not enough memory to allocate the page. May return a page that
* contains fewer bytes than requested, so callers should verify the size of returned pages.
*
* @throws TooLargePageException
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
assert(consumer != null);
assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
throw new TooLargePageException(size);
}

long acquired = acquireExecutionMemory(size, consumer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.memory;

public class TooLargePageException extends RuntimeException {
TooLargePageException(long size) {
super("Cannot allocate a page of " + size + " bytes.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TooLargePageException;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.BlockManager;
Expand All @@ -43,7 +45,6 @@
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
import org.apache.spark.internal.config.package$;

/**
* An external sorter that is specialized for sort-based shuffle.
Expand Down Expand Up @@ -75,10 +76,9 @@ final class ShuffleExternalSorter extends MemoryConsumer {
private final ShuffleWriteMetrics writeMetrics;

/**
* Force this sorter to spill when there are this many elements in memory. The default value is
* 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G.
* Force this sorter to spill when there are this many elements in memory.
*/
private final long numElementsForSpillThreshold;
private final int numElementsForSpillThreshold;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;
Expand Down Expand Up @@ -123,7 +123,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
this.fileBufferSizeBytes =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.numElementsForSpillThreshold =
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
Expand Down Expand Up @@ -325,14 +325,18 @@ public void cleanupResources() {
* array and grows the array if additional space is required. If the required space cannot be
* obtained, then the in-memory data will be spilled to disk.
*/
private void growPointerArrayIfNecessary() {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
spill();
return;
} catch (OutOfMemoryError e) {
// should have trigger spilling
if (!inMemSorter.hasSpaceForAnotherRecord()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TooLargePageException;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
Expand Down Expand Up @@ -68,12 +69,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final int fileBufferSizeBytes;

/**
* Force this sorter to spill when there are this many elements in memory. The default value is
* 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array to be 8G.
* Force this sorter to spill when there are this many elements in memory.
*/
public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 1024 * 1024 / 2;
private final int numElementsForSpillThreshold;

private final long numElementsForSpillThreshold;
/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
Expand Down Expand Up @@ -103,11 +102,11 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
int numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
numElementsForSpillThreshold, pageSizeBytes, inMemorySorter, false /* ignored */);
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
sorter.inMemSorter = null;
Expand All @@ -123,7 +122,7 @@ public static UnsafeExternalSorter create(
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
int numElementsForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes,
Expand All @@ -139,7 +138,7 @@ private UnsafeExternalSorter(
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
int numElementsForSpillThreshold,
@Nullable UnsafeInMemorySorter existingInMemorySorter,
boolean canUseRadixSort) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
Expand Down Expand Up @@ -338,14 +337,18 @@ public void cleanupResources() {
* array and grows the array if additional space is required. If the required space cannot be
* obtained, then the in-memory data will be spilled to disk.
*/
private void growPointerArrayIfNecessary() {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
spill();
return;
} catch (OutOfMemoryError e) {
// should have trigger spilling
if (!inMemSorter.hasSpaceForAnotherRecord()) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,14 @@ package object config {
.stringConf
.toSequence
.createOptional

private[spark] val SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD =
ConfigBuilder("spark.shuffle.spill.numElementsForceSpillThreshold")
.internal()
.doc("The maximum number of elements in memory before forcing the shuffle sorter to spill. " +
"By default it's Integer.MAX_VALUE, which means we never force the sorter to spill, " +
"until we reach some limitations, like the max page size limitation for the pointer " +
"array in the sorter.")
.intConf
.createWithDefault(Integer.MAX_VALUE)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.JavaSerializer;
Expand Down Expand Up @@ -86,6 +87,9 @@ public int compare(

private final long pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "4m");

private final int spillThreshold =
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -159,7 +163,7 @@ private UnsafeExternalSorter newSorter() throws IOException {
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes,
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
spillThreshold,
shouldUseRadixSort());
}

Expand Down Expand Up @@ -383,7 +387,7 @@ public void forcedSpillingWithoutComparator() throws Exception {
null,
/* initialSize */ 1024,
pageSizeBytes,
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
spillThreshold,
shouldUseRadixSort());
long[] record = new long[100];
int recordSize = record.length * 8;
Expand Down Expand Up @@ -445,7 +449,7 @@ public void testPeakMemoryUsed() throws Exception {
prefixComparator,
1024,
pageSizeBytes,
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
spillThreshold,
shouldUseRadixSort());

// Peak memory should be monotonically increasing. More specifically, every time
Expand Down Expand Up @@ -548,4 +552,3 @@ private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -89,8 +90,8 @@ public UnsafeExternalRowSorter(
sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
DEFAULT_INITIAL_SORT_BUFFER_SIZE),
pageSizeBytes,
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
(int) SparkEnv.get().conf().get(
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
canUseRadixSort
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ object SQLConf {
.internal()
.doc("Threshold for number of rows to be spilled by window operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)

val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
Expand All @@ -899,7 +899,7 @@ object SQLConf {
.internal()
.doc("Threshold for number of rows to be spilled by sort merge join operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)

val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
Expand All @@ -914,7 +914,7 @@ object SQLConf {
.internal()
.doc("Threshold for number of rows to be spilled by cartesian product operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)

val SUPPORT_QUOTED_REGEX_COLUMN_NAME = buildConf("spark.sql.parser.quotedRegexColumnNames")
.doc("When true, quoted Identifiers (using backticks) in SELECT statement are interpreted" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.spark.SparkEnv;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
Expand All @@ -29,7 +30,6 @@
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;

/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
Expand Down Expand Up @@ -238,8 +238,8 @@ public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOExcepti
SparkEnv.get().blockManager(),
SparkEnv.get().serializerManager(),
map.getPageSizeBytes(),
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
(int) SparkEnv.get().conf().get(
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public UnsafeKVExternalSorter(
BlockManager blockManager,
SerializerManager serializerManager,
long pageSizeBytes,
long numElementsForSpillThreshold) throws IOException {
int numElementsForSpillThreshold) throws IOException {
this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes,
numElementsForSpillThreshold, null);
}
Expand All @@ -68,7 +68,7 @@ public UnsafeKVExternalSorter(
BlockManager blockManager,
SerializerManager serializerManager,
long pageSizeBytes,
long numElementsForSpillThreshold,
int numElementsForSpillThreshold,
@Nullable BytesToBytesMap map) throws IOException {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.aggregate

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand Down Expand Up @@ -315,9 +315,7 @@ class SortBasedAggregator(
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get().taskMemoryManager().pageSizeBytes,
SparkEnv.get.conf.getLong(
"spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
SparkEnv.get.conf.get(config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD),
null
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import java.{util => ju}

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.config
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, TypedImperativeAggregate}
Expand Down Expand Up @@ -73,9 +74,7 @@ class ObjectAggregationMap() {
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get().taskMemoryManager().pageSizeBytes,
SparkEnv.get.conf.getLong(
"spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
SparkEnv.get.conf.get(config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD),
null
)

Expand Down
Loading