-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f6fdd6a
75e31a9
cf82b21
488c535
82b79a7
c052212
c565c9f
2fb596d
992acbe
b0a5839
a4f9797
2b5dcac
018f4d8
4b32134
6cf4ed7
1f271be
71a971f
0ab9ca7
7f14b71
7afce3f
b75b413
3b587b4
cb12fec
ae396cf
f8e3249
a44c29e
4bed817
c386186
b85cfc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,7 +47,6 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) | |
|
|
||
| private val executorExtraClasspath = sparkConf.get( | ||
| org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) | ||
| private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) | ||
|
|
||
| private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( | ||
| sparkConf, | ||
|
|
@@ -94,7 +93,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) | |
| MEMORY_OVERHEAD_MIN_MIB)) | ||
| private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB | ||
|
|
||
| private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) | ||
| private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) | ||
| private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) | ||
|
||
|
|
||
| override def createExecutorPod( | ||
|
|
@@ -108,7 +107,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) | |
|
|
||
| // hostname must be no longer than 63 characters, so take the last 63 characters of the pod | ||
| // name as the hostname. This preserves uniqueness since the end of name contains | ||
| // executorId and applicationId | ||
| // executorId | ||
| val hostname = name.substring(Math.max(0, name.length - 63)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is hostname used for here ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Think we might have been relying on it before but we certainly don't now - this should be removed. Probably want to try it on our fork first and run it through our integration tests to verify this is the case.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am fine with deferring this in a later PR as long as we track it somewhere : it might so happen that this is a requirement anyway and we cant use random names.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| val resolvedExecutorLabels = Map( | ||
| SPARK_EXECUTOR_ID_LABEL -> executorId, | ||
|
|
@@ -139,15 +138,14 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) | |
| new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() | ||
| } | ||
| }.getOrElse(Seq.empty[EnvVar]) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this getting used ? I see it getting set, but not used anywhere.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is used in the executor Docker file included in #19717.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, somehow it did not show up in my searches. |
||
| val executorEnv = (Seq( | ||
| val executorEnv = Seq( | ||
| (ENV_EXECUTOR_PORT, executorPort.toString), | ||
| (ENV_DRIVER_URL, driverUrl), | ||
| // Executor backend expects integral value for executor cores, so round it up to an int. | ||
| (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), | ||
| (ENV_EXECUTOR_MEMORY, executorMemoryString), | ||
| (ENV_APPLICATION_ID, applicationId), | ||
| (ENV_EXECUTOR_ID, executorId), | ||
| (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs) | ||
| (ENV_EXECUTOR_ID, executorId)) | ||
| .map(env => new EnvVarBuilder() | ||
| .withName(env._1) | ||
| .withValue(env._2) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ import java.io.Closeable | |
| import java.net.InetAddress | ||
| import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} | ||
| import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} | ||
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
| import io.fabric8.kubernetes.api.model._ | ||
| import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} | ||
|
|
@@ -49,9 +50,11 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
|
|
||
| private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) | ||
| private val RUNNING_EXECUTOR_PODS_LOCK = new Object | ||
| // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. | ||
| // Indexed by executor IDs | ||
| @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") | ||
| private val runningExecutorsToPods = new mutable.HashMap[String, Pod] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you could use GuardedBy instead of comment. |
||
| // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. | ||
| // Indexed by executor pod names | ||
| @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") | ||
| private val runningPodsToExecutors = new mutable.HashMap[String, String] | ||
| private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() | ||
| private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() | ||
|
|
@@ -105,21 +108,44 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
|
|
||
| override def run(): Unit = { | ||
| handleDisconnectedExecutors() | ||
| val executorsToAllocate = mutable.Map[String, Pod]() | ||
| val currentTotalRegisteredExecutors = totalRegisteredExecutors.get | ||
| val currentTotalExpectedExecutors = totalExpectedExecutors.get | ||
| val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts | ||
| if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) { | ||
|
||
| logDebug("Waiting for pending executors before scaling") | ||
| } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) { | ||
| logDebug("Maximum allowed executor limit reached. Not scaling up further.") | ||
| } else { | ||
| val nodeToLocalTaskCount = getNodesWithLocalTaskCounts | ||
| for (i <- 0 until math.min( | ||
| currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) { | ||
| val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString | ||
| val executorPod = executorPodFactory.createExecutorPod( | ||
| executorId, | ||
| applicationId(), | ||
| driverUrl, | ||
| conf.getExecutorEnv, | ||
| driverPod, | ||
| nodeToLocalTaskCount) | ||
| executorsToAllocate(executorId) = executorPod | ||
| logInfo( | ||
| s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") | ||
| } | ||
| } | ||
| val allocatedExecutors = executorsToAllocate.mapValues { pod => | ||
| Utils.tryLog { | ||
| kubernetesClient.pods().create(pod) | ||
| } | ||
| } | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) { | ||
| logDebug("Waiting for pending executors before scaling") | ||
| } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) { | ||
| logDebug("Maximum allowed executor limit reached. Not scaling up further.") | ||
| } else { | ||
| val nodeToLocalTaskCount = getNodesWithLocalTaskCounts | ||
| for (i <- 0 until math.min( | ||
| totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { | ||
| val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount) | ||
| runningExecutorsToPods.put(executorId, pod) | ||
| runningPodsToExecutors.put(pod.getMetadata.getName, executorId) | ||
| logInfo( | ||
| s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") | ||
| } | ||
| allocatedExecutors.map { | ||
| case (executorId, attemptedAllocatedExecutor) => | ||
| attemptedAllocatedExecutor.map { successfullyAllocatedExecutor => | ||
| runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor) | ||
| runningPodsToExecutors.put( | ||
| successfullyAllocatedExecutor.getMetadata.getName, executorId) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -128,25 +154,25 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| // For each disconnected executor, synchronize with the loss reasons that may have been found | ||
| // by the executor pod watcher. If the loss reason was discovered by the watcher, | ||
| // inform the parent class with removeExecutor. | ||
| disconnectedPodsByExecutorIdPendingRemoval.keys().asScala.foreach { case (executorId) => | ||
| val executorPod = disconnectedPodsByExecutorIdPendingRemoval.get(executorId) | ||
| val knownExitReason = Option(podsWithKnownExitReasons.remove( | ||
| executorPod.getMetadata.getName)) | ||
| knownExitReason.fold { | ||
| removeExecutorOrIncrementLossReasonCheckCount(executorId) | ||
| } { executorExited => | ||
| logWarning(s"Removing executor $executorId with loss reason " + executorExited.message) | ||
| removeExecutor(executorId, executorExited) | ||
| // We keep around executors that have exit conditions caused by the application. This | ||
| // allows them to be debugged later on. Otherwise, mark them as to be deleted from the | ||
| // the API server. | ||
| if (!executorExited.exitCausedByApp) { | ||
| logInfo(s"Executor $executorId failed because of a framework error.") | ||
| deleteExecutorFromClusterAndDataStructures(executorId) | ||
| } else { | ||
| logInfo(s"Executor $executorId exited because of the application.") | ||
| disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach { | ||
| case (executorId, executorPod) => | ||
| val knownExitReason = Option(podsWithKnownExitReasons.remove( | ||
| executorPod.getMetadata.getName)) | ||
| knownExitReason.fold { | ||
| removeExecutorOrIncrementLossReasonCheckCount(executorId) | ||
| } { executorExited => | ||
| logWarning(s"Removing executor $executorId with loss reason " + executorExited.message) | ||
| removeExecutor(executorId, executorExited) | ||
| // We keep around executors that have exit conditions caused by the application. This | ||
| // allows them to be debugged later on. Otherwise, mark them as to be deleted from the | ||
| // the API server. | ||
| if (!executorExited.exitCausedByApp) { | ||
| logInfo(s"Executor $executorId failed because of a framework error.") | ||
| deleteExecutorFromClusterAndDataStructures(executorId) | ||
| } else { | ||
| logInfo(s"Executor $executorId exited because of the application.") | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -163,12 +189,17 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = { | ||
| disconnectedPodsByExecutorIdPendingRemoval.remove(executorId) | ||
| executorReasonCheckAttemptCounts -= executorId | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove from podsWithKnownExitReasons ? |
||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| podsWithKnownExitReasons -= executorId | ||
| val maybeExecutorPodToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorsToPods.remove(executorId).map { pod => | ||
| kubernetesClient.pods().delete(pod) | ||
| runningPodsToExecutors.remove(pod.getMetadata.getName) | ||
| }.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId")) | ||
| pod | ||
| }.orElse { | ||
| logWarning(s"Unable to remove pod for unknown executor $executorId") | ||
| None | ||
| } | ||
| } | ||
| maybeExecutorPodToDelete.foreach(pod => kubernetesClient.pods().delete(pod)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -203,25 +234,23 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. | ||
| // When using Utils.tryLogNonFatalError some of the code fails but without any logs or | ||
| // indication as to why. | ||
| try { | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
| Utils.tryLogNonFatalError { | ||
| val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*) | ||
| runningExecutorsToPods.clear() | ||
| runningPodsToExecutors.clear() | ||
| runningExecutorPodsCopy | ||
| } | ||
| kubernetesClient.pods().delete(executorPodsToDelete: _*) | ||
| executorPodsByIPs.clear() | ||
| val resource = executorWatchResource.getAndSet(null) | ||
| if (resource != null) { | ||
| resource.close() | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not very sure of the semantics of watcher here - should we close watcher before executor deletes here ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The watcher will receive a DELETE event for each deleted executor pod, and the event is handled in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the actual Is this correct assumption ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about If this is the case, in the current code, we will not have watcher being notified of the nodes delete's.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok - looking up a bit I think we want the order to be reversed here. We should first close the watch to ensure we don't get any deleted events, then delete the pods themselves. We probably want to ensure the pods are deleted even if we fail to close the watch.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it depends on if we require the watcher to receive and act on the DELETE events in this case. If not, moving
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reversed the order in c386186. |
||
| } catch { | ||
| case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) | ||
| } | ||
| try { | ||
| Utils.tryLogNonFatalError { | ||
| logInfo("Closing kubernetes client") | ||
| kubernetesClient.close() | ||
| } catch { | ||
| case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -231,7 +260,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| */ | ||
| private def getNodesWithLocalTaskCounts() : Map[String, Int] = { | ||
| val nodeToLocalTaskCount = mutable.Map[String, Int]() ++ | ||
| KubernetesClusterSchedulerBackend.this.synchronized { | ||
| synchronized { | ||
|
||
| hostToLocalTaskCount | ||
| } | ||
| for (pod <- executorPodsByIPs.values().asScala) { | ||
|
|
@@ -247,58 +276,31 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| nodeToLocalTaskCount.toMap[String, Int] | ||
| } | ||
|
|
||
| /** | ||
| * Allocates a new executor pod | ||
| * | ||
| * @param nodeToLocalTaskCount A map of K8s cluster nodes to the number of tasks that could | ||
| * benefit from data locality if an executor launches on the cluster | ||
| * node. | ||
| * @return A tuple of the new executor name and the Pod data structure. | ||
| */ | ||
| private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = { | ||
| val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString | ||
| val executorPod = executorPodFactory.createExecutorPod( | ||
| executorId, | ||
| applicationId(), | ||
| driverUrl, | ||
| conf.getExecutorEnv, | ||
| driverPod, | ||
| nodeToLocalTaskCount) | ||
| try { | ||
| (executorId, kubernetesClient.pods.create(executorPod)) | ||
| } catch { | ||
| case throwable: Throwable => | ||
| logError("Failed to allocate executor pod.", throwable) | ||
| throw throwable | ||
| } | ||
| } | ||
|
|
||
| override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { | ||
| totalExpectedExecutors.set(requestedTotal) | ||
| true | ||
| } | ||
|
|
||
| override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { | ||
| val podsToDelete = mutable.Buffer[Pod]() | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
|
||
| for (executor <- executorIds) { | ||
| val maybeRemovedExecutor = runningExecutorsToPods.remove(executor) | ||
| maybeRemovedExecutor.foreach { executorPod => | ||
| kubernetesClient.pods().delete(executorPod) | ||
| disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod) | ||
| runningPodsToExecutors.remove(executorPod.getMetadata.getName) | ||
| podsToDelete += executorPod | ||
| } | ||
| if (maybeRemovedExecutor.isEmpty) { | ||
| logWarning(s"Unable to remove pod for unknown executor $executor") | ||
| } | ||
| } | ||
| } | ||
| kubernetesClient.pods().delete(podsToDelete: _*) | ||
| true | ||
| } | ||
|
|
||
| def getExecutorPodByIP(podIP: String): Option[Pod] = { | ||
| // Note: Per https://github.com/databricks/scala-style-guide#concurrency, we don't | ||
| // want to be switching to scala.collection.concurrent.Map on | ||
| // executorPodsByIPs. | ||
| val pod = executorPodsByIPs.get(podIP) | ||
| Option(pod) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: