diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
new file mode 100644
index 0000000000000..646496f313507
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+/**
+ * Java clients should extend this class instead of implementing
+ * SparkListener directly. This is to prevent java clients
+ * from breaking when new events are added to the SparkListener
+ * trait.
+ *
+ * This is a concrete class instead of abstract to enforce
+ * new events get added to both the SparkListener and this adapter
+ * in lockstep.
+ */
+public class JavaSparkListener implements SparkListener {
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) { }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
+
+ @Override
+ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
+
+ @Override
+ public void onJobStart(SparkListenerJobStart jobStart) { }
+
+ @Override
+ public void onJobEnd(SparkListenerJobEnd jobEnd) { }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
+
+ @Override
+ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
+
+ @Override
+ public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ad7d81747c377..ede0a9dbefb8d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -38,8 +38,8 @@ private[spark] class ApplicationInfo(
extends Serializable {
@transient var state: ApplicationState.Value = _
- @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
- @transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
+ @transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
+ @transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
@@ -55,12 +55,12 @@ private[spark] class ApplicationInfo(
private def init() {
state = ApplicationState.WAITING
- executors = new mutable.HashMap[Int, ExecutorInfo]
+ executors = new mutable.HashMap[Int, ExecutorDesc]
coresGranted = 0
endTime = -1L
appSource = new ApplicationSource(this)
nextExecutorId = 0
- removedExecutors = new ArrayBuffer[ExecutorInfo]
+ removedExecutors = new ArrayBuffer[ExecutorDesc]
}
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -75,14 +75,14 @@ private[spark] class ApplicationInfo(
}
}
- def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
- val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
+ def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = {
+ val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
coresGranted += cores
exec
}
- def removeExecutor(exec: ExecutorInfo) {
+ def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
similarity index 95%
rename from core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
rename to core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
index d417070c51016..5d620dfcabad5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
-private[spark] class ExecutorInfo(
+private[spark] class ExecutorDesc(
val id: Int,
val application: ApplicationInfo,
val worker: WorkerInfo,
@@ -37,7 +37,7 @@ private[spark] class ExecutorInfo(
override def equals(other: Any): Boolean = {
other match {
- case info: ExecutorInfo =>
+ case info: ExecutorDesc =>
fullId == info.fullId &&
worker.id == info.worker.id &&
cores == info.cores &&
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e8a5cfc746fed..c3c24e8b7a983 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -581,7 +581,7 @@ private[spark] class Master(
}
}
- def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
+ def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 473ddc23ff0f3..e94aae93e4495 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -38,7 +38,7 @@ private[spark] class WorkerInfo(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
- @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
+ @transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
@transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
@@ -70,13 +70,13 @@ private[spark] class WorkerInfo(
host + ":" + port
}
- def addExecutor(exec: ExecutorInfo) {
+ def addExecutor(exec: ExecutorDesc) {
executors(exec.fullId) = exec
coresUsed += exec.cores
memoryUsed += exec.memory
}
- def removeExecutor(exec: ExecutorInfo) {
+ def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.fullId)) {
executors -= exec.fullId
coresUsed -= exec.cores
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 4588c130ef439..3aae2b95d7396 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -27,7 +27,7 @@ import org.json4s.JValue
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.ExecutorInfo
+import org.apache.spark.deploy.master.ExecutorDesc
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
@@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}
- private def executorRow(executor: ExecutorInfo): Seq[Node] = {
+ private def executorRow(executor: ExecutorDesc): Seq[Node] = {
| {executor.id} |
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 27bf4f1599076..30075c172bdb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -168,6 +168,10 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
logEvent(event, flushLogger = true)
+ override def onExecutorAdded(event: SparkListenerExecutorAdded) =
+ logEvent(event, flushLogger = true)
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved) =
+ logEvent(event, flushLogger = true)
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b62b0c1312693..4840d8bd2d2f0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Distribution, Utils}
@@ -84,6 +85,14 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
+@DeveloperApi
+case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerExecutorRemoved(executorId: String)
+ extends SparkListenerEvent
+
/**
* Periodic updates from executors.
* @param execId executor id
@@ -109,7 +118,8 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
- * interface which might change in different Spark releases.
+ * interface which might change in different Spark releases. Java clients should extend
+ * {@link JavaSparkListener}
*/
@DeveloperApi
trait SparkListener {
@@ -183,6 +193,16 @@ trait SparkListener {
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
+
+ /**
+ * Called when the driver registers a new executor.
+ */
+ def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
+
+ /**
+ * Called when the driver removes an executor.
+ */
+ def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e79ffd7a3587d..e700c6af542f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -70,6 +70,10 @@ private[spark] trait SparkListenerBus extends Logging {
foreachListener(_.onApplicationEnd(applicationEnd))
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
+ case executorAdded: SparkListenerExecutorAdded =>
+ foreachListener(_.onExecutorAdded(executorAdded))
+ case executorRemoved: SparkListenerExecutorRemoved =>
+ foreachListener(_.onExecutorRemoved(executorRemoved))
case SparkListenerShutdown =>
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index fe9914b50bc54..5786d367464f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -28,7 +28,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
-import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
@@ -66,6 +66,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
// Number of executors requested from the cluster manager that have not registered yet
private var numPendingExecutors = 0
+ private val listenerBus = scheduler.sc.listenerBus
+
// Executors we have requested the cluster manager to kill that have not died yet
private val executorsPendingToRemove = new HashSet[String]
@@ -106,6 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
+ listenerBus.post(SparkListenerExecutorAdded(executorId, data))
makeOffers()
}
@@ -213,6 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, SlaveLost(reason))
+ listenerBus.post(SparkListenerExecutorRemoved(executorId))
case None => logError(s"Asked to remove non-existent executor $executorId")
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index b71bd5783d6df..eb52ddfb1eab1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -31,7 +31,7 @@ import akka.actor.{Address, ActorRef}
private[cluster] class ExecutorData(
val executorActor: ActorRef,
val executorAddress: Address,
- val executorHost: String ,
+ override val executorHost: String,
var freeCores: Int,
- val totalCores: Int
-)
+ override val totalCores: Int
+) extends ExecutorInfo(executorHost, totalCores)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
new file mode 100644
index 0000000000000..b4738e64c9391
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about an executor to pass from the scheduler to SparkListeners.
+ */
+@DeveloperApi
+class ExecutorInfo(
+ val executorHost: String,
+ val totalCores: Int
+) {
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: ExecutorInfo =>
+ (that canEqual this) &&
+ executorHost == that.executorHost &&
+ totalCores == that.totalCores
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(executorHost, totalCores)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 10e6886c16a4f..7e5428198e2e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -27,9 +27,11 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
+ ExecutorInfo => MesosExecutorInfo, _}
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -62,6 +64,9 @@ private[spark] class MesosSchedulerBackend(
var classLoader: ClassLoader = null
+ // The listener bus to publish executor added/removed events.
+ val listenerBus = sc.listenerBus
+
@volatile var appId: String = _
override def start() {
@@ -87,7 +92,7 @@ private[spark] class MesosSchedulerBackend(
}
}
- def createExecutorInfo(execId: String): ExecutorInfo = {
+ def createExecutorInfo(execId: String): MesosExecutorInfo = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
@@ -141,7 +146,7 @@ private[spark] class MesosSchedulerBackend(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
- ExecutorInfo.newBuilder()
+ MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
@@ -237,6 +242,7 @@ private[spark] class MesosSchedulerBackend(
}
val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
+ val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
@@ -260,6 +266,10 @@ private[spark] class MesosSchedulerBackend(
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
mesosTasks.foreach { case (slaveId, tasks) =>
+ slaveIdToWorkerOffer.get(slaveId).foreach(o =>
+ listenerBus.post(SparkListenerExecutorAdded(slaveId,
+ new ExecutorInfo(o.host, o.cores)))
+ )
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
@@ -315,7 +325,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+ removeExecutor(taskIdToSlaveId(tid))
}
if (isFinished(status.getState)) {
taskIdToSlaveId.remove(tid)
@@ -344,12 +354,20 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+ /**
+ * Remove executor associated with slaveId in a thread safe manner.
+ */
+ private def removeExecutor(slaveId: String) = {
+ synchronized {
+ listenerBus.post(SparkListenerExecutorRemoved(slaveId))
+ slaveIdsWithExecutors -= slaveId
+ }
+ }
+
private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
inClassLoader() {
logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
- }
+ removeExecutor(slaveId.getValue)
scheduler.executorLost(slaveId.getValue, reason)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index d94e8252650d2..a025011006156 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,6 +19,8 @@ package org.apache.spark.util
import java.util.{Properties, UUID}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
import scala.collection.JavaConverters._
import scala.collection.Map
@@ -83,7 +85,10 @@ private[spark] object JsonProtocol {
applicationStartToJson(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
applicationEndToJson(applicationEnd)
-
+ case executorAdded: SparkListenerExecutorAdded =>
+ executorAddedToJson(executorAdded)
+ case executorRemoved: SparkListenerExecutorRemoved =>
+ executorRemovedToJson(executorRemoved)
// These aren't used, but keeps compiler happy
case SparkListenerShutdown => JNothing
case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
@@ -194,6 +199,16 @@ private[spark] object JsonProtocol {
("Timestamp" -> applicationEnd.time)
}
+ def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
+ ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+ ("Executor ID" -> executorAdded.executorId) ~
+ ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
+ }
+
+ def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
+ ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
+ ("Executor ID" -> executorRemoved.executorId)
+ }
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
@@ -362,6 +377,10 @@ private[spark] object JsonProtocol {
("Disk Size" -> blockStatus.diskSize)
}
+ def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
+ ("Host" -> executorInfo.executorHost) ~
+ ("Total Cores" -> executorInfo.totalCores)
+ }
/** ------------------------------ *
* Util JSON serialization methods |
@@ -416,6 +435,8 @@ private[spark] object JsonProtocol {
val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart)
val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
+ val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
+ val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -431,6 +452,8 @@ private[spark] object JsonProtocol {
case `unpersistRDD` => unpersistRDDFromJson(json)
case `applicationStart` => applicationStartFromJson(json)
case `applicationEnd` => applicationEndFromJson(json)
+ case `executorAdded` => executorAddedFromJson(json)
+ case `executorRemoved` => executorRemovedFromJson(json)
}
}
@@ -523,6 +546,16 @@ private[spark] object JsonProtocol {
SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
}
+ def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
+ val executorId = (json \ "Executor ID").extract[String]
+ val executorInfo = executorInfoFromJson(json \ "Executor Info")
+ SparkListenerExecutorAdded(executorId, executorInfo)
+ }
+
+ def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
+ val executorId = (json \ "Executor ID").extract[String]
+ SparkListenerExecutorRemoved(executorId)
+ }
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
@@ -745,6 +778,11 @@ private[spark] object JsonProtocol {
BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
}
+ def executorInfoFromJson(json: JValue): ExecutorInfo = {
+ val executorHost = (json \ "Host").extract[String]
+ val totalCores = (json \ "Total Cores").extract[Int]
+ new ExecutorInfo(executorHost, totalCores)
+ }
/** -------------------------------- *
* Util JSON deserialization methods |
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 1de7e130039a5..437d8693c0b1f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -160,7 +160,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
*/
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(testDirPath, compressionCodec)
- val sc = new SparkContext("local", "test", conf)
+ val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val expectedLogDir = testDir.toURI().toString()
@@ -184,6 +184,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
val eventSet = mutable.Set(
SparkListenerApplicationStart,
SparkListenerBlockManagerAdded,
+ SparkListenerExecutorAdded,
SparkListenerEnvironmentUpdate,
SparkListenerJobStart,
SparkListenerJobEnd,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
new file mode 100644
index 0000000000000..623a687c359a2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.{SparkContext, LocalSparkContext}
+
+import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll}
+
+import scala.collection.mutable
+
+/**
+ * Unit tests for SparkListener that require a local cluster.
+ */
+class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext
+ with BeforeAndAfter with BeforeAndAfterAll {
+
+ /** Length of time to wait while draining listener events. */
+ val WAIT_TIMEOUT_MILLIS = 10000
+
+ before {
+ sc = new SparkContext("local-cluster[2,1,512]", "SparkListenerSuite")
+ }
+
+ test("SparkListener sends executor added message") {
+ val listener = new SaveExecutorInfo
+ sc.addSparkListener(listener)
+
+ val rdd1 = sc.parallelize(1 to 100, 4)
+ val rdd2 = rdd1.map(_.toString)
+ rdd2.setName("Target RDD")
+ rdd2.count()
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(listener.addedExecutorInfo.size == 2)
+ assert(listener.addedExecutorInfo("0").totalCores == 1)
+ assert(listener.addedExecutorInfo("1").totalCores == 1)
+ }
+
+ private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfo = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+ addedExecutorInfo(executor.executorId) = executor.executorInfo
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index e60e70afd3218..b25f2a10d71a1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -18,17 +18,20 @@
package org.apache.spark.scheduler.mesos
import org.scalatest.FunSuite
-import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext}
-import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl}
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
+import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
+ TaskDescription, WorkerOffer, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
import org.apache.mesos.SchedulerDriver
-import org.apache.mesos.Protos._
-import org.scalatest.mock.EasyMockSugar
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _}
import org.apache.mesos.Protos.Value.Scalar
import org.easymock.{Capture, EasyMock}
import java.nio.ByteBuffer
import java.util.Collections
import java.util
+import org.scalatest.mock.EasyMockSugar
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -52,11 +55,16 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
+ val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
+ listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+ EasyMock.replay(listenerBus)
+
val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
+ EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 63c2559c5c5f5..5ba94ff67d395 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
import java.util.Properties
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.shuffle.MetadataFetchFailedException
import scala.collection.Map
@@ -69,6 +70,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
+ val executorAdded = SparkListenerExecutorAdded("exec1",
+ new ExecutorInfo("Hostee.awesome.com", 11))
+ val executorRemoved = SparkListenerExecutorRemoved("exec2")
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -85,6 +89,8 @@ class JsonProtocolSuite extends FunSuite {
testEvent(unpersistRdd, unpersistRDDJsonString)
testEvent(applicationStart, applicationStartJsonString)
testEvent(applicationEnd, applicationEndJsonString)
+ testEvent(executorAdded, executorAddedJsonString)
+ testEvent(executorRemoved, executorRemovedJsonString)
}
test("Dependent Classes") {
@@ -94,6 +100,7 @@ class JsonProtocolSuite extends FunSuite {
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
+ testExecutorInfo(new ExecutorInfo("host", 43))
// StorageLevel
testStorageLevel(StorageLevel.NONE)
@@ -303,6 +310,10 @@ class JsonProtocolSuite extends FunSuite {
assert(blockId === newBlockId)
}
+ private def testExecutorInfo(info: ExecutorInfo) {
+ val newInfo = JsonProtocol.executorInfoFromJson(JsonProtocol.executorInfoToJson(info))
+ assertEquals(info, newInfo)
+ }
/** -------------------------------- *
| Util methods for comparing events |
@@ -335,6 +346,11 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(e1.jobResult, e2.jobResult)
case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) =>
assertEquals(e1.environmentDetails, e2.environmentDetails)
+ case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) =>
+ assert(e1.executorId == e1.executorId)
+ assertEquals(e1.executorInfo, e2.executorInfo)
+ case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) =>
+ assert(e1.executorId == e1.executorId)
case (e1, e2) =>
assert(e1 === e2)
case _ => fail("Events don't match in types!")
@@ -387,6 +403,11 @@ class JsonProtocolSuite extends FunSuite {
assert(info1.accumulables === info2.accumulables)
}
+ private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) {
+ assert(info1.executorHost == info2.executorHost)
+ assert(info1.totalCores == info2.totalCores)
+ }
+
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
assert(metrics1.hostname === metrics2.hostname)
assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime)
@@ -1407,4 +1428,24 @@ class JsonProtocolSuite extends FunSuite {
| "Timestamp": 42
|}
"""
+
+ private val executorAddedJsonString =
+ """
+ |{
+ | "Event": "SparkListenerExecutorAdded",
+ | "Executor ID": "exec1",
+ | "Executor Info": {
+ | "Host": "Hostee.awesome.com",
+ | "Total Cores": 11
+ | }
+ |}
+ """
+
+ private val executorRemovedJsonString =
+ """
+ |{
+ | "Event": "SparkListenerExecutorRemoved",
+ | "Executor ID": "exec2"
+ |}
+ """
}
|