Skip to content

Commit e31c8ff

Browse files
CodingCatrxin
authored andcommitted
SPARK-1483: Rename minSplits to minPartitions in public APIs
https://issues.apache.org/jira/browse/SPARK-1483 From the original JIRA: " The parameter name is part of the public API in Scala and Python, since you can pass named parameters to a method, so we should name it to this more descriptive term. Everywhere else we refer to "splits" as partitions." - @mateiz Author: CodingCat <zhunansjtu@gmail.com> Closes #430 from CodingCat/SPARK-1483 and squashes the following commits: 4b60541 [CodingCat] deprecate defaultMinSplits ba2c663 [CodingCat] Rename minSplits to minPartitions in public APIs
1 parent 7863ecc commit e31c8ff

File tree

8 files changed

+70
-55
lines changed

8 files changed

+70
-55
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging {
427427
* Read a text file from HDFS, a local file system (available on all nodes), or any
428428
* Hadoop-supported file system URI, and return it as an RDD of Strings.
429429
*/
430-
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
430+
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
431431
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
432-
minSplits).map(pair => pair._2.toString)
432+
minPartitions).map(pair => pair._2.toString)
433433
}
434434

435435
/**
@@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging {
457457
*
458458
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
459459
*
460-
* @param minSplits A suggestion value of the minimal splitting number for input data.
460+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
461461
*/
462-
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
462+
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
463+
RDD[(String, String)] = {
463464
val job = new NewHadoopJob(hadoopConfiguration)
464465
NewFileInputFormat.addInputPath(job, new Path(path))
465466
val updateConf = job.getConfiguration
@@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging {
469470
classOf[String],
470471
classOf[String],
471472
updateConf,
472-
minSplits)
473+
minPartitions)
473474
}
474475

475476
/**
@@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging {
481482
* @param inputFormatClass Class of the InputFormat
482483
* @param keyClass Class of the keys
483484
* @param valueClass Class of the values
484-
* @param minSplits Minimum number of Hadoop Splits to generate.
485+
* @param minPartitions Minimum number of Hadoop Splits to generate.
485486
*
486487
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
487488
* record, directly caching the returned RDD will create many references to the same object.
@@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging {
493494
inputFormatClass: Class[_ <: InputFormat[K, V]],
494495
keyClass: Class[K],
495496
valueClass: Class[V],
496-
minSplits: Int = defaultMinSplits
497+
minPartitions: Int = defaultMinPartitions
497498
): RDD[(K, V)] = {
498499
// Add necessary security credentials to the JobConf before broadcasting it.
499500
SparkHadoopUtil.get.addCredentials(conf)
500-
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
501+
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
501502
}
502503

503504
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
512513
inputFormatClass: Class[_ <: InputFormat[K, V]],
513514
keyClass: Class[K],
514515
valueClass: Class[V],
515-
minSplits: Int = defaultMinSplits
516+
minPartitions: Int = defaultMinPartitions
516517
): RDD[(K, V)] = {
517518
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
518519
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -524,15 +525,15 @@ class SparkContext(config: SparkConf) extends Logging {
524525
inputFormatClass,
525526
keyClass,
526527
valueClass,
527-
minSplits)
528+
minPartitions)
528529
}
529530

530531
/**
531532
* Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
532533
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
533534
* can just write, for example,
534535
* {{{
535-
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
536+
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions)
536537
* }}}
537538
*
538539
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
@@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging {
541542
* a `map` function.
542543
*/
543544
def hadoopFile[K, V, F <: InputFormat[K, V]]
544-
(path: String, minSplits: Int)
545+
(path: String, minPartitions: Int)
545546
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
546547
hadoopFile(path,
547548
fm.runtimeClass.asInstanceOf[Class[F]],
548549
km.runtimeClass.asInstanceOf[Class[K]],
549550
vm.runtimeClass.asInstanceOf[Class[V]],
550-
minSplits)
551+
minPartitions)
551552
}
552553

553554
/**
@@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
565566
*/
566567
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
567568
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
568-
hadoopFile[K, V, F](path, defaultMinSplits)
569+
hadoopFile[K, V, F](path, defaultMinPartitions)
569570

