From d3edd9598055abd684b58eb9bb86990e470cffb6 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 23 Sep 2021 13:38:01 +0800 Subject: [PATCH 1/3] fix --- .../storage/BlockManagerMasterEndpoint.scala | 63 ++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 2df26f3530ef..00e456af4e8c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -117,15 +117,20 @@ class BlockManagerMasterEndpoint( case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => - val response = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) - response.foreach { isSuccess => + @inline def handleResult(success: Boolean): Unit = { // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo // returns false since the block info would be updated again later. - if (isSuccess) { + if (success) { listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) } - context.reply(isSuccess) + context.reply(success) + } + + if (blockId.isShuffle) { + updateShuffleBlockInfo(blockId, blockManagerId).foreach(handleResult) + } else { + handleResult(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)) } case GetLocations(blockId) => @@ -571,46 +576,48 @@ class BlockManagerMasterEndpoint( id } + private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId) + : Future[Boolean] = { + blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + Future { + // We need to update this at index file because there exists the index-only block + logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.") + mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) + true + } + case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => + logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.") + Future.successful(true) + case _ => + logDebug(s"Unexpected shuffle block type ${blockId}" + + s"as ${blockId.getClass().getSimpleName()}") + Future.successful(false) + } + } + + private def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): Future[Boolean] = { + diskSize: Long): Boolean = { logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}") - if (blockId.isShuffle) { - blockId match { - case ShuffleIndexBlockId(shuffleId, mapId, _) => - // We need to update this at index file because there exists the index-only block - return Future { - logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.") - mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) - true - } - case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => - logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.") - return Future.successful(true) - case _ => - logDebug(s"Unexpected shuffle block type ${blockId}" + - s"as ${blockId.getClass().getSimpleName()}") - return Future.successful(false) - } - } - if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. - return Future.successful(true) + return true } else { - return Future.successful(false) + return false } } if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - return Future.successful(true) + return true } blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) @@ -642,7 +649,7 @@ class BlockManagerMasterEndpoint( if (locations.size == 0) { blockLocations.remove(blockId) } - Future.successful(true) + true } private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { From d8c4302424ff95680bd68639b0f9e23ef746ec37 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 23 Sep 2021 13:39:45 +0800 Subject: [PATCH 2/3] remove line --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 00e456af4e8c..54f099c5b0e4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -596,7 +596,6 @@ class BlockManagerMasterEndpoint( } } - private def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, From dd32bc2e85bc831a159edd4ec467cdda3944e457 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 23 Sep 2021 13:55:31 +0800 Subject: [PATCH 3/3] add comment --- .../apache/spark/storage/BlockManagerMasterEndpoint.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 54f099c5b0e4..6f043da76d5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -580,6 +580,13 @@ class BlockManagerMasterEndpoint( : Future[Boolean] = { blockId match { case ShuffleIndexBlockId(shuffleId, mapId, _) => + // SPARK-36782: Invoke `MapOutputTracker.updateMapOutput` within the thread + // `dispatcher-BlockManagerMaster` could lead to the deadlock when + // `MapOutputTracker.serializeOutputStatuses` broadcasts the serialized mapstatues under + // the acquired write lock. The broadcast block would report its status to + // `BlockManagerMasterEndpoint`, while the `BlockManagerMasterEndpoint` is occupied by + // `updateMapOutput` since it's waiting for the write lock. Thus, we use `Future` to call + // `updateMapOutput` in a separate thread to avoid the deadlock. Future { // We need to update this at index file because there exists the index-only block logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.")