Skip to content

Commit 9cd1aaf

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 9828158 + 15b7333 commit 9cd1aaf

File tree

130 files changed

+5074
-1246
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+5074
-1246
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@ class SparkContext(config: SparkConf) extends Logging {
8383
// The call site where this SparkContext was constructed.
8484
private val creationSite: CallSite = Utils.getCallSite()
8585

86-
// In order to prevent SparkContext from being created in executors.
87-
SparkContext.assertOnDriver()
86+
if (!config.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) {
87+
// In order to prevent SparkContext from being created in executors.
88+
SparkContext.assertOnDriver()
89+
}
8890

8991
// In order to prevent multiple SparkContexts from being active at the same time, mark this
9092
// context as having started construction.

core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ private[spark] object ConfigEntry {
275275

276276
val UNDEFINED = "<undefined>"
277277

278-
private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
278+
private[spark] val knownConfigs =
279+
new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
279280

280281
def registerEntry(entry: ConfigEntry[_]): Unit = {
281282
val existing = knownConfigs.putIfAbsent(entry.key, entry)

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1908,4 +1908,11 @@ package object config {
19081908
.version("3.1.0")
19091909
.booleanConf
19101910
.createWithDefault(false)
1911+
1912+
private[spark] val EXECUTOR_ALLOW_SPARK_CONTEXT =
1913+
ConfigBuilder("spark.executor.allowSparkContext")
1914+
.doc("If set to true, SparkContext can be created in executors.")
1915+
.version("3.0.1")
1916+
.booleanConf
1917+
.createWithDefault(false)
19111918
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -664,16 +664,17 @@ private[spark] class TaskSchedulerImpl(
664664
// in order to provision more executors to make them schedulable
665665
if (Utils.isDynamicAllocationEnabled(conf)) {
666666
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
667-
logInfo(s"Notifying ExecutorAllocationManager to allocate more executors to" +
668-
s" schedule the unschedulable task before aborting $taskSet.")
667+
logInfo("Notifying ExecutorAllocationManager to allocate more executors to" +
668+
" schedule the unschedulable task before aborting" +
669+
" stage ${taskSet.stageId}.")
669670
dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId,
670671
taskSet.taskSet.stageAttemptId)
671672
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
672673
}
673674
} else {
674675
// Abort Immediately
675676
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
676-
s" executors can be found to kill. Aborting $taskSet.")
677+
s" executors can be found to kill. Aborting stage ${taskSet.stageId}.")
677678
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
678679
}
679680
}
@@ -744,7 +745,7 @@ private[spark] class TaskSchedulerImpl(
744745
val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
745746
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
746747
logInfo(s"Waiting for $timeout ms for completely " +
747-
s"blacklisted task to be schedulable again before aborting $taskSet.")
748+
s"blacklisted task to be schedulable again before aborting stage ${taskSet.stageId}.")
748749
abortTimer.schedule(
749750
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
750751
}
@@ -757,7 +758,7 @@ private[spark] class TaskSchedulerImpl(
757758
if (unschedulableTaskSetToExpiryTime.contains(taskSet) &&
758759
unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis()) {
759760
logInfo("Cannot schedule any task because of complete blacklisting. " +
760-
s"Wait time for scheduling expired. Aborting $taskSet.")
761+
s"Wait time for scheduling expired. Aborting stage ${taskSet.stageId}.")
761762
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
762763
} else {
763764
this.cancel()

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
2424
import scala.collection.mutable.HashMap
2525

2626
import org.apache.spark._
27-
import org.apache.spark.executor.TaskMetrics
27+
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.internal.config.CPUS_PER_TASK
3030
import org.apache.spark.internal.config.Status._
@@ -868,13 +868,17 @@ private[spark] class AppStatusListener(
868868
// check if there is a new peak value for any of the executor level memory metrics
869869
// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed
870870
// for the live UI.
871-
event.executorUpdates.foreach { case (_, peakUpdates) =>
871+
event.executorUpdates.foreach { case (key, peakUpdates) =>
872872
liveExecutors.get(event.execId).foreach { exec =>
873873
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(peakUpdates)) {
874-
maybeUpdate(exec, now)
874+
update(exec, now)
875875
}
876876
}
877+
878+
// Update stage level peak executor metrics.
879+
updateStageLevelPeakExecutorMetrics(key._1, key._2, event.execId, peakUpdates, now)
877880
}
881+
878882
// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
879883
// here to ensure the staleness of Spark UI doesn't last more than
880884
// `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
@@ -885,17 +889,38 @@ private[spark] class AppStatusListener(
885889
}
886890
}
887891

888-
override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
892+
override def onStageExecutorMetrics(event: SparkListenerStageExecutorMetrics): Unit = {
889893
val now = System.nanoTime()
890894

891895
// check if there is a new peak value for any of the executor level memory metrics,
892896
// while reading from the log. SparkListenerStageExecutorMetrics are only processed
893897
// when reading logs.
894-
liveExecutors.get(executorMetrics.execId).orElse(
895-
deadExecutors.get(executorMetrics.execId)).foreach { exec =>
896-
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
897-
update(exec, now)
898-
}
898+
liveExecutors.get(event.execId).orElse(
899+
deadExecutors.get(event.execId)).foreach { exec =>
900+
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(event.executorMetrics)) {
901+
update(exec, now)
902+
}
903+
}
904+
905+
// Update stage level peak executor metrics.
906+
updateStageLevelPeakExecutorMetrics(
907+
event.stageId, event.stageAttemptId, event.execId, event.executorMetrics, now)
908+
}
909+
910+
private def updateStageLevelPeakExecutorMetrics(
911+
stageId: Int,
912+
stageAttemptId: Int,
913+
executorId: String,
914+
executorMetrics: ExecutorMetrics,
915+
now: Long): Unit = {
916+
Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
917+
if (stage.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics)) {
918+
update(stage, now)
919+
}
920+
val esummary = stage.executorSummary(executorId)
921+
if (esummary.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics)) {
922+
update(esummary, now)
923+
}
899924
}
900925
}
901926

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,8 @@ private[spark] class AppStatusStore(
506506
tasks = Some(tasks),
507507
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
508508
killedTasksSummary = stage.killedTasksSummary,
509-
resourceProfileId = stage.resourceProfileId)
509+
resourceProfileId = stage.resourceProfileId,
510+
peakExecutorMetrics = stage.peakExecutorMetrics)
510511
}
511512

