Skip to content

Conversation

@JoshRosen
Copy link
Contributor

What changes were proposed in this pull request?

The behavior of SparkContext.addFile() changed slightly with the introduction of the Netty-RPC-based file server, which was introduced in Spark 1.6 (where it was disabled by default) and became the default / only file server in Spark 2.0.0.

Prior to 2.0, calling SparkContext.addFile() with files that have the same name and identical contents would succeed. This behavior was never explicitly documented but Spark has behaved this way since very early 1.x versions.

In 2.0 (or 1.6 with the Netty file server enabled), the second addFile() call will fail with a requirement error because NettyStreamManager tries to guard against duplicate file registration.

This problem also affects addJar() in a more subtle way: the fileServer.addJar() call will also fail with an exception but that exception is logged and ignored; I believe that the problematic exception-catching path was mistakenly copied from some old code which was only relevant to very old versions of Spark and YARN mode.

I believe that this change of behavior was unintentional, so this patch weakens the require check so that adding the same filename at the same path will succeed.

At file download time, Spark tasks will fail with exceptions if an executor already has a local copy of a file and that file's contents do not match the contents of the file being downloaded / added. As a result, it's important that we prevent files with the same name and different contents from being served because allowing that can effectively brick an executor by preventing it from successfully launching any new tasks. Before this patch's change, this was prevented by forbidding addFile() from being called twice on files with the same name. Because Spark does not defensively copy local files that are passed to addFile it is vulnerable to files' contents changing, so I think it's okay to rely on an implicit assumption that these files are intended to be immutable (since if they are mutable then this can lead to either explicit task failures or implicit incorrectness (in case new executors silently get newer copies of the file while old executors continue to use an older version)). To guard against this, I have decided to only update the file addition timestamps on the first call to addFile(); duplicate calls will succeed but will not update the timestamp. This behavior is fine as long as we assume files are immutable, which seems reasonable given the behaviors described above.

As part of this change, I also improved the thread-safety of the addedJars and addedFiles maps; this is important because these maps may be concurrently read by a task launching thread and written by a driver thread in case the user's driver code is multi-threaded.

How was this patch tested?

I added regression tests in SparkContextSuite.

@rxin
Copy link
Contributor

rxin commented Jul 29, 2016

cc @vanzin

@SparkQA
Copy link

SparkQA commented Jul 29, 2016

Test build #62994 has finished for PR 14396 at commit 0d7dd0d.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is putIfAbsent correct here? It won't update the timestamp when you call addFile for the same file again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I see you're actually changing the behavior from 1.x instead of restoring it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is actually intentional. I experimented with implementing the 1.x behavior and started writing some tests to verify that newer versions of files took precedence over old ones but then discovered that Spark executors will crash with exceptions if they've downloaded a file with a given name and the new file's contents don't match the old file. Given this behavior it seems that updating the timestamp will work only if the new file has the same contents as the old file (in which case it doesn't matter what we do with the timestamp) or if all executors are replaced before running any further tasks (which seems like an obscure use-case that we don't want to optimize for).

@vanzin
Copy link
Contributor

vanzin commented Jul 29, 2016

The change LGTM.

