Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring to avoid multiple calls of pollAndReportStatus method
  • Loading branch information
akshatb1 committed May 22, 2020
commit 45c9817fe3857b93703eef76c540671e2bc0762e
96 changes: 47 additions & 49 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.log4j.Logger
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
Expand Down Expand Up @@ -67,6 +68,7 @@ private class ClientEndpoint(
false)
private val REPORT_DRIVER_STATUS_INTERVAL = 10000
private var submittedDriverID = ""
private var driverStatusReported = false


private def getProperty(key: String, conf: SparkConf): Option[String] = {
Expand Down Expand Up @@ -112,14 +114,15 @@ private class ClientEndpoint(
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
MonitorDriverStatus()
}, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)

case "kill" =>
val driverId = driverArgs.driverId
submittedDriverID = driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
logInfo("... waiting before polling master for driver state")
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
monitorDriverStatus()
}, 5000, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)
}

/**
Expand All @@ -134,50 +137,61 @@ private class ClientEndpoint(
}(forwardMessageExecutionContext)
}
}
private def MonitorDriverStatus(): Unit = {

private def monitorDriverStatus(): Unit = {
if (submittedDriverID != "") {
asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID))
}
}

/**
* Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors
* the application until it finishes, fails or is killed.
* Processes and reports the driver status then exit the JVM if the
* waitAppCompletion is set to false, else reports the driver status
* if debug logs are enabled.
*/
def pollAndReportStatus(driverId: String): Unit = {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
def reportDriverStatus(found: Boolean, state: Option[DriverState],
workerId: Option[String],
workerHostPort: Option[String],
exception: Option[Exception]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
def reportDriverStatus(found: Boolean, state: Option[DriverState],
workerId: Option[String],
workerHostPort: Option[String],
exception: Option[Exception]): Unit = {
def reportDriverStatus(
found: Boolean,
state: Option[DriverState],
workerId: Option[String],
workerHostPort: Option[String],
exception: Option[Exception]): Unit = {

if (found) {
// Using driverStatusReported to avoid writing following
// logs again when waitAppCompletion is set to true
if (!driverStatusReported) {
driverStatusReported = true
logInfo(s"State of $submittedDriverID is ${state.get}")
// Worker node, if present
(workerId, workerHostPort, state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
}
// Exception, if present
statusResponse.exception match {
exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
if (!waitAppCompletion) {
logInfo(s"spark-submit not configured to wait for completion, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
logInfo(s"spark-submit is configured to wait for completion, " +
s"continue monitoring driver status.")
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
s"exiting spark-submit JVM.")
System.exit(0)
case _ =>
if (!waitAppCompletion) {
logInfo(s"spark-submit not configured to wait for completion, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
s"continue monitoring driver status.")
}
}
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
logError(s"ERROR: Cluster master did not recognize $submittedDriverID")
System.exit(-1)
}
}
Expand All @@ -188,36 +202,20 @@ private class ClientEndpoint(
if (success) {
activeMasterEndpoint = master
submittedDriverID = driverId.get
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}


case KillDriverResponse(master, driverId, success, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}

case DriverStatusResponse(found, state, _, _, _) =>
if (found) {
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
s"exiting spark-submit JVM.")
System.exit(0)
case _ =>
logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
s"continue monitoring driver status.")
}
} else {
System.exit(-1)
}
case DriverStatusResponse(found, state, workerId, workerHostPort, exception) =>
reportDriverStatus(found, state, workerId, workerHostPort, exception)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down