-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22614] Dataset API: repartitionByRange(...) #19828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
| */ | ||
| @scala.annotation.varargs | ||
| def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { | ||
| partitionExprs.find(_.expr.isInstanceOf[SortOrder]).foreach { sortOrder => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use collect or filter? That way we can show all offending columns.
| val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) | ||
|
|
||
| require(sortOrder.isEmpty || nonSortOrder.isEmpty, | ||
| s"""${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want this to be a multiline message? it makes sense to put the sort order and non sort order on new lines.
| // RepartitionByExpression's constructor verifies that either all expressions are | ||
| // of type SortOrder, in which case we're doing RangePartitioning, or none of them are, | ||
| // in which case we're doing HashPartitioning. | ||
| val partitioning = if (expressions.forall(_.isInstanceOf[SortOrder])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have discussed this before, but to me it makes slightly more sense to add this logic to the RepartitionByExpression plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, and it also makes it easier to unit test this code :)
|
|
||
| // .repartitionByRange() assumes .asc by default if no explicit sort order is specified | ||
| checkAnswer( | ||
| data2d.toDF("a", "b").repartitionByRange(data1d.size, $"a".desc, $"b") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data1d.size?
|
add to whitelist |
|
@adrian-ionescu this looks pretty good. I left a few small comments. |
| } | ||
| } | ||
|
|
||
| test("repartitionByRange") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it to DataFrameSuite?
| * @since 2.3.0 | ||
| */ | ||
| @scala.annotation.varargs | ||
| def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open a JIRA for adding the corresponding API in PySpark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! Raised SPARK-22624.
| col.expr match { | ||
| case expr: SortOrder => | ||
| expr | ||
| case expr: Expression => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened if we have a SortOrder that is not in the root node of expr?
data1d.toDF("val").repartitionByRange(data1d.size, $"val".desc + 1)
.select(spark_partition_id().as("id"), $"val").show()
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning((val#236 DESC NULLS LAST + 1) ASC NULLS FIRST, 10)
+- LocalTableScan [val#236]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:113)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a more generic problem right? I think a similar error gets thrown if you do something like this: spark.range(10).select($"id".asc + 1).show()
Let's fix that in a different ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot evaluate expression: input[0, bigint, false] ASC NULLS FIRST
java.lang.UnsupportedOperationException: Cannot evaluate expression: input[0, bigint, false] ASC NULLS FIRST
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:259)
at org.apache.spark.sql.catalyst.expressions.SortOrder.doGenCode(SortOrder.scala:60)
Yeah. It also does not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error is slightly different because the project is whole stage code generated.
| require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.") | ||
|
|
||
| val partitioning: Partitioning = { | ||
| val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still the same question. What happened when the SortOrder is not at the root node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's going to follow the HashPartitioning path and eventually lead to a "Cannot evaluate expression" exception, just like it would presently do if you tried running df.repartition($"col".asc + 1) or df.sort($"col".asc + 1)
|
Test build #84223 has finished for PR 19828 at commit
|
|
Test build #84222 has finished for PR 19828 at commit
|
|
Test build #84225 has finished for PR 19828 at commit
|
|
LGTM. Like what @hvanhovell suggested, we can fix it in the follow-up PR. Thanks! |
|
Test build #84247 has finished for PR 19828 at commit
|
Does not seem to be related.. can we retrigger the tests? |
|
jenkins retest this please |
|
Test build #84252 has finished for PR 19828 at commit
|
|
Test build #84258 has finished for PR 19828 at commit
|
| s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " + | ||
| "`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + | ||
| "means `HashPartitioning`. In this case we have:" + | ||
| s"""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" * 4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still exists after we revert the previous changes.
|
|
||
| /** | ||
| * Returns a new Dataset partitioned by the given partitioning expressions into | ||
| * `numPartitions`. The resulting Dataset is range partitioned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update this to describe the latest change?
|
Test build #84296 has finished for PR 19828 at commit
|
| } | ||
|
|
||
| /** | ||
| * Returns a new Dataset that is hash partitioned by the given expressions into `numPartitions`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hash -> range
|
Test build #84308 has finished for PR 19828 at commit
|
This reverts commit 60ec0e3
|
Test build #84338 has finished for PR 19828 at commit
|
| s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " + | ||
| "`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + | ||
| "means `HashPartitioning`. In this case we have:" + | ||
| s"""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still exists after we revert the previous changes.
|
|
||
| require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") | ||
|
|
||
| require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for safety, also keep this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would change the current behavior of .repartition(numPartitions, Seq.empty: _*) and I'd like to avoid that.
In fact, I've just raised a separate ticket about the latter: SPARK-22665
|
Test build #84347 has finished for PR 19828 at commit
|
|
LGTM Thanks! Merged to master. |
What changes were proposed in this pull request?
This PR introduces a way to explicitly range-partition a Dataset. So far, only round-robin and hash partitioning were possible via
df.repartition(...), but sometimes range partitioning might be desirable: e.g. when writing to disk, for better compression without the cost of global sort.The current implementation piggybacks on the existing
RepartitionByExpressionLogicalPlanand simply adds the following logic: If its expressions are of typeSortOrder, then it will doRangePartitioning; otherwiseHashPartitioning. This was by far the least intrusive solution I could come up with.How was this patch tested?
Unit test for
RepartitionByExpressionchanges, a test to ensure we're not changing the behavior of existing.repartition()and a few end-to-end tests inDataFrameSuite.