diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 783cf47df169..73acfedd8bcc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy import java.io._ import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableException} import java.net.{URI, URL} +import java.nio.file.Files import java.security.PrivilegedExceptionAction import java.text.ParseException import java.util.{ServiceLoader, UUID} @@ -383,43 +384,55 @@ private[spark] class SparkSubmit extends Logging { }.orNull if (isKubernetesClusterModeDriver) { - // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris. - // Executors will get the jars from the Spark file server. - // Explicitly download the related files here - args.jars = localJars - val filesLocalFiles = Option(args.files).map { - downloadFileList(_, targetDir, sparkConf, hadoopConf) - }.orNull - val archiveLocalFiles = Option(args.archives).map { uris => + // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running + // in cluster mode, the archives should be available in the driver's current working + // directory too. + // SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current + // working directory + def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): + String = { val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) - val localArchives = downloadFileList( + val localResources = downloadFileList( resolvedUris.map( UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","), targetDir, sparkConf, hadoopConf) - - // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running - // in cluster mode, the archives should be available in the driver's current working - // directory too. - Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map { - case (localArchive, resolvedUri) => - val source = new File(localArchive.getPath) + Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map { + case (localResources, resolvedUri) => + val source = new File(localResources.getPath) val dest = new File( ".", if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName) logInfo( - s"Unpacking an archive $resolvedUri " + + s"Files $resolvedUri " + s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") Utils.deleteRecursively(dest) - Utils.unpack(source, dest) - + if (isArchive) { + Utils.unpack(source, dest) + } else { + Files.copy(source.toPath, dest.toPath) + } // Keep the URIs of local files with the given fragments. UriBuilder.fromUri( - localArchive).fragment(resolvedUri.getFragment).build().toString + localResources).fragment(resolvedUri.getFragment).build().toString }.mkString(",") + } + + val filesLocalFiles = Option(args.files).map { + downloadResourcesToCurrentDirectory(_) + }.orNull + val jarsLocalJars = Option(args.jars).map { + downloadResourcesToCurrentDirectory(_) + }.orNull + val archiveLocalFiles = Option(args.archives).map { + downloadResourcesToCurrentDirectory(_, true) + }.orNull + val pyLocalFiles = Option(args.pyFiles).map { + downloadResourcesToCurrentDirectory(_) }.orNull args.files = filesLocalFiles args.archives = archiveLocalFiles - args.pyFiles = localPyFiles + args.pyFiles = pyLocalFiles + args.jars = jarsLocalJars } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 6bd3a49576ae..76311d0ab183 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -486,6 +486,41 @@ class SparkSubmitSuite conf.get("spark.kubernetes.driver.container.image") should be ("bar") } + test("SPARK-33782: handles k8s files download to current directory") { + val clArgs = Seq( + "--deploy-mode", "client", + "--proxy-user", "test.user", + "--master", "k8s://host:port", + "--executor-memory", "5g", + "--class", "org.SomeClass", + "--driver-memory", "4g", + "--conf", "spark.kubernetes.namespace=spark", + "--conf", "spark.kubernetes.driver.container.image=bar", + "--conf", "spark.kubernetes.submitInDriver=true", + "--files", "src/test/resources/test_metrics_config.properties", + "--py-files", "src/test/resources/test_metrics_system.properties", + "--archives", "src/test/resources/log4j2.properties", + "--jars", "src/test/resources/TestUDTF.jar", + "/home/thejar.jar", + "arg1") + val appArgs = new SparkSubmitArguments(clArgs) + val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs) + conf.get("spark.master") should be ("k8s://https://host:port") + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.driver.memory") should be ("4g") + conf.get("spark.kubernetes.namespace") should be ("spark") + conf.get("spark.kubernetes.driver.container.image") should be ("bar") + + Files.exists(Paths.get("test_metrics_config.properties")) should be (true) + Files.exists(Paths.get("test_metrics_system.properties")) should be (true) + Files.exists(Paths.get("log4j2.properties")) should be (true) + Files.exists(Paths.get("TestUDTF.jar")) should be (true) + Files.delete(Paths.get("test_metrics_config.properties")) + Files.delete(Paths.get("test_metrics_system.properties")) + Files.delete(Paths.get("log4j2.properties")) + Files.delete(Paths.get("TestUDTF.jar")) + } + /** * Helper function for testing main class resolution on remote JAR files. *