Skip to content

Commit 0c7b430

Browse files
turboFeidongjoon-hyun
authored andcommitted
[SPARK-43540][K8S][CORE] Add working directory into classpath on the driver in K8S cluster mode
### What changes were proposed in this pull request? Adding working directory into classpath on the driver in K8S cluster mode. ### Why are the changes needed? After #37417, the spark.files, spark.jars are placed in the working directory. But seems that the spark context classloader can not access them because they are not in the classpath by default. This pr adds the current working directory into classpath, so that the spark.files, spark.jars placed in the working directory can be accessible by the classloader. For example, the `hive-site.xml` uploaded by `spark.files`. ### Does this PR introduce _any_ user-facing change? yes, users do not need to add the working directory into spark classpath manually. ### How was this patch tested? UT. Closes #41201 from turboFei/work_dir_classpath. Authored-by: fwang12 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 2057eb7 commit 0c7b430

File tree

3 files changed

+22
-5
lines changed

3 files changed

+22
-5
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ private[spark] class SparkSubmit extends Logging {
414414
// directory too.
415415
// SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current
416416
// working directory
417+
// SPARK-43540: add current working directory into driver classpath
418+
val workingDirectory = "."
419+
childClasspath += workingDirectory
417420
def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false):
418421
String = {
419422
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
@@ -423,13 +426,12 @@ private[spark] class SparkSubmit extends Logging {
423426
targetDir, sparkConf, hadoopConf)
424427
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
425428
case (localResources, resolvedUri) =>
426-
val source = new File(localResources.getPath)
429+
val source = new File(localResources.getPath).getCanonicalFile
427430
val dest = new File(
428-
".",
431+
workingDirectory,
429432
if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
430-
logInfo(
431-
s"Files $resolvedUri " +
432-
s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
433+
.getCanonicalFile
434+
logInfo(s"Files $resolvedUri from $source to $dest")
433435
Utils.deleteRecursively(dest)
434436
if (isArchive) {
435437
Utils.unpack(source, dest)

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,18 @@ class SparkSubmitSuite
16181618
conf.get(k) should be (v)
16191619
}
16201620
}
1621+
1622+
test("SPARK-43540: Add working directory into classpath on the driver in K8S cluster mode") {
1623+
val clArgs = Seq(
1624+
"--deploy-mode", "client",
1625+
"--master", "k8s://host:port",
1626+
"--class", "org.SomeClass",
1627+
"--conf", "spark.kubernetes.submitInDriver=true",
1628+
"/home/thejar.jar")
1629+
val appArgs = new SparkSubmitArguments(clArgs)
1630+
val (_, classpath, _, _) = submit.prepareSubmitEnvironment(appArgs)
1631+
assert(classpath.contains("."))
1632+
}
16211633
}
16221634

16231635
object JarCreationTest extends Logging {

resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ elif ! [ -z ${SPARK_HOME+x} ]; then
7575
SPARK_CLASSPATH="$SPARK_HOME/conf:$SPARK_CLASSPATH";
7676
fi
7777

78+
# SPARK-43540: add current working directory into executor classpath
79+
SPARK_CLASSPATH="$SPARK_CLASSPATH:$PWD"
80+
7881
case "$1" in
7982
driver)
8083
shift 1

0 commit comments

Comments
 (0)