Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make UnsafeExternalSorter's page size configurable
  • Loading branch information
JoshRosen committed Jul 29, 2015
commit ba54d4b203621b5577a76abacaf895fa2836b9bb
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ public final class UnsafeExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);

private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
@VisibleForTesting
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;

private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
private final int initialSize;
Expand Down Expand Up @@ -91,6 +88,7 @@ public UnsafeExternalSorter(
this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();
}

Expand Down Expand Up @@ -147,9 +145,12 @@ public void spill() throws IOException {
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this loop necessary? are page sizes different?

If this is called a lot, it might be better to just maintain an incremental view of this, rather than calculating it each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're not different yet, but this was just some forward-thinking work when I consider how we'l handle large records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only called when spilling and the corresponding methods in the other classes are only called in debugging code which isn't called in normal operation. Therefore, I'm not planning to change this in this patch.

totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
}

@VisibleForTesting
public int getNumberOfAllocatedPages() {
return allocatedPages.size();
Expand Down Expand Up @@ -214,23 +215,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
if (requiredSpace > PAGE_SIZE) {
if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = PAGE_SIZE;
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
Expand Down