Skip to content

Commit 1c17773

Browse files
committed
fixed explicit types in core package
1 parent 087eedc commit 1c17773

File tree

16 files changed

+52
-34
lines changed

16 files changed

+52
-34
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
3535
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3636
import org.apache.mesos.MesosNativeLibrary
3737

38+
import org.apache.spark.broadcast.Broadcast
3839
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
3940
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4041
import org.apache.spark.rdd._
@@ -209,7 +210,7 @@ class SparkContext(
209210
ui.start()
210211

211212
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
212-
val hadoopConfiguration = {
213+
val hadoopConfiguration: Configuration = {
213214
val env = SparkEnv.get
214215
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
215216
// Explicitly check for S3 environment variables
@@ -610,7 +611,7 @@ class SparkContext(
610611
* standard mutable collections. So you can use this with mutable Map, Set, etc.
611612
*/
612613
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
613-
(initialValue: R) = {
614+
(initialValue: R): Accumulable[R, T] = {
614615
val param = new GrowableAccumulableParam[R,T]
615616
new Accumulable(initialValue, param)
616617
}
@@ -620,7 +621,7 @@ class SparkContext(
620621
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
621622
* The variable will be sent to each cluster only once.
622623
*/
623-
def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
624+
def broadcast[T](value: T): Broadcast[T] = env.broadcastManager.newBroadcast[T](value, isLocal)
624625

625626
/**
626627
* Add a file to be downloaded with this Spark job on every node.
@@ -1072,7 +1073,7 @@ object SparkContext extends Logging {
10721073
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
10731074

10741075
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
1075-
rdd: RDD[(K, V)]) =
1076+
rdd: RDD[(K, V)]) =
10761077
new SequenceFileRDDFunctions(rdd)
10771078

10781079
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
@@ -1109,23 +1110,28 @@ object SparkContext extends Logging {
11091110
}
11101111

11111112
// Helper objects for converting common types to Writable
1112-
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
1113+
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T):
1114+
WritableConverter[T] = {
11131115
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
11141116
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
11151117
}
11161118

1117-
implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
1119+
implicit def intWritableConverter(): WritableConverter[Int] =
1120+
simpleWritableConverter[Int, IntWritable](_.get)
11181121

1119-
implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
1122+
implicit def longWritableConverter(): WritableConverter[Long] =
1123+
simpleWritableConverter[Long, LongWritable](_.get)
11201124

1121-
implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
1125+
implicit def doubleWritableConverter(): WritableConverter[Double] =
1126+
simpleWritableConverter[Double, DoubleWritable](_.get)
11221127

1123-
implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
1128+
implicit def floatWritableConverter(): WritableConverter[Float] =
1129+
simpleWritableConverter[Float, FloatWritable](_.get)
11241130

1125-
implicit def booleanWritableConverter() =
1131+
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
11261132
simpleWritableConverter[Boolean, BooleanWritable](_.get)
11271133

1128-
implicit def bytesWritableConverter() = {
1134+
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
11291135
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
11301136
}
11311137

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,19 +391,24 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
391391
/**
392392
* Save this RDD as a text file, using string representations of elements.
393393
*/
394-
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
394+
def saveAsTextFile(path: String) {
395+
rdd.saveAsTextFile(path)
396+
}
395397

396398

397399
/**
398400
* Save this RDD as a compressed text file, using string representations of elements.
399401
*/
400-
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
402+
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
401403
rdd.saveAsTextFile(path, codec)
404+
}
402405

403406
/**
404407
* Save this RDD as a SequenceFile of serialized objects.
405408
*/
406-
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
409+
def saveAsObjectFile(path: String) {
410+
rdd.saveAsObjectFile(path)
411+
}
407412

408413
/**
409414
* Creates tuples of the elements in this RDD by applying `f`.
@@ -420,7 +425,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
420425
* executed on this RDD. It is strongly recommended that this RDD is persisted in
421426
* memory, otherwise saving it on a file will require recomputation.
422427
*/
423-
def checkpoint() = rdd.checkpoint()
428+
def checkpoint() {
429+
rdd.checkpoint()
430+
}
424431

425432
/**
426433
* Return whether this RDD has been checkpointed or not

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
461461
sc.setCheckpointDir(dir)
462462
}
463463

464-
def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
464+
def getCheckpointDir: Optional[String] = JavaUtils.optionToOptional(sc.getCheckpointDir)
465465

466466
protected def checkpointFile[T](path: String): JavaRDD[T] = {
467467
implicit val ctag: ClassTag[T] = fakeClassTag

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,5 +112,5 @@ private[spark] class ClientArguments(args: Array[String]) {
112112
}
113113

114114
object ClientArguments {
115-
def isValidJarUrl(s: String) = s.matches("(.+):(.+)jar")
115+
def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
116116
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
3232
* Contains util methods to interact with Hadoop from Spark.
3333
*/
3434
class SparkHadoopUtil {
35-
val conf = newConfiguration()
35+
val conf: Configuration = newConfiguration()
3636
UserGroupInformation.setConfiguration(conf)
3737

3838
def runAsUser(user: String)(func: () => Unit) {

core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ package org.apache.spark.deploy.master
2020
import scala.collection.JavaConversions._
2121

2222
import akka.serialization.Serialization
23-
import org.apache.zookeeper.CreateMode
2423

24+
import org.apache.curator.framework.CuratorFramework
2525
import org.apache.spark.{Logging, SparkConf}
26+
import org.apache.zookeeper.CreateMode
2627

2728
class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
2829
extends PersistenceEngine
2930
with Logging
3031
{
3132
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
32-
val zk = SparkCuratorUtil.newClient(conf)
33+
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
3334

3435
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
3536

core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry,
3838
case None => CONSOLE_DEFAULT_PERIOD
3939
}
4040

41-
val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
41+
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
4242
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
4343
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
4444
}

core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry,
4141
case None => CSV_DEFAULT_PERIOD
4242
}
4343

44-
val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
44+
val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
4545
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
4646
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
4747
}

core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
3939
val GRAPHITE_KEY_UNIT = "unit"
4040
val GRAPHITE_KEY_PREFIX = "prefix"
4141

42-
def propertyToOption(prop: String) = Option(property.getProperty(prop))
42+
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
4343

4444
if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
4545
throw new Exception("Graphite sink requires 'host' property.")
@@ -57,7 +57,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
5757
case None => GRAPHITE_DEFAULT_PERIOD
5858
}
5959

60-
val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
60+
val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
6161
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
6262
case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
6363
}

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
103103
array
104104
}
105105

106-
override val partitioner = Some(part)
106+
override val partitioner: Some[Partitioner] = Some(part)
107107

108108
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
109109
val sparkConf = SparkEnv.get.conf

0 commit comments

Comments
 (0)