Skip to content
Prev Previous commit
Next Next commit
preserve the memory in BytsToBytesMap
  • Loading branch information
Davies Liu committed Oct 29, 2015
commit 7bf76e5a29b49c97fd2d2e22ba244e2dba74df51
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public BytesToBytesMap(
TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
}
allocate(initialCapacity);
acquireMemory(longArray.memoryBlock().size());
// preserve the same amount of memory for UnsafeInMemorySorter
acquireMemory(longArray.memoryBlock().size() * 2);
}

public BytesToBytesMap(
Expand Down Expand Up @@ -668,7 +669,8 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
long capacity = Math.min(MAX_CAPACITY,
ByteArrayMethods.nextPowerOf2(growthStrategy.nextCapacity((int) bitset.capacity())));
try {
acquireMemory(capacity * 16L);
// preserve the same amount of memory for UnsafeInMemorySorter
acquireMemory(capacity * 16L * 2);
} catch (OutOfMemoryError e) {
return false;
}
Expand Down Expand Up @@ -701,7 +703,7 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
if (toGrow) {
long usedMemory = longArray.memoryBlock().size();
growAndRehash();
releaseMemory(usedMemory);
releaseMemory(usedMemory * 2);
}
return true;
}
Expand Down Expand Up @@ -758,7 +760,7 @@ private void allocate(int capacity) {
public void free() {
updatePeakMemoryUsed();
if (longArray != null) {
releaseMemory(longArray.memoryBlock().size());
releaseMemory(longArray.memoryBlock().size() * 2);
longArray = null;
}
bitset = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ private UnsafeExternalSorter(
if (existingInMemorySorter == null) {
this.inMemSorter =
new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
acquireMemory(inMemSorter.getMemoryUsage());
} else {
this.inMemSorter = existingInMemorySorter;
// will acquire after free the map
}
acquireMemory(inMemSorter.getMemoryUsage());

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public void randomizedTestWithRecordsLargerThanPageSize() {

@Test
public void failureToAllocateFirstPage() {
memoryManager.limit(1024); // longArray
memoryManager.limit(2048); // longArray
BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 1, PAGE_SIZE_BYTES);
try {
final long[] emptyArray = new long[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ public UnsafeKVExternalSorter(
/* initialSize */ 4096,
pageSizeBytes);
} else {
// Insert the records into the in-memory sorter.
// We will use the number of elements in the map as the initialSize of the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
// we will use 1 as its initial size if the map is empty.
// TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474)
// The memory have been preserved in BytesToBytesMap
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));

Expand Down Expand Up @@ -127,6 +123,7 @@ public UnsafeKVExternalSorter(

sorter.spill();
map.free();
taskContext.taskMemoryManager().acquireExecutionMemory(inMemSorter.getMemoryUsage(), sorter);
}
}

Expand Down