Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits).map(pair => pair._2.toString)
minPartitions).map(pair => pair._2.toString)
}

/**
Expand Down Expand Up @@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minSplits A suggestion value of the minimal splitting number for input data.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
Expand All @@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[String],
classOf[String],
updateConf,
minSplits)
minPartitions)
}

/**
Expand All @@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
* @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
Expand All @@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
Expand All @@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
Expand All @@ -524,15 +525,15 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass,
keyClass,
valueClass,
minSplits)
minPartitions)
}

/**
* Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions)
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand All @@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minSplits: Int)
(path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
minSplits)
minPartitions)
}

/**
Expand All @@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
hadoopFile[K, V, F](path, defaultMinPartitions)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
Expand Down Expand Up @@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging {
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
minPartitions: Int
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
Expand All @@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)

/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
Expand All @@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def sequenceFile[K, V]
(path: String, minSplits: Int = defaultMinSplits)
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
Expand All @@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging {
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}

Expand All @@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def objectFile[T: ClassTag](
path: String,
minSplits: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}

Expand Down Expand Up @@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
def defaultParallelism: Int = taskScheduler.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

private val nextShuffleId = new AtomicInteger(0)

private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
/**
* Default min number of partitions for Hadoop RDDs when not given by user.
* @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use
* {@link #defaultMinPartitions()} instead
*/
@Deprecated
def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions

/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down Expand Up @@ -153,7 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
sc.textFile(path, minPartitions)

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
Expand All @@ -180,17 +189,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minSplits A suggestion value of the minimal splitting number for input data.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* @see `wholeTextFiles(path: String, minSplits: Int)`.
* @see `wholeTextFiles(path: String, minPartitions: Int)`.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))
Expand All @@ -205,11 +214,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions))
}

/** Get an RDD for a Hadoop SequenceFile.
Expand All @@ -233,9 +242,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* slow if you use the default serializer (Java serialization), though the nice thing about it is
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
def objectFile[T](path: String, minPartitions: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
sc.objectFile(path, minSplits)(ctag)
sc.objectFile(path, minPartitions)(ctag)
}

/**
Expand Down Expand Up @@ -265,11 +274,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass: Class[F],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
}

/**
Expand Down Expand Up @@ -304,11 +313,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass: Class[F],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
}

/**
* Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
*/
def setMaxSplitSize(context: JobContext, minSplits: Int) {
def setMaxSplitSize(context: JobContext, minPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
}.sum
val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
}
}
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
* @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate.
*/
@DeveloperApi
class HadoopRDD[K, V](
Expand All @@ -97,7 +97,7 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {

def this(
Expand All @@ -106,7 +106,7 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int) = {
minPartitions: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
Expand All @@ -115,7 +115,7 @@ class HadoopRDD[K, V](
inputFormatClass,
keyClass,
valueClass,
minSplits)
minPartitions)
}

protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
Expand Down Expand Up @@ -169,7 +169,7 @@ class HadoopRDD[K, V](
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(jobConf, minSplits)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private[spark] class WholeTextFileRDD(
keyClass: Class[String],
valueClass: Class[String],
@transient conf: Configuration,
minSplits: Int)
minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
Expand All @@ -169,7 +169,7 @@ private[spark] class WholeTextFileRDD(
case _ =>
}
val jobContext = newJobContext(conf, jobId)
inputFormat.setMaxSplitSize(jobContext, minSplits)
inputFormat.setMaxSplitSize(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
Expand Down
Loading