Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,13 @@ abstract class RDD[T: ClassTag](
}

/**
* Return an RDD created by piping elements to a forked external process.
* Return an RDD created by piping elements to a forked external process. The resulting RDD
* is computed by executing the given process once per partition. All elements
* of each input partition are written to a process's stdin as lines of input separated
* by a newline. The resulting partition consists of the process's stdout output, with
* each line of stdout resulting in one element of the output partition. A process is invoked
* even for empty partitions.
*
* The print behavior can be customized by providing two functions.
*
* @param command command to run in forked process.
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}

test("pipe with empty partition") {
val data = sc.parallelize(Seq("foo", "bing"), 8)
val piped = data.pipe("wc -c")
assert(piped.count == 8)
val charCounts = piped.map(_.trim.toInt).collect().toSet
assert(Set(0, 4, 5) == charCounts)
}

test("pipe with env variable") {
if (testCommandAvailable("printenv")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
Expand Down