Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e1d9f71
One loader workers.
holdenk Mar 21, 2014
648b559
Both class loaders compile. Now for testing
holdenk Mar 21, 2014
8d2241e
Adda FakeClass for testing ClassLoader precedence options
holdenk Mar 21, 2014
16aecd1
Doesn't quite work
holdenk Mar 21, 2014
792d961
Almost works
holdenk Mar 22, 2014
7ef4628
Clean up
holdenk Mar 23, 2014
22d83cb
Add a test suite for the executor url class loader suite
holdenk Mar 23, 2014
47046ff
Remove bad import'
holdenk Mar 23, 2014
dc4fe44
Does not depend on being in my home directory
holdenk Mar 23, 2014
691ee00
It works ish
holdenk Mar 23, 2014
9e2d236
It works comrade
holdenk Mar 23, 2014
8a67302
Test are good
holdenk Mar 23, 2014
4919bf9
Fix parent calling class loader issue
holdenk Mar 24, 2014
d90d217
Fix fall back with custom class loader and add a test for it
holdenk Mar 24, 2014
a343350
Update rat-excludes and remove a useless file
holdenk Mar 25, 2014
241b03d
fix my rat excludes
holdenk Mar 25, 2014
bb8d179
Fix issues with the repl class loader
holdenk Mar 25, 2014
125ea7f
Easier to just remove those files, we don't need them
holdenk Mar 25, 2014
a0ef85a
Fix style issues
holdenk Mar 25, 2014
7752594
re-add https comment
holdenk Mar 25, 2014
aa95083
CR feedback
holdenk Mar 25, 2014
d4ae848
rewrite component into scala
holdenk Mar 25, 2014
7a7bf5f
CR feedback
holdenk Mar 25, 2014
261aaee
simplify executorurlclassloader a bit
holdenk Mar 25, 2014
858aba2
Remove a bunch of test junk
holdenk Apr 8, 2014
9f68f10
Start rewriting the ExecutorURLClassLoaderSuite to not use the hard c…
holdenk Apr 8, 2014
204b199
Fix the generated classes
holdenk Apr 8, 2014
f0b7114
Fix the core/src/test/scala/org/apache/spark/executor/ExecutorURLClas…
holdenk Apr 8, 2014
644719f
User the class generator for the repl class loader tests too
holdenk Apr 8, 2014
7546549
CR feedback, merge some of the testutils methods down, rename the cla…
holdenk Apr 8, 2014
8f89965
Fix tests for new class name
holdenk Apr 8, 2014
1955232
Fix long line in TestUtils
holdenk Apr 8, 2014
cf0cac9
Fix the executorclassloader
holdenk Apr 8, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ work
.*\.q
golden
test.out/*
.*iml
.*iml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ object TestUtils {
* Note: if this is used during class loader tests, class names should be unique
* in order to avoid interference between tests.
*/
def createJarWithClasses(classNames: Seq[String]): URL = {
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will need to be made package private if it goes into main instead of test. I can do this when merging, just putting a reminder here.

val tempDir = Files.createTempDir()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir)
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
}


/**
* Create a jar file that contains this set of files. All files will be located at the root
* of the jar.
Expand Down Expand Up @@ -80,9 +81,11 @@ object TestUtils {
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(className: String, destDir: File): File = {
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val sourceFile = new JavaSourceFromString(className, s"public class $className {}")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + " { @Override public String toString() { " +
"return \"" + value + "\";}}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand Down
17 changes: 12 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,19 @@ private[spark] class Executor(
* Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
* created by the interpreter to the search path
*/
private def createClassLoader(): ExecutorURLClassLoader = {
val loader = Thread.currentThread().getContextClassLoader
private def createClassLoader(): MutableURLClassLoader = {
val loader = this.getClass.getClassLoader

// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
val urls = currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
}.toArray
new ExecutorURLClassLoader(urls, loader)
val userClassPathFirst = conf.getBoolean("spark.classpath.userClassPathFirst", false)
userClassPathFirst match {
case true => new ChildExecutorURLClassLoader(urls, loader)
case false => new ExecutorURLClassLoader(urls, loader)
}
}

/**
Expand All @@ -310,11 +314,14 @@ private[spark] class Executor(
val classUri = conf.get("spark.repl.class.uri", null)
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
val userClassPathFirst: java.lang.Boolean =
conf.getBoolean("spark.classpath.userClassPathFirst", false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs docs (note to self)

try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader])
constructor.newInstance(classUri, parent)
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
classOf[Boolean])
constructor.newInstance(classUri, parent, userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,56 @@ package org.apache.spark.executor

import java.net.{URLClassLoader, URL}

import org.apache.spark.util.ParentClassLoader

/**
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
* We also make changes so user classes can come before the default classes.
*/

