Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Uniform calling for getFile and print root cause for doPut
  • Loading branch information
caneGuy committed Sep 5, 2017
commit 32f0b75df434703b0fd253a75a07c06cbac8276a
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,12 @@ private[spark] class BlockManager(
logWarning(s"Putting block $blockId failed")
}
res
} catch {
// Since removeBlockInternal may throw exception,
// we should print exception first to show root cause.
case e: Throwable =>
logWarning(s"Putting block $blockId failed due to exception $e.")
throw e
} finally {
// This cleanup is performed in a finally block rather than a `catch` to avoid having to
// catch and properly re-throw InterruptedException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
private def getFileInternal(filename: String): File = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this rename?

// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
Expand All @@ -77,11 +77,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
new File(subDir, filename)
}

def getFile(blockId: BlockId): File = getFile(blockId.name)
def getFile(blockId: BlockId): File = getFileInternal(blockId.name)

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getFile(blockId.name).exists()
getFileInternal(blockId.name).exists()
}

/** List all the files currently stored on disk by the disk manager. */
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[spark] class DiskStore(
}

def getBytes(blockId: BlockId): BlockData = {
val file = diskManager.getFile(blockId.name)
val file = diskManager.getFile(blockId)
val blockSize = getSize(blockId)

securityManager.getIOEncryptionKey() match {
Expand All @@ -116,7 +116,7 @@ private[spark] class DiskStore(

def remove(blockId: BlockId): Boolean = {
blockSizes.remove(blockId.name)
val file = diskManager.getFile(blockId.name)
val file = diskManager.getFile(blockId)
if (file.exists()) {
val ret = file.delete()
if (!ret) {
Expand All @@ -129,7 +129,7 @@ private[spark] class DiskStore(
}

def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
val file = diskManager.getFile(blockId)
file.exists()
}

Expand Down