Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit e81871c

Browse files
author
Anton Kirillov
committed
[DCOS-39150][SPARK] Support unique Executor IDs in cluster managers
Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance in front of numeric ID thus allowing to distinguish Executors belonging to different drivers. This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)"
1 parent a1d7e7a commit e81871c

File tree

6 files changed

+27
-36
lines changed

6 files changed

+27
-36
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ private[spark] object CoarseGrainedClusterMessages {
3636
hadoopDelegationCreds: Option[Array[Byte]])
3737
extends CoarseGrainedClusterMessage
3838

39-
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
40-
4139
// Driver to executors
4240
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
4341

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
9393
@GuardedBy("CoarseGrainedSchedulerBackend.this")
9494
protected var localityAwareTasks = 0
9595

96-
// The num of current max ExecutorId used to re-register appMaster
97-
@volatile protected var currentExecutorIdCounter = 0
98-
9996
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
10097
extends ThreadSafeRpcEndpoint with Logging {
10198

@@ -189,9 +186,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
189186
// in this block are read when requesting executors
190187
CoarseGrainedSchedulerBackend.this.synchronized {
191188
executorDataMap.put(executorId, data)
192-
if (currentExecutorIdCounter < executorId.toInt) {
193-
currentExecutorIdCounter = executorId.toInt
194-
}
195189
if (numPendingExecutors > 0) {
196190
numPendingExecutors -= 1
197191
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.scheduler.cluster.mesos
1919

2020
import java.io.File
21-
import java.util.{Collections, List => JList}
21+
import java.util.{Collections, UUID, List => JList}
2222
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2323
import java.util.concurrent.locks.ReentrantLock
2424

@@ -162,6 +162,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
162162

163163
private val metricsSource = new MesosCoarseGrainedSchedulerSource(this)
164164

165+
private val schedulerUuid: String = UUID.randomUUID().toString
166+
165167
private var nextMesosTaskId = 0
166168

167169
@volatile var appId: String = _
@@ -469,7 +471,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
469471
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
470472

471473
val taskBuilder = MesosTaskInfo.newBuilder()
472-
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
474+
.setTaskId(TaskID.newBuilder().setValue( s"$schedulerUuid-$taskId").build())
473475
.setSlaveId(offer.getSlaveId)
474476
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
475477
.setName(s"${sc.appName} $taskId")

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark.scheduler.cluster.mesos
1919

20+
import java.util.UUID
2021
import java.util.concurrent.TimeUnit
2122

2223
import scala.collection.JavaConverters._
2324
import scala.concurrent.duration._
24-
25+
import scala.util.Try
2526
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
2627
import org.apache.mesos.Protos._
2728
import org.mockito.Matchers
@@ -30,14 +31,13 @@ import org.mockito.Mockito._
3031
import org.scalatest.concurrent.ScalaFutures
3132
import org.scalatest.mock.MockitoSugar
3233
import org.scalatest.BeforeAndAfter
33-
3434
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
3535
import org.apache.spark.deploy.mesos.config._
3636
import org.apache.spark.internal.config._
3737
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
3838
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
3939
import org.apache.spark.scheduler.TaskSchedulerImpl
40-
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor}
40+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
4141
import org.apache.spark.scheduler.cluster.mesos.Utils._
4242

4343
class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
@@ -598,6 +598,21 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
598598
assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0")
599599
}
600600

601+
test("mesos sets different task ids across executions") {
602+
setBackend()
603+
var offers = List(Resources(backend.executorMemory(sc), 1))
604+
offerResources(offers)
605+
val firstLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue
606+
sc.stop()
607+
608+
setBackend()
609+
offers = List(Resources(backend.executorMemory(sc), 1))
610+
offerResources(offers)
611+
val secondLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue
612+
613+
assert(firstLaunchTaskId != secondLaunchTaskId)
614+
}
615+
601616
test("mesos sets configurable labels on tasks") {
602617
val taskLabelsString = "mesos:test,label:test"
603618
setBackend(Map(

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.util.Collections
20+
import java.util.{Collections, UUID}
2121
import java.util.concurrent._
2222
import java.util.concurrent.atomic.AtomicInteger
2323
import java.util.regex.Pattern
@@ -26,12 +26,10 @@ import scala.collection.mutable
2626
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
2727
import scala.collection.JavaConverters._
2828
import scala.util.control.NonFatal
29-
3029
import org.apache.hadoop.yarn.api.records._
3130
import org.apache.hadoop.yarn.client.api.AMRMClient
3231
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
3332
import org.apache.hadoop.yarn.conf.YarnConfiguration
34-
3533
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3634
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
3735
import org.apache.spark.deploy.yarn.config._
@@ -40,7 +38,6 @@ import org.apache.spark.internal.config._
4038
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
4139
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
4240
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
43-
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
4441
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
4542

4643
/**
@@ -84,21 +81,9 @@ private[yarn] class YarnAllocator(
8481

8582
private val numExecutorsStarting = new AtomicInteger(0)
8683

87-
/**
88-
* Used to generate a unique ID per executor
89-
*
90-
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
91-
* the id of new executor will start from 1, this will conflict with the executor has
92-
* already created before. So, we should initialize the `executorIdCounter` by getting
93-
* the max executorId from driver.
94-
*
95-
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
96-
* in yarn client mode. For more details, can check in jira.
97-
*
98-
* @see SPARK-12864
99-
*/
100-
private var executorIdCounter: Int =
101-
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
84+
// Used to generate a unique ID per executor
85+
private val allocatorUuid: String = UUID.randomUUID().toString
86+
private var executorIdCounter: Int = 0
10287

10388
// Queue to store the timestamp of failed executors
10489
private val failedExecutorsTimeStamps = new Queue[Long]()
@@ -495,7 +480,7 @@ private[yarn] class YarnAllocator(
495480
executorIdCounter += 1
496481
val executorHostname = container.getNodeId.getHost
497482
val containerId = container.getId
498-
val executorId = executorIdCounter.toString
483+
val executorId = s"$allocatorUuid-$executorIdCounter"
499484
assert(container.getResource.getMemory >= resource.getMemory)
500485
logInfo(s"Launching container $containerId on host $executorHostname " +
501486
s"for executor with ID $executorId")

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,6 @@ private[spark] abstract class YarnSchedulerBackend(
311311
logWarning("Attempted to kill executors before the AM has registered!")
312312
context.reply(false)
313313
}
314-
315-
case RetrieveLastAllocatedExecutorId =>
316-
context.reply(currentExecutorIdCounter)
317314
}
318315

319316
override def onDisconnected(remoteAddress: RpcAddress): Unit = {

0 commit comments

Comments
 (0)