Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler.local

import java.io.File
import java.net.URL
import java.nio.ByteBuffer

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
Expand All @@ -40,6 +42,7 @@ private case class StopExecutor()
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
private val totalCores: Int)
Expand All @@ -51,7 +54,7 @@ private[spark] class LocalEndpoint(
private val localExecutorHostname = "localhost"

private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)

override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
Expand Down Expand Up @@ -97,10 +100,22 @@ private[spark] class LocalBackend(

private val appId = "local-" + System.currentTimeMillis
var localEndpoint: RpcEndpointRef = null
private val userClassPath = getUserClasspath(conf)

/**
* Returns a list of URLs representing the user classpath.
*
* @param conf Spark configuration.
*/
def getUserClasspath(conf: SparkConf): Seq[URL] = {
val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL)
}

override def start() {
localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
"LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores))
"LocalBackendEndpoint",
new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
}

override def stop() {
Expand Down