Skip to content

Commit 55aa8bc

Browse files
committed
Merge branch 'master' into SPARK-23288
2 parents 22e6ca1 + 25c2776 commit 55aa8bc

File tree

98 files changed

+3141
-1829
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+3141
-1829
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ $(function() {
7272
collapseTablePageLoad('collapse-aggregated-allActiveStages','aggregated-allActiveStages');
7373
collapseTablePageLoad('collapse-aggregated-allPendingStages','aggregated-allPendingStages');
7474
collapseTablePageLoad('collapse-aggregated-allCompletedStages','aggregated-allCompletedStages');
75+
collapseTablePageLoad('collapse-aggregated-allSkippedStages','aggregated-allSkippedStages');
7576
collapseTablePageLoad('collapse-aggregated-allFailedStages','aggregated-allFailedStages');
7677
collapseTablePageLoad('collapse-aggregated-activeStages','aggregated-activeStages');
7778
collapseTablePageLoad('collapse-aggregated-pendingOrSkippedStages','aggregated-pendingOrSkippedStages');

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
5555
/**
5656
* Request that the cluster manager kill the specified executors.
5757
*
58-
* When asking the executor to be replaced, the executor loss is considered a failure, and
59-
* killed tasks that are running on the executor will count towards the failure limits. If no
60-
* replacement is being requested, then the tasks will not count towards the limit.
61-
*
6258
* @param executorIds identifiers of executors to kill
63-
* @param replace whether to replace the killed executors with new ones, default false
59+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
60+
* after these executors have been killed
61+
* @param countFailures if there are tasks running on the executors when they are killed, whether
62+
* to count those failures toward task failure limits
6463
* @param force whether to force kill busy executors, default false
6564
* @return the ids of the executors acknowledged by the cluster manager to be removed.
6665
*/
6766
def killExecutors(
6867
executorIds: Seq[String],
69-
replace: Boolean = false,
68+
adjustTargetNumExecutors: Boolean,
69+
countFailures: Boolean,
7070
force: Boolean = false): Seq[String]
7171

7272
/**
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
8181
* @return whether the request is acknowledged by the cluster manager.
8282
*/
8383
def killExecutor(executorId: String): Boolean = {
84-
val killedExecutors = killExecutors(Seq(executorId))
84+
val killedExecutors = killExecutors(Seq(executorId), adjustTargetNumExecutors = true,
85+
countFailures = false)
8586
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
8687
}
8788
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
2929
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
3030
import org.apache.spark.metrics.source.Source
3131
import org.apache.spark.scheduler._
32+
import org.apache.spark.storage.BlockManagerMaster
3233
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
3334

3435
/**
@@ -81,7 +82,8 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
8182
private[spark] class ExecutorAllocationManager(
8283
client: ExecutorAllocationClient,
8384
listenerBus: LiveListenerBus,
84-
conf: SparkConf)
85+
conf: SparkConf,
86+
blockManagerMaster: BlockManagerMaster)
8587
extends Logging {
8688

8789
allocationManager =>
@@ -151,7 +153,7 @@ private[spark] class ExecutorAllocationManager(
151153
private var clock: Clock = new SystemClock()
152154

153155
// Listener for Spark events that impact the allocation policy
154-
private val listener = new ExecutorAllocationListener
156+
val listener = new ExecutorAllocationListener
155157

156158
// Executor that handles the scheduling task.
157159
private val executor =
@@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(
334336

335337
// If the new target has not changed, avoid sending a message to the cluster manager
336338
if (numExecutorsTarget < oldNumExecutorsTarget) {
339+
// We lower the target number of executors but don't actively kill any yet. Killing is
340+
// controlled separately by an idle timeout. It's still helpful to reduce the target number
341+
// in case an executor just happens to get lost (eg., bad hardware, or the cluster manager
342+
// preempts it) -- in that case, there is no point in trying to immediately get a new
343+
// executor, since we wouldn't even use it yet.
337344
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
338345
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
339346
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
@@ -455,7 +462,10 @@ private[spark] class ExecutorAllocationManager(
455462
val executorsRemoved = if (testing) {
456463
executorIdsToBeRemoved
457464
} else {
458-
client.killExecutors(executorIdsToBeRemoved)
465+
// We don't want to change our target number of executors, because we already did that
466+
// when the task backlog decreased.
467+
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
468+
countFailures = false, force = false)
459469
}
460470
// [SPARK-21834] killExecutors api reduces the target number of executors.
461471
// So we need to update the target with desired value.
@@ -575,7 +585,7 @@ private[spark] class ExecutorAllocationManager(
575585
// Note that it is not necessary to query the executors since all the cached
576586
// blocks we are concerned with are reported to the driver. Note that this
577587
// does not include broadcast blocks.
578-
val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
588+
val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
579589
val now = clock.getTimeMillis()
580590
val timeout = {
581591
if (hasCachedBlocks) {
@@ -610,7 +620,7 @@ private[spark] class ExecutorAllocationManager(
610620
* This class is intentionally conservative in its assumptions about the relative ordering
611621
* and consistency of events returned by the listener.
612622
*/
613-
private class ExecutorAllocationListener extends SparkListener {
623+
private[spark] class ExecutorAllocationListener extends SparkListener {
614624

615625
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
616626
// Number of running tasks per stage including speculative tasks.

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -520,19 +520,25 @@ private[spark] class SecurityManager(
520520
*
521521
* If authentication is disabled, do nothing.
522522
*
523-
* In YARN mode, generate a new secret and store it in the current user's credentials.
523+
* In YARN and local mode, generate a new secret and store it in the current user's credentials.
524524
*
525525
* In other modes, assert that the auth secret is set in the configuration.
526526
*/
527527
def initializeAuth(): Unit = {
528+
import SparkMasterRegex._
529+
528530
if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
529531
return
530532
}
531533

532-
if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
533-
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
534-
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
535-
return
534+
val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "")
535+
master match {
536+
case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) =>
537+
// Secret generation allowed here
538+
case _ =>
539+
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
540+
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
541+
return
536542
}
537543

538544
val rnd = new SecureRandom()
@@ -541,7 +547,8 @@ private[spark] class SecurityManager(
541547
rnd.nextBytes(secretBytes)
542548

543549
val creds = new Credentials()
544-
creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes)
550+
val secretStr = HashCodes.fromBytes(secretBytes).toString()
551+
creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8))
545552
UserGroupInformation.getCurrentUser().addCredentials(creds)
546553
}
547554

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,8 @@ class SparkContext(config: SparkConf) extends Logging {
534534
schedulerBackend match {
535535
case b: ExecutorAllocationClient =>
536536
Some(new ExecutorAllocationManager(
537-
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
537+
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
538+
_env.blockManager.master))
538539
case _ =>
539540
None
540541
}
@@ -1633,6 +1634,8 @@ class SparkContext(config: SparkConf) extends Logging {
16331634
* :: DeveloperApi ::
16341635
* Request that the cluster manager kill the specified executors.
16351636
*
1637+
* This is not supported when dynamic allocation is turned on.
1638+
*
16361639
* @note This is an indication to the cluster manager that the application wishes to adjust
16371640
* its resource usage downwards. If the application wishes to replace the executors it kills
16381641
* through this method with new ones, it should follow up explicitly with a call to
@@ -1644,7 +1647,10 @@ class SparkContext(config: SparkConf) extends Logging {
16441647
def killExecutors(executorIds: Seq[String]): Boolean = {
16451648
schedulerBackend match {
16461649
case b: ExecutorAllocationClient =>
1647-
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
1650+
require(executorAllocationManager.isEmpty,
1651+
"killExecutors() unsupported with Dynamic Allocation turned on")
1652+
b.killExecutors(executorIds, adjustTargetNumExecutors = true, countFailures = false,
1653+
force = true).nonEmpty
16481654
case _ =>
16491655
logWarning("Killing executors is not supported by current scheduler.")
16501656
false
@@ -1682,7 +1688,8 @@ class SparkContext(config: SparkConf) extends Logging {
16821688
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
16831689
schedulerBackend match {
16841690
case b: ExecutorAllocationClient =>
1685-
b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
1691+
b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = true,
1692+
force = true).nonEmpty
16861693
case _ =>
16871694
logWarning("Killing executors is not supported by current scheduler.")
16881695
false

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.api.python
1919

20-
import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
20+
import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, OutputStreamWriter}
2121
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
2222
import java.nio.charset.StandardCharsets
2323
import java.util.Arrays
@@ -182,7 +182,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
182182

183183
try {
184184
// Create and start the daemon
185-
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule))
185+
val command = Arrays.asList(pythonExec, "-m", daemonModule)
186+
val pb = new ProcessBuilder(command)
186187
val workerEnv = pb.environment()
187188
workerEnv.putAll(envVars.asJava)
188189
workerEnv.put("PYTHONPATH", pythonPath)
@@ -191,7 +192,29 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
191192
daemon = pb.start()
192193

193194
val in = new DataInputStream(daemon.getInputStream)
194-
daemonPort = in.readInt()
195+
try {
196+
daemonPort = in.readInt()
197+
} catch {
198+
case _: EOFException =>
199+
throw new SparkException(s"No port number in $daemonModule's stdout")
200+
}
201+
202+
// test that the returned port number is within a valid range.
203+
// note: this does not cover the case where the port number
204+
// is arbitrary data but is also coincidentally within range
205+
if (daemonPort < 1 || daemonPort > 0xffff) {
206+
val exceptionMessage = f"""
207+
|Bad data in $daemonModule's standard output. Invalid port number:
208+
| $daemonPort (0x$daemonPort%08x)
209+
|Python command to execute the daemon was:
210+
| ${command.asScala.mkString(" ")}
211+
|Check that you don't have any unexpected modules or libraries in
212+
|your PYTHONPATH:
213+
| $pythonPath
214+
|Also, check if you have a sitecustomize.py module in your python path,
215+
|or in your python installation, that is printing to standard output"""
216+
throw new SparkException(exceptionMessage.stripMargin)
217+
}
195218

196219
// Redirect daemon stdout and stderr
197220
redirectStreamsToStderr(in, daemon.getErrorStream)

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ private[scheduler] class BlacklistTracker (
152152
case Some(a) =>
153153
logInfo(s"Killing blacklisted executor id $exec " +
154154
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
155-
a.killExecutors(Seq(exec), true, true)
155+
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
156+
force = true)
156157
case None =>
157158
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
158159
s"since allocation client is not defined.")

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
147147

148148
case KillExecutorsOnHost(host) =>
149149
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
150-
killExecutors(exec.toSeq, replace = true, force = true)
150+
killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false,
151+
force = true)
151152
}
152153

153154
case UpdateDelegationTokens(newDelegationTokens) =>
@@ -584,18 +585,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
584585
/**
585586
* Request that the cluster manager kill the specified executors.
586587
*
587-
* When asking the executor to be replaced, the executor loss is considered a failure, and
588-
* killed tasks that are running on the executor will count towards the failure limits. If no
589-
* replacement is being requested, then the tasks will not count towards the limit.
590-
*
591588
* @param executorIds identifiers of executors to kill
592-
* @param replace whether to replace the killed executors with new ones, default false
589+
* @param adjustTargetNumExecutors whether the target number of executors be adjusted down
590+
* after these executors have been killed
591+
* @param countFailures if there are tasks running on the executors when they are killed, whether
592+
* those failures be counted to task failure limits?
593593
* @param force whether to force kill busy executors, default false
594594
* @return the ids of the executors acknowledged by the cluster manager to be removed.
595595
*/
596596
final override def killExecutors(
597597
executorIds: Seq[String],
598-
replace: Boolean,
598+
adjustTargetNumExecutors: Boolean,
599+
countFailures: Boolean,
599600
force: Boolean): Seq[String] = {
600601
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
601602

@@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
610611
val executorsToKill = knownExecutors
611612
.filter { id => !executorsPendingToRemove.contains(id) }
612613
.filter { id => force || !scheduler.isExecutorBusy(id) }
613-
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
614+
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures }
614615

615616
logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")
616617

617618
// If we do not wish to replace the executors we kill, sync the target number of executors
618619
// with the cluster manager to avoid allocating new ones. When computing the new target,
619620
// take into account executors that are pending to be added or removed.
620621
val adjustTotalExecutors =
621-
if (!replace) {
622+
if (adjustTargetNumExecutors) {
622623
requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
623624
if (requestedTotalExecutors !=
624625
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
625626
logDebug(
626-
s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
627+
s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force):
628+
|Executor counts do not match:
627629
|requestedTotalExecutors = $requestedTotalExecutors
628630
|numExistingExecutors = $numExistingExecutors
629631
|numPendingExecutors = $numPendingExecutors

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,10 @@ private[spark] class AppStatusListener(
915915
return
916916
}
917917

918-
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
918+
// As the completion time of a skipped stage is always -1, we will remove skipped stages first.
919+
// This is safe since the job itself contains enough information to render skipped stages in the
920+
// UI.
921+
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime")
919922
val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
920923
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
921924
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,11 @@ private[spark] class AppStatusStore(
9595
}
9696

9797
def lastStageAttempt(stageId: Int): v1.StageData = {
98-
val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
98+
val it = store.view(classOf[StageDataWrapper])
99+
.index("stageId")
100+
.reverse()
101+
.first(stageId)
102+
.last(stageId)
99103
.closeableIterator()
100104
try {
101105
if (it.hasNext()) {

0 commit comments

Comments
 (0)