Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ private[spark] class SparkSubmit extends Logging {
// directory too.
// SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current
// working directory
// SPARK-43540: add current working directory into driver classpath
val workingDirectory = "."
childClasspath += workingDirectory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might affect other modes. A better place would be the entrypoint.sh for resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh to include work-dir into the classpath.

Copy link
Member Author

@turboFei turboFei May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we have checked the k8s cluster mode

if (isKubernetesClusterModeDriver) {
 ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't add current working dir to executor's class path right?

Just checked with yarn's behavior, yarn add CWD to both driver and executor. And it puts CWD before localized SPARK_CONF and HADOOP_CONF.

See

To get the similar behavior, I believe it would be easier to leverage the entrypoint.sh here when running on K8S.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will check it

Copy link
Member Author

@turboFei turboFei May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about now? @advancedxy
I checked the code, for driver, if just leverage the entrypoint.sh, it is difficult to keep the behavior as mentioned above.

So I just leverage the entrypoint.sh for executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false):
String = {
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
Expand All @@ -423,13 +426,12 @@ private[spark] class SparkSubmit extends Logging {
targetDir, sparkConf, hadoopConf)
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
case (localResources, resolvedUri) =>
val source = new File(localResources.getPath)
val source = new File(localResources.getPath).getCanonicalFile
val dest = new File(
".",
workingDirectory,
if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
logInfo(
s"Files $resolvedUri " +
s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
.getCanonicalFile
logInfo(s"Files $resolvedUri from $source to $dest")
Utils.deleteRecursively(dest)
if (isArchive) {
Utils.unpack(source, dest)
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,18 @@ class SparkSubmitSuite
conf.get(k) should be (v)
}
}

test("SPARK-43540: Add working directory into classpath on the driver in K8S cluster mode") {
val clArgs = Seq(
"--deploy-mode", "client",
"--master", "k8s://host:port",
"--class", "org.SomeClass",
"--conf", "spark.kubernetes.submitInDriver=true",
"/home/thejar.jar")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, classpath, _, _) = submit.prepareSubmitEnvironment(appArgs)
assert(classpath.contains("."))
}
}

object JarCreationTest extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ elif ! [ -z ${SPARK_HOME+x} ]; then
SPARK_CLASSPATH="$SPARK_HOME/conf:$SPARK_CLASSPATH";
fi

# SPARK-43540: add current working directory into executor classpath
SPARK_CLASSPATH="$SPARK_CLASSPATH:$PWD"

case "$1" in
driver)
shift 1
Expand Down