Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd
8 changes: 6 additions & 2 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

pushd $FWDIR
pushd $FWDIR > /dev/null

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
1 change: 0 additions & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ Collate:
'serialize.R'
'sparkR.R'
'utils.R'
'zzz.R'
2 changes: 0 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
serializedFuncArr,
rdd@env$prev_serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
} else {
Expand All @@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
rdd@env$prev_serializedMode,
serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
Expand Down
1 change: 0 additions & 1 deletion R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ setMethod("partitionBy",
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
as.character(.sparkREnv$libname),
broadcastArr,
callJMethod(jrdd, "classTag"))

Expand Down
10 changes: 0 additions & 10 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

.sparkREnv <- new.env()

sparkR.onLoad <- function(libname, pkgname) {
.sparkREnv$libname <- libname
}

# Utility function that returns TRUE if we have an active connection to the
# backend and FALSE otherwise
connExists <- function(env) {
Expand Down Expand Up @@ -80,7 +76,6 @@ sparkR.stop <- function() {
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkPackages Character string vector of packages from spark-packages.org
#' @export
#' @examples
Expand All @@ -101,7 +96,6 @@ sparkR.init <- function(
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "",
sparkPackages = "") {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
Expand Down Expand Up @@ -170,10 +164,6 @@ sparkR.init <- function(
sparkHome <- normalizePath(sparkHome)
}

if (nchar(sparkRLibDir) != 0) {
.sparkREnv$libname <- sparkRLibDir
}

sparkEnvirMap <- new.env()
for (varname in names(sparkEnvir)) {
sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
Expand Down
20 changes: 0 additions & 20 deletions R/pkg/R/zzz.R

This file was deleted.

4 changes: 2 additions & 2 deletions R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

.First <- function() {
home <- Sys.getenv("SPARK_HOME")
.libPaths(c(file.path(home, "R", "lib"), .libPaths()))
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
.libPaths(c(packageDir, .libPaths()))
Sys.setenv(NOAWT=1)
}
21 changes: 9 additions & 12 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
deserializer: String,
serializer: String,
packageNames: Array[Byte],
rLibDir: String,
broadcastVars: Array[Broadcast[Object]])
extends RDD[U](parent) with Logging {
protected var dataStream: DataInputStream = _
Expand All @@ -60,7 +59,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](

// The stdout/stderr is shared by multiple tasks, because we use one daemon
// to launch child process as worker.
val errThread = RRDD.createRWorker(rLibDir, listenPort)
val errThread = RRDD.createRWorker(listenPort)

// We use two sockets to separate input and output, then it's easy to manage
// the lifecycle of them to avoid deadlock.
Expand Down Expand Up @@ -235,11 +234,10 @@ private class PairwiseRRDD[T: ClassTag](
hashFunc: Array[Byte],
deserializer: String,
packageNames: Array[Byte],
rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, (Int, Array[Byte])](
parent, numPartitions, hashFunc, deserializer,
SerializationFormats.BYTE, packageNames, rLibDir,
SerializationFormats.BYTE, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {

override protected def readData(length: Int): (Int, Array[Byte]) = {
Expand All @@ -266,10 +264,9 @@ private class RRDD[T: ClassTag](
deserializer: String,
serializer: String,
packageNames: Array[Byte],
rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, Array[Byte]](
parent, -1, func, deserializer, serializer, packageNames, rLibDir,
parent, -1, func, deserializer, serializer, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {

override protected def readData(length: Int): Array[Byte] = {
Expand All @@ -293,10 +290,9 @@ private class StringRRDD[T: ClassTag](
func: Array[Byte],
deserializer: String,
packageNames: Array[Byte],
rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, String](
parent, -1, func, deserializer, SerializationFormats.STRING, packageNames, rLibDir,
parent, -1, func, deserializer, SerializationFormats.STRING, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {

override protected def readData(length: Int): String = {
Expand Down Expand Up @@ -392,9 +388,10 @@ private[r] object RRDD {
thread
}

private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = {
private def createRProcess(port: Int, script: String): BufferedStreamThread = {
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
val rOptions = "--vanilla"
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
val rExecScript = rLibDir + "/SparkR/worker/" + script
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
// Unset the R_TESTS environment variable for workers.
Expand All @@ -413,15 +410,15 @@ private[r] object RRDD {
/**
* ProcessBuilder used to launch worker R processes.
*/
def createRWorker(rLibDir: String, port: Int): BufferedStreamThread = {
def createRWorker(port: Int): BufferedStreamThread = {
val useDaemon = SparkEnv.get.conf.getBoolean("spark.sparkr.use.daemon", true)
if (!Utils.isWindows && useDaemon) {
synchronized {
if (daemonChannel == null) {
// we expect one connections
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val daemonPort = serverSocket.getLocalPort
errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
errThread = createRProcess(daemonPort, "daemon.R")
// the socket used to send out the input of task
serverSocket.setSoTimeout(10000)
val sock = serverSocket.accept()
Expand All @@ -443,7 +440,7 @@ private[r] object RRDD {
errThread
}
} else {
createRProcess(rLibDir, port, "worker.R")
createRProcess(port, "worker.R")
}
}

Expand Down
65 changes: 65 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/RUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.r

import java.io.File

import org.apache.spark.{SparkEnv, SparkException}

private[spark] object RUtils {
/**
* Get the SparkR package path in the local spark distribution.
*/
def localSparkRPackagePath: Option[String] = {
val sparkHome = sys.env.get("SPARK_HOME")
sparkHome.map(
Seq(_, "R", "lib").mkString(File.separator)
)
}

/**
* Get the SparkR package path in various deployment modes.
* This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
* and environment variable `SPARK_HOME` are set.
*/
def sparkRPackagePath(isDriver: Boolean): String = {
val (master, deployMode) =
if (isDriver) {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
} else {
val sparkConf = SparkEnv.get.conf
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
}

val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
val isYarnClient = master.contains("yarn") && deployMode == "client"

// In YARN mode, the SparkR package is distributed as an archive symbolically
// linked to the "sparkr" file in the current directory. Note that this does not apply
// to the driver in client mode because it is run outside of the cluster.
if (isYarnCluster || (isYarnClient && !isDriver)) {
new File("sparkr").getAbsolutePath
} else {
// Otherwise, assume the package is local
// TODO: support this for Mesos
localSparkRPackagePath.getOrElse {
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
}
}
}
}
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._

import org.apache.hadoop.fs.Path

import org.apache.spark.api.r.RBackend
import org.apache.spark.api.r.{RBackend, RUtils}
import org.apache.spark.util.RedirectThread

/**
Expand Down Expand Up @@ -71,9 +71,10 @@ object RRunner {
val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
val env = builder.environment()
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
val sparkHome = System.getenv("SPARK_HOME")
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
env.put("SPARKR_PACKAGE_DIR", rPackageDir)
env.put("R_PROFILE_USER",
Seq(sparkHome, "R", "lib", "SparkR", "profile", "general.R").mkString(File.separator))
Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()

Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
import org.apache.spark.api.r.RUtils
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
Expand Down Expand Up @@ -79,6 +80,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"

private val CLASS_NOT_FOUND_EXIT_STATUS = 101

Expand Down Expand Up @@ -262,6 +264,12 @@ object SparkSubmit {
}
}

// Update args.deployMode if it is null. It will be passed down as a Spark property later.
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER

Expand Down Expand Up @@ -347,6 +355,23 @@ object SparkSubmit {
}
}

// In YARN mode for an R app, add the SparkR package archive to archives
// that can be distributed with the job
if (args.isR && clusterManager == YARN) {
val rPackagePath = RUtils.localSparkRPackagePath
if (rPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)

// Assigns a symbol link name "sparkr" to the shipped package.
args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github won't let me comment down there, but in L388 of this file we should add the deploy mode to a system property:

OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
  sysProp = "spark.submit.deployMode")

This is needed for RUtils.sparkRPackagePath, so we don't try to parse the deploy mode from spark.master.


// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand Down Expand Up @@ -375,6 +400,8 @@ object SparkSubmit {

// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class SparkSubmitSuite
mainClass should be ("org.apache.spark.deploy.Client")
}
classpath should have size 0
sysProps should have size 8
sysProps should have size 9
sysProps.keys should contain ("SPARK_SUBMIT")
sysProps.keys should contain ("spark.master")
sysProps.keys should contain ("spark.app.name")
Expand All @@ -255,6 +255,7 @@ class SparkSubmitSuite
sysProps.keys should contain ("spark.driver.cores")
sysProps.keys should contain ("spark.driver.supervise")
sysProps.keys should contain ("spark.shuffle.spill")
sysProps.keys should contain ("spark.submit.deployMode")
sysProps("spark.shuffle.spill") should be ("false")
}

Expand Down
1 change: 1 addition & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR"
if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
mkdir -p "$DISTDIR"/R/lib
cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib
cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib
fi

# Download and copy in tachyon, if requested
Expand Down
Loading