Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
CR
  • Loading branch information
sameeragarwal committed Apr 8, 2016
commit e41cae8a8a3fb248e643d68d5e9d144292b6ad2f
Original file line number Diff line number Diff line change
Expand Up @@ -1111,9 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
} {
writer.close(hadoopContext)
}
}(finallyBlock = writer.close(hadoopContext))
committer.commitTask(hadoopContext)
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
om.setBytesWritten(callback())
Expand Down Expand Up @@ -1200,9 +1198,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
} {
writer.close()
}
}(finallyBlock = writer.close())
writer.commit()
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
om.setBytesWritten(callback())
Expand Down
60 changes: 19 additions & 41 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1260,26 +1260,35 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a block of code, call the failure callbacks before finally block if there is any
* exceptions happen. But if exceptions happen in the finally block, do not suppress the original
* exception.
* Execute a block of code and call the failure callbacks in the catch block. If exceptions occur
* in either the catch or the finally block, they are appended to the list of suppressed
* exceptions in original exception which is then rethrown.
*
* This is primarily an issue with `finally { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* This is primarily an issue with `catch { abort() }` or `finally { out.close() }` blocks,
* where the abort/close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `abort` or `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)(finallyBlock: => Unit): T = {
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)
(catchBlock: => Unit = (), finallyBlock: => Unit = ()): T = {
var originalThrowable: Throwable = null
try {
block
} catch {
case t: Throwable =>
case cause: Throwable =>
// Purposefully not using NonFatal, because even fatal exceptions
// we don't want to have our finallyBlock suppress
originalThrowable = t
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(t)
originalThrowable = cause
try {
logError("Aborting task", originalThrowable)
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
catchBlock
} catch {
case t: Throwable =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
}
throw originalThrowable
} finally {
try {
Expand All @@ -1297,37 +1306,6 @@ private[spark] object Utils extends Logging {
}
}

/**
* Execute a block of code, then a catch block, but if exceptions happen in
* the catch block, do not suppress the original exception.
*
* This is primarily an issue with `catch { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeCatchAndFailureCallbacks[T](block: => T)(catchBlock: => Unit): T = {
try {
block
} catch {
case cause: Throwable =>
// Purposefully not using NonFatal, because for even fatal exceptions
// we don't want to have our catchBlock suppress
val originalThrowable = cause
try {
logError("Aborting task", originalThrowable)
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
catchBlock
} catch {
case t: Throwable =>
logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
originalThrowable.addSuppressed(t)
}
throw originalThrowable
}
}

/** Default filtering function for finding call sites using `getCallSite`. */
private def sparkInternalExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the internal Spark API's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,13 @@ private[sql] class DefaultWriterContainer(

// If anything below fails, we should abort the task.
try {
Utils.tryWithSafeCatchAndFailureCallbacks {
Utils.tryWithSafeFinallyAndFailureCallbacks {
while (iterator.hasNext) {
val internalRow = iterator.next()
writer.writeInternal(internalRow)
}
commitTask()
} {
abortTask()
}
}(catchBlock = abortTask())
} catch {
case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
Expand Down Expand Up @@ -420,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer(
// If anything below fails, we should abort the task.
var currentWriter: OutputWriter = null
try {
Utils.tryWithSafeCatchAndFailureCallbacks {
Utils.tryWithSafeFinallyAndFailureCallbacks {
var currentKey: UnsafeRow = null
while (sortedIterator.next()) {
val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
Expand All @@ -442,12 +440,12 @@ private[sql] class DynamicPartitionWriterContainer(
}

commitTask()
} {
}(catchBlock = {
if (currentWriter != null) {
currentWriter.close()
}
abortTask()
}
})
} catch {
case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
Expand Down