Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ case "$1" in
'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
;;

# Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
ExecutorInfo => MesosExecutorInfo, _}

import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -123,14 +124,15 @@ private[spark] class MesosSchedulerBackend(
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
val executorBackendName = classOf[MesosExecutorBackend].getName
if (uri == null) {
val executorPath = new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath
command.setValue("%s %s".format(prefixEnv, executorPath))
val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath
command.setValue(s"$prefixEnv $executorPath $executorBackendName")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; %s ./sbin/spark-executor".format(basename, prefixEnv))
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val cpus = Resource.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.scheduler.mesos

import org.apache.spark.executor.MesosExecutorBackend
import org.scalatest.FunSuite
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
Expand All @@ -37,6 +38,37 @@ import scala.collection.mutable.ArrayBuffer

class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {

test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes()
EasyMock.expect(sc.conf).andReturn(conf).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)

val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")

// uri is null.
val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")

// uri exists.
conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
}

test("mesos resource offers result in launching tasks") {
def createOffer(id: Int, mem: Int, cpu: Int) = {
val builder = Offer.newBuilder()
Expand Down
26 changes: 0 additions & 26 deletions sbin/spark-executor

This file was deleted.