Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make UnsafeShuffle's page size configurable
  • Loading branch information
JoshRosen committed Jul 29, 2015
commit 0045aa29b1489a7d36d285a845fb79cca39c61ee
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ final class UnsafeShuffleExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);

private static final int PAGE_SIZE = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
@VisibleForTesting
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
@VisibleForTesting
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;

private final int initialSize;
private final int numPartitions;
private final int pageSizeBytes;
@VisibleForTesting
final int maxRecordSizeBytes;
private final TaskMemoryManager memoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
Expand Down Expand Up @@ -109,7 +109,10 @@ public UnsafeShuffleExternalSorter(
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;

this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
}
Expand Down Expand Up @@ -272,7 +275,11 @@ void spill() throws IOException {
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
}

private long freeMemory() {
Expand Down Expand Up @@ -346,23 +353,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
if (requiredSpace > PAGE_SIZE) {
if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = PAGE_SIZE;
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public UnsafeShuffleWriter(
open();
}

@VisibleForTesting
public int maxRecordSizeBytes() {
return sorter.maxRecordSizeBytes;
}

/**
* This convenience method should only be called in test code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void setUp() throws IOException {
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
partitionSizesInMergedFile = null;
spillFilesCreated.clear();
conf = new SparkConf();
conf = new SparkConf().set("spark.buffer.pageSize", "128m");
taskMetrics = new TaskMetrics();

when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
Expand Down Expand Up @@ -512,12 +512,12 @@ public void close() { }
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(new byte[1], new byte[1]));
writer.forceSorterToSpill();
// We should be able to write a record that's right _at_ the max record size
final byte[] atMaxRecordSize = new byte[UnsafeShuffleExternalSorter.MAX_RECORD_SIZE];
final byte[] atMaxRecordSize = new byte[writer.maxRecordSizeBytes()];
new Random(42).nextBytes(atMaxRecordSize);
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(new byte[0], atMaxRecordSize));
writer.forceSorterToSpill();
// Inserting a record that's larger than the max record size should fail:
final byte[] exceedsMaxRecordSize = new byte[UnsafeShuffleExternalSorter.MAX_RECORD_SIZE + 1];
final byte[] exceedsMaxRecordSize = new byte[writer.maxRecordSizeBytes() + 1];
new Random(42).nextBytes(exceedsMaxRecordSize);
Product2<Object, Object> hugeRecord =
new Tuple2<Object, Object>(new byte[0], exceedsMaxRecordSize);
Expand Down