Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Rename minPartitions to maxPartitions.
  • Loading branch information
watermen committed May 30, 2015
commit cd64289088d5db585ecca5394605f6ac89cbbbeb
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -825,11 +825,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
* @param maxPartitions A suggestion value of the maximal splitting number for input data.
*/
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
maxPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's strange that this still defaults to defaultMinPartitions. Does that need to be fixed as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because i don‘t know what is the best defalut value, one will be ok?

assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
Expand All @@ -842,7 +842,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[String],
classOf[String],
updateConf,
minPartitions).setName(path)
maxPartitions).setName(path)
}


Expand Down Expand Up @@ -871,14 +871,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
* @param maxPartitions A suggestion value of the maximal splitting number for input data.
*
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
def binaryFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
maxPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
Expand All @@ -891,7 +891,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[String],
classOf[PortableDataStream],
updateConf,
minPartitions).setName(path)
maxPartitions).setName(path)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] abstract class StreamFileInputFormat[T]
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
* which is set through setMaxSplitSize
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
def setMaxPartitions(context: JobContext, maxPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ private[spark] class WholeTextFileInputFormat
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API,
* which is set through setMaxSplitSize
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
def setMaxPartitions(context: JobContext, maxPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
}.sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
(if (maxPartitions == 0) 1 else maxPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class BinaryFileRDD[T](
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
maxPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
Expand All @@ -40,7 +40,7 @@ private[spark] class BinaryFileRDD[T](
case _ =>
}
val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
inputFormat.setMaxPartitions(jobContext, maxPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private[spark] class WholeTextFileRDD(
keyClass: Class[String],
valueClass: Class[String],
@transient conf: Configuration,
minPartitions: Int)
maxPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
Expand All @@ -263,7 +263,7 @@ private[spark] class WholeTextFileRDD(
case _ =>
}
val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
inputFormat.setMaxPartitions(jobContext, maxPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
Expand Down