diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 2df26f3530ef..6f043da76d5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -117,15 +117,20 @@ class BlockManagerMasterEndpoint( case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => - val response = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) - response.foreach { isSuccess => + @inline def handleResult(success: Boolean): Unit = { // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo // returns false since the block info would be updated again later. - if (isSuccess) { + if (success) { listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) } - context.reply(isSuccess) + context.reply(success) + } + + if (blockId.isShuffle) { + updateShuffleBlockInfo(blockId, blockManagerId).foreach(handleResult) + } else { + handleResult(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)) } case GetLocations(blockId) => @@ -571,46 +576,54 @@ class BlockManagerMasterEndpoint( id } + private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId) + : Future[Boolean] = { + blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + // SPARK-36782: Invoke `MapOutputTracker.updateMapOutput` within the thread + // `dispatcher-BlockManagerMaster` could lead to the deadlock when + // `MapOutputTracker.serializeOutputStatuses` broadcasts the serialized mapstatues under + // the acquired write lock. The broadcast block would report its status to + // `BlockManagerMasterEndpoint`, while the `BlockManagerMasterEndpoint` is occupied by + // `updateMapOutput` since it's waiting for the write lock. Thus, we use `Future` to call + // `updateMapOutput` in a separate thread to avoid the deadlock. + Future { + // We need to update this at index file because there exists the index-only block + logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.") + mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) + true + } + case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => + logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.") + Future.successful(true) + case _ => + logDebug(s"Unexpected shuffle block type ${blockId}" + + s"as ${blockId.getClass().getSimpleName()}") + Future.successful(false) + } + } + private def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): Future[Boolean] = { + diskSize: Long): Boolean = { logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}") - if (blockId.isShuffle) { - blockId match { - case ShuffleIndexBlockId(shuffleId, mapId, _) => - // We need to update this at index file because there exists the index-only block - return Future { - logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.") - mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) - true - } - case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => - logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.") - return Future.successful(true) - case _ => - logDebug(s"Unexpected shuffle block type ${blockId}" + - s"as ${blockId.getClass().getSimpleName()}") - return Future.successful(false) - } - } - if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. - return Future.successful(true) + return true } else { - return Future.successful(false) + return false } } if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - return Future.successful(true) + return true } blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) @@ -642,7 +655,7 @@ class BlockManagerMasterEndpoint( if (locations.size == 0) { blockLocations.remove(blockId) } - Future.successful(true) + true } private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {