Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
better way to manage eventProcessActor
update the comments and remove unnecessary checking

Make SparkContext.dagScheduler private and immutable

for work around the MIMA issue

MIMA fix
  • Loading branch information
CodingCat committed Oct 24, 2014
commit ec16a8c0e2cf4d9ccee8f0d9bcfd93ccd883713e
17 changes: 8 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

@volatile private[spark] var dagSchedulerStopped = false

@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => throw
new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
}

// for work around the current MIMA issue
dagScheduler = new DAGScheduler(this)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
Expand Down Expand Up @@ -1027,16 +1027,15 @@ class SparkContext(config: SparkConf) extends Logging {
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
dagSchedulerStopped = true
dagScheduler.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
}
}

Expand Down
32 changes: 18 additions & 14 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ class DAGScheduler(

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessActor ! BeginEvent(task, taskInfo)
dagSchedulerActorSupervisor ! BeginEvent(task, taskInfo)
}

// Called to report that a task has completed and results are being fetched remotely.
def taskGettingResult(taskInfo: TaskInfo) {
eventProcessActor ! GettingResultEvent(taskInfo)
dagSchedulerActorSupervisor ! GettingResultEvent(taskInfo)
}

// Called by TaskScheduler to report task completions or failures.
Expand All @@ -154,7 +154,8 @@ class DAGScheduler(
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
dagSchedulerActorSupervisor !
CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
}

/**
Expand All @@ -176,18 +177,18 @@ class DAGScheduler(

// Called by TaskScheduler when an executor fails.
def executorLost(execId: String) {
eventProcessActor ! ExecutorLost(execId)
dagSchedulerActorSupervisor ! ExecutorLost(execId)
}

// Called by TaskScheduler when a host is added
def executorAdded(execId: String, host: String) {
eventProcessActor ! ExecutorAdded(execId, host)
dagSchedulerActorSupervisor ! ExecutorAdded(execId, host)
}

// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
def taskSetFailed(taskSet: TaskSet, reason: String) {
eventProcessActor ! TaskSetFailed(taskSet, reason)
dagSchedulerActorSupervisor ! TaskSetFailed(taskSet, reason)
}

private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
Expand Down Expand Up @@ -493,7 +494,7 @@ class DAGScheduler(
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessActor ! JobSubmitted(
dagSchedulerActorSupervisor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
Expand Down Expand Up @@ -534,7 +535,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
eventProcessActor ! JobSubmitted(
dagSchedulerActorSupervisor ! JobSubmitted(
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
listener.awaitResult() // Will throw an exception if the job fails
}
Expand All @@ -544,19 +545,19 @@ class DAGScheduler(
*/
def cancelJob(jobId: Int) {
logInfo("Asked to cancel job " + jobId)
eventProcessActor ! JobCancelled(jobId)
dagSchedulerActorSupervisor ! JobCancelled(jobId)
}

def cancelJobGroup(groupId: String) {
logInfo("Asked to cancel job group " + groupId)
eventProcessActor ! JobGroupCancelled(groupId)
dagSchedulerActorSupervisor ! JobGroupCancelled(groupId)
}

/**
* Cancel all jobs that are running or waiting in the queue.
*/
def cancelAllJobs() {
eventProcessActor ! AllJobsCancelled
dagSchedulerActorSupervisor ! AllJobsCancelled
}

private[scheduler] def doCancelAllJobs() {
Expand All @@ -572,7 +573,7 @@ class DAGScheduler(
* Cancel all jobs associated with a running or scheduled stage.
*/
def cancelStage(stageId: Int) {
eventProcessActor ! StageCancelled(stageId)
dagSchedulerActorSupervisor ! StageCancelled(stageId)
}

/**
Expand Down Expand Up @@ -1063,7 +1064,6 @@ class DAGScheduler(
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
}

if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment needs update

Expand All @@ -1073,7 +1073,7 @@ class DAGScheduler(
logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
s"$failedStage (${failedStage.name}) due to fetch failure")
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
RESUBMIT_TIMEOUT, dagSchedulerActorSupervisor, ResubmitFailedStages)
}
failedStages += failedStage
failedStages += mapStage
Expand Down Expand Up @@ -1330,6 +1330,9 @@ class DAGScheduler(
private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
extends Actor with Logging {

private val eventProcessActor: ActorRef =
context.actorOf(Props(new DAGSchedulerEventProcessActor(dagScheduler)))

override val supervisorStrategy =
OneForOneStrategy() {
case x: Exception =>
Expand All @@ -1345,6 +1348,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)

def receive = {
case p: Props => sender ! context.actorOf(p)
case msg: DAGSchedulerEvent => eventProcessActor ! msg
case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
watch(child)
child ! "hi"
expectMsgPF(){ case Terminated(child) => () }
assert(scheduler.sc.dagScheduler === null)
assert(scheduler.sc.dagSchedulerStopped === true)
}

/**
Expand Down