-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24242][SQL] RangeExec should have correct outputOrdering and outputPartitioning #21291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan @kiszk |
|
Test build #90455 has finished for PR 21291 at commit
|
| shouldHaveSort = true) | ||
| } | ||
|
|
||
| test("RangeExec should have correct output ordering") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: start with SPARK-24242: ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
|
LGTM except one minor comment |
|
|
||
| override val output: Seq[Attribute] = range.output | ||
|
|
||
| override def outputOrdering: Seq[SortOrder] = range.outputOrdering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are here, shall we also implement outputPartitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
|
Test build #90460 has finished for PR 21291 at commit
|
|
LGTM, +1 for adding |
|
|
||
| override def outputPartitioning: Partitioning = { | ||
| if (numSlices == 1) { | ||
| SinglePartition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which one is better? SinglePartition or RangePartitioning(outputOrdering, 1)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SinglePartition is better
|
Test build #90484 has finished for PR 21291 at commit
|
|
retest this please. |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
Test build #90487 has finished for PR 21291 at commit
|
|
retest this please. |
|
Changed E.g. in to I will update related tests. |
| 0 | ||
| } else { | ||
| collected.head.getLong(0) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.range(-10, -9, -20, 1).select("id").count in DataFrameRangeSuite causes exception here. plan.executeCollect().head pulls empty iterator by calling next.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is caused by returning SinglePartition when there is no data (and therefore no partition). So I think we should fix it there and not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, making sense. Thanks.
|
Test build #90495 has finished for PR 21291 at commit
|
|
Test build #90496 has finished for PR 21291 at commit
|
|
retest this please. |
|
Test build #90497 has finished for PR 21291 at commit
|
|
retest this please. |
|
Test build #90499 has finished for PR 21291 at commit
|
|
Test build #90503 has finished for PR 21291 at commit
|
|
retest this please. |
|
Test build #90514 has finished for PR 21291 at commit
|
|
retest this please |
|
Test build #90610 has finished for PR 21291 at commit
|
|
retest this please. |
|
Test build #90646 has finished for PR 21291 at commit
|
|
retest this please. |
|
Test build #90661 has finished for PR 21291 at commit
|
|
retest this please. |
|
Test build #90666 has finished for PR 21291 at commit
|
|
Test build #90678 has finished for PR 21291 at commit
|
| # groupby one column and one sql expression | ||
| result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)) | ||
| expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)) | ||
| result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just orderBy(df.id)? and why was this not failing before this fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply said, the data ordering between result3 and expect3 are different now.
Previous query plan for two queries:
== Physical Plan ==
!AggregateInPandas [id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40], [sum(v#8)], [id#0L, (v#8 % 2.0)#40 AS (v % 2)#22, sum(v)#21 AS sum(v)#23]
+- *(2) Sort [id#0L ASC NULLS FIRST, (v#8 % 2.0) AS (v#8 % 2.0)#40 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40, 200)
+- Generate explode(vs#4), [id#0L], false, [v#8]
+- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), (21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 + cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4]
+- *(1) Range (0, 10, step=1, splits=8)
== Physical Plan ==
*(3) HashAggregate(keys=[id#0L, (v#8 % 2.0)#36], functions=[sum(v#8)], output=[id#0L, (v % 2)#31, sum(v)#32])
+- Exchange hashpartitioning(id#0L, (v#8 % 2.0)#36, 200)
+- *(2) HashAggregate(keys=[id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#36], functions=[partial_sum(v#8)], output=[id#0L, (v#8 % 2.0)#36, sum#38])
+- Generate explode(vs#4), [id#0L], false, [v#8]
+- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), (21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 + cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4]
+- *(1) Range (0, 10, step=1, splits=8)
Both have Exchange hashpartitioning which produces the same data distribution previously. Notice Sort doesn't change data ordering because 200 partitions make sparse distribution.
Current query plan:
!AggregateInPandas [id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#453], [sum(v#396)], [id#388L, (v#396 % 2.0)#453 AS (v % 2)#438, sum(v)#437 AS s
um(v)#439]
+- *(2) Sort [id#388L ASC NULLS FIRST, (v#396 % 2.0) AS (v#396 % 2.0)#453 ASC NULLS FIRST], false, 0
+- Generate explode(vs#392), [id#388L], false, [v#396]
+- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), (21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)),
(23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 + cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2
7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + cast(id#388L as double))) AS vs#392]
+- *(1) Range (0, 10, step=1, splits=4)
== Physical Plan ==
*(2) HashAggregate(keys=[id#388L, (v#396 % 2.0)#454], functions=[sum(v#396)], output=[id#388L, (v % 2)#447, sum(v)#448])
+- *(2) HashAggregate(keys=[id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#454], functions=[partial_sum(v#396)], output=[id#388L, (v#396 % 2.0)#45
4, sum#456])
+- Generate explode(vs#392), [id#388L], false, [v#396]
+- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), (21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)),
(23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 + cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2
7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + cast(id#388L as double))) AS vs#392]
+- *(1) Range (0, 10, step=1, splits=4)
Exchange is not there anymore. They have same data distribution. But now Sort changes data ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your detailed explanation. Anyway, can we just use orderBy(df.id) instead of orderBy(df.id, df.v % 2)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are already ordered by df.id. This is the partial data:
Expected:
id (v % 2) sum(v)
0 0 0.0 120.0
1 0 1.0 125.0
2 1 1.0 125.0
3 1 0.0 130.0
4 2 0.0 130.0
5 2 1.0 135.0
Result:
id (v % 2) sum(v)
0 0 0.0 120.0
1 0 1.0 125.0
2 1 0.0 130.0
3 1 1.0 125.0
4 2 0.0 130.0
5 2 1.0 135.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see now, sorry, thanks.
| } | ||
|
|
||
| test("debugCodegen") { | ||
| val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we change to groupBy('id * 2)? We should try our best to keep what to test, and keep the shuffle in this query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
| test("debugCodegenStringSeq") { | ||
| val res = codegenStringSeq(spark.range(10).groupBy("id").count().queryExecution.executedPlan) | ||
| assert(res.length == 2) | ||
| assert(res.length == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
LGTM |
|
Thanks @mgaido91 |
|
Test build #90722 has finished for PR 21291 at commit
|
| assert(plan.find(p => | ||
| p.isInstanceOf[WholeStageCodegenExec] && | ||
| p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) | ||
| p.asInstanceOf[WholeStageCodegenExec].child.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, can we change the groupBy instead of the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
|
LGTM |
|
Test build #90772 has finished for PR 21291 at commit
|
|
retest this please |
|
Test build #90780 has finished for PR 21291 at commit
|
|
Merged to master. |
|
Thanks @cloud-fan @mgaido91 @kiszk @HyukjinKwon |
What changes were proposed in this pull request?
Logical
Rangenode has been added withoutputOrderingrecently. It's used to eliminate redundantSortduring optimization. However, thisoutputOrderingdoesn't not propagate to physicalRangeExecnode.We also add correct
outputPartitioningtoRangeExecnode.How was this patch tested?
Added test.