Skip to content

Commit 3bdcae2

Browse files
committed
Move WritableConverter implicits to object WritableConverter
1 parent 9b73188 commit 3bdcae2

File tree

4 files changed

+96
-11
lines changed

4 files changed

+96
-11
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,32 +1510,40 @@ object SparkContext extends Logging {
15101510
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
15111511
}
15121512

1513-
implicit def intWritableConverter(): WritableConverter[Int] =
1513+
@deprecated("An API for backforward compatibility", "1.2.0")
1514+
def intWritableConverter(): WritableConverter[Int] =
15141515
simpleWritableConverter[Int, IntWritable](_.get)
15151516

1516-
implicit def longWritableConverter(): WritableConverter[Long] =
1517+
@deprecated("An API for backforward compatibility", "1.2.0")
1518+
def longWritableConverter(): WritableConverter[Long] =
15171519
simpleWritableConverter[Long, LongWritable](_.get)
15181520

1519-
implicit def doubleWritableConverter(): WritableConverter[Double] =
1521+
@deprecated("An API for backforward compatibility", "1.2.0")
1522+
def doubleWritableConverter(): WritableConverter[Double] =
15201523
simpleWritableConverter[Double, DoubleWritable](_.get)
15211524

1522-
implicit def floatWritableConverter(): WritableConverter[Float] =
1525+
@deprecated("An API for backforward compatibility", "1.2.0")
1526+
def floatWritableConverter(): WritableConverter[Float] =
15231527
simpleWritableConverter[Float, FloatWritable](_.get)
15241528

1525-
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
1529+
@deprecated("An API for backforward compatibility", "1.2.0")
1530+
def booleanWritableConverter(): WritableConverter[Boolean] =
15261531
simpleWritableConverter[Boolean, BooleanWritable](_.get)
15271532

1528-
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
1533+
@deprecated("An API for backforward compatibility", "1.2.0")
1534+
def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
15291535
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
15301536
// getBytes method returns array which is longer then data to be returned
15311537
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
15321538
)
15331539
}
15341540

1535-
implicit def stringWritableConverter(): WritableConverter[String] =
1541+
@deprecated("An API for backforward compatibility", "1.2.0")
1542+
def stringWritableConverter(): WritableConverter[String] =
15361543
simpleWritableConverter[String, Text](_.toString)
15371544

1538-
implicit def writableWritableConverter[T <: Writable]() =
1545+
@deprecated("An API for backforward compatibility", "1.2.0")
1546+
def writableWritableConverter[T <: Writable]() =
15391547
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
15401548

15411549
/**
@@ -1760,3 +1768,41 @@ private[spark] class WritableConverter[T](
17601768
val writableClass: ClassTag[T] => Class[_ <: Writable],
17611769
val convert: Writable => T)
17621770
extends Serializable
1771+
1772+
object WritableConverter {
1773+
1774+
// Helper objects for converting common types to Writable
1775+
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
1776+
: WritableConverter[T] = {
1777+
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
1778+
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
1779+
}
1780+
1781+
implicit def intWritableConverter(): WritableConverter[Int] =
1782+
simpleWritableConverter[Int, IntWritable](_.get)
1783+
1784+
implicit def longWritableConverter(): WritableConverter[Long] =
1785+
simpleWritableConverter[Long, LongWritable](_.get)
1786+
1787+
implicit def doubleWritableConverter(): WritableConverter[Double] =
1788+
simpleWritableConverter[Double, DoubleWritable](_.get)
1789+
1790+
implicit def floatWritableConverter(): WritableConverter[Float] =
1791+
simpleWritableConverter[Float, FloatWritable](_.get)
1792+
1793+
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
1794+
simpleWritableConverter[Boolean, BooleanWritable](_.get)
1795+
1796+
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
1797+
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
1798+
// getBytes method returns array which is longer then data to be returned
1799+
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
1800+
)
1801+
}
1802+
1803+
implicit def stringWritableConverter(): WritableConverter[String] =
1804+
simpleWritableConverter[String, Text](_.toString)
1805+
1806+
implicit def writableWritableConverter[T <: Writable]() =
1807+
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
1808+
}

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,12 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
3232

3333
import org.apache.spark.{HashPartitioner, Partitioner}
3434
import org.apache.spark.Partitioner._
35-
import org.apache.spark.SparkContext.rddToPairRDDFunctions
3635
import org.apache.spark.annotation.Experimental
3736
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3837
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
3938
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
4039
import org.apache.spark.partial.{BoundedDouble, PartialResult}
41-
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
40+
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD, rddToPairRDDFunctions}
4241
import org.apache.spark.storage.StorageLevel
4342
import org.apache.spark.util.Utils
4443

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
3333
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3434

3535
import org.apache.spark._
36-
import org.apache.spark.SparkContext._
36+
import org.apache.spark.AccumulatorParam._
3737
import org.apache.spark.annotation.Experimental
3838
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3939
import org.apache.spark.broadcast.Broadcast

core/src/test/scala/org/apache/spark/ImplicitSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,44 @@ class ImplicitSuite {
8585
val sc = mockSparkContext
8686
sc.accumulator(123F)
8787
}
88+
89+
def testIntWritableConverter(): Unit = {
90+
val sc = mockSparkContext
91+
sc.sequenceFile[Int, Int]("/a/test/path")
92+
}
93+
94+
def testLongWritableConverter(): Unit = {
95+
val sc = mockSparkContext
96+
sc.sequenceFile[Long, Long]("/a/test/path")
97+
}
98+
99+
def testDoubleWritableConverter(): Unit = {
100+
val sc = mockSparkContext
101+
sc.sequenceFile[Double, Double]("/a/test/path")
102+
}
103+
104+
def testFloatWritableConverter(): Unit = {
105+
val sc = mockSparkContext
106+
sc.sequenceFile[Float, Float]("/a/test/path")
107+
}
108+
109+
def testBooleanWritableConverter(): Unit = {
110+
val sc = mockSparkContext
111+
sc.sequenceFile[Boolean, Boolean]("/a/test/path")
112+
}
113+
114+
def testBytesWritableConverter(): Unit = {
115+
val sc = mockSparkContext
116+
sc.sequenceFile[Array[Byte], Array[Byte]]("/a/test/path")
117+
}
118+
119+
def testStringWritableConverter(): Unit = {
120+
val sc = mockSparkContext
121+
sc.sequenceFile[String, String]("/a/test/path")
122+
}
123+
124+
def testWritableWritableConverter(): Unit = {
125+
val sc = mockSparkContext
126+
sc.sequenceFile[org.apache.hadoop.io.Text, org.apache.hadoop.io.Text]("/a/test/path")
127+
}
88128
}

0 commit comments

Comments
 (0)