Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b7c9c23
Move Unsafe mem. mgrs. to spark-core subproject.
JoshRosen Oct 14, 2015
25ba4b5
Merge ExecutorMemoryManager into MemoryManager.
JoshRosen Oct 14, 2015
3d997ce
Naming and formatting fixes.
JoshRosen Oct 16, 2015
d9e6b84
Move Tungsten-related methods to end of MemoryManager file.
JoshRosen Oct 16, 2015
98ef86b
Add taskAttemptId to TaskMemoryManager constructor.
JoshRosen Oct 16, 2015
8f93e94
Move ShuffleMemoryManager into memory package.
JoshRosen Oct 16, 2015
3bbc54d
Merge remote-tracking branch 'origin/master' into SPARK-10984
JoshRosen Oct 16, 2015
88a7970
Fix bug in AbstractBytesToBytesMapSuite.
JoshRosen Oct 16, 2015
ec48ff9
Refactor the existing Tungsten TaskMemoryManager interactions so Tung…
JoshRosen Oct 16, 2015
6f98bc4
Move TaskMemoryManager from unsafe to memory.
JoshRosen Oct 16, 2015
6459397
Further minimization of ShuffleMemoryManager usage.
JoshRosen Oct 16, 2015
60c66b2
Merge ShuffleMemoryManager into MemoryManager.
JoshRosen Oct 17, 2015
7d6a37f
Clean up interaction between TaskMemoryManager and MemoryManager.
JoshRosen Oct 17, 2015
0dc21dc
Merge remote-tracking branch 'origin/master' into SPARK-10984
JoshRosen Oct 22, 2015
f21b767
Fix compilation.
JoshRosen Oct 22, 2015
46ad693
Fix Scalastyle
JoshRosen Oct 22, 2015
c33e330
Fix import ordering in Executor.scala
JoshRosen Oct 22, 2015
ef45d91
Fix import ordering in Task.scala
JoshRosen Oct 22, 2015
c7eac69
Fix import ordering in TaskContextImpl
JoshRosen Oct 22, 2015
d86f435
Fix spillable collection tests
JoshRosen Oct 22, 2015
bba5550
Integrate TaskMemoryManager acquire/releasePage with MemoryManager bo…
JoshRosen Oct 22, 2015
66ae259
Move pooling logic into allocators themselves.
JoshRosen Oct 22, 2015
b1d5151
Scaladoc updates.
JoshRosen Oct 22, 2015
d0c0dd9
Update Spillable to properly integrate with TaskMemoryManager.
JoshRosen Oct 22, 2015
48149fc
Move pageSizeBytes to Tungsten section
JoshRosen Oct 23, 2015
c8ba196
Cleanup after merging of ShuffleMemoryManager into MemoryManager.
JoshRosen Oct 23, 2015
63a6cbc
Rename getMemoryConsumptionForThisTask to getExecutionMemoryUsageForTask
JoshRosen Oct 23, 2015
6ec9c30
Properly thread numCores to memory manager.
JoshRosen Oct 23, 2015
1593fad
Explain why MemoryBlock.pageNumber is public
JoshRosen Oct 23, 2015
64bec0b
Fix TaskMemoryManagerSuite tests.
JoshRosen Oct 23, 2015
f9240e9
Fix compilation
JoshRosen Oct 23, 2015
a95bc08
Fix a memory leak in UnsafeShuffleWriter's sorter
JoshRosen Oct 23, 2015
b3ad761
Remove println
JoshRosen Oct 23, 2015
a7e8320
Fix Scalastyle.
JoshRosen Oct 23, 2015
e874a45
Fix remaining TODOs in UnsafeShuffleWriterSuite.
JoshRosen Oct 23, 2015
2ba6e51
Fix DeveloperAPI change
JoshRosen Oct 23, 2015
0c13723
Address comments in MemoryManager
JoshRosen Oct 23, 2015
04ec429
Release memory acquired after unsuccessful allocatePage() call
JoshRosen Oct 23, 2015
e56d039
Fix EAOM compilation.
JoshRosen Oct 23, 2015
aa14113
Port tests from ShuffleMemoryManagerSuite
JoshRosen Oct 23, 2015
7addf8b
Remove unused non-page-memory allocation methods.
JoshRosen Oct 23, 2015
5af0b17
Update Tungsten tests
JoshRosen Oct 23, 2015
a264703
Fix execution memory leaks in Spillable collections
JoshRosen Oct 24, 2015
f2ab708
Fix NPE in UnsafeRowSerializerSuite
JoshRosen Oct 24, 2015
0b5c72f
Update EAOM tests to reflect fact that iterator() is destructive.
JoshRosen Oct 24, 2015
f68fdb1
Fix streaming test compilation
JoshRosen Oct 26, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move pooling logic into allocators themselves.
  • Loading branch information
JoshRosen committed Oct 22, 2015
commit 66ae259cf123e3744e436eece7a222bf2c36e24c
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public MemoryBlock allocatePage(long size) {
}
return null;
}
final MemoryBlock page = memoryManager.allocateMemoryBlock(size);
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(size);
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
Expand All @@ -187,7 +187,7 @@ public void freePage(MemoryBlock page) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
long pageSize = page.size();
memoryManager.releaseMemoryBlock(page);
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize);
}

