Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, Utils}
Expand Down Expand Up @@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}

Expand Down Expand Up @@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend(
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
notifyDriver = false)
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
Expand All @@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
protected def exitExecutor(code: Int,
reason: String,
throwable: Throwable = null,
notifyDriver: Boolean = true) = {
val message = "Executor self-exiting due to : " + reason
if (throwable != null) {
logError(reason, throwable)
logError(message, throwable)
} else {
logError(reason)
logError(message)
}

if (notifyDriver && driver.nonEmpty) {
driver.get.ask[Boolean](
RemoveExecutor(executorId, new ExecutorLossReason(reason))
).onFailure { case e =>
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
}(ThreadUtils.sameThread)
}

System.exit(code)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ private[spark] class BlockManager(
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
case NonFatal(e) =>
throw new SparkException("Unable to register with external shuffle server due to : " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not. I wanted to make the exception for this case better as the one which gets shown in the UI with this PR is not definitive.

e.getMessage, e)
}
}
}
Expand Down