Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed another major round of comments
  • Loading branch information
liyinan926 committed Dec 5, 2017
commit 347ed697e3f17546af83a8cd8a9578527cf65384
4 changes: 0 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Kubernetes only
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.kubernetes.namespace"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var submissionToRequestStatusFor: String = null
var useRest: Boolean = true // used internally

// Kubernetes only
var kubernetesNamespace: String = null

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
Expand Down Expand Up @@ -202,10 +199,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull

kubernetesNamespace = Option(kubernetesNamespace)
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
.orNull

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
Expand Down Expand Up @@ -461,9 +454,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KEYTAB =>
keytab = value

case KUBERNETES_NAMESPACE =>
kubernetesNamespace = value

case HELP =>
printUsageAndExit(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should also add doc for this config here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val EVENT_LOG_COMPRESS =
ConfigBuilder("spark.eventLog.compress")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ class SparkSubmitSuite
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--kubernetes-namespace", "foo",
"--driver-memory", "4g",
"--conf", "spark.kubernetes.namespace=spark",
"--conf", "spark.kubernetes.driver.docker.image=bar",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test the arg "--kubernetes-namespace"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--kubernetes-namespace has been removed in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"/home/thejar.jar",
"arg1")
Expand All @@ -410,7 +410,7 @@ class SparkSubmitSuite
classpath should have length (0)
conf.get("spark.executor.memory") should be ("5g")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check spark.master too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("foo")
conf.get("spark.kubernetes.namespace") should be ("spark")
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
}

Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ of the most common options to set are:
or in your default properties file.
</td>
</tr>
<tr>
<td><code>spark.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is
memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
This tends to grow with the container size (typically 6-10%).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should mention that not all cluster managers support this option, since this is now in the common configuration doc. Same below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

</td>
</tr>
<tr>
<td><code>spark.executor.memory</code></td>
<td>1g</td>
Expand Down
9 changes: 1 addition & 8 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,11 @@ To use a custom metrics.properties for the application master and executors, upd
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make this configuration backward compatible, user should still be able to use spark.yarn.driver.memoryOverhead with warning log, like other deprecated configurations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an example PR for deprecating a config property that I can follow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look for configsWithAlternatives in SparkConf.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, done.

<td>driverMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.am.memoryOverhead</code></td>
<td>AM memory * 0.10, with minimum of 384 </td>
<td>
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
Same as <code>spark.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,10 @@ private[spark] object Config extends Logging {

val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.doc("Specify the hard cpu limit for each executor pod")
.stringConf
.createOptional

val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the " +
"driver submission server. This is memory that accounts for things like VM overheads, " +
"interned strings, other native overheads, etc. This tends to grow with the driver's " +
"memory size (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
Expand Down Expand Up @@ -150,6 +141,7 @@ private[spark] object Config extends Logging {
ConfigBuilder("spark.kubernetes.report.interval")
.doc("Interval between reports of the current app status in cluster mode.")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

private[spark] val JARS_DOWNLOAD_LOCATION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private[spark] is redundant in this object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private[spark] object Client extends SparkApplication {
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The master URL has been checked for validity already in SparkSubmit.
val master = sparkConf.get("spark.master")
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter(_ => waitForAppCompletion)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
kubernetesAppId, loggingInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.spark.deploy.k8s.submit

import java.util.UUID

import com.google.common.primitives.Longs

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
Expand Down Expand Up @@ -43,8 +47,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
// label values are considerably restrictive, e.g. must be no longer than 63 characters in
// length. So we generate a separate identifier for the app ID itself, and bookkeeping that
// requires finding "all pods for this application" should use the kubernetesAppId.
private val kubernetesResourceNamePrefix =
s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val kubernetesResourceNamePrefix = {
val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime))
s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
}

private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)
Expand Down Expand Up @@ -91,7 +98,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
}
mayBeResource
} else {
Option.empty
None
}

val sparkJars = submissionSparkConf.getOption("spark.jars")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support jars/files on distributed file systems or Http/https?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our fork, we use Kubernetes init-container col-located with the driver/executor containers for downloading remote dependencies, e.g., from http/https endpoints. The init-container will be introduced in a subsequent PR.

Expand All @@ -109,7 +116,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
jarsDownloadPath,
filesDownloadPath))
} else {
Option.empty
None
}

Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,11 @@ private[spark] object KubernetesFileUtils {
* - File URIs with scheme local:// resolve to just the path of the URI.
* - Otherwise, the URIs are returned as-is.
*/
def resolveSubmittedUris(
def resolveFileUris(
fileUris: Iterable[String],
fileDownloadPath: String): Iterable[String] = {
fileUris.map { uri =>
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
case "file" =>
val fileName = new File(fileUri.getPath).getName
s"$fileDownloadPath/$fileName"
case "local" =>
fileUri.getPath
case _ => uri
}
resolveFileUri(uri, fileDownloadPath, false)
}
}

Expand All @@ -52,17 +43,26 @@ private[spark] object KubernetesFileUtils {
*/
def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
fileUris.map { uri =>
resolveFilePath(uri, fileDownloadPath)
resolveFileUri(uri, fileDownloadPath, true)
}
}

private def resolveFilePath(uri: String, fileDownloadPath: String): String = {
private def resolveFileUri(
uri: String,
fileDownloadPath: String,
assumesDownloaded: Boolean): String = {
val fileUri = Utils.resolveURI(uri)
if (Option(fileUri.getScheme).getOrElse("file") == "local") {
fileUri.getPath
} else {
val fileName = new File(fileUri.getPath).getName
s"$fileDownloadPath/$fileName"
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
case "local" =>
fileUri.getPath
case _ =>
if (assumesDownloaded || fileScheme == "file") {
val fileName = new File(fileUri.getPath).getName
s"$fileDownloadPath/$fileName"
} else {
uri
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ private[k8s] class LoggingPodStatusWatcherImpl(

def start(): Unit = {
maybeLoggingInterval.foreach { interval =>
require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.")
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}

/**
* Represents the initial setup required for the driver.
Expand All @@ -43,7 +44,7 @@ private[spark] class BaseDriverConfigurationStep(
.getOrElse(s"$kubernetesResourceNamePrefix-driver")

private val driverExtraClasspath = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_CLASS_PATH)
DRIVER_CLASS_PATH)

private val driverDockerImage = submissionSparkConf
.get(DRIVER_DOCKER_IMAGE)
Expand All @@ -55,18 +56,17 @@ private[spark] class BaseDriverConfigurationStep(

// Memory settings
private val driverMemoryMiB = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_MEMORY)
DRIVER_MEMORY)
private val driverMemoryString = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_MEMORY.key,
org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
DRIVER_MEMORY.key,
DRIVER_MEMORY.defaultValueString)
private val memoryOverheadMiB = submissionSparkConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another config that could use some re-factoring so that YARN and k8s use the same logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged both into spark.driver.memoryOverhead and used it in both yarn and k8s.

.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configureDriver(
driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
new EnvVarBuilder()
.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
Expand All @@ -83,11 +83,12 @@ private[spark] class BaseDriverConfigurationStep(
" Spark bookkeeping operations.")

val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
.map(env =>
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())
.build()
}

val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ private[spark] class DependencyResolutionStep(
localFilesDownloadPath: String) extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val resolvedSparkJars = KubernetesFileUtils.resolveSubmittedUris(sparkJars, jarsDownloadPath)
val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(
val resolvedSparkJars = KubernetesFileUtils.resolveFileUris(sparkJars, jarsDownloadPath)
val resolvedSparkFiles = KubernetesFileUtils.resolveFileUris(
sparkFiles, localFilesDownloadPath)
val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
if (resolvedSparkJars.nonEmpty) {
Expand Down
Loading