From 942763555eafaddd5dd9ef2f9a1117c4987c6e88 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 30 Dec 2015 16:07:30 -0800 Subject: [PATCH 01/11] Remove MapOutputTracker cleaner. --- .../org/apache/spark/MapOutputTracker.scala | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 72355cdfa68b3..2e94ab844195d 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -18,7 +18,6 @@ package org.apache.spark import java.io._ -import java.util.Arrays import java.util.concurrent.ConcurrentHashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} @@ -267,8 +266,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } /** - * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map - * output information, which allows old output information based on a TTL. + * MapOutputTracker for the driver. */ private[spark] class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) { @@ -291,17 +289,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) // can be read locally, but may lead to more delay in scheduling if those locations are busy. private val REDUCER_PREF_LOCS_FRACTION = 0.2 - /** - * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver, - * so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set). - * Other than these two scenarios, nothing should be dropped from this HashMap. - */ - protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]() - private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]() - - // For cleaning up TimeStampedHashMaps - private val metadataCleaner = - new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf) + // HashMaps for storing mapStatuses and cached serialized statuses in the driver. + // Statuses are dropped only by explicit de-registering. + protected val mapStatuses = new HashMap[Int, Array[MapStatus]]() + private val cachedSerializedStatuses = new HashMap[Int, Array[Byte]]() def registerShuffle(shuffleId: Int, numMaps: Int) { if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) { @@ -462,14 +453,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) sendTracker(StopMapOutputTracker) mapStatuses.clear() trackerEndpoint = null - metadataCleaner.cancel() cachedSerializedStatuses.clear() } - - private def cleanup(cleanupTime: Long) { - mapStatuses.clearOldValues(cleanupTime) - cachedSerializedStatuses.clearOldValues(cleanupTime) - } } /** From 23669a7f04c801da5e23fe6ac1f479e28016af2e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 30 Dec 2015 16:11:59 -0800 Subject: [PATCH 02/11] Remove from HttpBroadcast --- .../spark/broadcast/HttpBroadcast.scala | 27 +++---------------- .../apache/spark/util/MetadataCleaner.scala | 2 +- 2 files changed, 4 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index b69af639f7862..c31de99dbd299 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -22,12 +22,13 @@ import java.io.{BufferedInputStream, BufferedOutputStream} import java.net.{URL, URLConnection, URI} import java.util.concurrent.TimeUnit +import scala.collection.mutable import scala.reflect.ClassTag import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.io.CompressionCodec import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils} +import org.apache.spark.util.Utils /** * A [[org.apache.spark.broadcast.Broadcast]] implementation that uses HTTP server @@ -112,10 +113,9 @@ private[broadcast] object HttpBroadcast extends Logging { private var securityManager: SecurityManager = null // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist - private val files = new TimeStampedHashSet[File] + private val files = new mutable.HashSet[File] private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt private var compressionCodec: CompressionCodec = null - private var cleaner: MetadataCleaner = null def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { synchronized { @@ -128,7 +128,6 @@ private[broadcast] object HttpBroadcast extends Logging { conf.set("spark.httpBroadcast.uri", serverUri) } serverUri = conf.get("spark.httpBroadcast.uri") - cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf) compressionCodec = CompressionCodec.createCodec(conf) initialized = true } @@ -141,10 +140,6 @@ private[broadcast] object HttpBroadcast extends Logging { server.stop() server = null } - if (cleaner != null) { - cleaner.cancel() - cleaner = null - } compressionCodec = null initialized = false } @@ -236,22 +231,6 @@ private[broadcast] object HttpBroadcast extends Logging { } } - /** - * Periodically clean up old broadcasts by removing the associated map entries and - * deleting the associated files. - */ - private def cleanup(cleanupTime: Long) { - val iterator = files.internalMap.entrySet().iterator() - while(iterator.hasNext) { - val entry = iterator.next() - val (file, time) = (entry.getKey, entry.getValue) - if (time < cleanupTime) { - iterator.remove() - deleteBroadcastFile(file) - } - } - } - private def deleteBroadcastFile(file: File) { try { if (file.exists) { diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index a8bbad086849e..e5ee7e3a77082 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -62,7 +62,7 @@ private[spark] class MetadataCleaner( private[spark] object MetadataCleanerType extends Enumeration { - val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, BLOCK_MANAGER, + val SPARK_CONTEXT, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value type MetadataCleanerType = Value From f2c2f5dd5820a41e31ef73b5b918299649f8cd72 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 30 Dec 2015 16:14:03 -0800 Subject: [PATCH 03/11] Remove from BlockManager. --- .../apache/spark/storage/BlockManager.scala | 39 +------------------ .../apache/spark/util/MetadataCleaner.scala | 3 +- 2 files changed, 2 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b5b7804d54ce2..ff64c0bf0b818 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -75,7 +75,7 @@ private[spark] class BlockManager( val diskBlockManager = new DiskBlockManager(this, conf) - private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] + private val blockInfo = new HashMap[BlockId, BlockInfo] private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) @@ -147,11 +147,6 @@ private[spark] class BlockManager( private var asyncReregisterTask: Future[Unit] = null private val asyncReregisterLock = new Object - private val metadataCleaner = new MetadataCleaner( - MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) - private val broadcastCleaner = new MetadataCleaner( - MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) - // Field related to peer block managers that are necessary for block replication @volatile private var cachedPeers: Seq[BlockManagerId] = _ private val peerFetchLock = new Object @@ -1141,36 +1136,6 @@ private[spark] class BlockManager( } } - private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = { - logInfo(s"Dropping non broadcast blocks older than $cleanupTime") - dropOldBlocks(cleanupTime, !_.isBroadcast) - } - - private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = { - logInfo(s"Dropping broadcast blocks older than $cleanupTime") - dropOldBlocks(cleanupTime, _.isBroadcast) - } - - private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = { - val iterator = blockInfo.getEntrySet.iterator - while (iterator.hasNext) { - val entry = iterator.next() - val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) - if (time < cleanupTime && shouldDrop(id)) { - info.synchronized { - val level = info.level - if (level.useMemory) { memoryStore.remove(id) } - if (level.useDisk) { diskStore.remove(id) } - if (level.useOffHeap) { externalBlockStore.remove(id) } - iterator.remove() - logInfo(s"Dropped block $id") - } - val status = getCurrentBlockStatus(id, info) - reportBlockStatus(id, info, status) - } - } - } - private def shouldCompress(blockId: BlockId): Boolean = { blockId match { case _: ShuffleBlockId => compressShuffle @@ -1248,8 +1213,6 @@ private[spark] class BlockManager( if (externalBlockStoreInitialized) { externalBlockStore.clear() } - metadataCleaner.cancel() - broadcastCleaner.cancel() futureExecutionContext.shutdownNow() logInfo("BlockManager stopped") } diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index e5ee7e3a77082..06920c4acdad5 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -62,8 +62,7 @@ private[spark] class MetadataCleaner( private[spark] object MetadataCleanerType extends Enumeration { - val SPARK_CONTEXT, BLOCK_MANAGER, - SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value + val SPARK_CONTEXT, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value type MetadataCleanerType = Value From 3940e976005cc6064ba903082d8e9918ed5708ff Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 30 Dec 2015 16:19:54 -0800 Subject: [PATCH 04/11] Delete TimeStampedHashSet --- .../spark/util/TimeStampedHashSet.scala | 86 ------------------- 1 file changed, 86 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala deleted file mode 100644 index 65efeb1f4c19c..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ -import scala.collection.mutable.Set - -private[spark] class TimeStampedHashSet[A] extends Set[A] { - val internalMap = new ConcurrentHashMap[A, Long]() - - def contains(key: A): Boolean = { - internalMap.contains(key) - } - - def iterator: Iterator[A] = { - val jIterator = internalMap.entrySet().iterator() - jIterator.asScala.map(_.getKey) - } - - override def + (elem: A): Set[A] = { - val newSet = new TimeStampedHashSet[A] - newSet ++= this - newSet += elem - newSet - } - - override def - (elem: A): Set[A] = { - val newSet = new TimeStampedHashSet[A] - newSet ++= this - newSet -= elem - newSet - } - - override def += (key: A): this.type = { - internalMap.put(key, currentTime) - this - } - - override def -= (key: A): this.type = { - internalMap.remove(key) - this - } - - override def empty: Set[A] = new TimeStampedHashSet[A]() - - override def size(): Int = internalMap.size() - - override def foreach[U](f: (A) => U): Unit = { - val iterator = internalMap.entrySet().iterator() - while(iterator.hasNext) { - f(iterator.next.getKey) - } - } - - /** - * Removes old values that have timestamp earlier than `threshTime` - */ - def clearOldValues(threshTime: Long) { - val iterator = internalMap.entrySet().iterator() - while(iterator.hasNext) { - val entry = iterator.next() - if (entry.getValue < threshTime) { - iterator.remove() - } - } - } - - private def currentTime: Long = System.currentTimeMillis() -} From 98b732a554216e0164bf01bdb547387f25dea7d4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 30 Dec 2015 16:41:31 -0800 Subject: [PATCH 05/11] All of the rest of the changes. --- .../scala/org/apache/spark/SparkContext.scala | 17 +- .../shuffle/FileShuffleBlockResolver.scala | 20 +- .../apache/spark/storage/BlockManager.scala | 26 +-- .../apache/spark/util/MetadataCleaner.scala | 109 ----------- .../util/TimeStampedWeakValueHashMap.scala | 171 ------------------ .../spark/util/TimeStampedHashMapSuite.scala | 86 --------- docs/configuration.md | 13 +- .../apache/spark/streaming/Checkpoint.scala | 3 +- .../spark/streaming/dstream/DStream.scala | 14 +- .../streaming/StreamingContextSuite.scala | 1 + 10 files changed, 25 insertions(+), 435 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala delete mode 100644 core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bbdc9158d8e2b..21650671df146 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -33,6 +33,7 @@ import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal +import com.google.common.collect.MapMaker import org.apache.commons.lang.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -221,7 +222,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None private var _env: SparkEnv = _ - private var _metadataCleaner: MetadataCleaner = _ private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ private var _progressBar: Option[ConsoleProgressBar] = None @@ -295,8 +295,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]] - private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner + private[spark] val persistentRdds = new MapMaker().weakValues().makeMap[Int, RDD[_]]().asScala private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener def statusTracker: SparkStatusTracker = _statusTracker @@ -463,8 +462,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _conf.set("spark.repl.class.uri", replUri) } - _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf) - _statusTracker = new SparkStatusTracker(this) _progressBar = @@ -1721,11 +1718,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli env.metricsSystem.report() } } - if (metadataCleaner != null) { - Utils.tryLogNonFatalError { - metadataCleaner.cancel() - } - } Utils.tryLogNonFatalError { _cleaner.foreach(_.stop()) } @@ -2193,11 +2185,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ - private[spark] def cleanup(cleanupTime: Long) { - persistentRdds.clearOldValues(cleanupTime) - } - // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction. // NOTE: this must be placed at the end of the SparkContext constructor. diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index cc5f933393adf..ed877aa0a7759 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -26,7 +26,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.serializer.Serializer import org.apache.spark.storage._ -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkConf, SparkEnv} /** A group of writers for a ShuffleMapTask, one writer per reducer. */ @@ -63,10 +63,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) val completedMapTasks = new ConcurrentLinkedQueue[Int]() } - private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] - - private val metadataCleaner = - new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf) + private val shuffleStates = new scala.collection.mutable.HashMap[ShuffleId, ShuffleState] /** * Get a ShuffleWriterGroup for the given map task, which will register it as complete @@ -75,9 +72,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer, writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { new ShuffleWriterGroup { - shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers)) - private val shuffleState = shuffleStates(shuffleId) - + private val shuffleState = + shuffleStates.getOrElseUpdate(shuffleId, new ShuffleState(numReducers)) val openStartTime = System.nanoTime val serializerInstance = serializer.newInstance() val writers: Array[DiskBlockObjectWriter] = { @@ -131,11 +127,5 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) } } - private def cleanup(cleanupTime: Long) { - shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) - } - - override def stop() { - metadataCleaner.cancel() - } + override def stop(): Unit = {} } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ff64c0bf0b818..0dcef7827410f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,7 +19,9 @@ package org.apache.spark.storage import java.io._ import java.nio.{ByteBuffer, MappedByteBuffer} +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -75,7 +77,7 @@ private[spark] class BlockManager( val diskBlockManager = new DiskBlockManager(this, conf) - private val blockInfo = new HashMap[BlockId, BlockInfo] + private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo] private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) @@ -227,7 +229,7 @@ private[spark] class BlockManager( */ private def reportAllBlocks(): Unit = { logInfo(s"Reporting ${blockInfo.size} blocks to the master.") - for ((blockId, info) <- blockInfo) { + for ((blockId, info) <- blockInfo.asScala) { val status = getCurrentBlockStatus(blockId, info) if (!tryToReportBlockStatus(blockId, info, status)) { logError(s"Failed to report $blockId to master; giving up.") @@ -308,7 +310,7 @@ private[spark] class BlockManager( * NOTE: This is mainly for testing, and it doesn't fetch information from external block store. */ def getStatus(blockId: BlockId): Option[BlockStatus] = { - blockInfo.get(blockId).map { info => + blockInfo.asScala.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L // Assume that block is not in external block store @@ -322,7 +324,7 @@ private[spark] class BlockManager( * may not know of). */ def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = { - (blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq + (blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq } /** @@ -434,7 +436,7 @@ private[spark] class BlockManager( } private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { - val info = blockInfo.get(blockId).orNull + val info = blockInfo.get(blockId) if (info != null) { info.synchronized { // Double check to make sure the block is still there. There is a small chance that the @@ -442,7 +444,7 @@ private[spark] class BlockManager( // Note that this only checks metadata tracking. If user intentionally deleted the block // on disk or from off heap storage without using removeBlock, this conditional check will // still pass but eventually we will get an exception because we can't find the block. - if (blockInfo.get(blockId).isEmpty) { + if (blockInfo.asScala.get(blockId).isEmpty) { logWarning(s"Block $blockId had been removed") return None } @@ -726,7 +728,7 @@ private[spark] class BlockManager( val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! - val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) + val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo)) if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning(s"Block $blockId already exists on this machine; not re-adding it") @@ -1027,7 +1029,7 @@ private[spark] class BlockManager( data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { logInfo(s"Dropping block $blockId from memory") - val info = blockInfo.get(blockId).orNull + val info = blockInfo.get(blockId) // If the block has not already been dropped if (info != null) { @@ -1038,7 +1040,7 @@ private[spark] class BlockManager( // If we get here, the block write failed. logWarning(s"Block $blockId was marked as failure. Nothing to drop") return None - } else if (blockInfo.get(blockId).isEmpty) { + } else if (blockInfo.asScala.get(blockId).isEmpty) { logWarning(s"Block $blockId was already dropped.") return None } @@ -1090,7 +1092,7 @@ private[spark] class BlockManager( def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. logInfo(s"Removing RDD $rddId") - val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) + val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.size } @@ -1100,7 +1102,7 @@ private[spark] class BlockManager( */ def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = { logDebug(s"Removing broadcast $broadcastId") - val blocksToRemove = blockInfo.keys.collect { + val blocksToRemove = blockInfo.asScala.keys.collect { case bid @ BroadcastBlockId(`broadcastId`, _) => bid } blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) } @@ -1112,7 +1114,7 @@ private[spark] class BlockManager( */ def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") - val info = blockInfo.get(blockId).orNull + val info = blockInfo.get(blockId) if (info != null) { info.synchronized { // Removals are idempotent in disk store and memory store. At worst, we get a warning. diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala deleted file mode 100644 index 06920c4acdad5..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.util.{Timer, TimerTask} - -import org.apache.spark.{Logging, SparkConf} - -/** - * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries) - */ -private[spark] class MetadataCleaner( - cleanerType: MetadataCleanerType.MetadataCleanerType, - cleanupFunc: (Long) => Unit, - conf: SparkConf) - extends Logging -{ - val name = cleanerType.toString - - private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType) - private val periodSeconds = math.max(10, delaySeconds / 10) - private val timer = new Timer(name + " cleanup timer", true) - - - private val task = new TimerTask { - override def run() { - try { - cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) - logInfo("Ran metadata cleaner for " + name) - } catch { - case e: Exception => logError("Error running cleanup task for " + name, e) - } - } - } - - if (delaySeconds > 0) { - logDebug( - "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " + - "and period of " + periodSeconds + " secs") - timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000) - } - - def cancel() { - timer.cancel() - } -} - -private[spark] object MetadataCleanerType extends Enumeration { - - val SPARK_CONTEXT, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value - - type MetadataCleanerType = Value - - def systemProperty(which: MetadataCleanerType.MetadataCleanerType): String = { - "spark.cleaner.ttl." + which.toString - } -} - -// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the -// initialization of StreamingContext. It's okay for users trying to configure stuff themselves. -private[spark] object MetadataCleaner { - def getDelaySeconds(conf: SparkConf): Int = { - conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt - } - - def getDelaySeconds( - conf: SparkConf, - cleanerType: MetadataCleanerType.MetadataCleanerType): Int = { - conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString).toInt - } - - def setDelaySeconds( - conf: SparkConf, - cleanerType: MetadataCleanerType.MetadataCleanerType, - delay: Int) { - conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString) - } - - /** - * Set the default delay time (in seconds). - * @param conf SparkConf instance - * @param delay default delay time to set - * @param resetAll whether to reset all to default - */ - def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) { - conf.set("spark.cleaner.ttl", delay.toString) - if (resetAll) { - for (cleanerType <- MetadataCleanerType.values) { - System.clearProperty(MetadataCleanerType.systemProperty(cleanerType)) - } - } - } -} - diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala deleted file mode 100644 index 310c0c109416c..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.lang.ref.WeakReference -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable -import scala.language.implicitConversions - -import org.apache.spark.Logging - -/** - * A wrapper of TimeStampedHashMap that ensures the values are weakly referenced and timestamped. - * - * If the value is garbage collected and the weak reference is null, get() will return a - * non-existent value. These entries are removed from the map periodically (every N inserts), as - * their values are no longer strongly reachable. Further, key-value pairs whose timestamps are - * older than a particular threshold can be removed using the clearOldValues method. - * - * TimeStampedWeakValueHashMap exposes a scala.collection.mutable.Map interface, which allows it - * to be a drop-in replacement for Scala HashMaps. Internally, it uses a Java ConcurrentHashMap, - * so all operations on this HashMap are thread-safe. - * - * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed. - */ -private[spark] class TimeStampedWeakValueHashMap[A, B](updateTimeStampOnGet: Boolean = false) - extends mutable.Map[A, B]() with Logging { - - import TimeStampedWeakValueHashMap._ - - private val internalMap = new TimeStampedHashMap[A, WeakReference[B]](updateTimeStampOnGet) - private val insertCount = new AtomicInteger(0) - - /** Return a map consisting only of entries whose values are still strongly reachable. */ - private def nonNullReferenceMap = internalMap.filter { case (_, ref) => ref.get != null } - - def get(key: A): Option[B] = internalMap.get(key) - - def iterator: Iterator[(A, B)] = nonNullReferenceMap.iterator - - override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = { - val newMap = new TimeStampedWeakValueHashMap[A, B1] - val oldMap = nonNullReferenceMap.asInstanceOf[mutable.Map[A, WeakReference[B1]]] - newMap.internalMap.putAll(oldMap.toMap) - newMap.internalMap += kv - newMap - } - - override def - (key: A): mutable.Map[A, B] = { - val newMap = new TimeStampedWeakValueHashMap[A, B] - newMap.internalMap.putAll(nonNullReferenceMap.toMap) - newMap.internalMap -= key - newMap - } - - override def += (kv: (A, B)): this.type = { - internalMap += kv - if (insertCount.incrementAndGet() % CLEAR_NULL_VALUES_INTERVAL == 0) { - clearNullValues() - } - this - } - - override def -= (key: A): this.type = { - internalMap -= key - this - } - - override def update(key: A, value: B): Unit = this += ((key, value)) - - override def apply(key: A): B = internalMap.apply(key) - - override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = nonNullReferenceMap.filter(p) - - override def empty: mutable.Map[A, B] = new TimeStampedWeakValueHashMap[A, B]() - - override def size: Int = internalMap.size - - override def foreach[U](f: ((A, B)) => U): Unit = nonNullReferenceMap.foreach(f) - - def putIfAbsent(key: A, value: B): Option[B] = internalMap.putIfAbsent(key, value) - - def toMap: Map[A, B] = iterator.toMap - - /** Remove old key-value pairs with timestamps earlier than `threshTime`. */ - def clearOldValues(threshTime: Long): Unit = internalMap.clearOldValues(threshTime) - - /** Remove entries with values that are no longer strongly reachable. */ - def clearNullValues() { - val it = internalMap.getEntrySet.iterator - while (it.hasNext) { - val entry = it.next() - if (entry.getValue.value.get == null) { - logDebug("Removing key " + entry.getKey + " because it is no longer strongly reachable.") - it.remove() - } - } - } - - // For testing - - def getTimestamp(key: A): Option[Long] = { - internalMap.getTimeStampedValue(key).map(_.timestamp) - } - - def getReference(key: A): Option[WeakReference[B]] = { - internalMap.getTimeStampedValue(key).map(_.value) - } -} - -/** - * Helper methods for converting to and from WeakReferences. - */ -private object TimeStampedWeakValueHashMap { - - // Number of inserts after which entries with null references are removed - val CLEAR_NULL_VALUES_INTERVAL = 100 - - /* Implicit conversion methods to WeakReferences. */ - - implicit def toWeakReference[V](v: V): WeakReference[V] = new WeakReference[V](v) - - implicit def toWeakReferenceTuple[K, V](kv: (K, V)): (K, WeakReference[V]) = { - kv match { case (k, v) => (k, toWeakReference(v)) } - } - - implicit def toWeakReferenceFunction[K, V, R](p: ((K, V)) => R): ((K, WeakReference[V])) => R = { - (kv: (K, WeakReference[V])) => p(kv) - } - - /* Implicit conversion methods from WeakReferences. */ - - implicit def fromWeakReference[V](ref: WeakReference[V]): V = ref.get - - implicit def fromWeakReferenceOption[V](v: Option[WeakReference[V]]): Option[V] = { - v match { - case Some(ref) => Option(fromWeakReference(ref)) - case None => None - } - } - - implicit def fromWeakReferenceTuple[K, V](kv: (K, WeakReference[V])): (K, V) = { - kv match { case (k, v) => (k, fromWeakReference(v)) } - } - - implicit def fromWeakReferenceIterator[K, V]( - it: Iterator[(K, WeakReference[V])]): Iterator[(K, V)] = { - it.map(fromWeakReferenceTuple) - } - - implicit def fromWeakReferenceMap[K, V]( - map: mutable.Map[K, WeakReference[V]]) : mutable.Map[K, V] = { - mutable.Map(map.mapValues(fromWeakReference).toSeq: _*) - } -} diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala index 9b3169026cda3..25fc15dd54d04 100644 --- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.util -import java.lang.ref.WeakReference - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -34,10 +32,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite { testMap(new TimeStampedHashMap[String, String]()) testMapThreadSafety(new TimeStampedHashMap[String, String]()) - // Test TimeStampedWeakValueHashMap basic functionality - testMap(new TimeStampedWeakValueHashMap[String, String]()) - testMapThreadSafety(new TimeStampedWeakValueHashMap[String, String]()) - test("TimeStampedHashMap - clearing by timestamp") { // clearing by insertion time val map = new TimeStampedHashMap[String, String](updateTimeStampOnGet = false) @@ -68,86 +62,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite { assert(map1.get("k2").isDefined) } - test("TimeStampedWeakValueHashMap - clearing by timestamp") { - // clearing by insertion time - val map = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = false) - map("k1") = "v1" - assert(map("k1") === "v1") - Thread.sleep(10) - val threshTime = System.currentTimeMillis - assert(map.getTimestamp("k1").isDefined) - assert(map.getTimestamp("k1").get < threshTime) - map.clearOldValues(threshTime) - assert(map.get("k1") === None) - - // clearing by modification time - val map1 = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = true) - map1("k1") = "v1" - map1("k2") = "v2" - assert(map1("k1") === "v1") - Thread.sleep(10) - val threshTime1 = System.currentTimeMillis - Thread.sleep(10) - assert(map1("k2") === "v2") // access k2 to update its access time to > threshTime - assert(map1.getTimestamp("k1").isDefined) - assert(map1.getTimestamp("k1").get < threshTime1) - assert(map1.getTimestamp("k2").isDefined) - assert(map1.getTimestamp("k2").get >= threshTime1) - map1.clearOldValues(threshTime1) // should only clear k1 - assert(map1.get("k1") === None) - assert(map1.get("k2").isDefined) - } - - test("TimeStampedWeakValueHashMap - clearing weak references") { - var strongRef = new Object - val weakRef = new WeakReference(strongRef) - val map = new TimeStampedWeakValueHashMap[String, Object] - map("k1") = strongRef - map("k2") = "v2" - map("k3") = "v3" - val isEquals = map("k1") == strongRef - assert(isEquals) - - // clear strong reference to "k1" - strongRef = null - val startTime = System.currentTimeMillis - System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. - System.runFinalization() // Make a best effort to call finalizer on all cleaned objects. - while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) { - System.gc() - System.runFinalization() - Thread.sleep(100) - } - assert(map.getReference("k1").isDefined) - val ref = map.getReference("k1").get - assert(ref.get === null) - assert(map.get("k1") === None) - - // operations should only display non-null entries - assert(map.iterator.forall { case (k, v) => k != "k1" }) - assert(map.filter { case (k, v) => k != "k2" }.size === 1) - assert(map.filter { case (k, v) => k != "k2" }.head._1 === "k3") - assert(map.toMap.size === 2) - assert(map.toMap.forall { case (k, v) => k != "k1" }) - val buffer = new ArrayBuffer[String] - map.foreach { case (k, v) => buffer += v.toString } - assert(buffer.size === 2) - assert(buffer.forall(_ != "k1")) - val plusMap = map + (("k4", "v4")) - assert(plusMap.size === 3) - assert(plusMap.forall { case (k, v) => k != "k1" }) - val minusMap = map - "k2" - assert(minusMap.size === 1) - assert(minusMap.head._1 == "k3") - - // clear null values - should only clear k1 - map.clearNullValues() - assert(map.getReference("k1") === None) - assert(map.get("k1") === None) - assert(map.get("k2").isDefined) - assert(map.get("k2").get === "v2") - } - /** Test basic operations of a Scala mutable Map. */ def testMap(hashMapConstructor: => mutable.Map[String, String]) { def newMap() = hashMapConstructor diff --git a/docs/configuration.md b/docs/configuration.md index a9ef37a9b1cd9..2341852d1b18b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -830,17 +830,6 @@ Apart from these, the following properties are also available, and may be useful Which broadcast implementation to use. - - spark.cleaner.ttl - (infinite) - - Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks - generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be - forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in - case of Spark Streaming applications). Note that any RDD that persists in memory for more than - this duration will be cleared as well. - - spark.executor.cores 1 in YARN mode, all the available cores on the worker in standalone mode. @@ -1454,7 +1443,7 @@ Apart from these, the following properties are also available, and may be useful A comma separated list of ciphers. The specified ciphers must be supported by JVM. The reference list of protocols one can find on - this + a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this page. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index d0046afdeb447..98e1112d2b3d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkException, SparkConf, Logging} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.{MetadataCleaner, Utils} +import org.apache.spark.util.Utils import org.apache.spark.streaming.scheduler.JobGenerator @@ -41,7 +41,6 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray - val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll def createSparkConf(): SparkConf = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 91a43e14a8b1b..c59348a89d34f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -32,7 +32,7 @@ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName import org.apache.spark.streaming.scheduler.Job import org.apache.spark.streaming.ui.UIUtils -import org.apache.spark.util.{CallSite, MetadataCleaner, Utils} +import org.apache.spark.util.{CallSite, Utils} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -271,18 +271,6 @@ abstract class DStream[T: ClassTag] ( checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." ) - val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) - logInfo("metadataCleanupDelay = " + metadataCleanerDelay) - require( - metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, - "It seems you are doing some DStream window operation or setting a checkpoint interval " + - "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + - "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + - "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + - "set the Java cleaner delay to more than " + - math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." - ) - dependencies.foreach(_.validateAtStart()) logInfo("Slide time = " + slideDuration) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 860fac29c0ee0..f756ea2f3e71a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -81,6 +81,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + // TODO(josh): Update these exmaples to use a different configuration. myConf.set("spark.cleaner.ttl", "10s") ssc = new StreamingContext(myConf, batchDuration) assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) From e6482fad7da812fd3fe775f064e19893717f7a88 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 2 Jan 2016 14:26:55 -0800 Subject: [PATCH 06/11] Thread-safety fixes. --- .../scala/org/apache/spark/MapOutputTracker.scala | 4 ++-- .../main/scala/org/apache/spark/SparkContext.scala | 6 +++++- .../spark/shuffle/FileShuffleBlockResolver.scala | 14 +++++++++----- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 2e94ab844195d..a2675db7a5bd2 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -291,8 +291,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) // HashMaps for storing mapStatuses and cached serialized statuses in the driver. // Statuses are dropped only by explicit de-registering. - protected val mapStatuses = new HashMap[Int, Array[MapStatus]]() - private val cachedSerializedStatuses = new HashMap[Int, Array[Byte]]() + protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala + private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala def registerShuffle(shuffleId: Int, numMaps: Int) { if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c2ca31fc83f5a..26d018b048a17 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -23,6 +23,7 @@ import java.io._ import java.lang.reflect.Constructor import java.net.URI import java.util.{Arrays, Properties, UUID} +import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger} import java.util.UUID.randomUUID @@ -295,7 +296,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new MapMaker().weakValues().makeMap[Int, RDD[_]]().asScala + private[spark] val persistentRdds = { + val map : ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() + map.asScala + } private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener def statusTracker: SparkStatusTracker = _statusTracker diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index ed877aa0a7759..b2f4a730d5b32 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} import scala.collection.JavaConverters._ @@ -63,7 +63,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) val completedMapTasks = new ConcurrentLinkedQueue[Int]() } - private val shuffleStates = new scala.collection.mutable.HashMap[ShuffleId, ShuffleState] + private val shuffleStates = new ConcurrentHashMap[ShuffleId, ShuffleState] /** * Get a ShuffleWriterGroup for the given map task, which will register it as complete @@ -72,8 +72,12 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer, writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { new ShuffleWriterGroup { - private val shuffleState = - shuffleStates.getOrElseUpdate(shuffleId, new ShuffleState(numReducers)) + private val shuffleState: ShuffleState = { + // Note: we do _not_ want to just wrap this java ConcurrentHashMap into a Scala map and use + // .getOrElseUpdate() because that's actually NOT atomic. + shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers)) + shuffleStates.get(shuffleId) + } val openStartTime = System.nanoTime val serializerInstance = serializer.newInstance() val writers: Array[DiskBlockObjectWriter] = { @@ -110,7 +114,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) /** Remove all the blocks / files related to a particular shuffle. */ private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = { - shuffleStates.get(shuffleId) match { + Option(shuffleStates.get(shuffleId)) match { case Some(state) => for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) { val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) From 1d96791a0949ab8d79333ce0f38b08261a1f776a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 2 Jan 2016 23:07:23 -0800 Subject: [PATCH 07/11] Update StreamingContextSuite tests. --- .../streaming/StreamingContextSuite.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f756ea2f3e71a..19b79da117df9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -82,9 +82,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) // TODO(josh): Update these exmaples to use a different configuration. - myConf.set("spark.cleaner.ttl", "10s") + myConf.set("spark.cleaner.periodicGC.interval", "10s") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) + assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) } test("from existing SparkContext") { @@ -94,26 +94,27 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("from existing SparkContext with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", "10s") + myConf.set("spark.cleaner.periodicGC.interval", "10s") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) + assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) } test("from checkpoint") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", "10s") + myConf.set("spark.cleaner.periodicGC.interval", "10s") val ssc1 = new StreamingContext(myConf, batchDuration) addInputStream(ssc1).register() ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) assert( Utils.timeStringAsSeconds(cp.sparkConfPairs - .toMap.getOrElse("spark.cleaner.ttl", "-1")) === 10) + .toMap.getOrElse("spark.cleaner.periodicGC.interval", "-1")) === 10) ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) + assert( + newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) ssc = new StreamingContext(null, newCp, null) - assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) + assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) } test("checkPoint from conf") { @@ -289,7 +290,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("stop gracefully") { val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("spark.cleaner.ttl", "3600s") + conf.set("spark.cleaner.periodicGC.interval", "3600s") sc = new SparkContext(conf) for (i <- 1 to 4) { logInfo("==================================\n\n\n") From 75ac2185a911a2b456d7d33e3f279a9b3091d77e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 6 Jan 2016 14:28:36 -0800 Subject: [PATCH 08/11] Update tests to use dummy time config --- .../streaming/StreamingContextSuite.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 19b79da117df9..ed4db6fc253b2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -82,9 +82,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) // TODO(josh): Update these exmaples to use a different configuration. - myConf.set("spark.cleaner.periodicGC.interval", "10s") + myConf.set("spark.dummyTimeConfig", "10s") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) + assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10) } test("from existing SparkContext") { @@ -94,27 +94,27 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("from existing SparkContext with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.periodicGC.interval", "10s") + myConf.set("spark.dummyTimeConfig", "10s") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) + assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10) } test("from checkpoint") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.periodicGC.interval", "10s") + myConf.set("spark.dummyTimeConfig", "10s") val ssc1 = new StreamingContext(myConf, batchDuration) addInputStream(ssc1).register() ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) assert( Utils.timeStringAsSeconds(cp.sparkConfPairs - .toMap.getOrElse("spark.cleaner.periodicGC.interval", "-1")) === 10) + .toMap.getOrElse("spark.dummyTimeConfig", "-1")) === 10) ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) assert( - newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) + newCp.createSparkConf().getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10) ssc = new StreamingContext(null, newCp, null) - assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10) + assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10) } test("checkPoint from conf") { @@ -290,7 +290,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("stop gracefully") { val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("spark.cleaner.periodicGC.interval", "3600s") + conf.set("spark.dummyTimeConfig", "3600s") sc = new SparkContext(conf) for (i <- 1 to 4) { logInfo("==================================\n\n\n") From 8b7ca1c973531caaf2eb89e3d544c409150e5826 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 4 Jan 2016 13:57:58 -0800 Subject: [PATCH 09/11] Remove old todo --- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index ed4db6fc253b2..0ae4c45988032 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -81,7 +81,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - // TODO(josh): Update these exmaples to use a different configuration. myConf.set("spark.dummyTimeConfig", "10s") ssc = new StreamingContext(myConf, batchDuration) assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10) From 9704bc2da914f5a06978eca3ba0b96802684cd2a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 6 Jan 2016 14:34:50 -0800 Subject: [PATCH 10/11] Add missing import. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03ec4a01717e2..98075cef112db 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,6 +21,7 @@ import java.io._ import java.lang.reflect.Constructor import java.net.URI import java.util.{Arrays, Properties, UUID} +import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} import java.util.UUID.randomUUID @@ -272,7 +273,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Keeps track of all persisted RDDs private[spark] val persistentRdds = { - val map : ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() + val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() map.asScala } private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener From 24519631c722d337af938102842d09e3d6f1b132 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 6 Jan 2016 15:29:27 -0800 Subject: [PATCH 11/11] Update configuration.md --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 53ed2813f5b6b..3ffc77dcc62e0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1428,7 +1428,7 @@ Apart from these, the following properties are also available, and may be useful A comma separated list of ciphers. The specified ciphers must be supported by JVM. The reference list of protocols one can find on - a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this + this page.