From 4101eb9a8efbed2c58d780a2e704d73b2b71d3c4 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Wed, 17 Aug 2016 18:03:43 -0700 Subject: [PATCH] [SPARK-17113][Shuffle] Job failure due to Executor OOM in offheap mode --- .../util/collection/unsafe/sort/UnsafeExternalSorter.java | 2 +- .../util/collection/unsafe/sort/UnsafeInMemorySorter.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 8d596f87d213..ccf76643db2b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -522,7 +522,7 @@ public long spill() throws IOException { // is accessing the current record. We free this page in that caller's next loadNext() // call. for (MemoryBlock page : allocatedPages) { - if (!loaded || page.getBaseObject() != upstream.getBaseObject()) { + if (!loaded || page.pageNumber != ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) { released += page.size(); freePage(page); } else { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 78da38927878..30d0f3006a04 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -248,6 +248,7 @@ public final class SortedIterator extends UnsafeSorterIterator implements Clonea private long baseOffset; private long keyPrefix; private int recordLength; + private long currentPageNumber; private SortedIterator(int numRecords, int offset) { this.numRecords = numRecords; @@ -262,6 +263,7 @@ public SortedIterator clone() { iter.baseOffset = baseOffset; iter.keyPrefix = keyPrefix; iter.recordLength = recordLength; + iter.currentPageNumber = currentPageNumber; return iter; } @@ -279,6 +281,7 @@ public boolean hasNext() { public void loadNext() { // This pointer points to a 4-byte record length, followed by the record's bytes final long recordPointer = array.get(offset + position); + currentPageNumber = memoryManager.decodePageNumber(recordPointer); baseObject = memoryManager.getPage(recordPointer); baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length recordLength = Platform.getInt(baseObject, baseOffset - 4); @@ -292,6 +295,10 @@ public void loadNext() { @Override public long getBaseOffset() { return baseOffset; } + public long getCurrentPageNumber() { + return currentPageNumber; + } + @Override public int getRecordLength() { return recordLength; }