Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-36782] Avoid blocking dispatcher-BlockManagerMaster during Upd…
…ateBlockInfo

Delegate task to threadpool and register callback for succesful
completion. Reply to caller once future finished succesfully.

To avoid java.util.ConcurrentModificationException we have to protect
the blockLocations using locks.
  • Loading branch information
f-thiele committed Sep 19, 2021
commit 1278c41e88e18fdabdcbc3b0938f827327ecf262
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.IOException
import java.util.{HashMap => JHashMap}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -74,6 +75,30 @@ class BlockManagerMasterEndpoint(
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

private val (readBlockLoc, writeBlockLoc) = {
val lock = new ReentrantReadWriteLock()
(lock.readLock(), lock.writeLock())
}

// All accesses to blockLocations must be guarded with withReadBlockLoc or withWriteBlockLoc.
private def withReadBlockLoc[B](fn: => B): B = {
readBlockLoc.lock()
try {
fn
} finally {
readBlockLoc.unlock()
}
}

private def withWriteBlockLoc[B](fn: => B): B = {
writeBlockLoc.lock()
try {
fn
} finally {
writeBlockLoc.unlock()
}
}

// Mapping from host name to shuffle (mergers) services where the current app
// registered an executor in the past. Older hosts are removed when the
// maxRetainedMergerLocations size is reached in favor of newer locations.
Expand Down Expand Up @@ -117,12 +142,17 @@ class BlockManagerMasterEndpoint(

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
context.reply(isSuccess)
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (isSuccess) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
val response = Future {
updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
}

response.foreach { isSuccess =>
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (isSuccess) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
}
context.reply(isSuccess)
Copy link
Member

@Ngone51 Ngone51 Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes the responses of non-shuffle blocks also be handled in the thread pool. I'm afraid this introduces unexpected overhead. Shall we only do this for the shuffle blocks only and leave the non-shuffle block the same behavior as it is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not realize this - thanks for pointing it out !
So if I understood it right, the proposal is:

  def handleResult(success: Boolean): Unit = {
    if (success) {
      // post
    }
    context.reply(success)
  }

  if (blockId.isShuffle) {
    updateShuffleBlockInfo( ... ).foreach( handleResult(_))
  } else {
    handleResult(updateBlockInfo( ... ))
  }

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given @gengliangwang has merged it, can you create a follow up PR ? We can merge it pretty quickly and possible make that into current 3.2 RC as well :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

}

case GetLocations(blockId) =>
Expand Down Expand Up @@ -251,12 +281,16 @@ class BlockManagerMasterEndpoint(
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks and create the futures which asynchronously
// remove the blocks from storage endpoints and gives back the number of removed blocks
val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
val blocks = withReadBlockLoc {
blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
}
val blocksToDeleteByShuffleService =
new mutable.HashMap[BlockManagerId, mutable.HashSet[RDDBlockId]]

blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.remove(blockId)
val bms: mutable.HashSet[BlockManagerId] = withWriteBlockLoc {
blockLocations.remove(blockId)
}

val (bmIdsExtShuffle, bmIdsExecutor) = bms.partition(_.port == externalShuffleServicePort)
val liveExecutorsForBlock = bmIdsExecutor.map(_.executorId).toSet
Expand Down Expand Up @@ -347,15 +381,15 @@ class BlockManagerMasterEndpoint(
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
val locations = withReadBlockLoc { blockLocations.get(blockId) }
locations -= blockManagerId
// De-register the block if none of the block managers have it. Otherwise, if pro-active
// replication is enabled, and a block is either an RDD or a test block (the latter is used
// for unit testing), we send a message to a randomly chosen executor location to replicate
// the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
// etc.) as replication doesn't make much sense in that context.
if (locations.size == 0) {
blockLocations.remove(blockId)
withWriteBlockLoc { blockLocations.remove(blockId) }
logWarning(s"No more replicas available for $blockId !")
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
// As a heuristic, assume single executor failure to find out the number of replicas that
Expand Down Expand Up @@ -404,7 +438,7 @@ class BlockManagerMasterEndpoint(

val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val currentBlockLocations = withReadBlockLoc { blockLocations.get(blockId) }
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
Expand All @@ -420,7 +454,7 @@ class BlockManagerMasterEndpoint(
// Remove a block from the workers that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlockFromWorkers(blockId: BlockId): Unit = {
val locations = blockLocations.get(blockId)
val locations = withReadBlockLoc { blockLocations.get(blockId) }
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
Expand Down Expand Up @@ -611,11 +645,13 @@ class BlockManagerMasterEndpoint(
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)

var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
withWriteBlockLoc {
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
}

if (storageLevel.isValid) {
Expand All @@ -635,19 +671,24 @@ class BlockManagerMasterEndpoint(

// Remove the block from master tracking if it has been removed on all endpoints.
if (locations.size == 0) {
blockLocations.remove(blockId)
withWriteBlockLoc {
blockLocations.remove(blockId)
}
}
true
}

private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
private def getLocations(blockId: BlockId): Seq[BlockManagerId] =
withReadBlockLoc {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}

private def getLocationsAndStatus(
blockId: BlockId,
requesterHost: String): Option[BlockLocationsAndStatus] = {
val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
val locations = withReadBlockLoc {
Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
}
val status = locations.headOption.flatMap { bmId =>
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
runDecomTest(true, true, JobEnded)
}

test(s"SPARK-XXX not deadlock if MapOutput uses broadcast") {
test(s"SPARK-36782 not deadlock if MapOutput uses broadcast") {
runDecomTest(false, true, JobEnded, forceMapOutputBroadcast = true)
}

Expand Down