Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Properly clean up after errors in ExecutorClassLoader
  • Loading branch information
JoshRosen committed Mar 9, 2015
commit e2d70a35b3598fb44fb2fe85d79fa8d456b7ef5f
62 changes: 44 additions & 18 deletions repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.spark.repl

import java.io.{ByteArrayOutputStream, InputStream, FileNotFoundException}
import java.net.{URI, URL, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService}
import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}

import org.apache.hadoop.fs.{FileSystem, Path}

Expand Down Expand Up @@ -71,37 +70,64 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
}
}

private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = {
val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
newuri.toURL
} else {
new URL(classUri + "/" + urlEncode(pathInDirectory))
}
val connection: HttpURLConnection = Utils.setupSecureURLConnection(url.openConnection(),
SparkEnv.get.securityManager).asInstanceOf[HttpURLConnection]
if (connection.getResponseCode != 200) {
connection.disconnect()
throw new ClassNotFoundException(s"Class file not found at URL $url")
} else {
connection.getInputStream
}
}

private def getClassFileInputStreamFromFileSystem(pathInDirectory: String): InputStream = {
val path = new Path(directory, pathInDirectory)
if (fileSystem.exists(path)) {
fileSystem.open(path)
} else {
throw new ClassNotFoundException(s"Class file not found at path $path")
}
}

def findClassLocally(name: String): Option[Class[_]] = {
val pathInDirectory = name.replace('.', '/') + ".class"
var inputStream: InputStream = null
try {
val pathInDirectory = name.replace('.', '/') + ".class"
val inputStream = {
inputStream = {
if (fileSystem != null) {
fileSystem.open(new Path(directory, pathInDirectory))
getClassFileInputStreamFromFileSystem(pathInDirectory)
} else {
val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
newuri.toURL
} else {
new URL(classUri + "/" + urlEncode(pathInDirectory))
}

Utils.setupSecureURLConnection(url.openConnection(), SparkEnv.get.securityManager)
.getInputStream
getClassFileInputStreamFromHttpServer(pathInDirectory)
}
}
val bytes = readAndTransformClass(name, inputStream)
inputStream.close()
Some(defineClass(name, bytes, 0, bytes.length))
} catch {
case e: FileNotFoundException =>
case e: ClassNotFoundException =>
// We did not find the class
logDebug(s"Did not load class $name from REPL class server at $uri", e)
None
case e: Exception =>
// Something bad happened while checking if the class exists
logError(s"Failed to check existence of class $name on REPL class server at $uri", e)
None
} finally {
if (inputStream != null) {
try {
inputStream.close()
} catch {
case e: Exception =>
logError("Exception while closing inputStream", e)
}
}
}
}

Expand Down