-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13601] call failure callbacks before writer.close() #11450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -247,11 +247,9 @@ private[sql] class DefaultWriterContainer( | |
| executorSideSetup(taskContext) | ||
| val configuration = taskAttemptContext.getConfiguration | ||
| configuration.set("spark.sql.sources.output.path", outputPath) | ||
| val writer = newOutputWriter(getWorkPath) | ||
| var writer = newOutputWriter(getWorkPath) | ||
| writer.initConverter(dataSchema) | ||
|
|
||
| var writerClosed = false | ||
|
|
||
| // If anything below fails, we should abort the task. | ||
| try { | ||
| while (iterator.hasNext) { | ||
|
|
@@ -263,16 +261,17 @@ private[sql] class DefaultWriterContainer( | |
| } catch { | ||
| case cause: Throwable => | ||
| logError("Aborting task.", cause) | ||
| // call failure callbacks first, so we could have a chance to cleanup the writer. | ||
| TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe more clear if we move this into abortTask?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually never mind - more clear here
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't access |
||
| abortTask() | ||
| throw new SparkException("Task failed while writing rows.", cause) | ||
| } | ||
|
|
||
| def commitTask(): Unit = { | ||
| try { | ||
| assert(writer != null, "OutputWriter instance should have been initialized") | ||
| if (!writerClosed) { | ||
| if (writer != null) { | ||
| writer.close() | ||
| writerClosed = true | ||
| writer = null | ||
| } | ||
| super.commitTask() | ||
| } catch { | ||
|
|
@@ -285,9 +284,8 @@ private[sql] class DefaultWriterContainer( | |
|
|
||
| def abortTask(): Unit = { | ||
| try { | ||
| if (!writerClosed) { | ||
| if (writer != null) { | ||
| writer.close() | ||
| writerClosed = true | ||
| } | ||
| } finally { | ||
| super.abortTask() | ||
|
|
@@ -393,57 +391,62 @@ private[sql] class DynamicPartitionWriterContainer( | |
| val getPartitionString = | ||
| UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) | ||
|
|
||
| // If anything below fails, we should abort the task. | ||
| try { | ||
| // Sorts the data before write, so that we only need one writer at the same time. | ||
| // TODO: inject a local sort operator in planning. | ||
| val sorter = new UnsafeKVExternalSorter( | ||
| sortingKeySchema, | ||
| StructType.fromAttributes(dataColumns), | ||
| SparkEnv.get.blockManager, | ||
| TaskContext.get().taskMemoryManager().pageSizeBytes) | ||
|
|
||
| while (iterator.hasNext) { | ||
| val currentRow = iterator.next() | ||
| sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) | ||
| } | ||
| // Sorts the data before write, so that we only need one writer at the same time. | ||
| // TODO: inject a local sort operator in planning. | ||
| val sorter = new UnsafeKVExternalSorter( | ||
| sortingKeySchema, | ||
| StructType.fromAttributes(dataColumns), | ||
| SparkEnv.get.blockManager, | ||
| TaskContext.get().taskMemoryManager().pageSizeBytes) | ||
|
|
||
| while (iterator.hasNext) { | ||
| val currentRow = iterator.next() | ||
| sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) | ||
| } | ||
| logInfo(s"Sorting complete. Writing out partition files one at a time.") | ||
|
|
||
| logInfo(s"Sorting complete. Writing out partition files one at a time.") | ||
| val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { | ||
| identity | ||
| } else { | ||
| UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { | ||
| case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) | ||
| }) | ||
| } | ||
|
|
||
| val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { | ||
| identity | ||
| } else { | ||
| UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { | ||
| case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) | ||
| }) | ||
| } | ||
| val sortedIterator = sorter.sortedIterator() | ||
|
|
||
| val sortedIterator = sorter.sortedIterator() | ||
| // If anything below fails, we should abort the task. | ||
| var currentWriter: OutputWriter = null | ||
| try { | ||
| var currentKey: UnsafeRow = null | ||
| var currentWriter: OutputWriter = null | ||
| try { | ||
| while (sortedIterator.next()) { | ||
| val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] | ||
| if (currentKey != nextKey) { | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| } | ||
| currentKey = nextKey.copy() | ||
| logDebug(s"Writing partition: $currentKey") | ||
|
|
||
| currentWriter = newOutputWriter(currentKey, getPartitionString) | ||
| while (sortedIterator.next()) { | ||
| val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] | ||
| if (currentKey != nextKey) { | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| currentWriter = null | ||
| } | ||
| currentKey = nextKey.copy() | ||
| logDebug(s"Writing partition: $currentKey") | ||
|
|
||
| currentWriter.writeInternal(sortedIterator.getValue) | ||
| currentWriter = newOutputWriter(currentKey, getPartitionString) | ||
| } | ||
| } finally { | ||
| if (currentWriter != null) { currentWriter.close() } | ||
| currentWriter.writeInternal(sortedIterator.getValue) | ||
| } | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| currentWriter = null | ||
| } | ||
|
|
||
| commitTask() | ||
| } catch { | ||
| case cause: Throwable => | ||
| logError("Aborting task.", cause) | ||
| // call failure callbacks first, so we could have a chance to cleanup the writer. | ||
| TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| } | ||
| abortTask() | ||
| throw new SparkException("Task failed while writing rows.", cause) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this comment