Skip to content

Commit c185f3a

Browse files
coderplayAndrew Or
authored andcommitted
[SPARK-8675] Executors created by LocalBackend won't get the same classpath as other executor backends
AFAIK, some spark application always use LocalBackend to do some local initiatives, spark sql is an example. Starting a LocalPoint won't add user classpath into executor. ```java override def start() { localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint( "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores)) } ``` Thus will cause local executor fail with these scenarios, loading hadoop built-in native libraries, loading other user defined native libraries, loading user jars, reading s3 config from a site.xml file, etc Author: Min Zhou <coderplay@gmail.com> Closes #7091 from coderplay/master and squashes the following commits: 365838f [Min Zhou] Fixed java.net.MalformedURLException, add default scheme, support relative path d215b7f [Min Zhou] Follows spark standard scala style, make the auto testing happy 84ad2cd [Min Zhou] Use system specific path separator instead of ',' 01f5d1a [Min Zhou] Merge branch 'master' of https://github.com/apache/spark e528be7 [Min Zhou] Merge branch 'master' of https://github.com/apache/spark 45bf62c [Min Zhou] SPARK-8675 Executors created by LocalBackend won't get the same classpath as other executor backends
1 parent db6d57f commit c185f3a

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler.local
1919

20+
import java.io.File
21+
import java.net.URL
2022
import java.nio.ByteBuffer
2123

2224
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
@@ -40,6 +42,7 @@ private case class StopExecutor()
4042
*/
4143
private[spark] class LocalEndpoint(
4244
override val rpcEnv: RpcEnv,
45+
userClassPath: Seq[URL],
4346
scheduler: TaskSchedulerImpl,
4447
executorBackend: LocalBackend,
4548
private val totalCores: Int)
@@ -51,7 +54,7 @@ private[spark] class LocalEndpoint(
5154
private val localExecutorHostname = "localhost"
5255

5356
private val executor = new Executor(
54-
localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
57+
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
5558

5659
override def receive: PartialFunction[Any, Unit] = {
5760
case ReviveOffers =>
@@ -97,10 +100,22 @@ private[spark] class LocalBackend(
97100

98101
private val appId = "local-" + System.currentTimeMillis
99102
var localEndpoint: RpcEndpointRef = null
103+
private val userClassPath = getUserClasspath(conf)
104+
105+
/**
106+
* Returns a list of URLs representing the user classpath.
107+
*
108+
* @param conf Spark configuration.
109+
*/
110+
def getUserClasspath(conf: SparkConf): Seq[URL] = {
111+
val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
112+
userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL)
113+
}
100114

101115
override def start() {
102116
localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
103-
"LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores))
117+
"LocalBackendEndpoint",
118+
new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
104119
}
105120

106121
override def stop() {

0 commit comments

Comments
 (0)