-
Notifications
You must be signed in to change notification settings - Fork 29k
Spark 939 allow user jars to take precedence over spark jars #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
e1d9f71
648b559
8d2241e
16aecd1
792d961
7ef4628
22d83cb
47046ff
dc4fe44
691ee00
9e2d236
8a67302
4919bf9
d90d217
a343350
241b03d
bb8d179
125ea7f
a0ef85a
7752594
aa95083
d4ae848
7a7bf5f
261aaee
858aba2
9f68f10
204b199
f0b7114
644719f
7546549
8f89965
1955232
cf0cac9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,4 +39,4 @@ work | |
| .*\.q | ||
| golden | ||
| test.out/* | ||
| .*iml | ||
| .*iml | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,20 @@ object TestUtils { | |
| createJar(files, jarFile) | ||
| } | ||
|
|
||
| /** | ||
| * Create a jar that defines classes with the given names. | ||
| * | ||
| * Note: if this is used during class loader tests, class names should be unique | ||
| * in order to avoid interference between tests. | ||
| */ | ||
| def createJarWithClassesAndValue(classNames: Seq[String], value: Integer): URL = { | ||
| val tempDir = Files.createTempDir() | ||
| val files = for (name <- classNames) yield createCompiledClassWithValue(name, value, tempDir) | ||
| val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis())) | ||
| createJar(files, jarFile) | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Create a jar file that contains this set of files. All files will be located at the root | ||
| * of the jar. | ||
|
|
@@ -95,4 +109,22 @@ object TestUtils { | |
| result.renameTo(out) | ||
| out | ||
| } | ||
|
|
||
| /** Creates a compiled class with the given name. Class file will be placed in destDir. */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very redundant with |
||
| def createCompiledClassWithValue(className: String, value: Integer, destDir: File): File = { | ||
| val compiler = ToolProvider.getSystemJavaCompiler | ||
| val sourceFile = new JavaSourceFromString(className, | ||
| "public class " + className + " { @Override public String toString() { return \"" + value + "\";}}") | ||
|
|
||
| // Calling this outputs a class file in pwd. It's easier to just rename the file than | ||
| // build a custom FileManager that controls the output location. | ||
| compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call() | ||
|
|
||
| val fileName = className + ".class" | ||
| val result = new File(fileName) | ||
| if (!result.exists()) throw new Exception("Compiled file not found: " + fileName) | ||
| val out = new File(destDir, fileName) | ||
| result.renameTo(out) | ||
| out | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -291,15 +291,19 @@ private[spark] class Executor( | |
| * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes | ||
| * created by the interpreter to the search path | ||
| */ | ||
| private def createClassLoader(): ExecutorURLClassLoader = { | ||
| val loader = Thread.currentThread().getContextClassLoader | ||
| private def createClassLoader(): MutableURLClassLoader = { | ||
| val loader = this.getClass.getClassLoader | ||
|
|
||
| // For each of the jars in the jarSet, add them to the class loader. | ||
| // We assume each of the files has already been fetched. | ||
| val urls = currentJars.keySet.map { uri => | ||
| new File(uri.split("/").last).toURI.toURL | ||
| }.toArray | ||
| new ExecutorURLClassLoader(urls, loader) | ||
| val userClassPathFirst = conf.getBoolean("spark.classpath.userClassPathFirst", false) | ||
| userClassPathFirst match { | ||
| case true => new ChildExecutorURLClassLoader(urls, loader) | ||
| case false => new ExecutorURLClassLoader(urls, loader) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -310,11 +314,14 @@ private[spark] class Executor( | |
| val classUri = conf.get("spark.repl.class.uri", null) | ||
| if (classUri != null) { | ||
| logInfo("Using REPL class URI: " + classUri) | ||
| val userClassPathFirst: java.lang.Boolean = | ||
| conf.getBoolean("spark.classpath.userClassPathFirst", false) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs docs (note to self) |
||
| try { | ||
| val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader") | ||
| val klass = Class.forName("org.apache.spark.repl.FlexibleExecutorClassLoader") | ||
| .asInstanceOf[Class[_ <: ClassLoader]] | ||
| val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader]) | ||
| constructor.newInstance(classUri, parent) | ||
| val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader], | ||
| classOf[Boolean]) | ||
| constructor.newInstance(classUri, parent, userClassPathFirst) | ||
| } catch { | ||
| case _: ClassNotFoundException => | ||
| logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.util | ||
|
|
||
| /** | ||
| * A class loader which makes findClass accesible to the child | ||
| */ | ||
| private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) { | ||
|
|
||
| override def findClass(name: String) = { | ||
| super.findClass(name) | ||
| } | ||
|
|
||
| override def loadClass(name: String): Class[_] = { | ||
| super.loadClass(name) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.executor | ||
|
|
||
| import java.io.File | ||
| import java.net.URLClassLoader | ||
|
|
||
| import org.scalatest.FunSuite | ||
|
|
||
| import org.apache.spark.TestUtils | ||
|
|
||
| class ExecutorURLClassLoaderSuite extends FunSuite { | ||
|
|
||
| val childClassNames = List("FakeClass1", "FakeClass2") | ||
| val parentClassNames = List("FakeClass1", "FakeClass2", "FakeClass3") | ||
| val urls = List(TestUtils.createJarWithClassesAndValue(childClassNames, 1)).toArray | ||
| val urls2 = List(TestUtils.createJarWithClassesAndValue(parentClassNames, 2)).toArray | ||
|
|
||
| test("child first") { | ||
| val parentLoader = new URLClassLoader(urls2, null) | ||
| val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader) | ||
| val fakeClass = classLoader.loadClass("FakeClass2").newInstance() | ||
| val fakeClassVersion = fakeClass.toString | ||
| assert(fakeClassVersion === "1") | ||
| } | ||
|
|
||
| test("parent first") { | ||
| val parentLoader = new URLClassLoader(urls2, null) | ||
| val classLoader = new ExecutorURLClassLoader(urls, parentLoader) | ||
| val fakeClass = classLoader.loadClass("FakeClass1").newInstance() | ||
| val fakeClassVersion = fakeClass.toString | ||
| assert(fakeClassVersion === "2") | ||
| } | ||
|
|
||
| test("child first can fall back") { | ||
| val parentLoader = new URLClassLoader(urls2, null) | ||
| val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader) | ||
| val fakeClass = classLoader.loadClass("FakeClass3").newInstance() | ||
| val fakeClassVersion = fakeClass.toString | ||
| assert(fakeClassVersion === "2") | ||
| } | ||
|
|
||
| test("child first can fail") { | ||
| val parentLoader = new URLClassLoader(urls2, null) | ||
| val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader) | ||
| intercept[java.lang.ClassNotFoundException] { | ||
| classLoader.loadClass("FakeClassDoesNotExist").newInstance() | ||
| } | ||
| } | ||
|
|
||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,21 +26,28 @@ import org.apache.hadoop.fs.{FileSystem, Path} | |
|
|
||
| import org.apache.spark.SparkEnv | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| import org.apache.spark.util.ParentClassLoader | ||
|
|
||
| import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._ | ||
| import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ | ||
|
|
||
|
|
||
| /** | ||
| * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI, | ||
| * used to load classes defined by the interpreter when the REPL is used | ||
| */ | ||
| class ExecutorClassLoader(classUri: String, parent: ClassLoader) | ||
| extends ClassLoader(parent) { | ||
| extends FlexibleExecutorClassLoader(classUri, parent, false) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just rename this to |
||
| } | ||
| /** | ||
| * Allows the user to specify if user class path should be first | ||
| */ | ||
| class FlexibleExecutorClassLoader(classUri: String, parent: ClassLoader, | ||
| userClassPathFirst: Boolean) extends ClassLoader { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this indent should be four spaces to differentiate from the function body |
||
| val uri = new URI(classUri) | ||
| val directory = uri.getPath | ||
|
|
||
| val parentLoader = new ParentClassLoader(parent) | ||
|
|
||
| // Hadoop FileSystem object for our URI, if it isn't using HTTP | ||
| var fileSystem: FileSystem = { | ||
| if (uri.getScheme() == "http") { | ||
|
|
@@ -49,8 +56,27 @@ extends ClassLoader(parent) { | |
| FileSystem.get(uri, new Configuration()) | ||
| } | ||
| } | ||
|
|
||
| override def findClass(name: String): Class[_] = { | ||
| userClassPathFirst match { | ||
| case true => findClassLocally(name).getOrElse(parentLoader.loadClass(name)) | ||
| case false => { | ||
| try { | ||
| parentLoader.loadClass(name) | ||
| } catch { | ||
| case e: ClassNotFoundException => { | ||
| val classOption = findClassLocally(name) | ||
| classOption match { | ||
| case None => throw new ClassNotFoundException(name, e) | ||
| case Some(a) => a | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def findClassLocally(name: String): Option[Class[_]] = { | ||
| try { | ||
| val pathInDirectory = name.replace('.', '/') + ".class" | ||
| val inputStream = { | ||
|
|
@@ -68,9 +94,9 @@ extends ClassLoader(parent) { | |
| } | ||
| val bytes = readAndTransformClass(name, inputStream) | ||
| inputStream.close() | ||
| return defineClass(name, bytes, 0, bytes.length) | ||
| Some(defineClass(name, bytes, 0, bytes.length)) | ||
| } catch { | ||
| case e: Exception => throw new ClassNotFoundException(name, e) | ||
| case e: Exception => None | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also just consolidate these by having
createJarWithClassescall this and pass an unused value.