diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 289b109a4274..b4dd007cde6e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -590,14 +590,12 @@ private[spark] class MesosClusterScheduler( partitionResources(remainingResources.asJava, "mem", desc.mem) offer.remainingResources = finalResources.asJava - val appName = desc.conf.get("spark.app.name") - val driverLabels = MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS) .getOrElse("")) TaskInfo.newBuilder() .setTaskId(taskId) - .setName(s"Driver for ${appName}") + .setName(s"Driver for ${desc.name}") .setSlaveId(offer.offer.getSlaveId) .setCommand(buildDriverCommand(desc)) .setContainer(getContainerInfo(desc)) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e916125ffdb6..a81d0e647717 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{Collections, List => JList} +import java.util.{Collections, UUID, List => JList} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.ReentrantLock @@ -175,17 +175,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) } - private var nextMesosTaskId = 0 - @volatile var appId: String = _ private var schedulerDriver: SchedulerDriver = _ - def newMesosTaskId(): String = { - val id = nextMesosTaskId - nextMesosTaskId += 1 - id.toString - } + private val schedulerUuid: String = UUID.randomUUID().toString + private val nextExecutorNumber = new AtomicLong() override def start(): Unit = { super.start() @@ -505,7 +500,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( if (canLaunchTask(slaveId, offer.getHostname, resources)) { // Create a task launchTasks = true - val taskId = newMesosTaskId() + val taskSeqNumber = nextExecutorNumber.getAndIncrement() + val taskId = s"${schedulerUuid}-$taskSeqNumber" val offerCPUs = getResource(resources, "cpus").toInt val taskGPUs = Math.min( Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt) @@ -519,10 +515,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs) val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setTaskId(TaskID.newBuilder().setValue(taskId).build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) - .setName(s"${sc.appName} $taskId") + .setName(s"${sc.appName} $taskSeqNumber") .setLabels(MesosProtoUtils.mesosLabels(taskLabels)) .addAllResources(resourcesToUse.asJava) .setContainer(getContainerInfo(sc.conf)) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index ed3bd358d408..f5bef456c541 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -552,6 +552,7 @@ trait MesosSchedulerUtils extends Logging { * the same frameworkID. To enforce that only the first driver registers with the configured * framework ID, the driver calls this method after the first registration. */ + @deprecated("Multiple Spark Contexts and fine-grained scheduler are deprecated") def unsetFrameworkID(sc: SparkContext): Unit = { sc.conf.remove(mesosConfig.DRIVER_FRAMEWORK_ID) System.clearProperty(mesosConfig.DRIVER_FRAMEWORK_ID.key)