Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address review comments
  • Loading branch information
HyukjinKwon committed Nov 25, 2020
commit 15d8ed51d99e57403aae3272d9975b96e7735ee2
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,7 @@ class SparkContext(config: SparkConf) extends Logging {
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_archives = _conf.getOption(ARCHIVES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_archives = _conf.getOption(ARCHIVES.key).map(Utils.stringToSeq).toSeq.flatten

_eventLogDir =
if (isEventLogEnabled) {
Expand Down Expand Up @@ -1638,10 +1637,11 @@ class SparkContext(config: SparkConf) extends Logging {
val uriToDownload = UriBuilder.fromUri(new URI(key)).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
logInfo(s"Unpacking an archive $path")
val dest = new File(
SparkFiles.getRootDirectory(),
if (uri.getFragment != null) uri.getFragment else source.getName)
logInfo(
s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
postEnvironmentUpdate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor. File paths of these files
| in executors can be accessed via SparkFiles.get(fileName).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| --archives ARCHIVES Comma-separated list of archives to be extracted into the
| working directory of each executor.
|
| --conf, -c PROP=VALUE Arbitrary Spark configuration property.
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private[spark] class Executor(

// To allow users to distribute plugins and their required files
// specified by --jars, --files and --archives on application submission, those
// jars/files/archives should be downloaded and added to the class loader vi
// jars/files/archives should be downloaded and added to the class loader via
// updateDependencies. This should be done before plugin initialization below
// because executors search plugins from the class loader and initialize them.
private val Seq(initialUserJars, initialUserFiles, initialUserArchives) =
Expand Down Expand Up @@ -930,11 +930,12 @@ private[spark] class Executor(
val sourceURI = new URI(name)
val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConf, timestamp, useCache = false, shouldUntar = false)
logInfo(s"Unpacking an archive $name")
env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName)
logInfo(
s"Unpacking an archive $name from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
currentArchives(name) = timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,7 @@ package object config {

private[spark] val ARCHIVES = ConfigBuilder("spark.archives")
.version("3.1.0")
.doc("Comma separated list of archives to be extracted into the working directory of each " +
.doc("Comma-separated list of archives to be extracted into the working directory of each " +
"executor. .jar, .tar.gz, .tgz and .zip are supported. You can specify the directory " +
"name to unpack via adding '#' after the file name to unpack, for example, " +
"'file.zip#directory'. This configuration is experimental.")
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ private[spark] object Utils extends Logging {
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*
* If `shouldUntar` is true, it untars the given url if it is a tar.gz or tgz into `targetDir`.
* This is a legacy behavior, and users should better use `spark.archives` configuration or
* `SparkContext.addArchive`
*/
def fetchFile(
url: String,
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.archives</code></td>
<td></td>
<td>
Comma separated list of archives to be extracted into the working directory of each executor.
Comma-separated list of archives to be extracted into the working directory of each executor.
.jar, .tar.gz, .tgz and .zip are supported. You can specify the directory name to unpack via
adding <code>#</code> after the file name to unpack, for example, <code>file.zip#directory</code>.
This configuration is experimental.
Expand Down