Skip to content
Closed
Next Next commit
Reorganize 'implicit's to improve the API convenience
  • Loading branch information
zsxwing committed Nov 14, 2014
commit 1eda9e4921617bc71acf2bb502cf3a22ee43c41f
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,29 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
}
}

object AccumulatorParam {

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}
}

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
Expand Down
30 changes: 20 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1427,46 +1427,56 @@ object SparkContext extends Logging {

private[spark] val DRIVER_IDENTIFIER = "<driver>"

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
@deprecated("An API for backforward compatibility", "1.2.0")
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this provide binary compatibility for Spark programs compiled against earlier versions of Spark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this provide binary compatibility for Spark programs compiled against earlier versions of Spark?

Yes. I mentioned it in the description of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind updating the deprecation message to say

"Replaced by implicit objects in AccumulatorParam. This is kept here only for backward binary compatibility."

Do it for all the following.

def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
@deprecated("An API for backforward compatibility", "1.2.0")
object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
@deprecated("An API for backforward compatibility", "1.2.0")
object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
@deprecated("An API for backforward compatibility", "1.2.0")
object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
@deprecated("An API for backforward compatibility", "1.2.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update these accordingly too

def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairRDDFunctions(rdd)
}

implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
@deprecated("An API for backforward compatibility", "1.2.0")
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
@deprecated("An API for backforward compatibility", "1.2.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
@deprecated("An API for backforward compatibility", "1.2.0")
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)

implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
@deprecated("An API for backforward compatibility", "1.2.0")
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)

implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
@deprecated("An API for backforward compatibility", "1.2.0")
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))

// Implicit conversions to common Writable types, for saveAsSequenceFile
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,34 @@

package org.apache.spark

import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.hadoop.io.Writable

/**
* Provides several RDD implementations. See [[org.apache.spark.rdd.RDD]].
*/
package object rdd
package object rdd {

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to add some comment mentioning the duplicate code in SparkContext and say pre-1.2, these were in SparkContext and users had to import SparkContext._

(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairRDDFunctions(rdd)
}

implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)

implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)

implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))

}
73 changes: 73 additions & 0 deletions core/src/test/scala/org/apache/spark/ImplicitSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.apache.spark

/**
* A test suite to make sure all `implicit` functions work correctly.
* Please don't `import org.apache.spark.SparkContext._` in this class.
*
* As `implicit` is a compiler feature, we don't need to run this class.
* What we need to do is making the compiler happy.
*/
class ImplicitSuite {


// We only want to test if `implict` works well with the compiler, so we don't need a real
// SparkContext.
def mockSparkContext[T]: org.apache.spark.SparkContext = null

// We only want to test if `implict` works well with the compiler, so we don't need a real RDD.
def mockRDD[T]: org.apache.spark.rdd.RDD[T] = null

def testRddToPairRDDFunctions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.groupByKey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u add parentheses to groupByKey?

}

def testRddToAsyncRDDActions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[Int] = mockRDD
rdd.countAsync()
}

def testRddToSequenceFileRDDFunctions(): Unit = {
// TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions.
// That will be a breaking change.
import org.apache.spark.SparkContext.intToIntWritable
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}

def testRddToOrderedRDDFunctions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.sortByKey()
}

def testDoubleRDDToDoubleRDDFunctions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[Double] = mockRDD
rdd.stats()
}


def testNumericRDDToDoubleRDDFunctions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[Int] = mockRDD
rdd.stats()
}

def testDoubleAccumulatorParam(): Unit = {
val sc = mockSparkContext
sc.accumulator(123.4)
}

def testIntAccumulatorParam(): Unit = {
val sc = mockSparkContext
sc.accumulator(123)
}

def testLongAccumulatorParam(): Unit = {
val sc = mockSparkContext
sc.accumulator(123L)
}

def testFloatAccumulatorParam(): Unit = {
val sc = mockSparkContext
sc.accumulator(123F)
}
}