-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4857] [CORE] Adds Executor membership events to SparkListener #3711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
93d087b
14fe78d
1727b38
b1d054a
946d2c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark; | ||
|
|
||
| import org.apache.spark.scheduler.SparkListener; | ||
| import org.apache.spark.scheduler.SparkListenerApplicationEnd; | ||
| import org.apache.spark.scheduler.SparkListenerApplicationStart; | ||
| import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; | ||
| import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; | ||
| import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; | ||
| import org.apache.spark.scheduler.SparkListenerExecutorAdded; | ||
| import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; | ||
| import org.apache.spark.scheduler.SparkListenerExecutorRemoved; | ||
| import org.apache.spark.scheduler.SparkListenerJobEnd; | ||
| import org.apache.spark.scheduler.SparkListenerJobStart; | ||
| import org.apache.spark.scheduler.SparkListenerStageCompleted; | ||
| import org.apache.spark.scheduler.SparkListenerStageSubmitted; | ||
| import org.apache.spark.scheduler.SparkListenerTaskEnd; | ||
| import org.apache.spark.scheduler.SparkListenerTaskGettingResult; | ||
| import org.apache.spark.scheduler.SparkListenerTaskStart; | ||
| import org.apache.spark.scheduler.SparkListenerUnpersistRDD; | ||
|
|
||
| /** | ||
| * Java clients should extend this class instead of implementing | ||
| * SparkListener directly. This is to prevent java clients | ||
| * from breaking when new events are added to the SparkListener | ||
| * trait. | ||
| * | ||
| * This is a concrete class instead of abstract to enforce | ||
| * new events get added to both the SparkListener and this adapter | ||
| * in lockstep. | ||
| */ | ||
| public class JavaSparkListener implements SparkListener { | ||
|
|
||
| @Override | ||
| public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { } | ||
|
|
||
| @Override | ||
| public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { } | ||
|
|
||
| @Override | ||
| public void onTaskStart(SparkListenerTaskStart taskStart) { } | ||
|
|
||
| @Override | ||
| public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { } | ||
|
|
||
| @Override | ||
| public void onTaskEnd(SparkListenerTaskEnd taskEnd) { } | ||
|
|
||
| @Override | ||
| public void onJobStart(SparkListenerJobStart jobStart) { } | ||
|
|
||
| @Override | ||
| public void onJobEnd(SparkListenerJobEnd jobEnd) { } | ||
|
|
||
| @Override | ||
| public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { } | ||
|
|
||
| @Override | ||
| public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { } | ||
|
|
||
| @Override | ||
| public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { } | ||
|
|
||
| @Override | ||
| public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { } | ||
|
|
||
| @Override | ||
| public void onApplicationStart(SparkListenerApplicationStart applicationStart) { } | ||
|
|
||
| @Override | ||
| public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { } | ||
|
|
||
| @Override | ||
| public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { } | ||
|
|
||
| @Override | ||
| public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { } | ||
|
|
||
| @Override | ||
| public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ import akka.pattern.ask | |
| import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} | ||
|
|
||
| import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState} | ||
| import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} | ||
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ | ||
| import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils} | ||
|
|
||
|
|
@@ -66,6 +66,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste | |
| // Number of executors requested from the cluster manager that have not registered yet | ||
| private var numPendingExecutors = 0 | ||
|
|
||
| private val listenerBus = scheduler.sc.listenerBus | ||
|
|
||
| // Executors we have requested the cluster manager to kill that have not died yet | ||
| private val executorsPendingToRemove = new HashSet[String] | ||
|
|
||
|
|
@@ -106,6 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste | |
| logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") | ||
| } | ||
| } | ||
| listenerBus.post(SparkListenerExecutorAdded(executorId, data)) | ||
| makeOffers() | ||
| } | ||
|
|
||
|
|
@@ -213,6 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste | |
| totalCoreCount.addAndGet(-executorInfo.totalCores) | ||
| totalRegisteredExecutors.addAndGet(-1) | ||
| scheduler.executorLost(executorId, SlaveLost(reason)) | ||
| listenerBus.post(SparkListenerExecutorRemoved(executorId)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason why we are doing this here instead of in "executorLost" method of DAGScheduler.scala ? (similarly for SparkListenerExecutorAdded event above)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ksakellis that seems like a good idea actually. |
||
| case None => logError(s"Asked to remove non-existent executor $executorId") | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.scheduler.cluster | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Stores information about an executor to pass from the scheduler to SparkListeners. | ||
| */ | ||
| @DeveloperApi | ||
| class ExecutorInfo( | ||
| val executorHost: String, | ||
| val totalCores: Int | ||
| ) { | ||
|
|
||
| def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] | ||
|
|
||
| override def equals(other: Any): Boolean = other match { | ||
| case that: ExecutorInfo => | ||
| (that canEqual this) && | ||
| executorHost == that.executorHost && | ||
| totalCores == that.totalCores | ||
| case _ => false | ||
| } | ||
|
|
||
| override def hashCode(): Int = { | ||
| val state = Seq(executorHost, totalCores) | ||
| state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to do the equivalent in
MesosSchedulerBackend?