Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Used fake class tag syntax
  • Loading branch information
ScrapCodes committed Mar 3, 2014
commit 80a13e8b9a2d49a1de5dee263102ac180a9b7077
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T

/** Return a new DStream by applying a function to all elements of this DStream. */
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}

Expand All @@ -160,7 +160,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}

Expand Down Expand Up @@ -284,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
implicit val cm: ClassTag[U] = fakeClassTag

def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
Expand All @@ -296,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
implicit val cm: ClassTag[U] = fakeClassTag

def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
Expand All @@ -309,10 +309,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassTag[K2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
implicit val cmv: ClassTag[V2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
implicit val cmk: ClassTag[K2] = fakeClassTag
implicit val cmv: ClassTag[V2] = fakeClassTag

def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
Expand All @@ -324,10 +323,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassTag[K2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
implicit val cmv: ClassTag[V2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
implicit val cmk: ClassTag[K2] = fakeClassTag
implicit val cmv: ClassTag[V2] = fakeClassTag

def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
Expand All @@ -341,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
): JavaDStream[W] = {
implicit val cmu: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
implicit val cmv: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cmu: ClassTag[U] = fakeClassTag
implicit val cmv: ClassTag[W] = fakeClassTag

def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
Expand All @@ -358,12 +355,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
): JavaPairDStream[K2, V2] = {
implicit val cmu: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
implicit val cmk2: ClassTag[K2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
implicit val cmv2: ClassTag[V2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
implicit val cmu: ClassTag[U] = fakeClassTag
implicit val cmk2: ClassTag[K2] = fakeClassTag
implicit val cmv2: ClassTag[V2] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
Expand All @@ -377,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
): JavaDStream[W] = {
implicit val cmk2: ClassTag[K2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
implicit val cmv2: ClassTag[V2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
implicit val cmw: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cmk2: ClassTag[K2] = fakeClassTag
implicit val cmv2: ClassTag[V2] = fakeClassTag
implicit val cmw: ClassTag[W] = fakeClassTag

def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
Expand All @@ -396,14 +388,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
): JavaPairDStream[K3, V3] = {
implicit val cmk2: ClassTag[K2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
implicit val cmv2: ClassTag[V2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
implicit val cmk3: ClassTag[K3] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
implicit val cmv3: ClassTag[V3] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
implicit val cmk2: ClassTag[K2] = fakeClassTag
implicit val cmv2: ClassTag[V2] = fakeClassTag
implicit val cmk3: ClassTag[K3] = fakeClassTag
implicit val cmv3: ClassTag[V3] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@

package org.apache.spark.streaming.api.java

import java.util.{List => JList}
import java.lang.{Long => JLong}
import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.Partitioner
import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.api.java.{JavaUtils, JavaPairRDD}
import org.apache.spark.Partitioner
import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.storage.StorageLevel
import com.google.common.base.Optional
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream

/**
Expand Down Expand Up @@ -169,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
implicit val cm: ClassTag[C] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}

Expand All @@ -185,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
partitioner: Partitioner,
mapSideCombine: Boolean
): JavaPairDStream[K, C] = {
implicit val cm: ClassTag[C] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
}

Expand Down Expand Up @@ -454,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
implicit val cm: ClassTag[S] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}

Expand All @@ -472,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
implicit val cm: ClassTag[S] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}

Expand All @@ -491,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
implicit val cm: ClassTag[S] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}

Expand All @@ -502,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* 'this' DStream without changing the key.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
implicit val cm: ClassTag[U] = fakeClassTag
dstream.mapValues(f)
}

Expand All @@ -525,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}

Expand All @@ -538,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, numPartitions)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
Expand All @@ -552,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
Expand All @@ -563,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream)
}

Expand All @@ -573,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, numPartitions)
}

Expand All @@ -586,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, partitioner)
}

Expand All @@ -597,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
Expand All @@ -612,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (V, Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
Expand All @@ -626,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
Expand All @@ -653,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], W)] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
Expand All @@ -668,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], W)] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
Expand Down Expand Up @@ -749,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
new JavaDStream[(K, V)](dstream)
}

override val classTag: ClassTag[(K, V)] =
implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
override val classTag: ClassTag[(K, V)] = fakeClassTag
}

object JavaPairDStream {
Expand All @@ -759,10 +741,8 @@ object JavaPairDStream {
}

def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmk: ClassTag[K] = fakeClassTag
implicit val cmv: ClassTag[V] = fakeClassTag
new JavaPairDStream[K, V](dstream.dstream)
}

Expand Down