Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Introduce MemoryManager + StaticMemoryManager
This commit does not represent any change in behavior. The
MemoryManager's introduced are not actually used anywhere yet.
This will come in the next commit.
  • Loading branch information
Andrew Or committed Sep 30, 2015
commit 9e73981280b6dfdfdd6132e2a213b6c4a95c026c
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/MemoryManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to clarify whether this is a one-per-JVM component or one-per-task component?

*
* In this context, execution memory refers to that used for computation in shuffles, joins,
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster.
*/
private[spark] abstract class MemoryManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this somewhere else that's not at the top level? I'd like to minimize the number of private[spark] that appears in the top level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on moving this out of the top-level and into a new package (maybe spark.memory ?). This will also provide a convenient way to group the memory-related classes together post-refactoring.


/**
* Acquire N bytes of memory for execution.
* @return whether all N bytes are successfully granted.
*/
def acquireExecutionMemory(numBytes: Long): Boolean

/**
* Acquire N bytes of memory for storage.
* @return whether all N bytes are successfully granted.
*/
def acquireStorageMemory(numBytes: Long): Boolean

/**
* Release N bytes of execution memory.
*/
def releaseExecutionMemory(numBytes: Long): Unit

/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit

}
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class SparkEnv (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
// TODO: unify these *MemoryManager classes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a JIRA for this, so you could mention the number here.

val memoryManager: MemoryManager,
val shuffleMemoryManager: ShuffleMemoryManager,
val executorMemoryManager: ExecutorMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
Expand Down Expand Up @@ -323,6 +325,7 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val memoryManager = new StaticMemoryManager(conf)
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
Expand Down Expand Up @@ -407,6 +410,7 @@ object SparkEnv extends Logging {
httpFileServer,
sparkFilesDir,
metricsSystem,
memoryManager,
shuffleMemoryManager,
executorMemoryManager,
outputCommitCoordinator,
Expand Down
77 changes: 77 additions & 0 deletions core/src/main/scala/org/apache/spark/StaticMemoryManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.BlockManager


/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
*
* The sizes of the execution and storage regions are determined through
* `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
* regions are cleanly separated such that neither usage can borrow memory from the other.
*/
private[spark] class StaticMemoryManager(conf: SparkConf) extends MemoryManager {
private val maxExecutionMemory = ShuffleMemoryManager.getMaxMemory(conf)
private val maxStorageMemory = BlockManager.getMaxMemory(conf)
@volatile private var executionMemoryUsed: Long = 0
@volatile private var storageMemoryUsed: Long = 0

/**
* Acquire N bytes of memory for execution.
* @return whether all N bytes are successfully granted.
*/
override def acquireExecutionMemory(numBytes: Long): Boolean = {
if (executionMemoryUsed + numBytes <= maxExecutionMemory) {
executionMemoryUsed += numBytes
true
} else {
false
}
}

/**
* Acquire N bytes of memory for storage.
* @return whether all N bytes are successfully granted.
*/
override def acquireStorageMemory(numBytes: Long): Boolean = {
if (storageMemoryUsed + numBytes <= maxStorageMemory) {
storageMemoryUsed += numBytes
true
} else {
false
}
}

/**
* Release N bytes of execution memory.
*/
override def releaseExecutionMemory(numBytes: Long): Unit = {
executionMemoryUsed -= numBytes
}

/**
* Release N bytes of storage memory.
*/
override def releaseStorageMemory(numBytes: Long): Unit = {
storageMemoryUsed -= numBytes
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private[spark] object ShuffleMemoryManager {
* of the memory pool and a safety factor since collections can sometimes grow bigger than
* the size we target before we estimate their sizes again.
*/
private def getMaxMemory(conf: SparkConf): Long = {
private[spark] def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ private[spark] object BlockManager extends Logging {
private val ID_GENERATOR = new IdGenerator

/** Return the total amount of storage memory available. */
private def getMaxMemory(conf: SparkConf): Long = {
private[spark] def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
Expand Down