Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,6 @@ case class RepartitionByExpression(

require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")

require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for safety, also keep this change?

Copy link
Contributor Author

@adrian-ionescu adrian-ionescu Nov 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would change the current behavior of .repartition(numPartitions, Seq.empty: _*) and I'd like to avoid that.

In fact, I've just raised a separate ticket about the latter: SPARK-22665


val partitioning: Partitioning = {
val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still the same question. What happened when the SortOrder is not at the root node.

Copy link
Contributor Author

@adrian-ionescu adrian-ionescu Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to follow the HashPartitioning path and eventually lead to a "Cannot evaluate expression" exception, just like it would presently do if you tried running df.repartition($"col".asc + 1) or df.sort($"col".asc + 1)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,6 @@ class AnalysisSuite extends AnalysisTest with Matchers {
intercept[IllegalArgumentException] {
checkPartitioning(numPartitions = -1, exprs = Literal(20))
}
intercept[IllegalArgumentException] {
checkPartitioning(numPartitions = 10, exprs = Seq.empty: _*)
}
intercept[IllegalArgumentException] {
checkPartitioning(numPartitions = 10, exprs = SortOrder('a.attr, Ascending), 'b.attr)
}
Expand Down
33 changes: 20 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2732,16 +2732,18 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
// The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments.
// However, we don't want to complicate the semantics of this API method. Instead, let's
// give users a friendly error message, pointing them to the new method.
// However, we don't want to complicate the semantics of this API method.
// Instead, let's give users a friendly error message, pointing them to the new method.
val sortOrders = partitionExprs.filter(_.expr.isInstanceOf[SortOrder])
if (sortOrders.nonEmpty) throw new IllegalArgumentException(
s"""Invalid partitionExprs specified: $sortOrders
|For range partitioning use repartitionByRange(...) instead.
""".stripMargin)
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions)
withTypedPlan {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions)
}
}

/**
Expand All @@ -2763,27 +2765,32 @@ class Dataset[T] private[sql](
* Returns a new Dataset partitioned by the given partitioning expressions into
* `numPartitions`. The resulting Dataset is range partitioned.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this to describe the latest change?

*
* At least one partition-by expression must be specified.
* When no explicit sort order is specified, "ascending nulls first" is assumed.
*
* @group typedrel
* @since 2.3.0
*/
@scala.annotation.varargs
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
val sortOrder: Seq[SortOrder] = partitionExprs.map { col =>
col.expr match {
case expr: SortOrder =>
expr
case expr: Expression =>
SortOrder(expr, Ascending)
}
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.")
val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match {
case expr: SortOrder => expr
case expr: Expression => SortOrder(expr, Ascending)
})
withTypedPlan {
RepartitionByExpression(sortOrder, logicalPlan, numPartitions)
}
RepartitionByExpression(sortOrder, logicalPlan, numPartitions)
}

/**
* Returns a new Dataset partitioned by the given partitioning expressions, using
* `spark.sql.shuffle.partitions` as number of partitions.
* The resulting Dataset is range partitioned.
*
* At least one partition-by expression must be specified.
* When no explicit sort order is specified, "ascending nulls first" is assumed.
*
* @group typedrel
* @since 2.3.0
*/
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,30 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.select(spark_partition_id().as("id"), $"val"),
data1d.map(i => Row(i, data1d.size - 1 - i)))

checkAnswer(
data1d.toDF("val").repartitionByRange(data1d.size, lit(42))
.select(spark_partition_id().as("id"), $"val"),
data1d.map(i => Row(0, i)))

checkAnswer(
data1d.toDF("val").repartitionByRange(data1d.size, lit(null), $"val".asc, rand())
.select(spark_partition_id().as("id"), $"val"),
data1d.map(i => Row(i, i)))

// .repartitionByRange() assumes .asc by default if no explicit sort order is specified
checkAnswer(
data2d.toDF("a", "b").repartitionByRange(data2d.size, $"a".desc, $"b")
.select(spark_partition_id().as("id"), $"a", $"b"),
data2d.toDF("a", "b").repartitionByRange(data2d.size, $"a".desc, $"b".asc)
.select(spark_partition_id().as("id"), $"a", $"b"))

// at least one partition-by expression must be specified
intercept[IllegalArgumentException] {
data1d.toDF("val").repartitionByRange(data1d.size)
}
intercept[IllegalArgumentException] {
data1d.toDF("val").repartitionByRange(data1d.size, Seq.empty: _*)
}
}

test("coalesce") {
Expand Down