570571
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
571572
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
@@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging {
626627
def sequenceFile[K, V](path: String,
627628
keyClass: Class[K],
628629
valueClass: Class[V],
629-
minSplits: Int
630+
minPartitions: Int
630631
): RDD[(K, V)] = {
631632
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
632-
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
633+
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
633634
}
634635

635636
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
@@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
641642
* */
642643
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
643644
): RDD[(K, V)] =
644-
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
645+
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
645646

646647
/**
647648
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
665666
* a `map` function.
666667
*/
667668
def sequenceFile[K, V]
668-
(path: String, minSplits: Int = defaultMinSplits)
669+
(path: String, minPartitions: Int = defaultMinPartitions)
669670
(implicit km: ClassTag[K], vm: ClassTag[V],
670671
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
671672
: RDD[(K, V)] = {
@@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging {
674675
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
675676
val writables = hadoopFile(path, format,
676677
kc.writableClass(km).asInstanceOf[Class[Writable]],
677-
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
678+
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
678679
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
679680
}
680681

@@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging {
688689
*/
689690
def objectFile[T: ClassTag](
690691
path: String,
691-
minSplits: Int = defaultMinSplits
692+
minPartitions: Int = defaultMinPartitions
692693
): RDD[T] = {
693-
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
694+
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
694695
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
695696
}
696697

@@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
11831184
def defaultParallelism: Int = taskScheduler.defaultParallelism
11841185

11851186
/** Default min number of partitions for Hadoop RDDs when not given by user */
1187+
@deprecated("use defaultMinPartitions", "1.0.0")
11861188
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
11871189

1190+
/** Default min number of partitions for Hadoop RDDs when not given by user */
1191+
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
1192+
11881193
private val nextShuffleId = new AtomicInteger(0)
11891194

11901195
private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
109109
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
110110
def defaultParallelism: java.lang.Integer = sc.defaultParallelism
111111

112-
/** Default min number of partitions for Hadoop RDDs when not given by user */
112+
/**
113+
* Default min number of partitions for Hadoop RDDs when not given by user.
114+
* @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use
115+
* {@link #defaultMinPartitions()} instead
116+
*/
117+
@Deprecated
113118
def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits
114119

120+
/** Default min number of partitions for Hadoop RDDs when not given by user */
121+
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
122+
115123
/** Distribute a local Scala collection to form an RDD. */
116124
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
117125
implicit val ctag: ClassTag[T] = fakeClassTag
@@ -153,7 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
153161
* Read a text file from HDFS, a local file system (available on all nodes), or any
154162
* Hadoop-supported file system URI, and return it as an RDD of Strings.
155163
*/
156-
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
164+
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
165+
sc.textFile(path, minPartitions)
157166

158167
/**
159168
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
@@ -180,17 +189,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
180189
*
181190
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
182191
*
183-
* @param minSplits A suggestion value of the minimal splitting number for input data.
192+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
184193
*/
185-
def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
186-
new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
194+
def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
195+
new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))
187196

188197
/**
189198
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
190199
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
191200
* key-value pair, where the key is the path of each file, the value is the content of each file.
192201
*
193-
* @see `wholeTextFiles(path: String, minSplits: Int)`.
202+
* @see `wholeTextFiles(path: String, minPartitions: Int)`.
194203
*/
195204
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
196205
new JavaPairRDD(sc.wholeTextFiles(path))
@@ -205,11 +214,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
205214
def sequenceFile[K, V](path: String,
206215
keyClass: Class[K],
207216
valueClass: Class[V],
208-
minSplits: Int
217+
minPartitions: Int
209218
): JavaPairRDD[K, V] = {
210219
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
211220
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
212-
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
221+
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions))
213222
}
214223

215224
/** Get an RDD for a Hadoop SequenceFile.
@@ -233,9 +242,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
233242
* slow if you use the default serializer (Java serialization), though the nice thing about it is
234243
* that there's very little effort required to save arbitrary objects.
235244
*/
236-
def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
245+
def objectFile[T](path: String, minPartitions: Int): JavaRDD[T] = {
237246
implicit val ctag: ClassTag[T] = fakeClassTag
238-
sc.objectFile(path, minSplits)(ctag)
247+
sc.objectFile(path, minPartitions)(ctag)
239248
}
240249