Expand All @@ -202,7 +202,7 @@ public void freePage(MemoryBlock page) {
*/
public MemoryBlock allocate(long size) throws OutOfMemoryError {
assert(size > 0) : "Size must be positive, but got " + size;
final MemoryBlock memory = memoryManager.allocateMemoryBlock(size);
final MemoryBlock memory = memoryManager.tungstenMemoryAllocator().allocate(size);
synchronized(allocatedNonPageMemory) {
allocatedNonPageMemory.add(memory);
}
Expand All @@ -214,7 +214,7 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError {
*/
public void free(MemoryBlock memory) {
assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()";
memoryManager.releaseMemoryBlock(memory);
memoryManager.tungstenMemoryAllocator().free(memory);
synchronized(allocatedNonPageMemory) {
final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
Expand Down Expand Up @@ -312,7 +312,7 @@ public long cleanUpAllAllocatedMemory() {
freedBytes += memory.size();
// We don't call free() here because that calls Set.remove, which would lead to a
// ConcurrentModificationException here.
memoryManager.releaseMemoryBlock(memory);
memoryManager.tungstenMemoryAllocator().free(memory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these just be memoryManager.free(memory)? i.e. We still keep the actual allocation code in a separate class, but just provide a helper method to call the internal memory allocator. Then you don't need to say tungstenMemoryAllocator everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Everywhere" is only five places, all in TaskMemoryManager. To address your comment, I think I'd have to add two methods, allocateTungstenMemory() and freeTungstenMemory(), which only forwarded to that field. Given that, I'm not sure that I want to fix this; it's a net increase in code and I don't feel that it necessarily buys us any increase in clarity or correctness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

iter.remove();
}
}
Expand Down
72 changes: 2 additions & 70 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.memory

import java.lang.ref.WeakReference
import java.util
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
Expand All @@ -27,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkException, TaskContext, SparkConf, Logging}
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.{MemoryAllocator, MemoryBlock}
import org.apache.spark.unsafe.memory.MemoryAllocator

/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
Expand Down Expand Up @@ -313,72 +311,6 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int = 1)
* Allocates memory for use by Unsafe/Tungsten code. Exposed to enable untracked allocations of
* temporary data structures.
*/
final val tungstenMemoryAllocator: MemoryAllocator =
private[memory] final val tungstenMemoryAllocator: MemoryAllocator =
if (tungstenMemoryIsAllocatedInHeap) MemoryAllocator.HEAP else MemoryAllocator.UNSAFE

private val POOLING_THRESHOLD_BYTES: Int = 1024 * 1024

/**
* Returns true if allocations of the given size should go through the pooling mechanism and
* false otherwise.
*/
private def shouldPool(size: Long): Boolean = {
// Very small allocations are less likely to benefit from pooling.
// At some point, we should explore supporting pooling for off-heap memory, but for now we'll
// ignore that case in the interest of simplicity.
size >= POOLING_THRESHOLD_BYTES && tungstenMemoryIsAllocatedInHeap
}

@GuardedBy("this")
private val bufferPoolsBySize: util.Map[Long, util.LinkedList[WeakReference[MemoryBlock]]] =
new util.HashMap[Long, util.LinkedList[WeakReference[MemoryBlock]]]

/**
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
* to be zeroed out (call `zero()` on the result if this is necessary). This method does
* not integrate with the memory bookkeeping system, so callers (i.e. TaskMemoryManager) should
* call those methods at appropirate times.
*/
@throws(classOf[OutOfMemoryError])
private[memory] final def allocateMemoryBlock(size: Long): MemoryBlock = {
if (shouldPool(size)) {
this synchronized {
val pool: util.LinkedList[WeakReference[MemoryBlock]] = bufferPoolsBySize.get(size)
if (pool != null) {
while (!pool.isEmpty) {
val blockReference: WeakReference[MemoryBlock] = pool.pop
val memory: MemoryBlock = blockReference.get
if (memory != null) {
assert(memory.size == size)
return memory
}
}
bufferPoolsBySize.remove(size)
}
}
tungstenMemoryAllocator.allocate(size)
} else {
tungstenMemoryAllocator.allocate(size)
}
}

/**
* Releases the given memory block, either freeing it immediately or storing it in a pool for
* re-use by other tasks.
*/
private[memory] final def releaseMemoryBlock(memory: MemoryBlock) {
val size: Long = memory.size
if (shouldPool(size)) {
this synchronized {
var pool: util.LinkedList[WeakReference[MemoryBlock]] = bufferPoolsBySize.get(size)
if (pool == null) {
pool = new util.LinkedList[WeakReference[MemoryBlock]]
bufferPoolsBySize.put(size, pool)
}
pool.add(new WeakReference[MemoryBlock](memory))
}
} else {
tungstenMemoryAllocator.free(memory)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,71 @@

package org.apache.spark.unsafe.memory;

import javax.annotation.concurrent.GuardedBy;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

/**
* A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
*/
public class HeapMemoryAllocator implements MemoryAllocator {

@GuardedBy("this")
private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
new HashMap<>();

private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;

/**
* Returns true if allocations of the given size should go through the pooling mechanism and
* false otherwise.
*/
private boolean shouldPool(long size) {
// Very small allocations are less likely to benefit from pooling.
return size >= POOLING_THRESHOLD_BYTES;
}

@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
if (size % 8 != 0) {
throw new IllegalArgumentException("Size " + size + " was not a multiple of 8");
}
if (shouldPool(size)) {
synchronized (this) {
final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<MemoryBlock> blockReference = pool.pop();
final MemoryBlock memory = blockReference.get();
if (memory != null) {
assert (memory.size() == size);
return memory;
}
}
bufferPoolsBySize.remove(size);
}
}
}
long[] array = new long[(int) (size / 8)];
return MemoryBlock.fromLongArray(array);
}

@Override
public void free(MemoryBlock memory) {
// Do nothing
final long size = memory.size();
if (shouldPool(size)) {
synchronized (this) {
LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(size, pool);
}
pool.add(new WeakReference<>(memory));
}
} else {
// Do nothing
}
}
}