512513
def rdd(rddId: Int): v1.RDDStorageInfo = {

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,8 @@ private class LiveExecutorStageSummary(
365365

366366
var metrics = createMetrics(default = 0L)
367367

368+
val peakExecutorMetrics = new ExecutorMetrics()
369+
368370
override protected def doUpdate(): Any = {
369371
val info = new v1.ExecutorStageSummary(
370372
taskTime,
@@ -381,7 +383,8 @@ private class LiveExecutorStageSummary(
381383
metrics.shuffleWriteMetrics.recordsWritten,
382384
metrics.memoryBytesSpilled,
383385
metrics.diskBytesSpilled,
384-
isBlacklisted)
386+
isBlacklisted,
387+
Some(peakExecutorMetrics).filter(_.isSet))
385388
new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
386389
}
387390

@@ -420,6 +423,8 @@ private class LiveStage extends LiveEntity {
420423

421424
var blackListedExecutors = new HashSet[String]()
422425

426+
val peakExecutorMetrics = new ExecutorMetrics()
427+
423428
// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
424429
@volatile var cleaning = false
425430
var savedTasks = new AtomicInteger(0)
@@ -484,7 +489,8 @@ private class LiveStage extends LiveEntity {
484489
tasks = None,
485490
executorSummary = None,
486491
killedTasksSummary = killedSummary,
487-
resourceProfileId = info.resourceProfileId)
492+
resourceProfileId = info.resourceProfileId,
493+
Some(peakExecutorMetrics).filter(_.isSet))
488494
}
489495

490496
override protected def doUpdate(): Any = {

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ class ExecutorStageSummary private[spark](
8282
val shuffleWriteRecords : Long,
8383
val memoryBytesSpilled : Long,
8484
val diskBytesSpilled : Long,
85-
val isBlacklistedForStage: Boolean)
85+
val isBlacklistedForStage: Boolean,
86+
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
87+
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
88+
val peakMemoryMetrics: Option[ExecutorMetrics])
8689

8790
class ExecutorSummary private[spark](
8891
val id: String,
@@ -259,7 +262,10 @@ class StageData private[spark](
259262
val tasks: Option[Map[Long, TaskData]],
260263
val executorSummary: Option[Map[String, ExecutorStageSummary]],
261264
val killedTasksSummary: Map[String, Int],
262-
val resourceProfileId: Int)
265+
val resourceProfileId: Int,
266+
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
267+
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
268+
val peakExecutorMetrics: Option[ExecutorMetrics])
263269

264270
class TaskData private[spark](
265271
val taskId: Long,

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.ui
1919

20-
import java.net.{URI, URL}
20+
import java.net.{URI, URL, URLDecoder}
2121
import java.util.EnumSet
2222
import javax.servlet.DispatcherType
2323
import javax.servlet.http._
@@ -377,8 +377,7 @@ private[spark] object JettyUtils extends Logging {
377377
if (baseRequest.isSecure) {
378378
return
379379
}
380-
val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort,
381-
baseRequest.getRequestURI, baseRequest.getQueryString)
380+
val httpsURI = createRedirectURI(scheme, securePort, baseRequest)
382381
response.setContentLength(0)
383382
response.sendRedirect(response.encodeRedirectURL(httpsURI))
384383
baseRequest.setHandled(true)
@@ -440,16 +439,34 @@ private[spark] object JettyUtils extends Logging {
440439
handler.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType]))
441440
}
442441

442+
private def decodeURL(url: String, encoding: String): String = {
443+
if (url == null) {
444+
null
445+
} else {
446+
URLDecoder.decode(url, encoding)
447+
}
448+
}
449+
443450
// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
444-
private def createRedirectURI(
445-
scheme: String, server: String, port: Int, path: String, query: String) = {
451+
private def createRedirectURI(scheme: String, port: Int, request: Request): String = {
452+
val server = request.getServerName
446453
val redirectServer = if (server.contains(":") && !server.startsWith("[")) {
447454
s"[${server}]"
448455
} else {
449456
server
450457
}
451458
val authority = s"$redirectServer:$port"
452-
new URI(scheme, authority, path, query, null).toString
459+
val queryEncoding = if (request.getQueryEncoding != null) {
460+
request.getQueryEncoding
461+
} else {
462+
// By default decoding the URI as "UTF-8" should be enough for SparkUI
463+
"UTF-8"
464+
}
465+
// The request URL can be raw or encoded here. To avoid the request URL being
466+
// encoded twice, let's decode it here.
467+
val requestURI = decodeURL(request.getRequestURI, queryEncoding)
468+
val queryString = decodeURL(request.getQueryString, queryEncoding)
469+
new URI(scheme, authority, requestURI, queryString, null).toString
453470
}
454471

455472
def toVirtualHosts(connectors: String*): Array[String] = connectors.map("@" + _).toArray

core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
255255
tasks = None,
256256
executorSummary = None,
257257
killedTasksSummary = Map(),
258-
ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
258+
ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID,
259+
peakExecutorMetrics = None)
259260
}
260261
}
261262

0 commit comments

Comments
 (0)