Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 47 additions & 26 deletions yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.deploy.yarn.{YarnAppProgress, ClientArguments, YarnResourceCapacity}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
Expand All @@ -36,15 +37,15 @@ import org.apache.spark.deploy.SparkHadoopUtil
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
*/
private[spark] class Client(
val args: ClientArguments,
toArgs: YarnResourceCapacity => ClientArguments,
val hadoopConf: Configuration,
val sparkConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {

def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
def this(to: YarnResourceCapacity => ClientArguments, spConf: SparkConf) =
this(to, SparkHadoopUtil.get.newConfiguration(spConf), spConf)

def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
def this(toa: YarnResourceCapacity => ClientArguments) = this(toa, new SparkConf())

val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)

Expand All @@ -55,48 +56,56 @@ private[spark] class Client(
* ------------------------------------------------------------------------------------- */

/** Submit an application running our ApplicationMaster to the ResourceManager. */
override def submitApplication(): ApplicationId = {
init(yarnConf)
start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newAppResponse = getNewApplication()
override def submitApplication(): (ApplicationId, ClientArguments) = {
val newAppResponse = createYarnApplication
val args = toArgs(getClusterResourceCapacity(newAppResponse))
val appId = newAppResponse.getApplicationId()

notifyAppInit(appId)

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
verifyClusterResources(args,newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(appId, containerContext)
val containerContext = createContainerLaunchContext(args,newAppResponse)
val appContext = createApplicationSubmissionContext(args,appId, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
submitApplication(appContext)
appId
(appId, args)
}


def createYarnApplication() : GetNewApplicationResponse = {
init(yarnConf)
start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(getYarnClusterMetrics.getNumNodeManagers))

super.getNewApplication
}

/**
* Set up a context for launching our ApplicationMaster container.
* In the Yarn alpha API, the memory requirements of this container must be set in
* the ContainerLaunchContext instead of the ApplicationSubmissionContext.
*/
override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
val containerContext = super.createContainerLaunchContext(newAppResponse)
override def createContainerLaunchContext(args:ClientArguments,
newAppResponse:GetNewApplicationResponse)
: ContainerLaunchContext = {
val containerContext = super.createContainerLaunchContext(args,newAppResponse)
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
capability.setMemory(args.amMemory + args.amMemoryOverhead)
containerContext.setResource(capability)
containerContext
}

/** Set up the context for submitting our ApplicationMaster. */
def createApplicationSubmissionContext(
appId: ApplicationId,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
def createApplicationSubmissionContext(args:ClientArguments,
appId: ApplicationId,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName(args.appName)
Expand All @@ -123,6 +132,17 @@ private[spark] class Client(
*/
override def getClientToken(report: ApplicationReport): String =
Option(report.getClientToken).map(_.toString).getOrElse("")


override
protected def getAppProgress(report: ApplicationReport): YarnAppProgress = {

val appUsageReport = report.getApplicationResourceUsageReport
YarnAppProgress(report.getApplicationId,
report.getTrackingUrl,
getResourceUsage(appUsageReport))
}

}

object Client {
Expand All @@ -138,8 +158,9 @@ object Client {
val sparkConf = new SparkConf

try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
//val args = new ClientArguments(argStrings, sparkConf)
def toArgs(capacity: YarnResourceCapacity) = new ClientArguments(argStrings, sparkConf)
new Client(toArgs, sparkConf).run()
} catch {
case e: Exception =>
Console.err.println(e.getMessage)
Expand Down
153 changes: 136 additions & 17 deletions yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.deploy.yarn

import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.util.Date

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Try, Success, Failure}

import com.google.common.base.Objects
Expand All @@ -47,35 +49,49 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
private[spark] trait ClientBase extends Logging {
import ClientBase._

protected val args: ClientArguments
//protected val args: ClientArguments
protected val hadoopConf: Configuration
protected val sparkConf: SparkConf
protected val yarnConf: YarnConfiguration
protected val credentials = UserGroupInformation.getCurrentUser.getCredentials
protected val amMemoryOverhead = args.amMemoryOverhead // MB
protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
// protected val amMemoryOverhead = args.amMemoryOverhead // MB
// protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()

private val listeners = ListBuffer[YarnApplicationListener]()


protected def getClusterResourceCapacity(newAppResponse: GetNewApplicationResponse)
: YarnResourceCapacity = {
val maxCapacity = newAppResponse.getMaximumResourceCapability
val (mem, vCores) = (maxCapacity.getMemory,maxCapacity.getVirtualCores)
val appResource = new YarnAppResource(mem, vCores)
val memoryOverhead = YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN
new YarnResourceCapacity(appResource, memoryOverhead,memoryOverhead)

}

/**
* Fail fast if we have requested more resources per container than is available in the cluster.
*/
protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
protected def verifyClusterResources(args:ClientArguments,
newAppResponse:GetNewApplicationResponse): Unit = {
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = args.executorMemory + executorMemoryOverhead
val executorMem = args.executorMemory + args.executorMemoryOverhead
if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
s"+${args.executorMemoryOverhead } MB) is above the max threshold ($maxMem MB) of this cluster!")
}
val amMem = args.amMemory + amMemoryOverhead
val amMem = args.amMemory + args.amMemoryOverhead
if (amMem > maxMem) {
throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
s"+${args.amMemoryOverhead} MB) is above the max threshold ($maxMem MB) of this cluster!")
}
logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
amMem,
amMemoryOverhead))
args.amMemoryOverhead))

// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves.
Expand Down Expand Up @@ -134,7 +150,8 @@ private[spark] trait ClientBase extends Logging {
* This is used for setting up a container launch context for our ApplicationMaster.
* Exposed for testing.
*/
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
def prepareLocalResources(args:ClientArguments,
appStagingDir: String): HashMap[String, LocalResource] = {
logInfo("Preparing resources for our AM container")
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
Expand Down Expand Up @@ -230,7 +247,7 @@ private[spark] trait ClientBase extends Logging {
/**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
private def setupLaunchEnv(args: ClientArguments, stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
Expand Down Expand Up @@ -297,14 +314,15 @@ private[spark] trait ClientBase extends Logging {
* Set up a ContainerLaunchContext to launch our ApplicationMaster container.
* This sets up the launch environment, java options, and the command for launching the AM.
*/
protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
protected def createContainerLaunchContext(args: ClientArguments,
newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")

val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
val launchEnv = setupLaunchEnv(appStagingDir)
val localResources = prepareLocalResources(args,appStagingDir)
val launchEnv = setupLaunchEnv(args, appStagingDir)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
Expand Down Expand Up @@ -428,6 +446,8 @@ private[spark] trait ClientBase extends Logging {
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true): YarnApplicationState = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
val initReport = getApplicationReport(appId)
notifyAppStart(initReport)
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
Expand Down Expand Up @@ -464,6 +484,19 @@ private[spark] trait ClientBase extends Logging {
}
}

state match {
case YarnApplicationState.RUNNING =>
notifyAppProgress(report)
case YarnApplicationState.FINISHED =>
notifyAppFinished(report)
case YarnApplicationState.FAILED =>
notifyAppFailed(report)
case YarnApplicationState.KILLED =>
notifyAppKilled(report)
case _ =>
notifyAppProgress(report)
}

if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
Expand All @@ -485,14 +518,14 @@ private[spark] trait ClientBase extends Logging {
* Submit an application to the ResourceManager and monitor its state.
* This continues until the application has exited for any reason.
*/
def run(): Unit = monitorApplication(submitApplication())
def run(): Unit = monitorApplication(submitApplication()._1)

/* --------------------------------------------------------------------------------------- *
| Methods that cannot be implemented here due to API differences across hadoop versions |
* --------------------------------------------------------------------------------------- */

/** Submit an application running our ApplicationMaster to the ResourceManager. */
def submitApplication(): ApplicationId
def submitApplication(): (ApplicationId, ClientArguments)

/** Set up security tokens for launching our ApplicationMaster container. */
protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit
Expand All @@ -505,6 +538,92 @@ private[spark] trait ClientBase extends Logging {
* If no security is enabled, the token returned by the report is null.
*/
protected def getClientToken(report: ApplicationReport): String


def addApplicationListener(listener: YarnApplicationListener): Unit = { listeners += listener }


def killApplication(appId: ApplicationId ) : Unit


def getApplicationInfo(report: ApplicationReport): YarnAppInfo = {
import scala.collection.JavaConverters._
YarnAppInfo(report.getApplicationId,
report.getUser,
report.getQueue,
report.getName,
report.getHost,
report.getRpcPort,
report.getYarnApplicationState.name(),
report.getDiagnostics,
report.getTrackingUrl,
report.getStartTime)

}

protected def notifyAppInit(appId: ApplicationId) {
import ExecutionContext.Implicits.global
for (l <- listeners) {
Future { l.onApplicationInit(new java.util.Date().getTime, appId) }
}
}


private def notifyAppStart(report: ApplicationReport) {
import ExecutionContext.Implicits.global
val appInfo: YarnAppInfo = getApplicationInfo(report)
for (l <- listeners) {
Future { l.onApplicationStart(appInfo.startTime, appInfo) }
}
}


private def notifyAppFailed(report: ApplicationReport) {
import ExecutionContext.Implicits.global
val appProgress: YarnAppProgress = getAppProgress(report)
for (l <- listeners) {
Future { l.onApplicationFailed(new Date().getTime, appProgress) }
}
}

private def notifyAppKilled(report: ApplicationReport) {
import ExecutionContext.Implicits.global
val appProgress: YarnAppProgress = getAppProgress(report)
for (l <- listeners) {
Future { l.onApplicationKilled(new Date().getTime, appProgress) }
}
}

private def notifyAppProgress(report: ApplicationReport) {
import ExecutionContext.Implicits.global
val appProgress: YarnAppProgress = getAppProgress(report)
for (l <- listeners) {
Future { l.onApplicationProgress(new Date().getTime, appProgress) }
}
}

private def notifyAppFinished(report: ApplicationReport) {
import ExecutionContext.Implicits.global
val appProgress: YarnAppProgress = getAppProgress(report)
for (l <- listeners) {
Future { l.onApplicationEnd(new Date().getTime, appProgress) }
}
}


protected def getResourceUsage(report: ApplicationResourceUsageReport): YarnResourceUsage = {

def getYarnAppResource(res: Resource) = YarnAppResource(res.getMemory, res.getVirtualCores)

YarnResourceUsage(report.getNumUsedContainers,
report.getNumReservedContainers,
getYarnAppResource(report.getUsedResources),
getYarnAppResource(report.getReservedResources),
getYarnAppResource(report.getNeededResources))
}

protected def getAppProgress(report: ApplicationReport): YarnAppProgress

}

private[spark] object ClientBase extends Logging {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.spark.deploy.yarn

import org.apache.hadoop.yarn.api.records.ApplicationId

case class YarnAppInfo(appId: ApplicationId,
user: String,
queue: String,
name: String,
masterHost: String,
masterRpcPort: Int,
state: String,
diagnostics: String,
trackingUrl: String,
startTime: Long)
Loading