Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,13 @@ class DAGScheduler(
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
outputCommitCoordinator.stageStart(stage.id)
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shuffle map stage's maximum partition id is determined by the number of partitions in the RDD being computed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I was reviewing this, I was wondering if a ShuffleMapStage could have a different maximum partitionId if it was from a skipped stage. I'm now convinced it cannot, but it might be a bit clearer if we change the constructor to not even take a numTasks argument, since it should always be rdd.partitions.length? Not necessary for this change, but just a thought while you are touching this.

Also -- isn't the output commit coordinator irrelevant for ShuffleMapStages anyway? If not, than I think there might be another bug there for skipped stages. Since it indexes by stageId, you can have two different stages, that really represent the exact same shuffle, so you could have two different tasks authorized to commit that are handling the same stage. (Which wouldn't be a problem introduced by this change, but I just thought it was worth mentioning.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should be irrelevant for ShuffleMapStages. I was just being overly-conservative here.

case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result stage case is trickier: for the cases where the OutputCommitCoordinator actually gets invoked, I think it's generally the case that all partitions are being computed, but I guess it's hypothetically possible that a result stage could write results for only one of the RDD's partitions. In this case, I think the partition ids can be larger than numPartitions - 1 (see the constructor of ResultStage to see this).

}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type PartitionId = Int
private type TaskAttemptNumber = Int

private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1

/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
* output for that partition.
Expand All @@ -56,9 +58,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
private type CommittersByStageMap =
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]()

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
Expand Down Expand Up @@ -95,9 +95,21 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
}

// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
/**
* Called by the DAGScheduler when a stage starts.
*
* @param stage the stage id.
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
private[scheduler] def stageStart(
stage: StageId,
maxPartitionId: Int): Unit = {
val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
synchronized {
authorizedCommittersByStage(stage) = arr
}
}

// Called by DAGScheduler
Expand All @@ -122,10 +134,10 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
if (authorizedCommitters(partition) == attemptNumber) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
}
}
}
Expand All @@ -145,16 +157,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
authorizedCommitters(partition) match {
case NO_AUTHORIZED_COMMITTER =>
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
true
case existingCommitter =>
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
}
case None =>
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
outputCommitCoordinator.stageStart(stage, maxPartitionId = 2)

assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
Expand Down