diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 283390814a6c..76fbd2797caa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -38,8 +38,6 @@ private[spark] object CoarseGrainedClusterMessages { resourceProfile: ResourceProfile) extends CoarseGrainedClusterMessage - case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage - // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8b55e2c7dbee..e018f1199a5d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -110,9 +110,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var localityAwareTasks = 0 - // The num of current max ExecutorId used to re-register appMaster - @volatile protected var currentExecutorIdCounter = 0 - // Current set of delegation tokens to send to executors. private val delegationTokens = new AtomicReference[Array[Byte]]() @@ -241,9 +238,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) - if (currentExecutorIdCounter < executorId.toInt) { - currentExecutorIdCounter = executorId.toInt - } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 289b109a4274..b4dd007cde6e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -590,14 +590,12 @@ private[spark] class MesosClusterScheduler( partitionResources(remainingResources.asJava, "mem", desc.mem) offer.remainingResources = finalResources.asJava - val appName = desc.conf.get("spark.app.name") - val driverLabels = MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS) .getOrElse("")) TaskInfo.newBuilder() .setTaskId(taskId) - .setName(s"Driver for ${appName}") + .setName(s"Driver for ${desc.name}") .setSlaveId(offer.offer.getSlaveId) .setCommand(buildDriverCommand(desc)) .setContainer(getContainerInfo(desc)) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e916125ffdb6..a81d0e647717 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{Collections, List => JList} +import java.util.{Collections, UUID, List => JList} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.ReentrantLock @@ -175,17 +175,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) } - private var nextMesosTaskId = 0 - @volatile var appId: String = _ private var schedulerDriver: SchedulerDriver = _ - def newMesosTaskId(): String = { - val id = nextMesosTaskId - nextMesosTaskId += 1 - id.toString - } + private val schedulerUuid: String = UUID.randomUUID().toString + private val nextExecutorNumber = new AtomicLong() override def start(): Unit = { super.start() @@ -505,7 +500,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( if (canLaunchTask(slaveId, offer.getHostname, resources)) { // Create a task launchTasks = true - val taskId = newMesosTaskId() + val taskSeqNumber = nextExecutorNumber.getAndIncrement() + val taskId = s"${schedulerUuid}-$taskSeqNumber" val offerCPUs = getResource(resources, "cpus").toInt val taskGPUs = Math.min( Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt) @@ -519,10 +515,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs) val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setTaskId(TaskID.newBuilder().setValue(taskId).build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) - .setName(s"${sc.appName} $taskId") + .setName(s"${sc.appName} $taskSeqNumber") .setLabels(MesosProtoUtils.mesosLabels(taskLabels)) .addAllResources(resourcesToUse.asJava) .setContainer(getContainerInfo(sc.conf)) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index ed3bd358d408..f5bef456c541 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -552,6 +552,7 @@ trait MesosSchedulerUtils extends Logging { * the same frameworkID. To enforce that only the first driver registers with the configured * framework ID, the driver calls this method after the first registration. */ + @deprecated("Multiple Spark Contexts and fine-grained scheduler are deprecated") def unsetFrameworkID(sc: SparkContext): Unit = { sc.conf.remove(mesosConfig.DRIVER_FRAMEWORK_ID) System.clearProperty(mesosConfig.DRIVER_FRAMEWORK_ID.key) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 1876861700fc..542f859e606b 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos +import java.util.UUID import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -37,7 +38,7 @@ import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite @@ -544,6 +545,21 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0") } + test("mesos sets different task ids across executions") { + setBackend() + var offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val firstLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue + sc.stop() + + setBackend() + offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val secondLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue + + assert(firstLaunchTaskId != secondLaunchTaskId) + } + test("mesos sets configurable labels on tasks") { val taskLabelsString = "mesos:test,label:test" setBackend(Map( diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 09414cbbe50a..a77d14f09604 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.util.Collections +import java.util.{Collections, UUID} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger @@ -42,7 +42,6 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.scheduler.cluster.SchedulerBackendUtils import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} @@ -89,21 +88,9 @@ private[yarn] class YarnAllocator( private val numExecutorsStarting = new AtomicInteger(0) - /** - * Used to generate a unique ID per executor - * - * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then - * the id of new executor will start from 1, this will conflict with the executor has - * already created before. So, we should initialize the `executorIdCounter` by getting - * the max executorId from driver. - * - * And this situation of executorId conflict is just in yarn client mode, so this is an issue - * in yarn client mode. For more details, can check in jira. - * - * @see SPARK-12864 - */ - private var executorIdCounter: Int = - driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) + // Used to generate a unique ID per executor + private val allocatorUuid: String = UUID.randomUUID().toString + private var executorIdCounter: Int = 0 private[spark] val failureTracker = new FailureTracker(sparkConf, clock) @@ -533,7 +520,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = executorIdCounter.toString + val executorId = s"$allocatorUuid-$executorIdCounter" assert(container.getResource.getMemory >= resource.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 471ee58d05cb..19d26d52e7e6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -311,9 +311,6 @@ private[spark] abstract class YarnSchedulerBackend( context.reply(false) } - case RetrieveLastAllocatedExecutorId => - context.reply(currentExecutorIdCounter) - case RetrieveDelegationTokens => context.reply(currentDelegationTokens) }