-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4397][Core] Reorganize 'implicit's to improve the API convenience #3262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 10 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
1eda9e4
Reorganize 'implicit's to improve the API convenience
zsxwing 3ac4f07
Add license header
zsxwing 9b73188
Fix the code style issue
zsxwing 3bdcae2
Move WritableConverter implicits to object WritableConverter
zsxwing 185c12f
Remove simpleWritableConverter from SparkContext
zsxwing 7266218
Add comments to warn the duplicate codes in SparkContext
zsxwing 34641d4
Move ImplicitSuite to org.apache.sparktest
zsxwing 52353de
Remove private[spark] from object WritableConverter
zsxwing 2b5f5a4
Comments for the deprecated functions
zsxwing 9c27aff
Move implicit functions to object RDD and forward old functions to ne…
zsxwing fc30314
Update the comments
zsxwing File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1427,47 +1427,74 @@ object SparkContext extends Logging { | |
|
|
||
| private[spark] val DRIVER_IDENTIFIER = "<driver>" | ||
|
|
||
| implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { | ||
| // The following deprecated objects have already been copied to `object AccumulatorParam` to | ||
| // make the compiler find them automatically. They are duplicate codes only for backward | ||
| // compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the | ||
| // following ones. | ||
|
|
||
| @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| object DoubleAccumulatorParam extends AccumulatorParam[Double] { | ||
| def addInPlace(t1: Double, t2: Double): Double = t1 + t2 | ||
| def zero(initialValue: Double) = 0.0 | ||
| } | ||
|
|
||
| implicit object IntAccumulatorParam extends AccumulatorParam[Int] { | ||
| @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| object IntAccumulatorParam extends AccumulatorParam[Int] { | ||
| def addInPlace(t1: Int, t2: Int): Int = t1 + t2 | ||
| def zero(initialValue: Int) = 0 | ||
| } | ||
|
|
||
| implicit object LongAccumulatorParam extends AccumulatorParam[Long] { | ||
| @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| object LongAccumulatorParam extends AccumulatorParam[Long] { | ||
| def addInPlace(t1: Long, t2: Long) = t1 + t2 | ||
| def zero(initialValue: Long) = 0L | ||
| } | ||
|
|
||
| implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { | ||
| @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| object FloatAccumulatorParam extends AccumulatorParam[Float] { | ||
| def addInPlace(t1: Float, t2: Float) = t1 + t2 | ||
| def zero(initialValue: Float) = 0f | ||
| } | ||
|
|
||
| // TODO: Add AccumulatorParams for other types, e.g. lists and strings | ||
| // The following deprecated functions have already been moved to `object RDD` to | ||
| // make the compiler find them automatically. They are still kept here for backward compatibility | ||
| // and just call the corresponding functions in `object RDD`. | ||
|
|
||
| implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) | ||
| @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All these comments are outdated (still refer to package object, but should refer to
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you. Fixed it. |
||
| "kept here only for backward compatibility.", "1.2.0") | ||
| def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) | ||
| (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { | ||
| new PairRDDFunctions(rdd) | ||
| RDD.rddToPairRDDFunctions(rdd) | ||
| } | ||
|
|
||
| implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) | ||
| @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + | ||
| "kept here only for backward compatibility.", "1.2.0") | ||
| def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd) | ||
|
|
||
| implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( | ||
| @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + | ||
| "kept here only for backward compatibility.", "1.2.0") | ||
| def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( | ||
| rdd: RDD[(K, V)]) = | ||
| new SequenceFileRDDFunctions(rdd) | ||
| RDD.rddToSequenceFileRDDFunctions(rdd) | ||
|
|
||
| implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( | ||
| @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + | ||
| "kept here only for backward compatibility.", "1.2.0") | ||
| def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( | ||
| rdd: RDD[(K, V)]) = | ||
| new OrderedRDDFunctions[K, V, (K, V)](rdd) | ||
| RDD.rddToOrderedRDDFunctions(rdd) | ||
|
|
||
| implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) | ||
| @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + | ||
| "kept here only for backward compatibility.", "1.2.0") | ||
| def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd) | ||
|
|
||
| implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = | ||
| new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) | ||
| @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + | ||
| "kept here only for backward compatibility.", "1.2.0") | ||
| def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = | ||
| RDD.numericRDDToDoubleRDDFunctions(rdd) | ||
|
|
||
| // Implicit conversions to common Writable types, for saveAsSequenceFile | ||
|
|
||
|
|
@@ -1493,40 +1520,49 @@ object SparkContext extends Logging { | |
| arr.map(x => anyToWritable(x)).toArray) | ||
| } | ||
|
|
||
| // Helper objects for converting common types to Writable | ||
| private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) | ||
| : WritableConverter[T] = { | ||
| val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] | ||
| new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) | ||
| } | ||
| // The following deprecated functions have already been moved to `object WritableConverter` to | ||
| // make the compiler find them automatically. They are still kept here for backward compatibility | ||
| // and just call the corresponding functions in `object WritableConverter`. | ||
|
|
||
| implicit def intWritableConverter(): WritableConverter[Int] = | ||
| simpleWritableConverter[Int, IntWritable](_.get) | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def intWritableConverter(): WritableConverter[Int] = | ||
| WritableConverter.intWritableConverter() | ||
|
|
||
| implicit def longWritableConverter(): WritableConverter[Long] = | ||
| simpleWritableConverter[Long, LongWritable](_.get) | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def longWritableConverter(): WritableConverter[Long] = | ||
| WritableConverter.longWritableConverter() | ||
|
|
||
| implicit def doubleWritableConverter(): WritableConverter[Double] = | ||
| simpleWritableConverter[Double, DoubleWritable](_.get) | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def doubleWritableConverter(): WritableConverter[Double] = | ||
| WritableConverter.doubleWritableConverter() | ||
|
|
||
| implicit def floatWritableConverter(): WritableConverter[Float] = | ||
| simpleWritableConverter[Float, FloatWritable](_.get) | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def floatWritableConverter(): WritableConverter[Float] = | ||
| WritableConverter.floatWritableConverter() | ||
|
|
||
| implicit def booleanWritableConverter(): WritableConverter[Boolean] = | ||
| simpleWritableConverter[Boolean, BooleanWritable](_.get) | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def booleanWritableConverter(): WritableConverter[Boolean] = | ||
| WritableConverter.booleanWritableConverter() | ||
|
|
||
| implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { | ||
| simpleWritableConverter[Array[Byte], BytesWritable](bw => | ||
| // getBytes method returns array which is longer then data to be returned | ||
| Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) | ||
| ) | ||
| } | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def bytesWritableConverter(): WritableConverter[Array[Byte]] = | ||
| WritableConverter.bytesWritableConverter() | ||
|
|
||
| implicit def stringWritableConverter(): WritableConverter[String] = | ||
| simpleWritableConverter[String, Text](_.toString) | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def stringWritableConverter(): WritableConverter[String] = | ||
| WritableConverter.stringWritableConverter() | ||
|
|
||
| implicit def writableWritableConverter[T <: Writable]() = | ||
| new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) | ||
| @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + | ||
| "backward compatibility.", "1.2.0") | ||
| def writableWritableConverter[T <: Writable]() = | ||
| WritableConverter.writableWritableConverter() | ||
|
|
||
| /** | ||
| * Find the JAR from which a given class was loaded, to make it easy for users to pass | ||
|
|
@@ -1750,3 +1786,46 @@ private[spark] class WritableConverter[T]( | |
| val writableClass: ClassTag[T] => Class[_ <: Writable], | ||
| val convert: Writable => T) | ||
| extends Serializable | ||
|
|
||
| object WritableConverter { | ||
|
|
||
| // Helper objects for converting common types to Writable | ||
| private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) | ||
| : WritableConverter[T] = { | ||
| val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] | ||
| new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) | ||
| } | ||
|
|
||
| // The following implicit functions were in SparkContext before 1.2 and users had to | ||
| // `import SparkContext._` to enable them. Now we move them here to make the compiler find | ||
| // them automatically. However, we still keep the old functions in SparkContext for backward | ||
| // compatibility and forward to the following functions directly. | ||
|
|
||
| implicit def intWritableConverter(): WritableConverter[Int] = | ||
| simpleWritableConverter[Int, IntWritable](_.get) | ||
|
|
||
| implicit def longWritableConverter(): WritableConverter[Long] = | ||
| simpleWritableConverter[Long, LongWritable](_.get) | ||
|
|
||
| implicit def doubleWritableConverter(): WritableConverter[Double] = | ||
| simpleWritableConverter[Double, DoubleWritable](_.get) | ||
|
|
||
| implicit def floatWritableConverter(): WritableConverter[Float] = | ||
| simpleWritableConverter[Float, FloatWritable](_.get) | ||
|
|
||
| implicit def booleanWritableConverter(): WritableConverter[Boolean] = | ||
| simpleWritableConverter[Boolean, BooleanWritable](_.get) | ||
|
|
||
| implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { | ||
| simpleWritableConverter[Array[Byte], BytesWritable](bw => | ||
| // getBytes method returns array which is longer then data to be returned | ||
| Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) | ||
| ) | ||
| } | ||
|
|
||
| implicit def stringWritableConverter(): WritableConverter[String] = | ||
| simpleWritableConverter[String, Text](_.toString) | ||
|
|
||
| implicit def writableWritableConverter[T <: Writable]() = | ||
| new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this provide binary compatibility for Spark programs compiled against earlier versions of Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I mentioned it in the description of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind updating the deprecation message to say
"Replaced by implicit objects in AccumulatorParam. This is kept here only for backward binary compatibility."
Do it for all the following.