Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "handle empty partition-by expr list as RoundRobin"
This reverts commit 66b192d
  • Loading branch information
adrian-ionescu committed Nov 30, 2017
commit f6cd38873e06380faf4dfb771b3839c68278996c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler
Expand Down Expand Up @@ -839,6 +839,8 @@ case class RepartitionByExpression(

require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")

require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.")

val partitioning: Partitioning = {
val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still the same question. What happened when the SortOrder is not at the root node.

Copy link
Contributor Author

@adrian-ionescu adrian-ionescu Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to follow the HashPartitioning path and eventually lead to a "Cannot evaluate expression" exception, just like it would presently do if you tried running df.repartition($"col".asc + 1) or df.sort($"col".asc + 1)


Expand All @@ -853,13 +855,8 @@ case class RepartitionByExpression(

if (sortOrder.nonEmpty) {
RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), numPartitions)
} else if (nonSortOrder.nonEmpty) {
HashPartitioning(nonSortOrder, numPartitions)
} else {
// TODO: Use this to replace Repartition(shuffle = true)
// That way we can rename Repartition(shuffle = false) to Coalesce and
// RepartitionByExpression to Repartition
RoundRobinPartitioning(numPartitions)
HashPartitioning(nonSortOrder, numPartitions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ class AnalysisSuite extends AnalysisTest with Matchers {
intercept[IllegalArgumentException] {
checkPartitioning(numPartitions = -1, exprs = Literal(20))
}
intercept[IllegalArgumentException] {
checkPartitioning(numPartitions = 10, exprs = Seq.empty: _*)
}
intercept[IllegalArgumentException] {
checkPartitioning(numPartitions = 10, exprs = SortOrder('a.attr, Ascending), 'b.attr)
}
Expand Down
11 changes: 0 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
checkAnswer(
testData.select('key).repartition(10).select('key),
testData.select('key).collect().toSeq)

checkAnswer(
testData.select('key).repartition(10).select(spark_partition_id(), 'key),
testData.select('key).repartition(10, Seq.empty: _*).select(spark_partition_id(), 'key))
}

test("repartition with SortOrder") {
Expand Down Expand Up @@ -399,13 +395,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.select(spark_partition_id().as("id"), $"a", $"b"),
data2d.toDF("a", "b").repartitionByRange(data2d.size, $"a".desc, $"b".asc)
.select(spark_partition_id().as("id"), $"a", $"b"))

// .repartitionByRange() with no partition-by expressions behaves the same as .repartition()
checkAnswer(
data2d.toDF("a", "b").repartitionByRange(data2d.size, Seq.empty: _*)
.select(spark_partition_id().as("id"), $"a", $"b"),
data2d.toDF("a", "b").repartition(data2d.size)
.select(spark_partition_id().as("id"), $"a", $"b"))
}

test("coalesce") {
Expand Down