Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Only tracker partitionId to instead of Task
  • Loading branch information
suyanNone committed Jul 21, 2015
commit 314873af23f391254dc89cf4e233d2571f3358d4
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)

if (mapStatuses.contains(shuffleId)) {
val statuses = mapStatuses(shuffleId)
if (statuses.nonEmpty && statuses.exists(_ != null)) {
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
var totalOutputSize = 0L
Expand Down
18 changes: 8 additions & 10 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ class DAGScheduler(
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()
stage.pendingPartitions.clear()

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = {
Expand Down Expand Up @@ -938,8 +938,8 @@ class DAGScheduler(

if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
Expand Down Expand Up @@ -1027,7 +1027,7 @@ class DAGScheduler(
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
Expand Down Expand Up @@ -1073,7 +1073,7 @@ class DAGScheduler(
shuffleStage.addOutputLoc(smt.partitionId, status)
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
Expand Down Expand Up @@ -1126,7 +1126,7 @@ class DAGScheduler(

case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task
stage.pendingPartitions += task.partitionId

case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
Expand Down Expand Up @@ -1217,10 +1217,8 @@ class DAGScheduler(
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
if (!runningStages.contains(stage)) {
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
if (shuffleToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[spark] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

var pendingTasks = new HashSet[Task[_]]
var pendingPartitions = new HashSet[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was a var before, but it can actually be a val


/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
Expand Down
11 changes: 0 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,6 @@ private[spark] abstract class Task[T](
taskThread.interrupt()
}
}

override def hashCode(): Int = {
31 * stageId.hashCode() + partitionId.hashCode()
}

override def equals(other: Any): Boolean = other match {
case that: Task[_] =>
stageId == that.stageId &&
partitionId == that.partitionId
case _ => false
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,10 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)

val taskSet = taskSets(0)
// should be ignored for being too old
Expand All @@ -709,8 +712,8 @@ class DAGSchedulerSuite
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because the host is re-add
runEvent(ExecutorAdded("exec-hostA", "hostA"))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Expand Down