Skip to content
Next Next commit
SPARK-8675 Executors created by LocalBackend won't get the same class…
…path as other executor backends
  • Loading branch information
coderplay committed Jun 29, 2015
commit 45bf62c8a01de4456b0f8c9f1e97af289343db8d
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.scheduler.local

import java.net.URL
import java.nio.ByteBuffer

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
Expand All @@ -40,6 +41,7 @@ private case class StopExecutor()
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
private val totalCores: Int)
Expand All @@ -51,7 +53,7 @@ private[spark] class LocalEndpoint(
private val localExecutorHostname = "localhost"

private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)

override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
Expand Down Expand Up @@ -97,10 +99,21 @@ private[spark] class LocalBackend(

private val appId = "local-" + System.currentTimeMillis
var localEndpoint: RpcEndpointRef = null
private val userClassPath = getUserClasspath(conf)

/**
* Returns a list of URLs representing the user classpath.
*
* @param conf Spark configuration.
*/
def getUserClasspath(conf: SparkConf): Seq[URL] = {
val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
userClassPathStr.map(_.split(",")).toSeq.flatten.map(new URL(_))
}

override def start() {
localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
"LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores))
"LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
}

override def stop() {
Expand Down