Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/master' into listener
  • Loading branch information
cloud-fan committed Dec 19, 2017
commit 5b64f881adcf34b958aa659e62a9ce93171cf109
4 changes: 3 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ install:
# Install maven and dependencies
- ps: .\dev\appveyor-install-dependencies.ps1
# Required package for R unit tests
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival'), repos='http://cran.us.r-project.org')"
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'devtools', 'e1071', 'survival'), repos='http://cran.us.r-project.org')"
# Here, we use the fixed version of testthat. For more details, please see SPARK-22817.
- cmd: R -e "devtools::install_version('testthat', version = '1.0.2', repos='http://cran.us.r-project.org')"
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"

build_script:
Expand Down
12 changes: 3 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.status.{AppStatusListener, AppStatusStore}
import org.apache.spark.status.AppStatusStore
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.util._
import org.apache.spark.util.kvstore.InMemoryStore

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -417,13 +416,8 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = {
// Create an in-memory store for a live application.
val store = new InMemoryStore()
val listener = new AppStatusListener(store, conf, true)
listenerBus.addToStatusQueue(listener)
new AppStatusStore(store, listener = Some(listener))
}
_statusStore = AppStatusStore.createLiveStore(conf)
listenerBus.addToStatusQueue(_statusStore.listener.get)

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val (kvstore, needReplay) = uiStorePath match {
case Some(path) =>
try {
// The store path is not guaranteed to exist - maybe it hasn't been created, or was
// invalidated because changes to the event log were detected. Need to replay in that
// case.
val _replay = !path.isDirectory()
(createDiskStore(path, conf), _replay)
} catch {
Expand All @@ -319,27 +322,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

val plugins = ServiceLoader.load(
classOf[SparkHistoryUIPlugin], Utils.getContextOrSparkClassLoader).asScala

classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
val trackingStore = new ElementTrackingStore(kvstore, conf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to limit the UI data for history server? cc @vanzin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both because it's the old behavior, and to limit the app's history data growth. Also because the UI code itself doesn't scale to arbitrarily large lists of things like jobs and stages.

if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
for {
plugin <- plugins
listener <- plugin.createListeners(conf, kvstore)
listener <- plugin.createListeners(conf, trackingStore)
} replayBus.addListener(listener)
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
listener.flush()
trackingStore.close(false)
} catch {
case e: Exception =>
try {
kvstore.close()
} catch {
case _e: Exception => logInfo("Error closing store.", _e)
Utils.tryLogNonFatalError {
trackingStore.close()
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,6 @@ package object config {
.stringConf
.createOptional

// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)

// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
Expand Down
19 changes: 0 additions & 19 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.kvstore.KVStore

@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
Expand Down Expand Up @@ -168,24 +167,6 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

/**
* An interface for creating history listeners(to replay event logs) defined in other modules like
* SQL, and setup the UI of the plugin to rebuild the history UI.
*/
private[spark] trait SparkHistoryUIPlugin {

/**
* Creates listeners to replay the event logs.
*/
def createListeners(conf: SparkConf, store: KVStore): Seq[SparkListener]

/**
* Sets up UI of this plugin to rebuild the history UI.
*/
def setupUI(ui: SparkUI): Unit
}


/**
* Interface for listening to events from the Spark scheduler. Most applications should probably
* extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,11 @@ private[spark] class TaskSetManager(
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
Expand All @@ -502,7 +502,7 @@ private[spark] class TaskSetManager(
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")

sched.dagScheduler.taskStarted(task, info)
new TaskDescription(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.ui.SparkUI

/**
* An interface for creating history listeners(to replay event logs) defined in other modules like
* SQL, and setup the UI of the plugin to rebuild the history UI.
*/
private[spark] trait AppHistoryServerPlugin {
/**
* Creates listeners to replay the event logs.
*/
def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener]

/**
* Sets up UI of this plugin to rebuild the history UI.
*/
def setupUI(ui: SparkUI): Unit
}
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.