Skip to content
Closed
Prev Previous commit
Next Next commit
Remove from BlockManager.
  • Loading branch information
JoshRosen committed Dec 31, 2015
commit f2c2f5dd5820a41e31ef73b5b918299649f8cd72
39 changes: 1 addition & 38 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private[spark] class BlockManager(

val diskBlockManager = new DiskBlockManager(this, conf)

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
private val blockInfo = new HashMap[BlockId, BlockInfo]

private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
Expand Down Expand Up @@ -147,11 +147,6 @@ private[spark] class BlockManager(
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object

private val metadataCleaner = new MetadataCleaner(
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)

// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
Expand Down Expand Up @@ -1141,36 +1136,6 @@ private[spark] class BlockManager(
}
}

private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = {
logInfo(s"Dropping non broadcast blocks older than $cleanupTime")
dropOldBlocks(cleanupTime, !_.isBroadcast)
}

private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = {
logInfo(s"Dropping broadcast blocks older than $cleanupTime")
dropOldBlocks(cleanupTime, _.isBroadcast)
}

private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = {
val iterator = blockInfo.getEntrySet.iterator
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
if (time < cleanupTime && shouldDrop(id)) {
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
val status = getCurrentBlockStatus(id, info)
reportBlockStatus(id, info, status)
}
}
}

private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
Expand Down Expand Up @@ -1248,8 +1213,6 @@ private[spark] class BlockManager(
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ private[spark] class MetadataCleaner(

private[spark] object MetadataCleanerType extends Enumeration {

val SPARK_CONTEXT, BLOCK_MANAGER,
SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
val SPARK_CONTEXT, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value

type MetadataCleanerType = Value

Expand Down