Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-22465][Core][WIP] Changes after code review
  • Loading branch information
sujithjay committed Dec 23, 2017
commit 961e3848cea1dc1b6568c1612eef7bedba4270d5
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ object Partitioner {
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty
&& isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get

val hasMaxPartitioner = if(hasPartitioner.nonEmpty){
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}

if(isEligiblePartitioner(hasMaxPartitioner, rdds)) {
hasMaxPartitioner.get.partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
Expand All @@ -77,9 +83,12 @@ object Partitioner {
* less than and within a single order of magnitude of the max number of upstream partitions;
* otherwise, returns false
*/
private def isEligiblePartitioner(hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = {
private def isEligiblePartitioner(hasMaxPartitioner: Option[RDD[_]], rdds: Seq[RDD[_]]): Boolean = {
if(hasMaxPartitioner.isEmpty){
return false
}
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1
log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
}
}

Expand Down