-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f6fdd6a
75e31a9
cf82b21
488c535
82b79a7
c052212
c565c9f
2fb596d
992acbe
b0a5839
a4f9797
2b5dcac
018f4d8
4b32134
6cf4ed7
1f271be
71a971f
0ab9ca7
7f14b71
7afce3f
b75b413
3b587b4
cb12fec
ae396cf
f8e3249
a44c29e
4bed817
c386186
b85cfc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Move Kubernetes client calls out of synchronized blocks to prevent locking with HTTP connection lag - Fix a bug where pods that fail to launch through the APi are not retried - Remove the map from executor pod name to executor ID by using the Pod's labels to get the same information without having to track extra state.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,12 +50,8 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
|
|
||
| private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) | ||
| private val RUNNING_EXECUTOR_PODS_LOCK = new Object | ||
| // Indexed by executor IDs | ||
| @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") | ||
| private val runningExecutorsToPods = new mutable.HashMap[String, Pod] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you could use GuardedBy instead of comment. |
||
| // Indexed by executor pod names | ||
| @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") | ||
| private val runningPodsToExecutors = new mutable.HashMap[String, String] | ||
| private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() | ||
| private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() | ||
| private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() | ||
|
|
@@ -117,7 +113,6 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) { | ||
| logDebug("Maximum allowed executor limit reached. Not scaling up further.") | ||
| } else { | ||
| val nodeToLocalTaskCount = getNodesWithLocalTaskCounts | ||
| for (i <- 0 until math.min( | ||
| currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) { | ||
| val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString | ||
|
|
@@ -127,7 +122,16 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| driverUrl, | ||
| conf.getExecutorEnv, | ||
| driverPod, | ||
| nodeToLocalTaskCount) | ||
| currentNodeToLocalTaskCount) | ||
| require(executorPod.getMetadata.getLabels.containsKey(SPARK_EXECUTOR_ID_LABEL), | ||
|
||
| s"Illegal internal state for pod with name ${executorPod.getMetadata.getName} - all" + | ||
| s" executor pods must contain the label $SPARK_EXECUTOR_ID_LABEL.") | ||
| val resolvedExecutorIdLabel = executorPod.getMetadata.getLabels.get( | ||
| SPARK_EXECUTOR_ID_LABEL) | ||
| require(resolvedExecutorIdLabel == executorId, | ||
|
||
| s"Illegal internal state for pod with name ${executorPod.getMetadata.getName} - all" + | ||
| s" executor pods must map the label with key ${SPARK_EXECUTOR_ID_LABEL} to the" + | ||
| s" executor's ID. This label mapped instead to: $resolvedExecutorIdLabel.") | ||
| executorsToAllocate(executorId) = executorPod | ||
| logInfo( | ||
| s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") | ||
|
|
@@ -143,8 +147,6 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| case (executorId, attemptedAllocatedExecutor) => | ||
| attemptedAllocatedExecutor.map { successfullyAllocatedExecutor => | ||
| runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor) | ||
| runningPodsToExecutors.put( | ||
| successfullyAllocatedExecutor.getMetadata.getName, executorId) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -166,11 +168,12 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| // We keep around executors that have exit conditions caused by the application. This | ||
| // allows them to be debugged later on. Otherwise, mark them as to be deleted from the | ||
| // the API server. | ||
| if (!executorExited.exitCausedByApp) { | ||
| if (executorExited.exitCausedByApp) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not very clear to me when these executors will go away. Will the app eventually clean them up? Will the user have to manually do something?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Executor pods are deleted when
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are 2 different cases here. If the application succeeds, then as @liyinan926 said, we'll clean up all executors in There are owner references between the driver pod and executors, such that the driver pod is the root of the ownership graph of all resources associated with a spark application. So, deleting the driver from the API automatically removes all executors. So, post-inspection, the expectation is that the user will delete the driver to simply clean up the application entirely. (We will include this in documentation) In the future (not in the Spark 2.3 timeframe), the root of the owner reference graph may be a CRD representing a Spark Application which will provide more transparency.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I understand the desire to make debugging easier, but isn't that bad, especially for long running applications? Won't you potentially end up with a bunch of containers holding on to CPU and memory, but no executor actually running on them?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually no, the failed driver/executor containers have already finished running and won't take cpu/memory resources. The pod objects of failed driver/executors, however, are not deleted from the API server so users can use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, makes sense. It would be good to make the comment explain that.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this not be done only when in debug mode ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We delete the in-memory executor object from
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was under the impression that client api has overhead to maintain the state of the failed executors (not the scheduler code - where it is removed from). If the references are maintained external to spark application, and within kubernetes infrastructure, this should be fine.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the executor pod objects are persisted in etcd until they are explicitly deleted by the user. So in that sense, such references are external within Kubernetes.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds great, as long as spark driver memory is not getting impacted by having these around; it should be fine. |
||
| logInfo(s"Executor $executorId exited because of the application.") | ||
| deleteExecutorFromDataStructures(executorId) | ||
| } else { | ||
| logInfo(s"Executor $executorId failed because of a framework error.") | ||
| deleteExecutorFromClusterAndDataStructures(executorId) | ||
| } else { | ||
| logInfo(s"Executor $executorId exited because of the application.") | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -187,19 +190,20 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| } | ||
|
|
||
| def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = { | ||
| deleteExecutorFromDataStructures(executorId) | ||
| .foreach(pod => kubernetesClient.pods().delete(pod)) | ||
|
||
| } | ||
|
|
||
| def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = { | ||
| disconnectedPodsByExecutorIdPendingRemoval.remove(executorId) | ||
| executorReasonCheckAttemptCounts -= executorId | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove from podsWithKnownExitReasons ? |
||
| podsWithKnownExitReasons -= executorId | ||
| val maybeExecutorPodToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorsToPods.remove(executorId).map { pod => | ||
| runningPodsToExecutors.remove(pod.getMetadata.getName) | ||
| pod | ||
| }.orElse { | ||
| podsWithKnownExitReasons.remove(executorId) | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorsToPods.remove(executorId).orElse { | ||
| logWarning(s"Unable to remove pod for unknown executor $executorId") | ||
| None | ||
| } | ||
| } | ||
| maybeExecutorPodToDelete.foreach(pod => kubernetesClient.pods().delete(pod)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -231,14 +235,10 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| super.stop() | ||
|
|
||
| // then delete the executor pods | ||
| // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. | ||
| // When using Utils.tryLogNonFatalError some of the code fails but without any logs or | ||
| // indication as to why. | ||
| Utils.tryLogNonFatalError { | ||
| val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*) | ||
| runningExecutorsToPods.clear() | ||
| runningPodsToExecutors.clear() | ||
| runningExecutorPodsCopy | ||
| } | ||
| kubernetesClient.pods().delete(executorPodsToDelete: _*) | ||
|
|
@@ -288,7 +288,6 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| val maybeRemovedExecutor = runningExecutorsToPods.remove(executor) | ||
| maybeRemovedExecutor.foreach { executorPod => | ||
| disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod) | ||
| runningPodsToExecutors.remove(executorPod.getMetadata.getName) | ||
| podsToDelete += executorPod | ||
| } | ||
| if (maybeRemovedExecutor.isEmpty) { | ||
|
|
@@ -300,11 +299,6 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| true | ||
| } | ||
|
|
||
| def getExecutorPodByIP(podIP: String): Option[Pod] = { | ||
| val pod = executorPodsByIPs.get(podIP) | ||
| Option(pod) | ||
| } | ||
|
|
||
| private class ExecutorPodsWatcher extends Watcher[Pod] { | ||
|
|
||
| private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 | ||
|
|
@@ -316,21 +310,33 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| val clusterNodeName = pod.getSpec.getNodeName | ||
| logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") | ||
| executorPodsByIPs.put(podIP, pod) | ||
| } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || | ||
| action == Action.DELETED || action == Action.ERROR) { | ||
| } else if (action == Action.DELETED || action == Action.ERROR) { | ||
| val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) | ||
| require(executorId != null, "Unexpected pod metadata; expected all executor pods" + | ||
|
||
| s" to have label $SPARK_EXECUTOR_ID_LABEL.") | ||
| val podName = pod.getMetadata.getName | ||
| val podIP = pod.getStatus.getPodIP | ||
| logDebug(s"Executor pod $podName at IP $podIP was at $action.") | ||
| if (podIP != null) { | ||
| executorPodsByIPs.remove(podIP) | ||
| } | ||
| if (action == Action.ERROR) { | ||
| val executorExitReason = if (action == Action.ERROR) { | ||
| logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) | ||
| handleErroredPod(pod) | ||
| executorExitReasonOnError(pod) | ||
| } else if (action == Action.DELETED) { | ||
| logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason) | ||
| handleDeletedPod(pod) | ||
| executorExitReasonOnDelete(pod) | ||
| } else { | ||
| throw new IllegalStateException( | ||
| s"Unknown action that should only be DELETED or ERROR: $action") | ||
| } | ||
| podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason) | ||
| if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) { | ||
| log.warn(s"Executor with id $executorId was not marked as disconnected, but the" + | ||
| s" watch received an event of type $action for this executor. The executor may" + | ||
| s" have failed to start in the first place and never registered with the driver.") | ||
| } | ||
| disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -356,15 +362,16 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| } | ||
|
|
||
| def isPodAlreadyReleased(pod: Pod): Boolean = { | ||
| val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| !runningPodsToExecutors.contains(pod.getMetadata.getName) | ||
| !runningExecutorsToPods.contains(executorId) | ||
| } | ||
| } | ||
|
|
||
| def handleErroredPod(pod: Pod): Unit = { | ||
| def executorExitReasonOnError(pod: Pod): ExecutorExited = { | ||
| val containerExitStatus = getExecutorExitStatus(pod) | ||
| // container was probably actively killed by the driver. | ||
| val exitReason = if (isPodAlreadyReleased(pod)) { | ||
| if (isPodAlreadyReleased(pod)) { | ||
| ExecutorExited(containerExitStatus, exitCausedByApp = false, | ||
| s"Container in pod ${pod.getMetadata.getName} exited from explicit termination" + | ||
| " request.") | ||
|
|
@@ -373,18 +380,16 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| s"exited with exit status code $containerExitStatus." | ||
| ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) | ||
| } | ||
| podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason) | ||
| } | ||
|
|
||
| def handleDeletedPod(pod: Pod): Unit = { | ||
| def executorExitReasonOnDelete(pod: Pod): ExecutorExited = { | ||
| val exitMessage = if (isPodAlreadyReleased(pod)) { | ||
| s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request." | ||
| } else { | ||
| s"Pod ${pod.getMetadata.getName} deleted or lost." | ||
| } | ||
| val exitReason = ExecutorExited( | ||
| ExecutorExited( | ||
| getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage) | ||
|
||
| podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this getting used ? I see it getting set, but not used anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in the executor Docker file included in #19717.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, somehow it did not show up in my searches.