diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 681e4378a4dd5..f27df505fa5ff 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -532,38 +532,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } // Validate memory fractions - val deprecatedMemoryKeys = Seq( - "spark.storage.memoryFraction", - "spark.shuffle.memoryFraction", - "spark.shuffle.safetyFraction", - "spark.storage.unrollFraction", - "spark.storage.safetyFraction") - val memoryKeys = Seq( - "spark.memory.fraction", - "spark.memory.storageFraction") ++ - deprecatedMemoryKeys - for (key <- memoryKeys) { + for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').") } } - // Warn against deprecated memory fractions (unless legacy memory management mode is enabled) - val legacyMemoryManagementKey = "spark.memory.useLegacyMode" - val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false) - if (!legacyMemoryManagement) { - val keyset = deprecatedMemoryKeys.toSet - val detected = settings.keys().asScala.filter(keyset.contains) - if (detected.nonEmpty) { - logWarning("Detected deprecated memory fraction settings: " + - detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " + - "memory management are unified. All memory fractions used in the old model are " + - "now deprecated and no longer read. If you wish to use the old memory management, " + - s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).") - } - } - if (contains("spark.master") && get("spark.master").startsWith("yarn-")) { val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " + "instead use \"yarn\" with specified deploy mode." diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 9222781fa0833..ba5ed8ab1f302 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -31,7 +31,7 @@ import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} +import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} @@ -322,13 +322,7 @@ object SparkEnv extends Logging { shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) - val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) - val memoryManager: MemoryManager = - if (useLegacyMemoryManager) { - new StaticMemoryManager(conf, numUsableCores) - } else { - UnifiedMemoryManager(conf, numUsableCores) - } + val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores) val blockManagerPort = if (isDriver) { conf.get(DRIVER_BLOCK_MANAGER_PORT) diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala deleted file mode 100644 index 7e052c02c9376..0000000000000 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.memory - -import org.apache.spark.SparkConf -import org.apache.spark.internal.config -import org.apache.spark.internal.config.Tests.TEST_MEMORY -import org.apache.spark.storage.BlockId - -/** - * A [[MemoryManager]] that statically partitions the heap space into disjoint regions. - * - * The sizes of the execution and storage regions are determined through - * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two - * regions are cleanly separated such that neither usage can borrow memory from the other. - */ -private[spark] class StaticMemoryManager( - conf: SparkConf, - maxOnHeapExecutionMemory: Long, - override val maxOnHeapStorageMemory: Long, - numCores: Int) - extends MemoryManager( - conf, - numCores, - maxOnHeapStorageMemory, - maxOnHeapExecutionMemory) { - - def this(conf: SparkConf, numCores: Int) { - this( - conf, - StaticMemoryManager.getMaxExecutionMemory(conf), - StaticMemoryManager.getMaxStorageMemory(conf), - numCores) - } - - // The StaticMemoryManager does not support off-heap storage memory: - offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize) - offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize) - - // Max number of bytes worth of blocks to evict when unrolling - private val maxUnrollMemory: Long = { - (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong - } - - override def maxOffHeapStorageMemory: Long = 0L - - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - memoryMode: MemoryMode): Boolean = synchronized { - require(memoryMode != MemoryMode.OFF_HEAP, - "StaticMemoryManager does not support off-heap storage memory") - if (numBytes > maxOnHeapStorageMemory) { - // Fail fast if the block simply won't fit - logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit ($maxOnHeapStorageMemory bytes)") - false - } else { - onHeapStorageMemoryPool.acquireMemory(blockId, numBytes) - } - } - - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - memoryMode: MemoryMode): Boolean = synchronized { - require(memoryMode != MemoryMode.OFF_HEAP, - "StaticMemoryManager does not support off-heap unroll memory") - if (numBytes > maxOnHeapStorageMemory) { - // Fail fast if the block simply won't fit - logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit ($maxOnHeapStorageMemory bytes)") - false - } else { - val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory - val freeMemory = onHeapStorageMemoryPool.memoryFree - // When unrolling, we will use all of the existing free memory, and, if necessary, - // some extra space freed from evicting cached blocks. We must place a cap on the - // amount of memory to be evicted by unrolling, however, otherwise unrolling one - // big block can blow away the entire cache. - val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) - // Keep it within the range 0 <= X <= maxNumBytesToFree - val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) - onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) - } - } - - private[memory] - override def acquireExecutionMemory( - numBytes: Long, - taskAttemptId: Long, - memoryMode: MemoryMode): Long = synchronized { - memoryMode match { - case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) - case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) - } - } -} - - -private[spark] object StaticMemoryManager { - - private val MIN_MEMORY_BYTES = 32 * 1024 * 1024 - - /** - * Return the total amount of memory available for the storage region, in bytes. - */ - private def getMaxStorageMemory(conf: SparkConf): Long = { - val systemMaxMemory = conf.get(TEST_MEMORY) - val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) - (systemMaxMemory * memoryFraction * safetyFraction).toLong - } - - /** - * Return the total amount of memory available for the execution region, in bytes. - */ - private def getMaxExecutionMemory(conf: SparkConf): Long = { - val systemMaxMemory = conf.get(TEST_MEMORY) - - if (systemMaxMemory < MIN_MEMORY_BYTES) { - throw new IllegalArgumentException(s"System memory $systemMaxMemory must " + - s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " + - s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.") - } - if (conf.contains(config.EXECUTOR_MEMORY)) { - val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key) - if (executorMemory < MIN_MEMORY_BYTES) { - throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + - s"$MIN_MEMORY_BYTES. Please increase executor memory using the " + - s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.") - } - } - val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) - val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) - (systemMaxMemory * memoryFraction * safetyFraction).toLong - } - -} diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 7801bb87050f6..a0fbbbdebd028 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -46,7 +46,7 @@ import org.apache.spark.storage.BlockId * it if necessary. Cached blocks can be evicted only if actual * storage memory usage exceeds this region. */ -private[spark] class UnifiedMemoryManager private[memory] ( +private[spark] class UnifiedMemoryManager( conf: SparkConf, val maxHeapMemory: Long, onHeapStorageRegionSize: Long, diff --git a/core/src/main/scala/org/apache/spark/memory/package.scala b/core/src/main/scala/org/apache/spark/memory/package.scala index 3d00cd9cb6377..7f782193f246f 100644 --- a/core/src/main/scala/org/apache/spark/memory/package.scala +++ b/core/src/main/scala/org/apache/spark/memory/package.scala @@ -61,15 +61,10 @@ package org.apache.spark * }}} * * - * There are two implementations of [[org.apache.spark.memory.MemoryManager]] which vary in how - * they handle the sizing of their memory pools: + * There is one implementation of [[org.apache.spark.memory.MemoryManager]]: * - * - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 1.6+, enforces soft + * - [[org.apache.spark.memory.UnifiedMemoryManager]] enforces soft * boundaries between storage and execution memory, allowing requests for memory in one region * to be fulfilled by borrowing memory from the other. - * - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries between storage - * and execution memory by statically partitioning Spark's memory and preventing storage and - * execution from borrowing memory from each other. This mode is retained only for legacy - * compatibility purposes. */ package object memory diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index a0664b30d6cc2..dc1fe774f7961 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite { @Test public void leakedPageMemoryIsDetected() { final TaskMemoryManager manager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set("spark.memory.offHeap.enabled", "false"), Long.MAX_VALUE, Long.MAX_VALUE, diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 9ad2e9a5e74ac..6976464e8ab5d 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -145,8 +145,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio encryptionTest("Cache broadcast to disk") { conf => conf.setMaster("local") .setAppName("test") - .set("spark.memory.useLegacyMode", "true") - .set("spark.storage.memoryFraction", "0.0") + .set("spark.memory.storageFraction", "0.0") sc = new SparkContext(conf) val list = List[Int](1, 2, 3, 4) val broadcast = sc.broadcast(list) @@ -173,8 +172,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio val conf = new SparkConf() .setMaster("local[4]") .setAppName("test") - .set("spark.memory.useLegacyMode", "true") - .set("spark.storage.memoryFraction", "0.0") + .set("spark.memory.storageFraction", "0.0") sc = new SparkContext(conf) val list = List[Int](1, 2, 3, 4) diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala deleted file mode 100644 index c3275add50f48..0000000000000 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.memory - -import org.mockito.Mockito.when - -import org.apache.spark.SparkConf -import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE -import org.apache.spark.internal.config.Tests.TEST_MEMORY -import org.apache.spark.storage.TestBlockId -import org.apache.spark.storage.memory.MemoryStore - -class StaticMemoryManagerSuite extends MemoryManagerSuite { - private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") - - /** - * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies. - */ - private def makeThings( - maxExecutionMem: Long, - maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { - val mm = new StaticMemoryManager( - conf, - maxOnHeapExecutionMemory = maxExecutionMem, - maxOnHeapStorageMemory = maxStorageMem, - numCores = 1) - val ms = makeMemoryStore(mm) - (mm, ms) - } - - override protected def createMemoryManager( - maxOnHeapExecutionMemory: Long, - maxOffHeapExecutionMemory: Long): StaticMemoryManager = { - new StaticMemoryManager( - conf.clone - .set("spark.memory.fraction", "1") - .set(TEST_MEMORY, maxOnHeapExecutionMemory) - .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory), - maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, - maxOnHeapStorageMemory = 0, - numCores = 1) - } - - test("basic execution memory") { - val maxExecutionMem = 1000L - val taskAttemptId = 0L - val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) - val memoryMode = MemoryMode.ON_HEAP - assert(mm.executionMemoryUsed === 0L) - assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L) - assert(mm.executionMemoryUsed === 10L) - assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L) - // Acquire up to the max - assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 890L) - assert(mm.executionMemoryUsed === maxExecutionMem) - assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L) - assert(mm.executionMemoryUsed === maxExecutionMem) - mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode) - assert(mm.executionMemoryUsed === 200L) - // Acquire after release - assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L) - assert(mm.executionMemoryUsed === 201L) - // Release beyond what was acquired - mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, memoryMode) - assert(mm.executionMemoryUsed === 0L) - } - - test("basic storage memory") { - val maxStorageMem = 1000L - val dummyBlock = TestBlockId("you can see the world you brought to live") - val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) - val memoryMode = MemoryMode.ON_HEAP - assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode)) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 10L) - - assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 110L) - // Acquire more than the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, memoryMode)) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 110L) - // Acquire up to the max, requests after this are still granted due to LRU eviction - assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, memoryMode)) - assertEvictBlocksToFreeSpaceCalled(ms, 110L) - assert(mm.storageMemoryUsed === 1000L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) - assertEvictBlocksToFreeSpaceCalled(ms, 1L) - assert(evictedBlocks.nonEmpty) - evictedBlocks.clear() - // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at - // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted - // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests. - assert(mm.storageMemoryUsed === 1000L) - mm.releaseStorageMemory(800L, memoryMode) - assert(mm.storageMemoryUsed === 200L) - // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 201L) - mm.releaseAllStorageMemory() - assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 1L) - // Release beyond what was acquired - mm.releaseStorageMemory(100L, memoryMode) - assert(mm.storageMemoryUsed === 0L) - } - - test("execution and storage isolation") { - val maxExecutionMem = 200L - val maxStorageMem = 1000L - val taskAttemptId = 0L - val dummyBlock = TestBlockId("ain't nobody love like you do") - val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) - val memoryMode = MemoryMode.ON_HEAP - // Only execution memory should increase - assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L) - assert(mm.storageMemoryUsed === 0L) - assert(mm.executionMemoryUsed === 100L) - assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 100L) - assert(mm.storageMemoryUsed === 0L) - assert(mm.executionMemoryUsed === 200L) - // Only storage memory should increase - assert(mm.acquireStorageMemory(dummyBlock, 50L, memoryMode)) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 50L) - assert(mm.executionMemoryUsed === 200L) - // Only execution memory should be released - mm.releaseExecutionMemory(133L, taskAttemptId, memoryMode) - assert(mm.storageMemoryUsed === 50L) - assert(mm.executionMemoryUsed === 67L) - // Only storage memory should be released - mm.releaseAllStorageMemory() - assert(mm.storageMemoryUsed === 0L) - assert(mm.executionMemoryUsed === 67L) - } - - test("unroll memory") { - val maxStorageMem = 1000L - val dummyBlock = TestBlockId("lonely water") - val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) - val memoryMode = MemoryMode.ON_HEAP - assert(mm.acquireUnrollMemory(dummyBlock, 100L, memoryMode)) - when(ms.currentUnrollMemory).thenReturn(100L) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 100L) - mm.releaseUnrollMemory(40L, memoryMode) - assert(mm.storageMemoryUsed === 60L) - when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode)) - assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 860L) - // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. - // As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes. - // Requesting 240 more bytes of unroll memory will leave our total unroll memory at - // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted. - assert(mm.acquireUnrollMemory(dummyBlock, 240L, memoryMode)) - assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000 - when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240 - assert(mm.storageMemoryUsed === 1000L) - evictedBlocks.clear() - // We already have 300 bytes of unroll memory, so requesting 150 more will leave us - // above the 400-byte limit. Since there is not enough free memory, this request will - // fail even after evicting as much as we can (400 - 300 = 100 bytes). - assert(!mm.acquireUnrollMemory(dummyBlock, 150L, memoryMode)) - assertEvictBlocksToFreeSpaceCalled(ms, 100L) - assert(mm.storageMemoryUsed === 900L) - // Release beyond what was acquired - mm.releaseUnrollMemory(maxStorageMem, memoryMode) - assert(mm.storageMemoryUsed === 0L) - } - -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 480e07fb9399a..2250ae2f771ed 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -91,7 +91,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite conf.set(IS_TESTING, true) conf.set("spark.memory.fraction", "1") conf.set("spark.memory.storageFraction", "1") - conf.set("spark.storage.unrollFraction", "0.4") conf.set("spark.storage.unrollMemoryThreshold", "512") // to make a replication attempt to inactive store fail fast diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index bda81365b0792..c23264191e124 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -120,7 +120,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set("spark.memory.fraction", "1") .set("spark.memory.storageFraction", "1") .set("spark.kryoserializer.buffer", "1m") - .set("spark.storage.unrollFraction", "0.4") .set("spark.storage.unrollMemoryThreshold", "512") rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index b02af2bfe7acc..7cdcd0fea2ed4 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import org.scalatest._ import org.apache.spark._ -import org.apache.spark.memory.{MemoryMode, StaticMemoryManager} +import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager} import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} import org.apache.spark.util._ @@ -39,7 +39,6 @@ class MemoryStoreSuite with ResetSystemProperties { var conf: SparkConf = new SparkConf(false) - .set("spark.storage.unrollFraction", "0.4") .set("spark.storage.unrollMemoryThreshold", "512") // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test @@ -60,7 +59,7 @@ class MemoryStoreSuite } def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = { - val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) + val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1) val blockInfoManager = new BlockInfoManager val blockEvictionHandler = new BlockEvictionHandler { var memoryStore: MemoryStore = _ @@ -239,7 +238,7 @@ class MemoryStoreSuite } test("safely unroll blocks through putIteratorAsBytes") { - val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val (memoryStore, blockInfoManager) = makeMemoryStore(8400) val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] @@ -290,11 +289,11 @@ class MemoryStoreSuite blockInfoManager.removeBlock("b3") putIteratorAsBytes("b3", smallIterator, ClassTag.Any) - // Unroll huge block with not enough space. This should fail. + // Unroll huge block with not enough space. This should fail and kick out b2 in the process. val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any) assert(result4.isLeft) // unroll was unsuccessful assert(!memoryStore.contains("b1")) - assert(memoryStore.contains("b2")) + assert(!memoryStore.contains("b2")) assert(memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator @@ -417,7 +416,7 @@ class MemoryStoreSuite val bytesPerSmallBlock = memStoreSize / numInitialBlocks def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean): Unit = { val tc = TaskContext.empty() - val memManager = new StaticMemoryManager(conf, Long.MaxValue, memStoreSize, numCores = 1) + val memManager = new UnifiedMemoryManager(conf, memStoreSize, memStoreSize.toInt, 1) val blockInfoManager = new BlockInfoManager blockInfoManager.registerTask(tc.taskAttemptId) var droppedSoFar = 0 diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 6211399005e1a..1e0399809ba87 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -551,12 +551,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("force to spill for external aggregation") { val conf = createSparkConf(loadDefaults = false) - .set("spark.shuffle.memoryFraction", "0.01") - .set("spark.memory.useLegacyMode", "true") - .set(TEST_MEMORY, 100000000L) + .set("spark.memory.storageFraction", "0.999") + .set(TEST_MEMORY, 471859200L) .set("spark.shuffle.sort.bypassMergeThreshold", "0") sc = new SparkContext("local", "test", conf) - val N = 2e5.toInt + val N = 200000 sc.parallelize(1 to N, 2) .map { i => (i, i) } .groupByKey() diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index aa400dd74e9ca..14148e0e67fa7 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -638,12 +638,11 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("force to spill for external sorter") { val conf = createSparkConf(loadDefaults = false, kryo = false) - .set("spark.shuffle.memoryFraction", "0.01") - .set("spark.memory.useLegacyMode", "true") - .set(TEST_MEMORY, 100000000L) + .set("spark.memory.storageFraction", "0.999") + .set(TEST_MEMORY, 471859200L) .set("spark.shuffle.sort.bypassMergeThreshold", "0") sc = new SparkContext("local", "test", conf) - val N = 2e5.toInt + val N = 200000 val p = new org.apache.spark.HashPartitioner(2) val p2 = new org.apache.spark.HashPartitioner(3) sc.parallelize(1 to N, 3) diff --git a/docs/configuration.md b/docs/configuration.md index ff9b802617f08..efd74c3add5e2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1198,51 +1198,6 @@ Apart from these, the following properties are also available, and may be useful This must be set to a positive value when spark.memory.offHeap.enabled=true. - - spark.memory.useLegacyMode - false - - Whether to enable the legacy memory management mode used in Spark 1.5 and before. - The legacy mode rigidly partitions the heap space into fixed-size regions, - potentially leading to excessive spilling if the application was not tuned. - The following deprecated memory fraction configurations are not read unless this is enabled: - spark.shuffle.memoryFraction
- spark.storage.memoryFraction
- spark.storage.unrollFraction - - - - spark.shuffle.memoryFraction - 0.2 - - (deprecated) This is read only if spark.memory.useLegacyMode is enabled. - Fraction of Java heap to use for aggregation and cogroups during shuffles. - At any given time, the collective size of - all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will - begin to spill to disk. If spills are often, consider increasing this value at the expense of - spark.storage.memoryFraction. - - - - spark.storage.memoryFraction - 0.6 - - (deprecated) This is read only if spark.memory.useLegacyMode is enabled. - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 0.6 of the heap, but you can - increase it if you configure your own old generation size. - - - - spark.storage.unrollFraction - 0.2 - - (deprecated) This is read only if spark.memory.useLegacyMode is enabled. - Fraction of spark.storage.memoryFraction to use for unrolling blocks in memory. - This is dynamically allocated by dropping existing blocks when there is not enough free - storage space to unroll the new block in its entirety. - - spark.storage.replication.proactive false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 7c21062c4cec3..a708926dd1f85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -94,7 +94,7 @@ private[execution] object HashedRelation { taskMemoryManager: TaskMemoryManager = null): HashedRelation = { val mm = Option(taskMemoryManager).getOrElse { new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, @@ -227,7 +227,7 @@ private[joins] class UnsafeHashedRelation( // TODO(josh): This needs to be revisited before we merge this patch; making this change now // so that tests compile: val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, @@ -392,7 +392,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap def this() = { this( new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index b7d28988274bf..334f0275d4ebf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -22,7 +22,7 @@ import java.util.HashMap import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.internal.config._ -import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} +import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap import org.apache.spark.sql.execution.vectorized.AggregateHashMap @@ -473,7 +473,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) value.setInt(0, 555) val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, @@ -504,7 +504,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { Seq("off", "on").foreach { heap => benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ => val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}") .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), Long.MaxValue, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala index bdf753debe62a..0b356a9e34c58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED -import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} +import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeProjection} import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap @@ -43,7 +43,7 @@ object HashedRelationMetricsBenchmark extends SqlBasedBenchmark { val benchmark = new Benchmark("LongToUnsafeRowMap metrics", numRows, output = output) benchmark.addCase("LongToUnsafeRowMap") { iter => val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index d9b34dcd16476..7b55e839e3b4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED -import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} +import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -36,7 +36,7 @@ import org.apache.spark.util.collection.CompactBuffer class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { val mm = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, @@ -85,7 +85,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("test serialization empty hash map") { val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, @@ -157,7 +157,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("LongToUnsafeRowMap with very wide range") { val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, @@ -202,7 +202,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("LongToUnsafeRowMap with random keys") { val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, @@ -256,7 +256,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("SPARK-24257: insert big values into LongToUnsafeRowMap") { val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index fe65353b9d502..d1a6e8a89acce 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.memory.StaticMemoryManager +import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus @@ -215,8 +215,6 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) test("Test Block - isFullyConsumed") { val sparkConf = new SparkConf().set("spark.app.id", "streaming-test") sparkConf.set("spark.storage.unrollMemoryThreshold", "512") - // spark.storage.unrollFraction set to 0.4 for BlockManager - sparkConf.set("spark.storage.unrollFraction", "0.4") sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption) // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll @@ -282,7 +280,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) maxMem: Long, conf: SparkConf, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { - val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) + val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1) val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)