Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ private[spark] abstract class Task[T](
}
try {
runTask(context)
} catch { case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
context.markTaskFailed(e)
throw e
} catch {
case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
try {
context.markTaskFailed(e)
} catch {
case t: Throwable =>
e.addSuppressed(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice - didn't know this existed

}
throw e
} finally {
// Call the task completion callbacks.
context.markTaskCompleted()
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,37 @@ private[spark] object Utils extends Logging {
}
}

/**
* Execute a block of code, then a catch block, but if exceptions happen in
* the catch block, do not suppress the original exception.
*
* This is primarily an issue with `catch { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeCatchAndFailureCallbacks[T](block: => T)(catchBlock: => Unit): T = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant with the method above; they can be unified right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks. folded these changes in the method above

try {
block
} catch {
case cause: Throwable =>
// Purposefully not using NonFatal, because for even fatal exceptions
// we don't want to have our catchBlock suppress
val originalThrowable = cause
try {
logError("Aborting task", originalThrowable)
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
catchBlock
} catch {
case t: Throwable =>
logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
originalThrowable.addSuppressed(t)
}
throw originalThrowable
}
}

/** Default filtering function for finding call sites using `getCallSite`. */
private def sparkInternalExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the internal Spark API's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, Utils}

/** A container for all the details required when writing to a table. */
case class WriteRelation(
Expand Down Expand Up @@ -255,19 +255,18 @@ private[sql] class DefaultWriterContainer(

// If anything below fails, we should abort the task.
try {
while (iterator.hasNext) {
val internalRow = iterator.next()
writer.writeInternal(internalRow)
Utils.tryWithSafeCatchAndFailureCallbacks {
while (iterator.hasNext) {
val internalRow = iterator.next()
writer.writeInternal(internalRow)
}
commitTask()
} {
abortTask()
}

commitTask()
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
// call failure callbacks first, so we could have a chance to cleanup the writer.
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
}

def commitTask(): Unit = {
Expand Down Expand Up @@ -421,37 +420,37 @@ private[sql] class DynamicPartitionWriterContainer(
// If anything below fails, we should abort the task.
var currentWriter: OutputWriter = null
try {
var currentKey: UnsafeRow = null
while (sortedIterator.next()) {
val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
if (currentKey != nextKey) {
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
Utils.tryWithSafeCatchAndFailureCallbacks {
var currentKey: UnsafeRow = null
while (sortedIterator.next()) {
val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
if (currentKey != nextKey) {
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
currentKey = nextKey.copy()
logDebug(s"Writing partition: $currentKey")

currentWriter = newOutputWriter(currentKey, getPartitionString)
}
currentKey = nextKey.copy()
logDebug(s"Writing partition: $currentKey")

currentWriter = newOutputWriter(currentKey, getPartitionString)
currentWriter.writeInternal(sortedIterator.getValue)
}
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
currentWriter.writeInternal(sortedIterator.getValue)
}
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}

commitTask()
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
// call failure callbacks first, so we could have a chance to cleanup the writer.
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
commitTask()
} {
if (currentWriter != null) {
currentWriter.close()
}
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
}
} catch {
case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
}
}
}