Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.network

import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.storage.StorageLevel

private[spark]
trait BlockDataManager {
Expand All @@ -29,6 +31,12 @@ trait BlockDataManager {
*/
def getBlockData(blockId: BlockId): ManagedBuffer

/**
* Interface to get local block data managed by given BlockManagerId.
* Throws an exception if the block cannot be found or cannot be read successfully.
*/
def getBlockData(blockId: BlockId, blockManagerId: BlockManagerId): ManagedBuffer

/**
* Put the block locally, using the given storage level.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
val file = blockManager.diskBlockManager.getFile(blockId)
override def getBlockData(blockId: ShuffleBlockId, blockManagerId: BlockManagerId)
: ManagedBuffer = {
val file = blockManager.diskBlockManager.getFile(blockId, blockManagerId)
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,19 @@ private[spark] class IndexShuffleBlockResolver(

private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")

def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
def getDataFile(shuffleId: Int, mapId: Int): File =
getDataFile(shuffleId, mapId, blockManager.blockManagerId)

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
private def getDataFile(shuffleId: Int, mapId: Int, blockManagerId: BlockManagerId): File =
blockManager.diskBlockManager.getFile(
ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID), blockManagerId)

private def getIndexFile(shuffleId: Int, mapId: Int): File =
getIndexFile(shuffleId, mapId, blockManager.blockManagerId)

private def getIndexFile(shuffleId: Int, mapId: Int, blockManagerId: BlockManagerId): File =
blockManager.diskBlockManager.getFile(
ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), blockManagerId)

/**
* Remove data file and index file that contain the output data from one map.
Expand Down Expand Up @@ -183,10 +189,12 @@ private[spark] class IndexShuffleBlockResolver(
}
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
override def getBlockData(
blockId: ShuffleBlockId,
blockManagerId: BlockManagerId): ManagedBuffer = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId, blockManagerId)

val in = new DataInputStream(new FileInputStream(indexFile))
try {
Expand All @@ -195,7 +203,7 @@ private[spark] class IndexShuffleBlockResolver(
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
getDataFile(blockId.shuffleId, blockId.mapId),
getDataFile(blockId.shuffleId, blockId.mapId, blockManagerId),
offset,
nextOffset - offset)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.spark.shuffle

import java.nio.ByteBuffer
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId}

private[spark]
/**
Expand All @@ -35,7 +34,7 @@ trait ShuffleBlockResolver {
* Retrieve the data for the specified block. If the data for that block is not available,
* throws an unspecified exception.
*/
def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
def getBlockData(blockId: ShuffleBlockId, blockManagerId: BlockManagerId): ManagedBuffer

def stop(): Unit
}
18 changes: 13 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ private[spark] class BlockManager(
blockManagerId
}

master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
master.registerBlockManager(
blockManagerId, maxMemory, diskBlockManager.getLocalDirsPath(), slaveEndpoint)

// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
Expand Down Expand Up @@ -250,7 +251,8 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo("BlockManager re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
master.registerBlockManager(
blockManagerId, maxMemory, diskBlockManager.getLocalDirsPath(), slaveEndpoint)
reportAllBlocks()
}

Expand Down Expand Up @@ -286,9 +288,10 @@ private[spark] class BlockManager(
* Interface to get local block data. Throws an exception if the block cannot be found or
* cannot be read successfully.
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
override def getBlockData(blockId: BlockId, blockManagerId: BlockManagerId): ManagedBuffer = {
if (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
shuffleManager.shuffleBlockResolver.getBlockData(
blockId.asInstanceOf[ShuffleBlockId], blockManagerId)
} else {
val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
.asInstanceOf[Option[ByteBuffer]]
Expand All @@ -301,6 +304,10 @@ private[spark] class BlockManager(
}
}

override def getBlockData(blockId: BlockId): ManagedBuffer = {
getBlockData(blockId, this.blockManagerId)
}

/**
* Put the block locally, using the given storage level.
*/
Expand Down Expand Up @@ -432,7 +439,8 @@ private[spark] class BlockManager(
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId], blockManagerId)
.nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ class BlockManagerMaster(

/** Register the BlockManager's id with the driver. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
blockManagerId: BlockManagerId,
maxMemSize: Long,
localDirsPath: Array[String],
slaveEndpoint: RpcEndpointRef): Unit = {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
tell(RegisterBlockManager(blockManagerId, maxMemSize, localDirsPath, slaveEndpoint))
logInfo("Registered BlockManager")
}

Expand Down Expand Up @@ -74,6 +77,12 @@ class BlockManagerMaster(
GetLocationsMultipleBlockIds(blockIds))
}

/** Return other blockmanager's local dirs with the given blockManagerId */
def getLocalDirsPath(blockManagerId: BlockManagerId): Map[BlockManagerId, Array[String]] = {
driverEndpoint.askWithRetry[Map[BlockManagerId, Array[String]]](
GetLocalDirsPath(blockManagerId))
}

/**
* Check if block manager master has a block. Note that this can be used to check for only
* those blocks that are reported to block manager master.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package org.apache.spark.storage

import java.util.{HashMap => JHashMap}

import scala.collection.immutable.HashSet
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, ThreadSafeRpcEndpoint}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SparkConf}

/**
* BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
Expand All @@ -56,8 +55,8 @@ class BlockManagerMasterEndpoint(
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
register(blockManagerId, maxMemSize, slaveEndpoint)
case RegisterBlockManager(blockManagerId, maxMemSize, localDirsPath, slaveEndpoint) =>
register(blockManagerId, maxMemSize, localDirsPath, slaveEndpoint)
context.reply(true)

case _updateBlockInfo @ UpdateBlockInfo(
Expand All @@ -81,6 +80,9 @@ class BlockManagerMasterEndpoint(
case GetMemoryStatus =>
context.reply(memoryStatus)

case GetLocalDirsPath(blockManagerId) =>
context.reply(getLocalDirsPath(blockManagerId))

case GetStorageStatus =>
context.reply(storageStatus)

Expand Down Expand Up @@ -235,11 +237,20 @@ class BlockManagerMasterEndpoint(

// Return a map from the block manager id to max memory and remaining memory.
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
blockManagerInfo.map { case(blockManagerId, info) =>
blockManagerInfo.map { case (blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
}

// Return the local dirs of a block manager with the given blockManagerId
private def getLocalDirsPath(blockManagerId: BlockManagerId)
: Map[BlockManagerId, Array[String]] = {
blockManagerInfo
.filter { case (id, _) => id != blockManagerId && id.host == blockManagerId.host }
.mapValues { info => info.localDirsPath }
.toMap
}

private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
Expand Down Expand Up @@ -299,7 +310,11 @@ class BlockManagerMasterEndpoint(
).map(_.flatten.toSeq)
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
private def register(
id: BlockManagerId,
maxMemSize: Long,
localDirsPath: Array[String],
slaveEndpoint: RpcEndpointRef): Unit = {
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
Expand All @@ -316,7 +331,7 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor(id.executorId) = id

blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
id, System.currentTimeMillis(), maxMemSize, localDirsPath, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}
Expand Down Expand Up @@ -423,6 +438,7 @@ private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val localDirsPath: Array[String],
val slaveEndpoint: RpcEndpointRef)
extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[spark] object BlockManagerMessages {
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
localDirsPath: Array[String],
sender: RpcEndpointRef)
extends ToBlockManagerMaster

Expand Down Expand Up @@ -109,4 +110,6 @@ private[spark] object BlockManagerMessages {
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster

case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.storage
import java.util.UUID
import java.io.{IOException, File}

import scala.collection.mutable

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.{ShutdownHookManager, Utils}
Expand Down Expand Up @@ -51,16 +53,26 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

// Cache local directories for other block managers
private val localDirsByOtherBlkMgr = new mutable.HashMap[BlockManagerId, Array[String]]

private val shutdownHook = addShutdownHook()

def blockManagerId: BlockManagerId = blockManager.blockManagerId

def getLocalDirsPath(): Array[String] = {
localDirs.map(_.getAbsolutePath)
}

def getLocalDirsPath(blockManagerId: BlockManagerId): Map[BlockManagerId, Array[String]] = {
blockManager.master.getLocalDirsPath(blockManagerId)
}

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val (dirId, subDirId) = getDirInfo(filename, localDirs.length)

// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
Expand All @@ -82,6 +94,39 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon

def getFile(blockId: BlockId): File = getFile(blockId.name)

def getFile(blockId: BlockId, blockManagerId: BlockManagerId): File = {
if (this.blockManagerId == blockManagerId) {
getFile(blockId)
} else {
// Get a file from another block manager with given blockManagerId
val dirs = localDirsByOtherBlkMgr.synchronized {
localDirsByOtherBlkMgr.getOrElse(blockManagerId, {
localDirsByOtherBlkMgr ++= getLocalDirsPath(this.blockManagerId)
localDirsByOtherBlkMgr.getOrElse(blockManagerId, {
throw new IOException(s"Block manager (${blockManagerId}) not found " +
s"in host '${this.blockManagerId.host}'")
})
})
}
val (dirId, subDirId) = getDirInfo(blockId.name, dirs.length)
val file = new File(new File(dirs(dirId), "%02x".format(subDirId)), blockId.name)
if (!file.exists()) {
throw new IOException(s"File '${file}' not found in local dir")
}
logInfo(s"${this.blockManagerId} bypasses network access and " +
s"directly reads file '${file}' in local dir")
file
}
}

def getDirInfo(filename: String, numDirs: Int): (Int, Int) = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % numDirs
val subDirName = (hash / numDirs) % subDirsPerLocalDir
(dirId, subDirName)
}

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getFile(blockId.name).exists()
Expand Down Expand Up @@ -166,7 +211,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
// Only perform cleanup if an external service is not serving our shuffle files.
// Also blockManagerId could be null if block manager is not initialized properly.
if (!blockManager.externalShuffleServiceEnabled ||
(blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
(this.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
Expand Down
Loading