Skip to content
Closed
Prev Previous commit
Next Next commit
Simplify the logic to use spark.app.id if set otherwise use superclas…
…s. applicationId
  • Loading branch information
suxingfate committed Dec 15, 2018
commit ed2c4caedcafb323dfef5ba01456f60f286fccd5
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,4 @@ private[spark] object KubernetesUtils extends Logging {
def formatTime(time: String): String = {
if (time != null) time else "N/A"
}

def generateAppId(): String = {
s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// to be added as a label to group resources belonging to the same application. Label values are
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = KubernetesUtils.generateAppId()
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,39 +64,18 @@ private[spark] class KubernetesClusterSchedulerBackend(

/**
* Get an application ID associated with the job.
* This returns the string value of [[appId]] if set, otherwise
* This returns the string value of spark.app.id if set, otherwise
* the locally-generated ID from the superclass.
* @return The application ID
*/

var appId: Option[String] = None;

override def applicationId(): String = {

appId.map(_.toString).getOrElse {
logInfo("Initializing Application ID.")
bindApplicationId();
appId.get
}
}

def bindApplicationId(): Unit = {
val appIdString = {
val wasSparkSubmittedInClusterMode = conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK)

// cluster mode: get appId from driver env
if (wasSparkSubmittedInClusterMode) {
val sparkAppId = conf.getOption("spark.app.id")
sparkAppId.map(_.toString).getOrElse {
logWarning("Application ID is not initialized yet in cluster mode.")
super.applicationId
}
} else {
// client mode: generate new appId
KubernetesUtils.generateAppId()
val appId = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: looks like you can simplify this as conf.getOption("spark.app.id").map(_.toString).getOrElse {super.applicationId}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, that looks better

val sparkAppId = conf.getOption("spark.app.id")
sparkAppId.map(_.toString).getOrElse {
super.applicationId
}
}
appId = Some(appIdString)
appId
}

override def start(): Unit = {
Expand Down