From 666fb4c5d343a1ea439ecc284d047810d6189c23 Mon Sep 17 00:00:00 2001 From: sharkdtu Date: Thu, 28 Jun 2018 15:35:52 +0800 Subject: [PATCH 1/4] give priority in use of 'PROCESS_LOCAL' for spark-streaming --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index df1a4bef616b..d20857369f55 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1569,7 +1569,7 @@ private[spark] object BlockManager { val blockManagers = new HashMap[BlockId, Seq[String]] for (i <- 0 until blockIds.length) { - blockManagers(blockIds(i)) = blockLocations(i).map(_.host) + blockManagers(blockIds(i)) = blockLocations(i).map(b => s"executor_${b.host}_${b.executorId}") } blockManagers.toMap } From adf39a53b24687154513028b4104239233c5c760 Mon Sep 17 00:00:00 2001 From: sharkdtu Date: Fri, 6 Jul 2018 16:39:04 +0800 Subject: [PATCH 2/4] optimize the code style --- core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala | 2 +- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 4e036c2ed49b..23cf19d55b4a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -30,7 +30,7 @@ private[spark] class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { - @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) + @transient lazy val _locations = BlockManager.blockIdsToLocations(blockIds, SparkEnv.get) @volatile private var _isValid = true override def getPartitions: Array[Partition] = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d20857369f55..b37fbb3b651f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -45,6 +45,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv +import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ @@ -1554,7 +1555,7 @@ private[spark] class BlockManager( private[spark] object BlockManager { private val ID_GENERATOR = new IdGenerator - def blockIdsToHosts( + def blockIdsToLocations( blockIds: Array[BlockId], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { @@ -1569,7 +1570,8 @@ private[spark] object BlockManager { val blockManagers = new HashMap[BlockId, Seq[String]] for (i <- 0 until blockIds.length) { - blockManagers(blockIds(i)) = blockLocations(i).map(b => s"executor_${b.host}_${b.executorId}") + blockManagers(blockIds(i)) = blockLocations(i).map( + loc => ExecutorCacheTaskLocation(loc.host, loc.executorId).toString) } blockManagers.toMap } From 47502603d0e2116fb3b789335bf6ebf7836c61de Mon Sep 17 00:00:00 2001 From: sharkdtu Date: Mon, 9 Jul 2018 10:40:46 +0800 Subject: [PATCH 3/4] add UT --- .../apache/spark/storage/BlockManager.scala | 5 +++-- .../spark/storage/BlockManagerSuite.scala | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b37fbb3b651f..0e1c7d5fd3fa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1570,8 +1570,9 @@ private[spark] object BlockManager { val blockManagers = new HashMap[BlockId, Seq[String]] for (i <- 0 until blockIds.length) { - blockManagers(blockIds(i)) = blockLocations(i).map( - loc => ExecutorCacheTaskLocation(loc.host, loc.executorId).toString) + blockManagers(blockIds(i)) = blockLocations(i).map { loc => + ExecutorCacheTaskLocation(loc.host, loc.executorId).toString + } } blockManagers.toMap } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index b19d8ebf72c6..f8dd0a4a068b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1422,6 +1422,24 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(mockBlockTransferService.tempFileManager === store.remoteBlockTempFileManager) } + test("query locations of blockIds") { + val mockBlockManagerMaster = mock(classOf[BlockManagerMaster]) + val blockLocations = Seq(BlockManagerId("1", "host1", 100), BlockManagerId("2", "host2", 200)) + when(mockBlockManagerMaster.getLocations(mc.any[Array[BlockId]])) + .thenReturn(Array(blockLocations)) + + val bm = mock(classOf[BlockManager]) + when(bm.master).thenReturn(mockBlockManagerMaster) + + val env = mock(classOf[SparkEnv]) + when(env.blockManager).thenReturn(bm) + + val blockId = StreamBlockId(1, 2) + val locs = BlockManager.blockIdsToLocations(Array(blockId), env) + val expectedLocs = Seq("executor_host1_1", "executor_host2_2") + assert(locs(blockId) == expectedLocs) + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 var tempFileManager: TempFileManager = null From 380f242cea5ef1d43c0ec42f57598cf21b24c3e2 Mon Sep 17 00:00:00 2001 From: sharkdtu Date: Tue, 10 Jul 2018 12:49:30 +0800 Subject: [PATCH 4/4] fix UT --- .../org/apache/spark/storage/BlockManagerSuite.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f8dd0a4a068b..08172f0b07b7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1427,17 +1427,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockLocations = Seq(BlockManagerId("1", "host1", 100), BlockManagerId("2", "host2", 200)) when(mockBlockManagerMaster.getLocations(mc.any[Array[BlockId]])) .thenReturn(Array(blockLocations)) - - val bm = mock(classOf[BlockManager]) - when(bm.master).thenReturn(mockBlockManagerMaster) - val env = mock(classOf[SparkEnv]) - when(env.blockManager).thenReturn(bm) - val blockId = StreamBlockId(1, 2) - val locs = BlockManager.blockIdsToLocations(Array(blockId), env) + val blockIds: Array[BlockId] = Array(StreamBlockId(1, 2)) + val locs = BlockManager.blockIdsToLocations(blockIds, env, mockBlockManagerMaster) val expectedLocs = Seq("executor_host1_1", "executor_host2_2") - assert(locs(blockId) == expectedLocs) + assert(locs(blockIds(0)) == expectedLocs) } class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {