Skip to content

Conversation

@pralabhkumar
Copy link
Contributor

What changes were proposed in this pull request?

This PR will place spark.files , spark.jars and spark.pyfiles to the current working directory on the driver in K8s cluster mode

Why are the changes needed?

This mimics the behaviour of Yarn and also helps user to access files from PWD . Also as mentioned in the jira
By doing this, users can, for example, leverage PEX to manage Python dependences in Apache Spark:

pex pyspark==3.0.1 pyarrow==0.15.1 pandas==0.25.3 -o myarchive.pex
PYSPARK_PYTHON=./myarchive.pex spark-submit --files myarchive.pex

Does this PR introduce any user-facing change?

No

How was this patch tested?

Tested via unit test cases and also ran on local K8s cluster.

@github-actions github-actions bot added the CORE label Aug 5, 2022
@pralabhkumar
Copy link
Contributor Author

@HyukjinKwon Please review , build is failing because of un related error (since its passing on local)

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test("SPARK-33782 : handles k8s files download to current directory") {
test("SPARK-33782 : handles k8s files download to current directory") {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's import this on the top

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why do we try this unpack for jars and files too? I think we should just call downloadFileList for them

@HyukjinKwon HyukjinKwon changed the title [SPARK-33782][K8s][CORE]Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster mode [SPARK-33782][K8S][CORE]Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster mode Aug 7, 2022
@HyukjinKwon
Copy link
Member

cc @dongjoon-hyun @tgravescs @Ngone51 FYI

@tgravescs
Copy link
Contributor

Does this PR introduce any user-facing change?
No

Is this true? If I specify a file to pass along and my code references it, what did it do before? I know normally on yarn I would do some like ./XXXX.

I'm a bit worried here about breaking people if we move location and they expected it somewhere else.

@pralabhkumar
Copy link
Contributor Author

pralabhkumar commented Aug 8, 2022

@tgravescs

https://github.com/apache/spark/pull/30735/files#r540935585 ,

In case K8s files are not copied to current working directory . Files are available in local but not in current working directory for driver (like the way it happens in yarn)

@pralabhkumar
Copy link
Contributor Author

ping @dongjoon-hyun @Ngone51

1 similar comment
@pralabhkumar
Copy link
Contributor Author

ping @dongjoon-hyun @Ngone51

@pralabhkumar
Copy link
Contributor Author

ping @dongjoon-hyun @Ngone51 . I think this is important feature to have the same behaviour as yarn . Please review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (isArchive) Utils.unpack(source, dest) else Files.copy(source.toPath, dest.toPath)
if (isArchive) {
Utils.unpack(source, dest)
} else {
Files.copy(source.toPath, dest.toPath)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jarsLocalJars -> localJars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since localJars is already defined above , used this variable (which is simillar to filesLocalFiles)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert unnecessary change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
test("SPARK-33782 : handles k8s files download to current directory") {
test("SPARK-33782: handles k8s files download to current directory") {

@Ngone51
Copy link
Member

Ngone51 commented Sep 6, 2022

The change generally looks good to me. It'd be good if @dongjoon-hyun could take a look since he has more knowledge in K8s.

@pralabhkumar
Copy link
Contributor Author

Thx @Ngone51 , for reviewing this , i'll do the suggested changes.

@dongjoon-hyun Please review the PR .

@pralabhkumar
Copy link
Contributor Author

Gentle ping @dongjoon-hyun

1 similar comment
@pralabhkumar
Copy link
Contributor Author

Gentle ping @dongjoon-hyun

@pralabhkumar
Copy link
Contributor Author

@dongjoon-hyun , please review this , it will help to have K8s similar functionality as Yarn . Since this is target for Spark3.4 (as per jira) , IMHO , it would be great if u can spend some time on it .

cc @HyukjinKwon (since jira is created by [HyukjinKwon])

@pralabhkumar
Copy link
Contributor Author

@dongjoon-hyun , Have incorporated all the review comments , please look into the same.

@pralabhkumar
Copy link
Contributor Author

@HyukjinKwon @Ngone51 @dongjoon-hyun
Gentle ping to please review the same.

@pralabhkumar
Copy link
Contributor Author

@HyukjinKwon , can u please reviewed or get it reviewed. Since this is important jira (opened by you) and also fix parity with yarn .

@pralabhkumar pralabhkumar requested review from HyukjinKwon and Ngone51 and removed request for HyukjinKwon and Ngone51 October 7, 2022 09:18
@pralabhkumar pralabhkumar requested review from HyukjinKwon and Ngone51 and removed request for HyukjinKwon and Ngone51 October 7, 2022 09:18
@pralabhkumar
Copy link
Contributor Author

@Yikun can u please review this PR .

@pralabhkumar
Copy link
Contributor Author

@Dooyoung-Hwang please let me know if this feature is not required . I can close the PR . However IMHO , this is a parity fix feature w.r.t to yarn

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, if no one else has any concerns I'll try and merge it this week or next.

@pralabhkumar
Copy link
Contributor Author

pralabhkumar commented Dec 13, 2022

@holdenk Thx for giving LGTM. Gentle reminder to please merge the PR

@asfgit asfgit closed this in af8dd41 Dec 13, 2022
@holdenk
Copy link
Contributor

holdenk commented Dec 13, 2022

Merged, thanks for the reminder.

@pralabhkumar
Copy link
Contributor Author

Thx @holdenk for your help .

@holdenk
Copy link
Contributor

holdenk commented Dec 13, 2022

Thanks for making the PR :D

beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…s under the current working directory on the driver in K8S cluster mode

### What changes were proposed in this pull request?
This PR will place spark.files , spark.jars and spark.pyfiles to the current working directory on the driver in K8s cluster mode

### Why are the changes needed?
This mimics the behaviour of Yarn and also helps user to access files from PWD . Also as mentioned in the jira
By doing this, users can, for example, leverage PEX to manage Python dependences in Apache Spark:
```
pex pyspark==3.0.1 pyarrow==0.15.1 pandas==0.25.3 -o myarchive.pex
PYSPARK_PYTHON=./myarchive.pex spark-submit --files myarchive.pex
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested via unit test cases and also ran on local K8s cluster.

Closes apache#37417 from pralabhkumar/rk_k8s_local_resource.

Authored-by: pralabhkumar <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
@advancedxy
Copy link
Contributor

@pralabhkumar thanks for your work. I noticed similar issue when running spark application on K8S, it's helpful feature

However, this pr might have some inefficiency to download files/jars twice when running k8s-cluster mode.

  if (deployMode == CLIENT) {
      // jars are downloaded once
      localJars = Option(args.jars).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf)
      }.orNull
      // py files are downloaded once
      localPyFiles = Option(args.pyFiles).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf)
      }.orNull
      if (isKubernetesClusterModeDriver) {
        def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): String = {
           ...
        }

        val filesLocalFiles = Option(args.files).map {
          downloadResourcesToCurrentDirectory(_)
        }.orNull
        // jars are downloaded again
        val jarsLocalJars = Option(args.jars).map {
          downloadResourcesToCurrentDirectory(_)
        }.orNull
        val archiveLocalFiles = Option(args.archives).map {
          downloadResourcesToCurrentDirectory(_, true)
        }.orNull
        // py files are downloaded again
        val pyLocalFiles = Option(args.pyFiles).map {
          downloadResourcesToCurrentDirectory(_)
        }.orNull
    }
  }

Would you mind to create a followup pr to address this issue? @pralabhkumar

Also, there's another catch when running spark on k8s with --files/--archives:
These files/archives are already downloaded here, however they are passed as args.files, args.archives, the spark context would copied them (and/or untar them) again when constructing the context, see relevant code:

_jars = Utils.getUserJars(_conf)
_files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_archives = _conf.getOption(ARCHIVES.key).map(Utils.stringToSeq).toSeq.flatten

And
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(jar => addJar(jar, true))
if (addedJars.nonEmpty) {
_conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(","))
}
}
if (files != null) {
files.foreach(file => addFile(file, false, true))
if (addedFiles.nonEmpty) {
_conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(","))
}
}
if (archives != null) {
archives.foreach(file => addFile(file, false, true, isArchive = true))
if (addedArchives.nonEmpty) {
_conf.set("spark.app.initial.archive.urls", addedArchives.keys.toSeq.mkString(","))
}
}

cc @Ngone51 @holdenk

@stijndehaes
Copy link
Contributor

stijndehaes commented May 23, 2023

Maybe this is not the correct place to ask, but this PR had created some issues for our flow. Maybe our flow was not how things were intended to be used but it goes like this:
We submit jobs on kubernetes by baking in the jar in the docker image, we copy the jar in the docker image on the workdir location.
The result of this PR is that our jar is removed, because of the Utils.deleteRecursively(dest).
Now a quick fix is to copy the jar somewhere else, but if this sounds like unintended behavior to you guys, I am willing to open up a PR to fix it.
I am however not sure what the intended behavior would be. Just removing the Utils.deleteRecursively(dest) sounds a bit too simple :)

dongjoon-hyun pushed a commit that referenced this pull request Jun 7, 2023
…driver in K8S cluster mode

### What changes were proposed in this pull request?

Adding working directory into classpath on the driver in K8S cluster mode.

### Why are the changes needed?

After #37417, the spark.files, spark.jars are placed  in the working directory.
But seems that the spark context classloader can not access them because they are not in the classpath by default.
This pr adds the current working directory into classpath, so that the spark.files, spark.jars placed in the working directory can be accessible by the classloader.
For example, the `hive-site.xml` uploaded by `spark.files`.

### Does this PR introduce _any_ user-facing change?

yes, users do not need to add the working directory into spark classpath manually.

### How was this patch tested?

UT.

Closes #41201 from turboFei/work_dir_classpath.

Authored-by: fwang12 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
…driver in K8S cluster mode

### What changes were proposed in this pull request?

Adding working directory into classpath on the driver in K8S cluster mode.

### Why are the changes needed?

After apache#37417, the spark.files, spark.jars are placed  in the working directory.
But seems that the spark context classloader can not access them because they are not in the classpath by default.
This pr adds the current working directory into classpath, so that the spark.files, spark.jars placed in the working directory can be accessible by the classloader.
For example, the `hive-site.xml` uploaded by `spark.files`.

### Does this PR introduce _any_ user-facing change?

yes, users do not need to add the working directory into spark classpath manually.

### How was this patch tested?

UT.

Closes apache#41201 from turboFei/work_dir_classpath.

Authored-by: fwang12 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@pm-nuance
Copy link

@pralabhkumar @HyukjinKwon @holdenk
Facing issue with the new Spark version which is using Files.Copy in SparkSubmit.scala.
The latest change in the SparkSubmit.scala is causing the NoSuchFileException.
The mentioned "sample.jar" is present in the mentioned path /opt/spark/work-dir/sample.jar inside the docker image.
My same code is working fine with Spark 3.3.0.

Here is the Error Message
Files  local:///opt/spark/work-dir/sample.jar from /opt/spark/work-dir/sample.jar to /opt/spark/work-dir/./sample.jar
Exception in thread "main" java.nio.file.NoSuchFileException: /opt/spark/work-dir/sample.jar
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
        at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)
        at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
        at java.nio.file.Files.copy(Files.java:1274)
        at org.apache.spark.deploy.SparkSubmit.$anonfun$prepareSubmitEnvironment$14(SparkSubmit.scala:437)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
     

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants