-
Notifications
You must be signed in to change notification settings - Fork 29k
SPARK-1686: keep schedule() calling in the main thread #639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,6 +104,8 @@ private[spark] class Master( | |
|
|
||
| var leaderElectionAgent: ActorRef = _ | ||
|
|
||
| private var recoverCallable: Cancellable = _ | ||
|
|
||
| // As a temporary workaround before better ways of configuring memory, we allow users to set | ||
| // a flag that will perform round-robin scheduling across the nodes (spreading out each app | ||
| // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. | ||
|
|
@@ -152,6 +154,10 @@ private[spark] class Master( | |
| } | ||
|
|
||
| override def postStop() { | ||
| // prevent the CompleteRecovery message sending to restarted master | ||
| if (recoverCallable != null) { | ||
| recoverCallable.cancel() | ||
| } | ||
| webUi.stop() | ||
| fileSystemsUsed.foreach(_.close()) | ||
| masterMetricsSystem.stop() | ||
|
|
@@ -171,10 +177,13 @@ private[spark] class Master( | |
| logInfo("I have been elected leader! New state: " + state) | ||
| if (state == RecoveryState.RECOVERING) { | ||
| beginRecovery(storedApps, storedDrivers, storedWorkers) | ||
| context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } | ||
| recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's actually an alternate syntax for just sending a message, I think it's something like |
||
| { self ! CompleteRecovery } | ||
| } | ||
| } | ||
|
|
||
| case CompleteRecovery => completeRecovery() | ||
|
|
||
| case RevokedLeadership => { | ||
| logError("Leadership has been revoked -- master shutting down.") | ||
| System.exit(0) | ||
|
|
@@ -465,7 +474,7 @@ private[spark] class Master( | |
| * Schedule the currently available resources among waiting apps. This method will be called | ||
| * every time a new app joins or resource availability changes. | ||
| */ | ||
| def schedule() { | ||
| private def schedule() { | ||
| if (state != RecoveryState.ALIVE) { return } | ||
|
|
||
| // First schedule drivers, they take strict precedence over applications | ||
|
|
@@ -485,7 +494,7 @@ private[spark] class Master( | |
| // Try to spread out each app among all the nodes, until it has all its cores | ||
| for (app <- waitingApps if app.coresLeft > 0) { | ||
| val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) | ||
| .filter(canUse(app, _)).sortBy(_.coresFree).reverse | ||
| .filter(canUse(app, _)).sortBy(_.coresFree).reverse | ||
| val numUsable = usableWorkers.length | ||
| val assigned = new Array[Int](numUsable) // Number of cores to give on each node | ||
| var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,4 +39,6 @@ private[master] object MasterMessages { | |
| case object RequestWebUIPort | ||
|
|
||
| case class WebUIPortResponse(webUIBoundPort: Int) | ||
|
|
||
| case object TriggerSchedule | ||
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Perhaps this would be better named "completeRecoveryTask" (we use the term "task" in a couple other places in the code base)