Skip to content

Commit fc7f9f5

Browse files
author
Andrew Or
committed
Address comments
1 parent 9de2e20 commit fc7f9f5

File tree

8 files changed

+70
-78
lines changed

8 files changed

+70
-78
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class SparkEnv (
7070
val httpFileServer: HttpFileServer,
7171
val sparkFilesDir: String,
7272
val metricsSystem: MetricsSystem,
73-
// TODO: unify these *MemoryManager classes
73+
// TODO: unify these *MemoryManager classes (SPARK-10984)
7474
val memoryManager: MemoryManager,
7575
val shuffleMemoryManager: ShuffleMemoryManager,
7676
val executorMemoryManager: ExecutorMemoryManager,

core/src/main/scala/org/apache/spark/memory/MemoryManager.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,35 @@ package org.apache.spark.memory
1919

2020
import scala.collection.mutable
2121

22-
import org.apache.spark.storage.{BlockId, BlockStatus}
22+
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
2323

2424

2525
/**
2626
* An abstract memory manager that enforces how memory is shared between execution and storage.
2727
*
2828
* In this context, execution memory refers to that used for computation in shuffles, joins,
2929
* sorts and aggregations, while storage memory refers to that used for caching and propagating
30-
* internal data across the cluster.
30+
* internal data across the cluster. There exists one of these per JVM.
3131
*/
3232
private[spark] abstract class MemoryManager {
3333

34+
// The memory store used to evict cached blocks
35+
private var _memoryStore: MemoryStore = _
36+
protected def memoryStore: MemoryStore = {
37+
if (_memoryStore == null) {
38+
throw new IllegalArgumentException("memory store not initialized yet")
39+
}
40+
_memoryStore
41+
}
42+
43+
/**
44+
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
45+
* This must be set after construction due to initialization ordering constraints.
46+
*/
47+
def setMemoryStore(store: MemoryStore): Unit = {
48+
_memoryStore = store
49+
}
50+
3451
/**
3552
* Acquire N bytes of memory for execution.
3653
* @return number of bytes successfully granted (<= N).

core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala

Lines changed: 30 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.memory
1919

2020
import scala.collection.mutable
2121

22-
import org.apache.spark.{Logging, SparkConf, SparkEnv}
23-
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
22+
import org.apache.spark.{Logging, SparkConf}
23+
import org.apache.spark.storage.{BlockId, BlockStatus}
2424

2525

2626
/**
@@ -41,27 +41,10 @@ private[spark] class StaticMemoryManager(
4141
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
4242
}
4343

44-
// Amount of execution memory in use. Accesses must be synchronized on `executionLock`.
44+
// Amount of execution / storage memory in use
45+
// Accesses must be synchronized on `this`
4546
private var _executionMemoryUsed: Long = 0
46-
private val executionLock = new Object
47-
48-
// Amount of storage memory in use. Accesses must be synchronized on `storageLock`.
4947
private var _storageMemoryUsed: Long = 0
50-
private val storageLock = new Object
51-
52-
// The memory store used to evict cached blocks
53-
private var _memoryStore: MemoryStore = _
54-
private def memoryStore: MemoryStore = {
55-
if (_memoryStore == null) {
56-
_memoryStore = SparkEnv.get.blockManager.memoryStore
57-
}
58-
_memoryStore
59-
}
60-
61-
// For testing only
62-
def setMemoryStore(store: MemoryStore): Unit = {
63-
_memoryStore = store
64-
}
6548

6649
def this(conf: SparkConf) {
6750
this(
@@ -74,13 +57,11 @@ private[spark] class StaticMemoryManager(
7457
* Acquire N bytes of memory for execution.
7558
* @return number of bytes successfully granted (<= N).
7659
*/
77-
override def acquireExecutionMemory(numBytes: Long): Long = {
78-
executionLock.synchronized {
79-
assert(_executionMemoryUsed <= maxExecutionMemory)
80-
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
81-
_executionMemoryUsed += bytesToGrant
82-
bytesToGrant
83-
}
60+
override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
61+
assert(_executionMemoryUsed <= maxExecutionMemory)
62+
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
63+
_executionMemoryUsed += bytesToGrant
64+
bytesToGrant
8465
}
8566

8667
/**
@@ -130,7 +111,7 @@ private[spark] class StaticMemoryManager(
130111
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
131112
// Note: Keep this outside synchronized block to avoid potential deadlocks!
132113
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
133-
storageLock.synchronized {
114+
synchronized {
134115
assert(_storageMemoryUsed <= maxStorageMemory)
135116
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
136117
if (enoughMemory) {
@@ -143,40 +124,34 @@ private[spark] class StaticMemoryManager(
143124
/**
144125
* Release N bytes of execution memory.
145126
*/
146-
override def releaseExecutionMemory(numBytes: Long): Unit = {
147-
executionLock.synchronized {
148-
if (numBytes > _executionMemoryUsed) {
149-
logWarning(s"Attempted to release $numBytes bytes of execution " +
150-
s"memory when we only have ${_executionMemoryUsed} bytes")
151-
_executionMemoryUsed = 0
152-
} else {
153-
_executionMemoryUsed -= numBytes
154-
}
127+
override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
128+
if (numBytes > _executionMemoryUsed) {
129+
logWarning(s"Attempted to release $numBytes bytes of execution " +
130+
s"memory when we only have ${_executionMemoryUsed} bytes")
131+
_executionMemoryUsed = 0
132+
} else {
133+
_executionMemoryUsed -= numBytes
155134
}
156135
}
157136

158137
/**
159138
* Release N bytes of storage memory.
160139
*/
161-
override def releaseStorageMemory(numBytes: Long): Unit = {
162-
storageLock.synchronized {
163-
if (numBytes > _storageMemoryUsed) {
164-
logWarning(s"Attempted to release $numBytes bytes of storage " +
165-
s"memory when we only have ${_storageMemoryUsed} bytes")
166-
_storageMemoryUsed = 0
167-
} else {
168-
_storageMemoryUsed -= numBytes
169-
}
140+
override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
141+
if (numBytes > _storageMemoryUsed) {
142+
logWarning(s"Attempted to release $numBytes bytes of storage " +
143+
s"memory when we only have ${_storageMemoryUsed} bytes")
144+
_storageMemoryUsed = 0
145+
} else {
146+
_storageMemoryUsed -= numBytes
170147
}
171148
}
172149

173150
/**
174151
* Release all storage memory acquired.
175152
*/
176-
override def releaseStorageMemory(): Unit = {
177-
storageLock.synchronized {
178-
_storageMemoryUsed = 0
179-
}
153+
override def releaseStorageMemory(): Unit = synchronized {
154+
_storageMemoryUsed = 0
180155
}
181156

182157
/**
@@ -189,19 +164,15 @@ private[spark] class StaticMemoryManager(
189164
/**
190165
* Amount of execution memory currently in use, in bytes.
191166
*/
192-
override def executionMemoryUsed: Long = {
193-
executionLock.synchronized {
194-
_executionMemoryUsed
195-
}
167+
override def executionMemoryUsed: Long = synchronized {
168+
_executionMemoryUsed
196169
}
197170

198171
/**
199172
* Amount of storage memory currently in use, in bytes.
200173
*/
201-
override def storageMemoryUsed: Long = {
202-
storageLock.synchronized {
203-
_storageMemoryUsed
204-
}
174+
override def storageMemoryUsed: Long = synchronized {
175+
_storageMemoryUsed
205176
}
206177

207178
}

core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ private[spark] object ShuffleMemoryManager {
165165
*/
166166
def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = {
167167
val conf = new SparkConf
168-
val memoryManager = new StaticMemoryManager(conf, maxMemory, Long.MaxValue)
168+
val memoryManager = new StaticMemoryManager(
169+
conf, maxExecutionMemory = maxMemory, maxStorageMemory = Long.MaxValue)
169170
new ShuffleMemoryManager(memoryManager, pageSizeBytes)
170171
}
171172

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ private[spark] class BlockManager(
8989
externalBlockStoreInitialized = true
9090
new ExternalBlockStore(this, executorId)
9191
}
92+
memoryManager.setMemoryStore(memoryStore)
9293

9394
private val maxMemory = memoryManager.maxStorageMemory
9495

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
496496
memory: Long,
497497
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
498498
accountingLock.synchronized {
499+
// Note: all acquisitions of unroll memory must be synchronized on `accountingLock`
499500
val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks)
500501
if (success) {
501502
val taskAttemptId = currentTaskAttemptId()

core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
3838
assert(mm.acquireExecutionMemory(100L) === 100L)
3939
// Acquire up to the max
4040
assert(mm.acquireExecutionMemory(1000L) === 890L)
41-
assert(mm.executionMemoryUsed === 1000L)
41+
assert(mm.executionMemoryUsed === maxExecutionMem)
4242
assert(mm.acquireExecutionMemory(1L) === 0L)
43-
assert(mm.executionMemoryUsed === 1000L)
43+
assert(mm.executionMemoryUsed === maxExecutionMem)
4444
mm.releaseExecutionMemory(800L)
4545
assert(mm.executionMemoryUsed === 200L)
4646
// Acquire after release
@@ -54,35 +54,36 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
5454
test("basic storage memory") {
5555
val maxStorageMem = 1000L
5656
val dummyBlock = TestBlockId("you can see the world you brought to live")
57-
val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
57+
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
5858
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
5959
assert(mm.storageMemoryUsed === 0L)
60-
assert(mm.acquireStorageMemory(dummyBlock, 10L, dummyBlocks))
60+
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
6161
// `ensureFreeSpace` should be called with the number of bytes requested
6262
assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L)
6363
assert(mm.storageMemoryUsed === 10L)
64-
assert(dummyBlocks.isEmpty)
65-
assert(mm.acquireStorageMemory(dummyBlock, 100L, dummyBlocks))
64+
assert(evictedBlocks.isEmpty)
65+
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
6666
assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
67+
assert(mm.storageMemoryUsed === 110L)
6768
// Acquire up to the max, not granted
68-
assert(!mm.acquireStorageMemory(dummyBlock, 1000L, dummyBlocks))
69+
assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
6970
assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L)
7071
assert(mm.storageMemoryUsed === 110L)
71-
assert(mm.acquireStorageMemory(dummyBlock, 890L, dummyBlocks))
72+
assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks))
7273
assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L)
7374
assert(mm.storageMemoryUsed === 1000L)
74-
assert(!mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks))
75+
assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
7576
assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
7677
assert(mm.storageMemoryUsed === 1000L)
7778
mm.releaseStorageMemory(800L)
7879
assert(mm.storageMemoryUsed === 200L)
7980
// Acquire after release
80-
assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks))
81+
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
8182
assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
8283
assert(mm.storageMemoryUsed === 201L)
8384
mm.releaseStorageMemory()
8485
assert(mm.storageMemoryUsed === 0L)
85-
assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks))
86+
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
8687
assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
8788
assert(mm.storageMemoryUsed === 1L)
8889
// Release beyond what was acquired
@@ -150,7 +151,8 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
150151
private def makeThings(
151152
maxExecutionMem: Long,
152153
maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
153-
val mm = new StaticMemoryManager(conf, maxExecutionMem, maxStorageMem)
154+
val mm = new StaticMemoryManager(
155+
conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
154156
val ms = mock(classOf[MemoryStore])
155157
mm.setMemoryStore(ms)
156158
(mm, ms)

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,13 +1045,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
10451045
test("reserve/release unroll memory") {
10461046
store = makeBlockManager(12000)
10471047
val memoryStore = store.memoryStore
1048-
val dummyBlock = TestBlockId("")
1049-
val dummyEvictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
10501048
assert(memoryStore.currentUnrollMemory === 0)
10511049
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
10521050

10531051
def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
1054-
memoryStore.reserveUnrollMemoryForThisTask(dummyBlock, memory, dummyEvictedBlocks)
1052+
memoryStore.reserveUnrollMemoryForThisTask(
1053+
TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)])
10551054
}
10561055

10571056
// Reserve

0 commit comments

Comments
 (0)