Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit f63527a

Browse files
skontosusanxhuynh
authored andcommitted
[SPARK-21502][MESOS] fix --supervise for mesos in cluster mode
## What changes were proposed in this pull request? With supervise enabled for a driver, re-launching it was failing because the driver had the same framework Id. This patch creates a new driver framework id every time we re-launch a driver, but we keep the driver submission id the same since that is the same with the task id the driver was launched with on mesos and retry state and other info within Dispatcher's data structures uses that as a key. We append a "-retry-%4d" string as a suffix to the framework id passed by the dispatcher to the driver and the same value to the app_id created by each driver, except the first time where we dont need the retry suffix. The previous format for the frameworkId was 'DispactherFId-DriverSubmissionId'. We also detect the case where we have multiple spark contexts started from within the same driver and we do set proper names to their corresponding app-ids. The old practice was to unset the framework id passed from the dispatcher after the driver framework was started for the first time and let mesos decide the framework ID for subsequent spark contexts. The decided fId was passed as an appID. This patch affects heavily the history server. Btw we dont have the issues of the standalone case where driver id must be different since the dispatcher will re-launch a driver(mesos task) only if it gets an update that it is dead and this is verified by mesos implicitly. We also dont fix the fine grained mode which is deprecated and of no use. ## How was this patch tested? This task was manually tested on dc/os. Launched a driver, stoped its container and verified the expected behavior. Initial retry of the driver, driver in pending state: ![image](https://user-images.githubusercontent.com/7945591/28473862-1088b736-6e4f-11e7-8d7d-7b785b1da6a6.png) Driver re-launched: ![image](https://user-images.githubusercontent.com/7945591/28473885-26e02d16-6e4f-11e7-9eb8-6bf7bdb10cb8.png) Another re-try: ![image](https://user-images.githubusercontent.com/7945591/28473897-35702318-6e4f-11e7-9585-fd295ad7c6b6.png) The resulted entries in history server at the bottom: ![image](https://user-images.githubusercontent.com/7945591/28473910-4946dabc-6e4f-11e7-90a6-fa4f80893c61.png) Regarding multiple spark contexts here is the end result regarding the spark history server, for the second spark context we add an increasing number as a suffix: ![image](https://user-images.githubusercontent.com/7945591/28474432-69cf8b06-6e51-11e7-93c7-e6c0b04dec93.png) Author: Stavros Kontopoulos <st.kontopoulos@gmail.com> Closes apache#18705 from skonto/fix_supervise_flag.
1 parent e416704 commit f63527a

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ private[spark] class MesosClusterScheduler(
370370
}
371371

372372
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
373-
s"${frameworkId}-${desc.submissionId}"
373+
val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("")
374+
s"${frameworkId}-${desc.submissionId}${retries}"
374375
}
375376

376377
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
1919

2020
import java.io.File
2121
import java.util.{Collections, List => JList}
22+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2223
import java.util.concurrent.locks.ReentrantLock
2324

2425
import scala.collection.JavaConverters._
@@ -168,6 +169,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
168169

169170
override def start() {
170171
super.start()
172+
173+
val startedBefore = IdHelper.startedBefore.getAndSet(true)
174+
175+
val suffix = if (startedBefore) {
176+
f"-${IdHelper.nextSCNumber.incrementAndGet()}%04d"
177+
} else {
178+
""
179+
}
180+
171181
val driver = createSchedulerDriver(
172182
master,
173183
MesosCoarseGrainedSchedulerBackend.this,
@@ -177,10 +187,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
177187
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
178188
None,
179189
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
180-
sc.conf.getOption("spark.mesos.driver.frameworkId")
190+
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
181191
)
182192

183-
unsetFrameworkID(sc)
184193
startScheduler(driver)
185194
}
186195

@@ -269,6 +278,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
269278
driver: org.apache.mesos.SchedulerDriver,
270279
frameworkId: FrameworkID,
271280
masterInfo: MasterInfo) {
281+
272282
this.appId = frameworkId.getValue
273283
this.mesosExternalShuffleClient.foreach(_.init(appId))
274284
this.schedulerDriver = driver
@@ -670,3 +680,9 @@ private class Slave(val hostname: String) {
670680
var taskFailures = 0
671681
var shuffleRegistered = false
672682
}
683+
684+
object IdHelper {
685+
// Use atomic values since Spark contexts can be initialized in parallel
686+
private[mesos] val nextSCNumber = new AtomicLong(0)
687+
private[mesos] val startedBefore = new AtomicBoolean(false)
688+
}

0 commit comments

Comments
 (0)