Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.{NextIterator, SerializableConfiguration, Utils}
import org.apache.spark.util.ArrayImplicits._


Expand Down Expand Up @@ -401,9 +401,10 @@ object FileFormatWriter extends Logging {
}

try {
val queryFailureCapturedIterator = new QueryFailureCapturedIterator(iterator)
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
dataWriter.writeWithIterator(iterator)
dataWriter.writeWithIterator(queryFailureCapturedIterator)
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
Expand All @@ -413,6 +414,8 @@ object FileFormatWriter extends Logging {
dataWriter.close()
})
} catch {
case e: QueryFailureDuringWrite =>
throw e.queryFailure
case e: FetchFailedException =>
throw e
case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput =>
Expand Down Expand Up @@ -452,3 +455,25 @@ object FileFormatWriter extends Logging {
}
}
}

// A exception wrapper to indicate that the error was thrown when executing the query, not writing
// the data
private class QueryFailureDuringWrite(val queryFailure: Throwable) extends Throwable

// An iterator wrapper to rethrow any error from the given iterator with `QueryFailureDuringWrite`.
private class QueryFailureCapturedIterator(data: Iterator[InternalRow])
extends NextIterator[InternalRow] {

override protected def getNext(): InternalRow = try {
if (data.hasNext) {
data.next()
} else {
finished = true
null
}
} catch {
case t: Throwable => throw new QueryFailureDuringWrite(t)
}

override protected def close(): Unit = {}
}
Loading