Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Remove spark.memory.useLegacyMode and StaticMemoryManager
  • Loading branch information
srowen committed Jan 8, 2019
commit 69ca3a9e0a6cd121d749cd0255580bbba0a66fa5
27 changes: 1 addition & 26 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -532,38 +532,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}

// Validate memory fractions
val deprecatedMemoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
val memoryKeys = Seq(
"spark.memory.fraction",
"spark.memory.storageFraction") ++
deprecatedMemoryKeys
for (key <- memoryKeys) {
for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) {
val value = getDouble(key, 0.5)
if (value > 1 || value < 0) {
throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
}
}

// Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
if (!legacyMemoryManagement) {
val keyset = deprecatedMemoryKeys.toSet
val detected = settings.keys().asScala.filter(keyset.contains)
if (detected.nonEmpty) {
logWarning("Detected deprecated memory fraction settings: " +
detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +
"memory management are unified. All memory fractions used in the old model are " +
"now deprecated and no longer read. If you wish to use the old memory management, " +
s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")
}
}

if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
"instead use \"yarn\" with specified deploy mode."
Expand Down
10 changes: 2 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
Expand Down Expand Up @@ -322,13 +322,7 @@ object SparkEnv extends Logging {
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)

val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
Expand Down
154 changes: 0 additions & 154 deletions core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.storage.BlockId
* it if necessary. Cached blocks can be evicted only if actual
* storage memory usage exceeds this region.
*/
private[spark] class UnifiedMemoryManager private[memory] (
private[spark] class UnifiedMemoryManager(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests and other internals now need to use this constructor directly

conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
Expand Down
7 changes: 1 addition & 6 deletions core/src/main/scala/org/apache/spark/memory/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,10 @@ package org.apache.spark
* }}}
*
*
* There are two implementations of [[org.apache.spark.memory.MemoryManager]] which vary in how
* they handle the sizing of their memory pools:
* There is one implementation of [[org.apache.spark.memory.MemoryManager]]:
*
* - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 1.6+, enforces soft
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since just one implementation now, still need to mention this is default?

* boundaries between storage and execution memory, allowing requests for memory in one region
* to be fulfilled by borrowing memory from the other.
* - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries between storage
* and execution memory by statically partitioning Spark's memory and preventing storage and
* execution from borrowing memory from each other. This mode is retained only for legacy
* compatibility purposes.
*/
package object memory
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite {
@Test
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
new StaticMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MAX_VALUE,
Long.MAX_VALUE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does these mean after the change?

maxHeapMemory = Long.MAX_VALUE and onHeapStorageRegionSize = Long.MAX_VALUE?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
encryptionTest("Cache broadcast to disk") { conf =>
conf.setMaster("local")
.setAppName("test")
.set("spark.memory.useLegacyMode", "true")
.set("spark.storage.memoryFraction", "0.0")
.set("spark.memory.storageFraction", "0.0")
sc = new SparkContext(conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
Expand All @@ -173,8 +172,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("test")
.set("spark.memory.useLegacyMode", "true")
.set("spark.storage.memoryFraction", "0.0")
.set("spark.memory.storageFraction", "0.0")

sc = new SparkContext(conf)
val list = List[Int](1, 2, 3, 4)
Expand Down
Loading