241250
/**
@@ -265,11 +274,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
265274
inputFormatClass: Class[F],
266275
keyClass: Class[K],
267276
valueClass: Class[V],
268-
minSplits: Int
277+
minPartitions: Int
269278
): JavaPairRDD[K, V] = {
270279
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
271280
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
272-
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
281+
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
273282
}
274283

275284
/**
@@ -304,11 +313,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
304313
inputFormatClass: Class[F],
305314
keyClass: Class[K],
306315
valueClass: Class[V],
307-
minSplits: Int
316+
minPartitions: Int
308317
): JavaPairRDD[K, V] = {
309318
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
310319
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
311-
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
320+
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
312321
}
313322

314323
/** Get an RDD for a Hadoop file with an arbitrary InputFormat

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
4848
}
4949

5050
/**
51-
* Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
51+
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
5252
*/
53-
def setMaxSplitSize(context: JobContext, minSplits: Int) {
53+
def setMaxSplitSize(context: JobContext, minPartitions: Int) {
5454
val files = listStatus(context)
5555
val totalLen = files.map { file =>
5656
if (file.isDir) 0L else file.getLen
5757
}.sum
58-
val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
58+
val maxSplitSize = Math.ceil(totalLen * 1.0 /
59+
(if (minPartitions == 0) 1 else minPartitions)).toLong
5960
super.setMaxSplitSize(maxSplitSize)
6061
}
6162
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
8787
* @param inputFormatClass Storage format of the data to be read.
8888
* @param keyClass Class of the key associated with the inputFormatClass.
8989
* @param valueClass Class of the value associated with the inputFormatClass.
90-
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
90+
* @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate.
9191
*/
9292
@DeveloperApi
9393
class HadoopRDD[K, V](
@@ -97,7 +97,7 @@ class HadoopRDD[K, V](
9797
inputFormatClass: Class[_ <: InputFormat[K, V]],
9898
keyClass: Class[K],
9999
valueClass: Class[V],
100-
minSplits: Int)
100+
minPartitions: Int)
101101
extends RDD[(K, V)](sc, Nil) with Logging {
102102

103103
def this(
@@ -106,7 +106,7 @@ class HadoopRDD[K, V](
106106
inputFormatClass: Class[_ <: InputFormat[K, V]],
107107
keyClass: Class[K],
108108
valueClass: Class[V],
109-
minSplits: Int) = {
109+
minPartitions: Int) = {
110110
this(
111111
sc,
112112
sc.broadcast(new SerializableWritable(conf))
@@ -115,7 +115,7 @@ class HadoopRDD[K, V](
115115
inputFormatClass,
116116
keyClass,
117117
valueClass,
118-
minSplits)
118+
minPartitions)
119119
}
120120

121121
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -169,7 +169,7 @@ class HadoopRDD[K, V](
169169
if (inputFormat.isInstanceOf[Configurable]) {
170170
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
171171
}
172-
val inputSplits = inputFormat.getSplits(jobConf, minSplits)
172+
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
173173
val array = new Array[Partition](inputSplits.size)
174174
for (i <- 0 until inputSplits.size) {
175175
array(i) = new HadoopPartition(id, i, inputSplits(i))

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private[spark] class WholeTextFileRDD(
158158
keyClass: Class[String],
159159
valueClass: Class[String],
160160
@transient conf: Configuration,
161-
minSplits: Int)
161+
minPartitions: Int)
162162
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
163163

164164
override def getPartitions: Array[Partition] = {
@@ -169,7 +169,7 @@ private[spark] class WholeTextFileRDD(
169169
case _ =>
170170
}
171171
val jobContext = newJobContext(conf, jobId)
172-
inputFormat.setMaxSplitSize(jobContext, minSplits)
172+
inputFormat.setMaxSplitSize(jobContext, minPartitions)
173173
val rawSplits = inputFormat.getSplits(jobContext).toArray
174174
val result = new Array[Partition](rawSplits.size)
175175
for (i <- 0 until rawSplits.size) {

0 commit comments

Comments
 (0)