Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Rename stageAttemptId to stageAttemptNumber
  • Loading branch information
advancedxy committed Dec 29, 2017
commit 9266cd8d4558b675b081a7282c626d79bb6bb786
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ abstract class TaskContext extends Serializable {
def stageId(): Int

/**
* An ID that is unique to the stage attempt that this task belongs to. It represents how many
* times the stage has been attempted. The first stage attempt will be assigned stageAttemptId = 0
* , and subsequent attempts will increasing stageAttemptId one by one.
* How many times the stage that this task belongs to has been attempted. The first stage attempt
* will be assigned stageAttemptNumber = 0, and subsequent attempts will have increasing attempt
* numbers.
*/
def stageAttemptId(): Int
def stageAttemptNumber(): Int

/**
* The ID of the RDD partition that is computed by this task.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util._
*/
private[spark] class TaskContextImpl(
override val stageId: Int,
override val stageAttemptId: Int,
override val stageAttemptNumber: Int,
override val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[spark] abstract class Task[T](
SparkEnv.get.blockManager.registerTask(taskAttemptId)
context = new TaskContextImpl(
stageId,
stageAttemptId,
stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much work we need to rename the internal stageAttemptId to stageAttemptNumber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modification may not be too much (100+ occurrences in 20+ files), however it may break eventLog's JsonProtocol backward compatibility(not sure)..

@squito you may have more knowledge on this since you introduced stageAttemptId.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so stageAttemptId is already exposed in developer API, we can't change it.

partitionId,
taskAttemptId,
attemptNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static void test() {
tc.attemptNumber();
tc.partitionId();
tc.stageId();
tc.stageAttemptId();
tc.stageAttemptNumber();
tc.taskAttemptId();
}

Expand All @@ -52,7 +52,7 @@ public void onTaskCompletion(TaskContext context) {
context.isCompleted();
context.isInterrupted();
context.stageId();
context.stageAttemptId();
context.stageAttemptNumber();
context.partitionId();
context.addTaskCompletionListener(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object MemoryTestingUtils {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, 0)
new TaskContextImpl(
stageId = 0,
stageAttemptId = 0,
stageAttemptNumber = 0,
partitionId = 0,
taskAttemptId = 0,
attemptNumber = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,28 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
}

test("TaskContext.stageAttemptId getter") {
test("TaskContext.stageAttemptNumber getter") {
sc = new SparkContext("local[1,2]", "test")

// Check stage attemptIds are 0 for initial stage
val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ =>
Seq(TaskContext.get().stageAttemptId()).iterator
// Check stageAttemptNumbers are 0 for initial stage
val stageAttemptNumbers = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ =>
Seq(TaskContext.get().stageAttemptNumber()).iterator
}.collect()
assert(stageAttemptIds.toSet === Set(0))
assert(stageAttemptNumbers.toSet === Set(0))

// Check stage attemptIds that are resubmitted when tasks have FetchFailedException
val stageAttemptIdsWithFailedStage =
// Check stageAttemptNumbers that are resubmitted when tasks have FetchFailedException
val stageAttemptNumbersWithFailedStage =
sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need repartition here, just sc.parallelize(Seq(1, 2, 3, 4), 1).mapPartitions {...}

val stageAttemptId = TaskContext.get().stageAttemptId()
if (stageAttemptId < 2) {
val stageAttemptNumber = TaskContext.get().stageAttemptNumber()
if (stageAttemptNumber < 2) {
// Throw FetchFailedException to explicitly trigger stage resubmission. A normal exception
// will only trigger task resubmission in the same stage.
throw new FetchFailedException(null, 0, 0, 0, "Fake")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm... just throw an Exception is enough here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to repartition part.

I use FetchFailedException to explicitly trigger a stage resubmission. Otherwise, the task would be resubmitted in the same stage if IIRC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment to explain that FetchFailedException will trigger a new stage attempt, while a common Exception will only trigger a task retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

}
Seq(stageAttemptId).iterator
Seq(stageAttemptNumber).iterator
}.collect()

assert(stageAttemptIdsWithFailedStage.toSet === Set(2))
assert(stageAttemptNumbersWithFailedStage.toSet === Set(2))
}

test("accumulators are updated on exception failures") {
Expand Down
2 changes: 1 addition & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object MimaExcludes {
// Exclude rules for 2.3.x
lazy val v23excludes = v22excludes ++ Seq(
// [SPARK-22897] Expose stageAttemptId in TaskContext
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.stageAttemptId"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.stageAttemptNumber"),

// SPARK-22789: Map-only continuous processing execution
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$8"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class UnsafeFixedWidthAggregationMapSuite

TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
stageAttemptId = 0,
stageAttemptNumber = 0,
partitionId = 0,
taskAttemptId = Random.nextInt(10000),
attemptNumber = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
stageAttemptId = 0,
stageAttemptNumber = 0,
partitionId = 0,
taskAttemptId = 98456,
attemptNumber = 0,
Expand Down