private[spark] trait MutableURLClassLoader extends ClassLoader {
def addURL(url: URL)
def getURLs: Array[URL]
}

private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends MutableURLClassLoader {

private object userClassLoader extends URLClassLoader(urls, null){
override def addURL(url: URL) {
super.addURL(url)
}
override def findClass(name: String): Class[_] = {
super.findClass(name)
}
}

private val parentClassLoader = new ParentClassLoader(parent)

override def findClass(name: String): Class[_] = {
try {
userClassLoader.findClass(name)
} catch {
case e: ClassNotFoundException => {
parentClassLoader.loadClass(name)
}
}
}

def addURL(url: URL) {
userClassLoader.addURL(url)
}

def getURLs() = {
userClassLoader.getURLs()
}
}

private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) {
extends URLClassLoader(urls, parent) with MutableURLClassLoader {

override def addURL(url: URL) {
super.addURL(url)
}
}

32 changes: 32 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

/**
* A class loader which makes findClass accesible to the child
*/
private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) {

override def findClass(name: String) = {
super.findClass(name)
}

override def loadClass(name: String): Class[_] = {
super.loadClass(name)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import java.io.File
import java.net.URLClassLoader

import org.scalatest.FunSuite

import org.apache.spark.TestUtils

class ExecutorURLClassLoaderSuite extends FunSuite {

val childClassNames = List("FakeClass1", "FakeClass2")
val parentClassNames = List("FakeClass1", "FakeClass2", "FakeClass3")
val urls = List(TestUtils.createJarWithClasses(childClassNames, "1")).toArray
val urls2 = List(TestUtils.createJarWithClasses(parentClassNames, "2")).toArray

test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
}

test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
}

test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
}

test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
}
}


}
1 change: 1 addition & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ object SparkBuild extends Build {
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),

resolvers ++= Seq(
// HTTPS is unavailable for Maven Central
"Maven Repository" at "http://repo.maven.apache.org/maven2",
"Apache Repository" at "https://repository.apache.org/content/repositories/releases",
"JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkEnv
import org.apache.spark.util.Utils

import org.apache.spark.util.ParentClassLoader

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._


/**
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
* used to load classes defined by the interpreter when the REPL is used
*/
class ExecutorClassLoader(classUri: String, parent: ClassLoader)
extends ClassLoader(parent) {
* used to load classes defined by the interpreter when the REPL is used.
* Allows the user to specify if user class path should be first
*/
class ExecutorClassLoader(classUri: String, parent: ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader {
val uri = new URI(classUri)
val directory = uri.getPath

val parentLoader = new ParentClassLoader(parent)

// Hadoop FileSystem object for our URI, if it isn't using HTTP
var fileSystem: FileSystem = {
if (uri.getScheme() == "http") {
Expand All @@ -49,8 +51,27 @@ extends ClassLoader(parent) {
FileSystem.get(uri, new Configuration())
}
}

override def findClass(name: String): Class[_] = {
userClassPathFirst match {
case true => findClassLocally(name).getOrElse(parentLoader.loadClass(name))
case false => {
try {
parentLoader.loadClass(name)
} catch {
case e: ClassNotFoundException => {
val classOption = findClassLocally(name)
classOption match {
case None => throw new ClassNotFoundException(name, e)
case Some(a) => a
}
}
}
}
}
}

def findClassLocally(name: String): Option[Class[_]] = {
try {
val pathInDirectory = name.replace('.', '/') + ".class"
val inputStream = {
Expand All @@ -68,9 +89,9 @@ extends ClassLoader(parent) {
}
val bytes = readAndTransformClass(name, inputStream)
inputStream.close()
return defineClass(name, bytes, 0, bytes.length)
Some(defineClass(name, bytes, 0, bytes.length))
} catch {
case e: Exception => throw new ClassNotFoundException(name, e)
case e: Exception => None
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.repl

import java.io.File
import java.net.URLClassLoader

import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite

import com.google.common.io.Files

import org.apache.spark.TestUtils

class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {

val childClassNames = List("ReplFakeClass1", "ReplFakeClass2")
val parentClassNames = List("ReplFakeClass1", "ReplFakeClass2", "ReplFakeClass3")
val tempDir1 = Files.createTempDir()
val tempDir2 = Files.createTempDir()
val url1 = "file://" + tempDir1
val urls2 = List(tempDir2.toURI.toURL).toArray

override def beforeAll() {
childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
parentClassNames.foreach(TestUtils.createCompiledClass(_, tempDir2, "2"))
}

test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
}

test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, false)
val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
}

test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
}

test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
}
}

}