Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Minor changes
  • Loading branch information
pwendell committed Mar 11, 2014
commit b132d7bd61f8ba4bd4994b8fee94099125d8ade7
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class SparkContext(
// driver. Do this before all other initialization so that any thread pools created for this
// SparkContext uses the class loader.
// Note that this is config-enabled as classloaders can introduce subtle side effects
private[spark] val classLoader = if (conf.getBoolean("spark.driver.add-dynamic-jars", false)) {
private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) {
val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should set its parent to the current thread's context class loader if one exists. Otherwise users who try to add some class loader before starting SparkContext (e.g. if they're in some server environment) will lose it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch - this is definitely something that needs to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I'm pretty sure there is almost no way that Spark contexts can work properly inside of some server environment, with simply using Thread context classloaders. The reason is that Spark spins up so many other threads. To make everything work easier, I believe we should instead have a standard classloader set in SparkEnv or somewhere like that, which can inherit from Thread context in the thread that started SparkContext, but which can be used everywhere else that spins up new threads.

Thread.currentThread.setContextClassLoader(loader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this only work if addJars is called from the thread that created the SparkContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader.

I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent.

This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader.

I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent.

This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer to @velvia on this one though as it's his design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, I understand now. In that case, to make things simpler, would it possibly make sense to not load the jars to the current thread and only load them for the SparkContext/executors? Classloader stuff can be confusing to deal with and keeping it as isolated as possible could make things easier for users. This would also line up a little more with how the MR distributed cache works - jars that get added to it don't become accessible for to driver code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Sandy - not sure what you mean exactly by "load them for the SparkContext". The SparkContext is just a java object. The scenario we want to handle is like this:

val sc = new SparkContext(...)
sc.addJar("jar-containing-lib-foo")
val x: Seq[Foo] = sc.textFile(...).map(...).collect()

There are two ways "Foo" can be visible for the list line. Either it can be included in the classpath when launching the JVM or it can be added dynamically to the classloader of the calling thread. Is there another way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had misunderstood how the original mechanism worked. I take this all back.

Some(loader)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object SparkEnv extends Logging {

val securityManager = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager, classLoader = classLoader)
securityManager = securityManager)

// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ private[spark] object AkkaUtils extends Logging {
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
conf: SparkConf, securityManager: SecurityManager,
classLoader: ClassLoader = Thread.currentThread.getContextClassLoader):
(ActorSystem, Int) = {
conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {

val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
Expand Down Expand Up @@ -104,9 +102,9 @@ private[spark] object AkkaUtils extends Logging {
""".stripMargin))

val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf, classLoader)
IndestructibleActorSystem(name, akkaConf)
} else {
ActorSystem(name, akkaConf, classLoader)
ActorSystem(name, akkaConf)
}

val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
Expand Down
4 changes: 2 additions & 2 deletions docs/cluster-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ and `addFile`.
- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This
means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
or shared via NFS, GlusterFS, etc. Note that if `spark.driver.add-dynamic-jars` is set, then the file
must be visible to the node running the SparkContext as well.
or shared via NFS, GlusterFS, etc. Note that if `spark.driver.loadAddedJars` is set,
then the file must be visible to the node running the SparkContext as well.

Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
Over time this can use up a significant amount of space and will need to be cleaned up.
Expand Down
9 changes: 5 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,13 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td>spark.driver.add-dynamic-jars</td>
<td>spark.driver.loadAddedJars</td>
<td>false</td>
<td>
If true, the SparkContext uses a class loader to make jars added via `addJar` available to the SparkContext.
The default behavior is that jars added via `addJar` are only made available to executors, and Spark apps
must include all its jars in the application CLASSPATH even if `addJar` is used.
If true, the SparkContext uses a class loader to make jars added via `addJar` available to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the second sentence be simplified to "The default behavior is that jars added via addJar must already be on the classpath."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

the SparkContext. The default behavior is that jars added via `addJar` are only made
available to executors, and Spark apps must include all its jars in the driver's
CLASSPATH even if `addJar` is used.
</td>
</tr>
<tr>
Expand Down