Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add mapPartitionsWithIndex for RDDBarrier
  • Loading branch information
ConeyLiu committed Oct 17, 2019
commit c8e6ded9bfbb1e1e1dfc677242ce39834766abcb
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,27 @@ class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) {
)
}

/**
* :: Experimental ::
* Returns a new RDD by applying a function to each partition of the wrapped RDD,
* where tasks are launched together in a barrier stage.
* The interface is the same as [[org.apache.spark.rdd.RDD#mapPartitionsWithIndex]].
* Please see the API doc there.
* @see [[org.apache.spark.BarrierTaskContext]]
*/
@Experimental
@Since("2.4.0")
def mapPartitionsWithIndex[S: ClassTag](
f: (Int, Iterator[T]) => Iterator[S],
preservesPartitioning: Boolean = false): RDD[S] = rdd.withScope {
val cleanedF = rdd.sparkContext.clean(f)
new MapPartitionsRDD(
rdd,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning,
isFromBarrier = true
)
}

// TODO: [SPARK-25247] add extra conf to RDDBarrier, e.g., timeout.
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ class RDDBarrierSuite extends SparkFunSuite with SharedSparkContext {
assert(rdd2.isBarrier())
}

test("RDDBarrier mapPartitionsWithIndex") {
val rdd = sc.parallelize(1 to 10, 4)
assert(rdd.isBarrier() === false)

val rdd2 = rdd.barrier().mapPartitionsWithIndex((_, iter) => iter)
assert(rdd2.isBarrier())
}

test("create an RDDBarrier in the middle of a chain of RDDs") {
val rdd = sc.parallelize(1 to 10, 4).map(x => x * 2)
val rdd2 = rdd.barrier().mapPartitions(iter => iter).map(x => (x, x + 1))
Expand Down
13 changes: 13 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2535,6 +2535,19 @@ def func(s, iterator):
return f(iterator)
return PipelinedRDD(self.rdd, func, preservesPartitioning, isFromBarrier=True)

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
.. note:: Experimental

Returns a new RDD by applying a function to each partition of the wrapped RDD,
where tasks are launched together in a barrier stage.
The interface is the same as :func:`RDD.mapPartitionsWithIndex`.
Please see the API doc there.

.. versionadded:: 2.4.0
"""
return PipelinedRDD(self.rdd, f, preservesPartitioning, isFromBarrier=True)


class PipelinedRDD(RDD):

Expand Down