From 45bf62c8a01de4456b0f8c9f1e97af289343db8d Mon Sep 17 00:00:00 2001 From: Min Zhou Date: Mon, 29 Jun 2015 12:29:12 -0700 Subject: [PATCH 1/4] SPARK-8675 Executors created by LocalBackend won't get the same classpath as other executor backends --- .../spark/scheduler/local/LocalBackend.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 3078a1b10be8..faa3c012b170 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.local +import java.net.URL import java.nio.ByteBuffer import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState} @@ -40,6 +41,7 @@ private case class StopExecutor() */ private[spark] class LocalEndpoint( override val rpcEnv: RpcEnv, + userClassPath: Seq[URL], scheduler: TaskSchedulerImpl, executorBackend: LocalBackend, private val totalCores: Int) @@ -51,7 +53,7 @@ private[spark] class LocalEndpoint( private val localExecutorHostname = "localhost" private val executor = new Executor( - localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true) + localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) override def receive: PartialFunction[Any, Unit] = { case ReviveOffers => @@ -97,10 +99,21 @@ private[spark] class LocalBackend( private val appId = "local-" + System.currentTimeMillis var localEndpoint: RpcEndpointRef = null + private val userClassPath = getUserClasspath(conf) + + /** + * Returns a list of URLs representing the user classpath. + * + * @param conf Spark configuration. + */ + def getUserClasspath(conf: SparkConf): Seq[URL] = { + val userClassPathStr = conf.getOption("spark.executor.extraClassPath") + userClassPathStr.map(_.split(",")).toSeq.flatten.map(new URL(_)) + } override def start() { localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint( - "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores)) + "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores)) } override def stop() { From 84ad2cd1cddee1f0a054e58124d5cf139ed36b5d Mon Sep 17 00:00:00 2001 From: Min Zhou Date: Thu, 9 Jul 2015 13:07:09 -0700 Subject: [PATCH 2/4] Use system specific path separator instead of ',' --- .../scala/org/apache/spark/scheduler/local/LocalBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index faa3c012b170..8b1cfc5aee7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.local +import java.io.File import java.net.URL import java.nio.ByteBuffer @@ -108,7 +109,7 @@ private[spark] class LocalBackend( */ def getUserClasspath(conf: SparkConf): Seq[URL] = { val userClassPathStr = conf.getOption("spark.executor.extraClassPath") - userClassPathStr.map(_.split(",")).toSeq.flatten.map(new URL(_)) + userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new URL(_)) } override def start() { From d215b7fcc418fad50b985ad78a7553a4a1005dca Mon Sep 17 00:00:00 2001 From: Min Zhou Date: Thu, 9 Jul 2015 13:10:10 -0700 Subject: [PATCH 3/4] Follows spark standard scala style, make the auto testing happy --- .../scala/org/apache/spark/scheduler/local/LocalBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 8b1cfc5aee7b..17ece3840db5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -114,7 +114,8 @@ private[spark] class LocalBackend( override def start() { localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint( - "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores)) + "LocalBackendEndpoint", + new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores)) } override def stop() { From 365838f4498b6cd9c1964b3090372322a10b7331 Mon Sep 17 00:00:00 2001 From: Min Zhou Date: Thu, 9 Jul 2015 15:56:48 -0700 Subject: [PATCH 4/4] Fixed java.net.MalformedURLException, add default scheme, support relative path --- .../scala/org/apache/spark/scheduler/local/LocalBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 17ece3840db5..776e5d330e3c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -109,7 +109,7 @@ private[spark] class LocalBackend( */ def getUserClasspath(conf: SparkConf): Seq[URL] = { val userClassPathStr = conf.getOption("spark.executor.extraClassPath") - userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new URL(_)) + userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL) } override def start() {