I vaguely remember some people relying on the behavior that the new contents are uploaded when calling these methods a second time. e.g., to upload a new jar with updated code (and yes, I realize that is super sketchy to begin with and probably won't work as they expect). Maybe making the behavior clear in the scaladoc would help?

addedFiles(key) = timestamp

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this line is unnecessary now that runLocally was removed a few releases ago. However, I suppose that we might need it in order to handle the somewhat obscure corner-case where a user's reduce function / closure executes on the driver and accesses the SparkFiles object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, turns out this is needed in the Python tests so I added it back.

@JoshRosen
Copy link
Contributor Author

I'll have to see about the JAR case, but I think that the case of re-uploading a file behaves in an especially-confusing way if the contents are different: the Utils.fetchFile() call on the driver would cause the driver's addFile() call to throw an exception but this would happen after the addedFiles(key) = timestamp assignment, meaning that executors will already have been bricked.

@JoshRosen
Copy link
Contributor Author

A few other notes:

  • It's important that we keep the timestamps because they're necessary for cross-worker / cross-executor JAR + file download caching to work correctly. This is a relatively obscure feature which was added a few releases ago in order to improve scalability when running large clusters with many executors per worker. Theoretically different applications will generally have different timestamps for the same JAR so we could just as well use app ids for this but I'd rather not make such an invasive change now.
  • If we assume that addFile() will never be called with files with the same name and different contents, so as not brick executors, then allowing files to be downloaded so that the content-comparison can be performed on executors will lead to performance problems if addFile is repeatedly called with the same argument, since executors will repeatedly download the file despite it having identical contents.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 30, 2016

Test build #63024 has finished for PR 14396 at commit 9aa32b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2016

Test build #63083 has finished for PR 14396 at commit 9aa32b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Aug 1, 2016

@JoshRosen LGTM for the current patch. However, I have a question about the following description in your PR:

At file download time, Spark tasks will fail with exceptions if an executor already has a local copy of a file and that file's contents do not match the contents of the file being downloaded / added.

Utils.fetchFile will skip the downloading step if a local copy exists. Hence it won't check the content. I think the file content checking is just to prevent from hash code conflicts for caching file names.

@JoshRosen
Copy link
Contributor Author

JoshRosen commented Aug 2, 2016

@zsxwing, I meant to describe what happens on executors in the following scenario:

  • addFile(foo) is called for the first time at timestamp = 1
  • A task runs on an executor and downloads the copy of the file added at timestamp = 1.
    • By default, the file fetch cache is enabled and filenames in that cache incorporate timestamps. Thus, this file will be downloaded to a file named <hashcode of foo's URL>$timestamp_cache.
  • addFile(foo) is called a second time at timestamp = 2 and the same file is passed to it.
  • A task runs on an executor and discovers that the added file's timestamp (2) is newer than the timestamp of the file that it has already downloaded (1), so it tries to fetch files again:
    • Because the file with the newer timestamp is not present in the fetch file cache, a new copy of the file will be downloaded.

If the fetch file cache is disabled, on the other hand, then we directly call doFetchFile which, in turn, will call downloadFile(), which downloads the file to a temporary file before considering whether to overwrite an existing file.

In either case, it looks like re-adding a file with a new timestamp will trigger downloads on the executors and those downloads will be unnecessary if the file's contents are unchanged.

@zsxwing
Copy link
Member

zsxwing commented Aug 2, 2016

@JoshRosen Thanks for your clarifying. LGTM!

@JoshRosen
Copy link
Contributor Author

Great! I'm going to merge this now. At some point we should clarify the user-facing documentation for addFile(), etc., but let's do that in a followup.

asfgit pushed a commit that referenced this pull request Aug 2, 2016
… with the same file

## What changes were proposed in this pull request?

The behavior of `SparkContext.addFile()` changed slightly with the introduction of the Netty-RPC-based file server, which was introduced in Spark 1.6 (where it was disabled by default) and became the default / only file server in Spark 2.0.0.

Prior to 2.0, calling `SparkContext.addFile()` with files that have the same name and identical contents would succeed. This behavior was never explicitly documented but Spark has behaved this way since very early 1.x versions.

In 2.0 (or 1.6 with the Netty file server enabled), the second `addFile()` call will fail with a requirement error because NettyStreamManager tries to guard against duplicate file registration.

This problem also affects `addJar()` in a more subtle way: the `fileServer.addJar()` call will also fail with an exception but that exception is logged and ignored; I believe that the problematic exception-catching path was mistakenly copied from some old code which was only relevant to very old versions of Spark and YARN mode.

I believe that this change of behavior was unintentional, so this patch weakens the `require` check so that adding the same filename at the same path will succeed.

At file download time, Spark tasks will fail with exceptions if an executor already has a local copy of a file and that file's contents do not match the contents of the file being downloaded / added. As a result, it's important that we prevent files with the same name and different contents from being served because allowing that can effectively brick an executor by preventing it from successfully launching any new tasks. Before this patch's change, this was prevented by forbidding `addFile()` from being called twice on files with the same name. Because Spark does not defensively copy local files that are passed to `addFile` it is vulnerable to files' contents changing, so I think it's okay to rely on an implicit assumption that these files are intended to be immutable (since if they _are_ mutable then this can lead to either explicit task failures or implicit incorrectness (in case new executors silently get newer copies of the file while old executors continue to use an older version)). To guard against this, I have decided to only update the file addition timestamps on the first call to `addFile()`; duplicate calls will succeed but will not update the timestamp. This behavior is fine as long as we assume files are immutable, which seems reasonable given the behaviors described above.

As part of this change, I also improved the thread-safety of the `addedJars` and `addedFiles` maps; this is important because these maps may be concurrently read by a task launching thread and written by a driver thread in case the user's driver code is multi-threaded.

## How was this patch tested?

I added regression tests in `SparkContextSuite`.

Author: Josh Rosen <[email protected]>

Closes #14396 from JoshRosen/SPARK-16787.

(cherry picked from commit e9fc0b6)
Signed-off-by: Josh Rosen <[email protected]>
@asfgit asfgit closed this in e9fc0b6 Aug 2, 2016
@JoshRosen JoshRosen deleted the SPARK-16787 branch August 2, 2016 19:05
@JoshRosen
Copy link
Contributor Author

Also merged into branch-2.0 for inclusion in 2.0.1.

@NYcleaner
Copy link

hi, The addFile(path) function in pyspark only have one param 'path'', so if i have many files . I had to use addFile many times, can you make the function as same as in the scala APi

@NYcleaner
Copy link

Thank you

@JoshRosen
Copy link
Contributor Author

Hi @NYcleaner,

This pull request isn't the right venue for making PySpark API feature requests. Instead, please file a ticket at https://issues.apache.org/jira/browse/SPARK.

@NYcleaner
Copy link

Oh. sorry. I am a newer , thank you for your patience and help

@josephguan
Copy link

Hi @JoshRosen

I'm investigating SPARK-19417 spark.files.overwrite is ignored. But according to this PR, it seems it is designed to ignore spark.files.overwrite, as we assume files are immutable and can not be overwrite, right?

So, shall we update the document and remove codes relating to 'spark.files.overwrite' property?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants