Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
999ec13
[SPARK-22570][SQL] Avoid to create a lot of global variables by using…
kiszk Nov 30, 2017
6ac57fd
[SPARK-21417][SQL] Infer join conditions using propagated constraints
Nov 30, 2017
bcceab6
[SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user …
wangyum Nov 30, 2017
f5f8e84
[SPARK-22614] Dataset API: repartitionByRange(...)
adrian-ionescu Nov 30, 2017
7e5f669
[SPARK-22428][DOC] Add spark application garbage collector configurat…
gaborgsomogyi Dec 1, 2017
7da1f57
[SPARK-22373] Bump Janino dependency version to fix thread safety issue…
Victsm Dec 1, 2017
dc36542
[SPARK-22653] executorAddress registered in CoarseGrainedSchedulerBac…
tgravescs Dec 1, 2017
16adaf6
[SPARK-22601][SQL] Data load is getting displayed successful on provi…
sujith71955 Dec 1, 2017
9d06a9e
[SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in c…
mpetruska Dec 1, 2017
ee10ca7
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
zsxwing Dec 1, 2017
aa4cf2b
[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients fo…
HyukjinKwon Dec 2, 2017
d2cf95a
[SPARK-22634][BUILD] Update Bouncy Castle to 1.58
srowen Dec 2, 2017
f23dddf
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based o…
dongjoon-hyun Dec 3, 2017
2c16267
[SPARK-22669][SQL] Avoid unnecessary function calls in code generation
mgaido91 Dec 3, 2017
dff440f
[SPARK-22626][SQL] deals with wrong Hive's statistics (zero rowCount)
wangyum Dec 3, 2017
4131ad0
[SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior changes in mig…
wangyum Dec 4, 2017
3927bb9
[SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions
mgaido91 Dec 4, 2017
f81401e
[SPARK-22162] Executors and the driver should use consistent JobIDs i…
Dec 4, 2017
e1dd03e
[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
Dec 4, 2017
dcaac45
Spark on Kubernetes - basic submission client
liyinan926 Nov 10, 2017
27c67ff
Addressed first round of review comments
liyinan926 Nov 27, 2017
6d597d0
Made Client implement the SparkApplication trait
liyinan926 Nov 28, 2017
5b9fa39
Addressed the second round of comments
liyinan926 Nov 28, 2017
5ccadb5
Added missing step for supporting local:// dependencies and addressed…
liyinan926 Nov 30, 2017
12f2797
Fixed Scala style check errors
liyinan926 Nov 30, 2017
c35fe48
Addressed another round of comments
liyinan926 Dec 4, 2017
faa2849
Rebased on master and added a constant val for the Client class
liyinan926 Dec 4, 2017
347ed69
Addressed another major round of comments
liyinan926 Dec 5, 2017
0e8ca01
Addressed one more round of comments
liyinan926 Dec 5, 2017
3a0b8e3
Removed mentioning of kubernetes-namespace
liyinan926 Dec 6, 2017
83d0b9c
Fixed a couple of bugs found during manual tests
liyinan926 Dec 7, 2017
44c40b1
Guard against client mode in SparkContext
liyinan926 Dec 8, 2017
67bc847
Added libc6-compat into the base docker image
liyinan926 Dec 8, 2017
7d2b303
Addressed latest comments
liyinan926 Dec 8, 2017
caf2206
Addressed docs comments
liyinan926 Dec 9, 2017
2e7810b
Fixed a comment
liyinan926 Dec 11, 2017
cbcd30e
Addressed latest comments
liyinan926 Dec 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed another round of comments
  • Loading branch information
liyinan926 committed Dec 4, 2017
commit c35fe4893e062ac2cf6d738a7d0430b8b98566aa
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}

Expand Down Expand Up @@ -296,6 +296,10 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
}

// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
Expand All @@ -304,12 +308,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://")
&& !master.startsWith("mesos://")
&& !master.startsWith("k8s://")) {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Killing submissions is only supported in standalone or Mesos mode!")
}
Expand All @@ -313,9 +311,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://")
&& !master.startsWith("mesos://")
&& !master.startsWith("k8s://")) {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging {
}
}

/**
* Check the validity of the given Kubernetes master URL and return the resolved URL.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
require(rawMasterURL.startsWith("k8s://"),
"Kubernetes master URL must start with k8s://.")
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
if (masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else if (masterWithoutK8sPrefix.startsWith("http://")) {
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
resolvedURL
}
}
}

private[util] object CallerContext extends Logging {
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,17 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
}

test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps == "https://host:port")

val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp == "http://host:port")

intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
}
}
}

private class SimpleExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,4 @@ private[spark] object Config extends Logging {
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."

def getK8sMasterUrl(rawMasterString: String): String = {
require(rawMasterString.startsWith("k8s://"),
"Master URL should start with k8s:// in Kubernetes mode.")
val masterWithoutK8sPrefix = rawMasterString.substring("k8s://".length)
if (masterWithoutK8sPrefix.startsWith("http://")
|| masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL")
resolvedURL
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.{Collections, UUID}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
Expand Down Expand Up @@ -150,7 +149,7 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
}
} catch {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we want to catch Throwable here - look into NonFatal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to NonFatal(e).

case NonFatal(e) =>
case e: Throwable =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
Expand Down Expand Up @@ -198,7 +197,8 @@ private[spark] object Client extends SparkApplication {
val launchTime = System.currentTimeMillis()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
val master = getK8sMasterUrl(sparkConf.get("spark.master"))
// The master URL has been checked for validity already in SparkSubmit.
val master = sparkConf.get("spark.master")
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter(_ => waitForAppCompletion)

val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ private[k8s] class LoggingPodStatusWatcherImpl(
}

private def formatPodState(pod: Pod): String = {
// TODO include specific container state
val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[spark] class DriverServiceBootstrapStep(
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname)
.set(DRIVER_HOST_KEY, driverHostname)
.set("spark.driver.port", driverPort.toString)
.set(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec

private[spark] class DependencyResolutionStepSuite extends SparkFunSuite {
class DependencyResolutionStepSuite extends SparkFunSuite {

private val SPARK_JARS = Seq(
"hdfs://localhost:9000/apps/jars/jar1.jar",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

FROM spark-base

# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-driver:latest -f dockerfiles/spark-base/Dockerfile .

COPY examples /opt/spark/examples

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

FROM spark-base

# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile .
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-executor:latest -f dockerfiles/spark-base/Dockerfile .

COPY examples /opt/spark/examples

Expand All @@ -28,4 +31,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

FROM openjdk:8-alpine

# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .

RUN apk upgrade --no-cache && \
RUN set -ex && \
apk upgrade --no-cache && \
apk add --no-cache bash tini && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/work-dir \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#

# echo commands to the terminal output
set -x
set -ex

# Check whether there is a passwd entry for the container UID
myuid=$(id -u)
Expand Down