diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 0634b362c1db..e431c29ef863 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -36,8 +36,6 @@ private[spark] object CoarseGrainedClusterMessages { hadoopDelegationCreds: Option[Array[Byte]]) extends CoarseGrainedClusterMessage - case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage - // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ea513776cec5..bc149b20f0cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -93,9 +93,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var localityAwareTasks = 0 - // The num of current max ExecutorId used to re-register appMaster - @volatile protected var currentExecutorIdCounter = 0 - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -189,9 +186,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) - if (currentExecutorIdCounter < executorId.toInt) { - currentExecutorIdCounter = executorId.toInt - } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 085857299b47..36a93c988224 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{Collections, List => JList} +import java.util.{Collections, UUID, List => JList} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.ReentrantLock @@ -162,6 +162,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val metricsSource = new MesosCoarseGrainedSchedulerSource(this) + private val schedulerUuid: String = UUID.randomUUID().toString + private var nextMesosTaskId = 0 @volatile var appId: String = _ @@ -469,7 +471,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs) val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setTaskId(TaskID.newBuilder().setValue( s"$schedulerUuid-$taskId").build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) .setName(s"${sc.appName} $taskId") diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 2fa8e420b954..8aacd9ac7123 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark.scheduler.cluster.mesos +import java.util.UUID import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.concurrent.duration._ - +import scala.util.Try import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos._ import org.mockito.Matchers @@ -30,14 +31,13 @@ import org.mockito.Mockito._ import org.scalatest.concurrent.ScalaFutures import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter - import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite @@ -598,6 +598,21 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0") } + test("mesos sets different task ids across executions") { + setBackend() + var offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val firstLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue + sc.stop() + + setBackend() + offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val secondLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue + + assert(firstLaunchTaskId != secondLaunchTaskId) + } + test("mesos sets configurable labels on tasks") { val taskLabelsString = "mesos:test,label:test" setBackend(Map( diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index cc571c330f8d..3a579eb4c675 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.util.Collections +import java.util.{Collections, UUID} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern @@ -26,12 +26,10 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConverters._ import scala.util.control.NonFatal - import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration - import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ @@ -40,7 +38,6 @@ import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** @@ -84,21 +81,9 @@ private[yarn] class YarnAllocator( private val numExecutorsStarting = new AtomicInteger(0) - /** - * Used to generate a unique ID per executor - * - * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then - * the id of new executor will start from 1, this will conflict with the executor has - * already created before. So, we should initialize the `executorIdCounter` by getting - * the max executorId from driver. - * - * And this situation of executorId conflict is just in yarn client mode, so this is an issue - * in yarn client mode. For more details, can check in jira. - * - * @see SPARK-12864 - */ - private var executorIdCounter: Int = - driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) + // Used to generate a unique ID per executor + private val allocatorUuid: String = UUID.randomUUID().toString + private var executorIdCounter: Int = 0 // Queue to store the timestamp of failed executors private val failedExecutorsTimeStamps = new Queue[Long]() @@ -495,7 +480,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = executorIdCounter.toString + val executorId = s"$allocatorUuid-$executorIdCounter" assert(container.getResource.getMemory >= resource.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 8452f4377419..3044bc481534 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -311,9 +311,6 @@ private[spark] abstract class YarnSchedulerBackend( logWarning("Attempted to kill executors before the AM has registered!") context.reply(false) } - - case RetrieveLastAllocatedExecutorId => - context.reply(currentExecutorIdCounter) } override def onDisconnected(remoteAddress: RpcAddress): Unit = {