Skip to content
Merged
Prev Previous commit
Next Next commit
[SPARK-10500][SPARKR] sparkr.zip cannot be created if /R/lib is unwri…
…table

Backport apache#9390 and apache#9744 to branch-1.5.

Author: Sun Rui <[email protected]>
Author: Shivaram Venkataraman <[email protected]>

Closes apache#10372 from sun-rui/SPARK-10500-branch-1.5.
  • Loading branch information
Sun Rui authored and shivaram committed Dec 18, 2015
commit d2f71c27c0f43475a69791693f1684e106e9b475
6 changes: 6 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd

4 changes: 4 additions & 0 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
14 changes: 13 additions & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ sparkR.stop <- function() {
}
}

# Remove the R package lib path from .libPaths()
if (exists(".libPath", envir = env)) {
libPath <- get(".libPath", envir = env)
.libPaths(.libPaths()[.libPaths() != libPath])
}

if (exists(".backendLaunched", envir = env)) {
callJStatic("SparkRHandler", "stopBackend")
}
Expand Down Expand Up @@ -149,14 +155,20 @@ sparkR.init <- function(
f <- file(path, open="rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
length(monitorPort) == 0 || monitorPort == 0) {
length(monitorPort) == 0 || monitorPort == 0 ||
length(rLibPath) != 1) {
stop("JVM failed to launch")
}
assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
if (rLibPath != "") {
assign(".libPath", rLibPath, envir = .sparkREnv)
.libPaths(c(rLibPath, .libPaths()))
}
}

.sparkREnv$backendPort <- backendPort
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

.First <- function() {
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
.libPaths(c(packageDir, .libPaths()))
dirs <- strsplit(packageDir, ",")[[1]]
.libPaths(c(dirs, .libPaths()))
Sys.setenv(NOAWT=1)
}
5 changes: 3 additions & 2 deletions R/pkg/inst/worker/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
# Worker daemon

rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/")
dirs <- strsplit(rLibDir, ",")[[1]]
script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")

# preload SparkR package, speedup worker
.libPaths(c(rLibDir, .libPaths()))
.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()

rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
dirs <- strsplit(rLibDir, ",")[[1]]
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
# SparkR namespace
.libPaths(c(rLibDir, .libPaths()))
.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private[spark] object RBackend extends Logging {
val dos = new DataOutputStream(new FileOutputStream(f))
dos.writeInt(boundPort)
dos.writeInt(listenPort)
SerDe.writeString(dos, RUtils.rPackages.getOrElse(""))
dos.close()
f.renameTo(new File(path))

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,14 @@ private[r] object RRDD {
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
val rOptions = "--vanilla"
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
val rExecScript = rLibDir + "/SparkR/worker/" + script
val rExecScript = rLibDir(0) + "/SparkR/worker/" + script
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
// Unset the R_TESTS environment variable for workers.
// This is set by R CMD check as startup.Rs
// (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R)
// and confuses worker script which tries to load a non-existent file
pb.environment().put("R_TESTS", "")
pb.environment().put("SPARKR_RLIBDIR", rLibDir)
pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
pb.redirectErrorStream(true) // redirect stderr into stdout
val proc = pb.start()
Expand Down
37 changes: 30 additions & 7 deletions core/src/main/scala/org/apache/spark/api/r/RUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import scala.collection.JavaConversions._
import org.apache.spark.{SparkEnv, SparkException}

private[spark] object RUtils {
// Local path where R binary packages built from R source code contained in the spark
// packages specified with "--packages" or "--jars" command line option reside.
var rPackages: Option[String] = None

/**
* Get the SparkR package path in the local spark distribution.
*/
Expand All @@ -35,11 +39,15 @@ private[spark] object RUtils {
}

/**
* Get the SparkR package path in various deployment modes.
* Get the list of paths for R packages in various deployment modes, of which the first
* path is for the SparkR package itself. The second path is for R packages built as
* part of Spark Packages, if any exist. Spark Packages can be provided through the
* "--packages" or "--jars" command line options.
*
* This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
* and environment variable `SPARK_HOME` are set.
*/
def sparkRPackagePath(isDriver: Boolean): String = {
def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
val (master, deployMode) =
if (isDriver) {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
Expand All @@ -52,15 +60,30 @@ private[spark] object RUtils {
val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"

// In YARN mode, the SparkR package is distributed as an archive symbolically
// linked to the "sparkr" file in the current directory. Note that this does not apply
// to the driver in client mode because it is run outside of the cluster.
// linked to the "sparkr" file in the current directory and additional R packages
// are distributed as an archive symbolically linked to the "rpkg" file in the
// current directory.
//
// Note that this does not apply to the driver in client mode because it is run
// outside of the cluster.
if (isYarnCluster || (isYarnClient && !isDriver)) {
new File("sparkr").getAbsolutePath
val sparkRPkgPath = new File("sparkr").getAbsolutePath
val rPkgPath = new File("rpkg")
if (rPkgPath.exists()) {
Seq(sparkRPkgPath, rPkgPath.getAbsolutePath)
} else {
Seq(sparkRPkgPath)
}
} else {
// Otherwise, assume the package is local
// TODO: support this for Mesos
localSparkRPackagePath.getOrElse {
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
val sparkRPkgPath = localSparkRPackagePath.getOrElse {
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
}
if (!rPackages.isEmpty) {
Seq(sparkRPkgPath, rPackages.get)
} else {
Seq(sparkRPkgPath)
}
}
}
Expand Down
26 changes: 19 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,29 @@ private[deploy] object RPackageUtils extends Logging {
* Runs the standard R package installation code to build the R package from source.
* Multiple runs don't cause problems.
*/
private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
private def rPackageBuilder(
dir: File,
printStream: PrintStream,
verbose: Boolean,
libDir: String): Boolean = {
// this code should be always running on the driver.
val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
val installCmd = baseInstallCmd ++ Seq(libDir, pathToPkg)
if (verbose) {
print(s"Building R package with the command: $installCmd", printStream)
}
try {
val builder = new ProcessBuilder(installCmd)
builder.redirectErrorStream(true)

// Put the SparkR package directory into R library search paths in case this R package
// may depend on SparkR.
val env = builder.environment()
env.clear()
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
env.put("R_PROFILE_USER",
Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))

val process = builder.start()
new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
process.waitFor() == 0
Expand Down Expand Up @@ -170,8 +179,11 @@ private[deploy] object RPackageUtils extends Logging {
if (checkManifestForR(jar)) {
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
val rSource = extractRFolder(jar, printStream, verbose)
if (RUtils.rPackages.isEmpty) {
RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
}
try {
if (!rPackageBuilder(rSource, printStream, verbose)) {
if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
}
Expand Down Expand Up @@ -206,7 +218,7 @@ private[deploy] object RPackageUtils extends Logging {
}
}

/** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
/** Zips all the R libraries built for distribution to the cluster. */
private[deploy] def zipRLibraries(dir: File, name: String): File = {
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
// create a zip file from scratch, do not append to existing file.
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ object RRunner {
val env = builder.environment()
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
env.put("SPARKR_PACKAGE_DIR", rPackageDir)
// Put the R package directories into an env variable of comma-separated paths
env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
env.put("R_PROFILE_USER",
Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()

Expand Down
43 changes: 34 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ object SparkSubmit {
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val R_PACKAGE_ARCHIVE = "rpkg.zip"

private val CLASS_NOT_FOUND_EXIT_STATUS = 101

Expand Down Expand Up @@ -362,22 +363,46 @@ object SparkSubmit {
}
}

// In YARN mode for an R app, add the SparkR package archive to archives
// that can be distributed with the job
// In YARN mode for an R app, add the SparkR package archive and the R package
// archive containing all of the built R libraries to archives so that they can
// be distributed with the job
if (args.isR && clusterManager == YARN) {
val rPackagePath = RUtils.localSparkRPackagePath
if (rPackagePath.isEmpty) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
val rPackageFile =
RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

// Distribute the SparkR package.
// Assigns a symbol link name "sparkr" to the shipped package.
args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")

// Distribute the R package archive containing all the built R packages.
if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit("Failed to zip all the built R packages.")
}

val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
// Assigns a symbol link name "rpkg" to the shipped package.
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
}
}

// TODO: Support distributing R packages with standalone cluster
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with standalone cluster is not supported.")
}

// TODO: Support SparkR with mesos cluster
if (args.isR && clusterManager == MESOS) {
printErrorAndExit("SparkR is not supported for Mesos cluster.")
}

// If we're running a R app, set the main class to our specific R runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.util.{ResetSystemProperties, Utils}
Expand Down Expand Up @@ -366,10 +367,9 @@ class SparkSubmitSuite
}
}

test("correctly builds R packages included in a jar with --packages") {
// TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins.
// It's hard to write the test in SparkR (because we can't create the repository dynamically)
/*
// TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds.
// See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log
ignore("correctly builds R packages included in a jar with --packages") {
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Expand All @@ -387,7 +387,6 @@ class SparkSubmitSuite
rScriptDir)
runSparkSubmit(args)
}
*/
}

test("resolves command line argument paths correctly") {
Expand Down
1 change: 1 addition & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR"
if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
mkdir -p "$DISTDIR"/R/lib
cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib
cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib
fi

# Download and copy in tachyon, if requested
Expand Down