Skip to content

Commit db538b2

Browse files
author
Marcelo Vanzin
committed
[SPARK-24552][CORE][SQL][BRANCH-2.3] Use unique id instead of attempt number for writes .
This passes a unique attempt id instead of attempt number to v2 data sources and hadoop APIs, because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #21615 from vanzin/SPARK-24552-2.3.
1 parent a1e9640 commit db538b2

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging {
7676
// Try to write all RDD partitions as a Hadoop OutputFormat.
7777
try {
7878
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
79+
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
80+
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
81+
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
82+
7983
executeTask(
8084
context = context,
8185
config = config,
8286
jobTrackerId = jobTrackerId,
8387
commitJobId = commitJobId,
8488
sparkPartitionId = context.partitionId,
85-
sparkAttemptNumber = context.attemptNumber,
89+
sparkAttemptNumber = attemptId,
8690
committer = committer,
8791
iterator = iter)
8892
})

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ object DataWritingSparkTask extends Logging {
123123
writeTask: DataWriterFactory[InternalRow],
124124
context: TaskContext,
125125
iter: Iterator[InternalRow]): WriterCommitMessage = {
126-
val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
126+
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
127+
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
128+
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
129+
val dataWriter = writeTask.createDataWriter(context.partitionId(), attemptId)
127130

128131
// write the data and commit this writer.
129132
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {

0 commit comments

Comments
 (0)