Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
deprecate defaultMinSplits
  • Loading branch information
CodingCat committed Apr 18, 2014
commit 4b60541bfe5d053fd1c27b9f5e68596d25b73527
21 changes: 13 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class SparkContext(config: SparkConf) extends Logging {
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minPartitions: Int = defaultMinSplits): RDD[String] = {
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
Expand Down Expand Up @@ -459,7 +459,8 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(path: String, minPartitions: Int = defaultMinSplits): RDD[(String, String)] = {
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
Expand Down Expand Up @@ -493,7 +494,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
Expand All @@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
Expand Down Expand Up @@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
hadoopFile[K, V, F](path, defaultMinPartitions)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
Expand Down Expand Up @@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)

/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
Expand All @@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinSplits)
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
Expand All @@ -688,7 +689,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
Expand Down Expand Up @@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
def defaultParallelism: Int = taskScheduler.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

private val nextShuffleId = new AtomicInteger(0)

private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
/**
* Default min number of partitions for Hadoop RDDs when not given by user.
* @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use
* {@link #defaultMinPartitions()} instead
*/
@Deprecated
def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions

/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down
12 changes: 6 additions & 6 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ object MLUtils {
* @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise
* @param numFeatures number of features, which will be determined from the input data if a
* negative value is given. The default value is -1.
* @param minSplits min number of partitions, default: sc.defaultMinSplits
* @param minPartitions min number of partitions, default: sc.defaultMinPartitions
* @return labeled data stored as an RDD[LabeledPoint]
*/
def loadLibSVMData(
sc: SparkContext,
path: String,
labelParser: LabelParser,
numFeatures: Int,
minSplits: Int): RDD[LabeledPoint] = {
val parsed = sc.textFile(path, minSplits)
minPartitions: Int): RDD[LabeledPoint] = {
val parsed = sc.textFile(path, minPartitions)
.map(_.trim)
.filter(!_.isEmpty)
.map(_.split(' '))
Expand Down Expand Up @@ -101,7 +101,7 @@ object MLUtils {
* with number of features determined automatically and the default number of partitions.
*/
def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinSplits)
loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinPartitions)

/**
* Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
Expand All @@ -112,7 +112,7 @@ object MLUtils {
sc: SparkContext,
path: String,
labelParser: LabelParser): RDD[LabeledPoint] =
loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits)
loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinPartitions)

/**
* Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
Expand All @@ -124,7 +124,7 @@ object MLUtils {
path: String,
labelParser: LabelParser,
numFeatures: Int): RDD[LabeledPoint] =
loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits)
loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinPartitions)

/**
* :: Experimental ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
// Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless
// it is smaller than what Spark suggests.
private val _minSplitsPerRDD = math.max(
sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits)
sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions)

// TODO: set aws s3 credentials.

Expand Down