Skip to content

Commit c12db0d

Browse files
sameeragarwaldavies
authored andcommitted
[SPARK-14454] [1.6] Better exception handling while marking tasks as failed
Backports apache#12234 to 1.6. Original description below: ## What changes were proposed in this pull request? This patch adds support for better handling of exceptions inside catch blocks if the code within the block throws an exception. For instance here is the code in a catch block before this change in `WriterContainer.scala`: ```scala logError("Aborting task.", cause) // call failure callbacks first, so we could have a chance to cleanup the writer. TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) if (currentWriter != null) { currentWriter.close() } abortTask() throw new SparkException("Task failed while writing rows.", cause) ``` If `markTaskFailed` or `currentWriter.close` throws an exception, we currently lose the original cause. This PR fixes this problem by implementing a utility function `Utils.tryWithSafeCatch` that suppresses (`Throwable.addSuppressed`) the exception that are thrown within the catch block and rethrowing the original exception. ## How was this patch tested? No new functionality added Author: Sameer Agarwal <[email protected]> Closes apache#12272 from sameeragarwal/fix-exception-1.6.
1 parent baf2985 commit c12db0d

File tree

4 files changed

+103
-95
lines changed

4 files changed

+103
-95
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,9 +1116,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11161116
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
11171117
recordsWritten += 1
11181118
}
1119-
} {
1120-
writer.close(hadoopContext)
1121-
}
1119+
}(finallyBlock = writer.close(hadoopContext))
11221120
committer.commitTask(hadoopContext)
11231121
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
11241122
outputMetrics.setRecordsWritten(recordsWritten)
@@ -1202,9 +1200,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
12021200
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
12031201
recordsWritten += 1
12041202
}
1205-
} {
1206-
writer.close()
1207-
}
1203+
}(finallyBlock = writer.close())
12081204
writer.commit()
12091205
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
12101206
outputMetrics.setRecordsWritten(recordsWritten)

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,16 @@ private[spark] abstract class Task[T](
8787
}
8888
try {
8989
(runTask(context), context.collectAccumulators())
90-
} catch { case e: Throwable =>
91-
// Catch all errors; run task failure callbacks, and rethrow the exception.
92-
context.markTaskFailed(e)
93-
throw e
90+
} catch {
91+
case e: Throwable =>
92+
// Catch all errors; run task failure callbacks, and rethrow the exception.
93+
try {
94+
context.markTaskFailed(e)
95+
} catch {
96+
case t: Throwable =>
97+
e.addSuppressed(t)
98+
}
99+
throw e
94100
} finally {
95101
// Call the task completion callbacks.
96102
context.markTaskCompleted()

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,26 +1259,35 @@ private[spark] object Utils extends Logging {
12591259
}
12601260

12611261
/**
1262-
* Execute a block of code, call the failure callbacks before finally block if there is any
1263-
* exceptions happen. But if exceptions happen in the finally block, do not suppress the original
1264-
* exception.
1262+
* Execute a block of code and call the failure callbacks in the catch block. If exceptions occur
1263+
* in either the catch or the finally block, they are appended to the list of suppressed
1264+
* exceptions in original exception which is then rethrown.
12651265
*
1266-
* This is primarily an issue with `finally { out.close() }` blocks, where
1267-
* close needs to be called to clean up `out`, but if an exception happened
1268-
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
1266+
* This is primarily an issue with `catch { abort() }` or `finally { out.close() }` blocks,
1267+
* where the abort/close needs to be called to clean up `out`, but if an exception happened
1268+
* in `out.write`, it's likely `out` may be corrupted and `abort` or `out.close` will
12691269
* fail as well. This would then suppress the original/likely more meaningful
12701270
* exception from the original `out.write` call.
12711271
*/
1272-
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)(finallyBlock: => Unit): T = {
1272+
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)
1273+
(catchBlock: => Unit = (), finallyBlock: => Unit = ()): T = {
12731274
var originalThrowable: Throwable = null
12741275
try {
12751276
block
12761277
} catch {
1277-
case t: Throwable =>
1278+
case cause: Throwable =>
12781279
// Purposefully not using NonFatal, because even fatal exceptions
12791280
// we don't want to have our finallyBlock suppress
1280-
originalThrowable = t
1281-
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(t)
1281+
originalThrowable = cause
1282+
try {
1283+
logError("Aborting task", originalThrowable)
1284+
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
1285+
catchBlock
1286+
} catch {
1287+
case t: Throwable =>
1288+
originalThrowable.addSuppressed(t)
1289+
logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
1290+
}
12821291
throw originalThrowable
12831292
} finally {
12841293
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

Lines changed: 72 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
3535
import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
3636
import org.apache.spark.sql.types.{StructType, StringType}
37-
import org.apache.spark.util.SerializableConfiguration
37+
import org.apache.spark.util.{SerializableConfiguration, Utils}
3838

3939

4040
private[sql] abstract class BaseWriterContainer(
@@ -257,19 +257,16 @@ private[sql] class DefaultWriterContainer(
257257

258258
// If anything below fails, we should abort the task.
259259
try {
260-
while (iterator.hasNext) {
261-
val internalRow = iterator.next()
262-
writer.writeInternal(internalRow)
263-
}
264-
265-
commitTask()
260+
Utils.tryWithSafeFinallyAndFailureCallbacks {
261+
while (iterator.hasNext) {
262+
val internalRow = iterator.next()
263+
writer.writeInternal(internalRow)
264+
}
265+
commitTask()
266+
}(catchBlock = abortTask())
266267
} catch {
267-
case cause: Throwable =>
268-
logError("Aborting task.", cause)
269-
// call failure callbacks first, so we could have a chance to cleanup the writer.
270-
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
271-
abortTask()
272-
throw new SparkException("Task failed while writing rows.", cause)
268+
case t: Throwable =>
269+
throw new SparkException("Task failed while writing rows", t)
273270
}
274271

275272
def commitTask(): Unit = {
@@ -343,81 +340,81 @@ private[sql] class DynamicPartitionWriterContainer(
343340
// If anything below fails, we should abort the task.
344341
var currentWriter: OutputWriter = null
345342
try {
346-
// This will be filled in if we have to fall back on sorting.
347-
var sorter: UnsafeKVExternalSorter = null
348-
while (iterator.hasNext && sorter == null) {
349-
val inputRow = iterator.next()
350-
val currentKey = getPartitionKey(inputRow)
351-
currentWriter = outputWriters.get(currentKey)
352-
353-
if (currentWriter == null) {
354-
if (outputWriters.size < maxOpenFiles) {
355-
currentWriter = newOutputWriter(currentKey)
356-
outputWriters.put(currentKey.copy(), currentWriter)
357-
currentWriter.writeInternal(getOutputRow(inputRow))
343+
Utils.tryWithSafeFinallyAndFailureCallbacks {
344+
// This will be filled in if we have to fall back on sorting.
345+
var sorter: UnsafeKVExternalSorter = null
346+
while (iterator.hasNext && sorter == null) {
347+
val inputRow = iterator.next()
348+
val currentKey = getPartitionKey(inputRow)
349+
currentWriter = outputWriters.get(currentKey)
350+
351+
if (currentWriter == null) {
352+
if (outputWriters.size < maxOpenFiles) {
353+
currentWriter = newOutputWriter(currentKey)
354+
outputWriters.put(currentKey.copy(), currentWriter)
355+
currentWriter.writeInternal(getOutputRow(inputRow))
356+
} else {
357+
logInfo(s"Maximum partitions reached, falling back on sorting.")
358+
sorter = new UnsafeKVExternalSorter(
359+
StructType.fromAttributes(partitionColumns),
360+
StructType.fromAttributes(dataColumns),
361+
SparkEnv.get.blockManager,
362+
TaskContext.get().taskMemoryManager().pageSizeBytes)
363+
sorter.insertKV(currentKey, getOutputRow(inputRow))
364+
}
358365
} else {
359-
logInfo(s"Maximum partitions reached, falling back on sorting.")
360-
sorter = new UnsafeKVExternalSorter(
361-
StructType.fromAttributes(partitionColumns),
362-
StructType.fromAttributes(dataColumns),
363-
SparkEnv.get.blockManager,
364-
TaskContext.get().taskMemoryManager().pageSizeBytes)
365-
sorter.insertKV(currentKey, getOutputRow(inputRow))
366+
currentWriter.writeInternal(getOutputRow(inputRow))
366367
}
367-
} else {
368-
currentWriter.writeInternal(getOutputRow(inputRow))
369-
}
370-
}
371-
// current writer is included in outputWriters
372-
currentWriter = null
373-
374-
// If the sorter is not null that means that we reached the maxFiles above and need to finish
375-
// using external sort.
376-
if (sorter != null) {
377-
while (iterator.hasNext) {
378-
val currentRow = iterator.next()
379-
sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
380368
}
369+
// current writer is included in outputWriters
370+
currentWriter = null
371+
372+
// If the sorter is not null that means that we reached the maxFiles above and need to
373+
// finish using external sort.
374+
if (sorter != null) {
375+
while (iterator.hasNext) {
376+
val currentRow = iterator.next()
377+
sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
378+
}
381379

382-
logInfo(s"Sorting complete. Writing out partition files one at a time.")
383-
384-
val sortedIterator = sorter.sortedIterator()
385-
var currentKey: InternalRow = null
386-
while (sortedIterator.next()) {
387-
if (currentKey != sortedIterator.getKey) {
388-
if (currentWriter != null) {
389-
currentWriter.close()
390-
currentWriter = null
380+
logInfo(s"Sorting complete. Writing out partition files one at a time.")
381+
382+
val sortedIterator = sorter.sortedIterator()
383+
var currentKey: InternalRow = null
384+
while (sortedIterator.next()) {
385+
if (currentKey != sortedIterator.getKey) {
386+
if (currentWriter != null) {
387+
currentWriter.close()
388+
currentWriter = null
389+
}
390+
currentKey = sortedIterator.getKey.copy()
391+
logDebug(s"Writing partition: $currentKey")
392+
393+
// Either use an existing file from before, or open a new one.
394+
currentWriter = outputWriters.remove(currentKey)
395+
if (currentWriter == null) {
396+
currentWriter = newOutputWriter(currentKey)
397+
}
391398
}
392-
currentKey = sortedIterator.getKey.copy()
393-
logDebug(s"Writing partition: $currentKey")
394399

395-
// Either use an existing file from before, or open a new one.
396-
currentWriter = outputWriters.remove(currentKey)
397-
if (currentWriter == null) {
398-
currentWriter = newOutputWriter(currentKey)
399-
}
400+
currentWriter.writeInternal(sortedIterator.getValue)
401+
}
402+
if (currentWriter != null) {
403+
currentWriter.close()
404+
currentWriter = null
400405
}
401-
402-
currentWriter.writeInternal(sortedIterator.getValue)
403-
}
404-
if (currentWriter != null) {
405-
currentWriter.close()
406-
currentWriter = null
407406
}
408-
}
409407

410-
commitTask()
411-
} catch {
412-
case cause: Throwable =>
413-
logError("Aborting task.", cause)
414-
// call failure callbacks first, so we could have a chance to cleanup the writer.
415-
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
408+
commitTask()
409+
}(catchBlock = {
416410
if (currentWriter != null) {
417411
currentWriter.close()
418412
}
419413
abortTask()
420-
throw new SparkException("Task failed while writing rows.", cause)
414+
})
415+
} catch {
416+
case t: Throwable =>
417+
throw new SparkException("Task failed while writing rows", t)
421418
}
422419

423420
/** Open and returns a new OutputWriter given a partition key. */

0 commit comments

Comments
 (0)