Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add support for running pipe tasks is separate directories
  • Loading branch information
tgravescs committed Mar 12, 2014
commit 1ab49ca90b7cae82efa26e018d9d285c948bf25c
9 changes: 4 additions & 5 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,13 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to the PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the diff is just a messy. I made commons-io available outside of the test scope since I use FileUtils.delete and moved it up a couple lines. derby didn't change.

<scope>test</scope>
</dependency>
<dependency>
Expand Down
62 changes: 59 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.rdd

import java.io.File
import java.io.FilenameFilter
import java.io.PrintWriter
import java.util.StringTokenizer

Expand All @@ -26,7 +28,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag

import org.apache.commons.io.FileUtils
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.util.Utils


/**
Expand All @@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
command: Seq[String],
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit)
printRDDElement: (T, String => Unit) => Unit,
separateWorkingDir: Boolean)
extends RDD[String](prev) {

// Similar to Runtime.exec(), if we are given a single string, split it into words
Expand All @@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
separateWorkingDir)


override def getPartitions: Array[Partition] = firstParent[T].partitions

/**
* A FilenameFilter that accepts anything that isn't equal to the name passed in.
* @param name of file or directory to leave out
*/
class NotEqualsFileNameFilter(name: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean = {
!name.equals(name)
}
}

override def compute(split: Partition, context: TaskContext): Iterator[String] = {
val pb = new ProcessBuilder(command)
// Add the environmental variables to the process.
Expand All @@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
}

// When spark.worker.separated.working.directory option is turned on, each
// task will be run in separate directory. This should be resolve file
// access conflict issue
val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
var workInTaskDirectory = false
logDebug("taskDirectory = " + taskDirectory)
if (separateWorkingDir == true) {
val currentDir = new File(".")
logDebug("currentDir = " + currentDir)
val taskDirFile = new File(taskDirectory)
taskDirFile.mkdirs()

try {
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")

// Need to add symlinks to jars, files, and directories. On Yarn we could have
// directories and other files not known to the SparkContext that were added via the
// Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
// are creating here.
for (file <- currentDir.list(tasksDirFilter)) {
val fileWithDir = new File(currentDir, file)
Utils.symlink(new File(fileWithDir.getAbsolutePath()),
new File(taskDirectory + "/" + fileWithDir.getName()))
}
pb.directory(taskDirFile)
workInTaskDirectory = true
} catch {
case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
" (" + taskDirectory + ")")
}
}

val proc = pb.start()
val env = SparkEnv.get

Expand Down Expand Up @@ -112,6 +161,13 @@ class PipedRDD[T: ClassTag](
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}

// cleanup task working directory if used
if(workInTaskDirectory == true) {
FileUtils.deleteQuietly(new File(taskDirectory))
logDebug("Removed task working directory " + taskDirectory)
}

false
}
}
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -478,16 +478,19 @@ abstract class RDD[T: ClassTag](
* instead of constructing a huge String to concat all the elements:
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2){f(e)}
* @param separateWorkingDir Use separate working directories for each task.
* @return the result RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false): RDD[String] = {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir)
}

/**
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SortedSet
import scala.io.Source
import scala.reflect.ClassTag

Expand Down Expand Up @@ -895,4 +896,24 @@ private[spark] object Utils extends Logging {
}
count
}

/**
* Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
* for jdk1.6 support. Doesn't support windows or any other platform without 'ln'.
* @param src absolute path to the source
* @param dst relative path for the destination
*/
def symlink(src: File, dst: File) {
if (!src.isAbsolute()) {
throw new IOException("Source must be absolute")
}
if (dst.isAbsolute()) {
throw new IOException("Destination must be relative")
}
import scala.sys.process._
("ln -sf " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
(logInfo(line)))
}


}
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
}

test("basic pipe with separate working directory") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)

val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)

val c = piped.collect()
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
println("collect pwd is: " + collectPwd(0))
assert(collectPwd(0).contains("tasks/"))
assert(collectPwd(0).matches("tasks/"))
} else {
assert(true)
}
}

test("test pipe exports map_input_file") {
testExportInputFile("map_input_file")
}
Expand Down
4 changes: 2 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ object SparkBuild extends Build {
"org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106",
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"commons-io" % "commons-io" % "2.4",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
"org.mockito" % "mockito-all" % "1.8.5" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Expand Down