Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5239,8 +5239,8 @@ def test_complex_groupby(self):
expected2 = df.groupby().agg(sum(df.v))

# groupby one column and one sql expression
result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just orderBy(df.id)? and why was this not failing before this fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply said, the data ordering between result3 and expect3 are different now.

Previous query plan for two queries:

== Physical Plan ==
!AggregateInPandas [id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40], [sum(v#8)], [id#0L, (v#8 % 2.0)#40 AS (v % 2)#22, sum(v)#21 AS sum(v)#23]
+- *(2) Sort [id#0L ASC NULLS FIRST, (v#8 % 2.0) AS (v#8 % 2.0)#40 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40, 200)
      +- Generate explode(vs#4), [id#0L], false, [v#8]
         +- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), (21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 + cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4]
            +- *(1) Range (0, 10, step=1, splits=8)
== Physical Plan ==
*(3) HashAggregate(keys=[id#0L, (v#8 % 2.0)#36], functions=[sum(v#8)], output=[id#0L, (v % 2)#31, sum(v)#32])
+- Exchange hashpartitioning(id#0L, (v#8 % 2.0)#36, 200)
   +- *(2) HashAggregate(keys=[id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#36], functions=[partial_sum(v#8)], output=[id#0L, (v#8 % 2.0)#36, sum#38])
      +- Generate explode(vs#4), [id#0L], false, [v#8]
         +- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), (21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 + cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4]
            +- *(1) Range (0, 10, step=1, splits=8)

Both have Exchange hashpartitioning which produces the same data distribution previously. Notice Sort doesn't change data ordering because 200 partitions make sparse distribution.

Current query plan:

!AggregateInPandas [id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#453], [sum(v#396)], [id#388L, (v#396 % 2.0)#453 AS (v % 2)#438, sum(v)#437 AS s
um(v)#439]
+- *(2) Sort [id#388L ASC NULLS FIRST, (v#396 % 2.0) AS (v#396 % 2.0)#453 ASC NULLS FIRST], false, 0
   +- Generate explode(vs#392), [id#388L], false, [v#396]
      +- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), (21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)),
 (23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 + cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2
7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + cast(id#388L as double))) AS vs#392]
         +- *(1) Range (0, 10, step=1, splits=4)
== Physical Plan ==
*(2) HashAggregate(keys=[id#388L, (v#396 % 2.0)#454], functions=[sum(v#396)], output=[id#388L, (v % 2)#447, sum(v)#448])
+- *(2) HashAggregate(keys=[id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#454], functions=[partial_sum(v#396)], output=[id#388L, (v#396 % 2.0)#45
4, sum#456])
   +- Generate explode(vs#392), [id#388L], false, [v#396]
      +- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), (21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)),
 (23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 + cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2
7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + cast(id#388L as double))) AS vs#392]
         +- *(1) Range (0, 10, step=1, splits=4)

Exchange is not there anymore. They have same data distribution. But now Sort changes data ordering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your detailed explanation. Anyway, can we just use orderBy(df.id) instead of orderBy(df.id, df.v % 2)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already ordered by df.id. This is the partial data:

Expected:
    id  (v % 2)  sum(v)
0    0      0.0   120.0
1    0      1.0   125.0
2    1      1.0   125.0
3    1      0.0   130.0
4    2      0.0   130.0
5    2      1.0   135.0
Result:
    id  (v % 2)  sum(v)
0    0      0.0   120.0
1    0      1.0   125.0
2    1      0.0   130.0
3    1      1.0   125.0
4    2      0.0   130.0
5    2      1.0   135.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see now, sorry, thanks.

expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)).orderBy(df.id, df.v % 2)

# groupby one python UDF
result4 = df.groupby(plus_one(df.id)).agg(sum_udf(df.v))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,20 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)

override val output: Seq[Attribute] = range.output

override def outputOrdering: Seq[SortOrder] = range.outputOrdering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are here, shall we also implement outputPartitioning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.


override def outputPartitioning: Partitioning = {
if (numElements > 0) {
if (numSlices == 1) {
SinglePartition
} else {
RangePartitioning(outputOrdering, numSlices)
}
} else {
UnknownPartitioning(0)
}
}

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext {
def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
val data = spark.range(0, n, 1, 1).sort('id.desc)
// Range has range partitioning in its output now. To have a range shuffle, we
// need to run a repartition first.
val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I am just curious, why is sort('id.desc) not causing a shuffle? Shouldn't it be ordered by 'id.asc without the sort?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test requires a range shuffle. Previously range has unknown output partitioning/ordering, so there is a range shuffle inserted before sort.

For now range has an ordered output, so planner doesn't insert the shuffle we need here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also confused here, the range output ordering is 'id.asc, which doesn't match 'id.desc how can we avoid shuffle?

Copy link
Member Author

@viirya viirya May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because range reports it is just one partition now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then can we change the code to spark.range(0, n, 1, 10)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION to change sample size per partition and check the chi-sq value. It samples just 1 point so the chi-sq value is expected to be high.

If we change it from 1 to 10 partition, the chi-sq value will changed too. Should we do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, isn't spark.range(0, n, 1, 10) almost same as spark.range(0, n, 1, 1).repartition(10)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point.

This is query plan and partition size for spark.range(0, n, 1, 1).repartition(10).sort('id.desc), when we set SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION to 1:

== Physical Plan ==
*(2) Sort [id#15L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#15L DESC NULLS LAST, 4)
   +- Exchange RoundRobinPartitioning(10)
      +- *(1) Range (0, 10000, step=1, splits=1)

1666, 3766, 2003, 2565

spark.range(0, n, 1, 10).sort('id.desc):

== Physical Plan ==
*(2) Sort [id#13L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#13L DESC NULLS LAST, 4)
   +- *(1) Range (0, 10000, step=1, splits=10)

(2835, 2469, 2362, 2334)

Because repartition shuffles data with RoundRobinPartitioning, I guess that it makes the worse sampling for range exchange. Without repartition, Range's output is already range partitioning, so it can get sampling leading better range boundaries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, so the 100 and 300 in this test are coupled with the physical execution. I feel the right way to test this is, instead of hardcoding 100 and 300, we should have a and b, and check if b > 3 * a or something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By spark.range(0, n, 1, 10).sort('id.desc), there is no 3 times liner relation between a and b. As shown above, this is also evenly distribution, the chi-sq value is also under 100.

Here we need a redistribution on data to make sampling difficult. Previously, a repartition is added automatically before sort. Now range has correct output partition info, so the repattition must be added manually.

.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()

// Compute histogram for the number of records per partition post sort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.{execution, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range, Repartition, Sort}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
Expand Down Expand Up @@ -621,6 +621,31 @@ class PlannerSuite extends SharedSQLContext {
requiredOrdering = Seq(orderingA, orderingB),
shouldHaveSort = true)
}

test("SPARK-24242: RangeExec should have correct output ordering and partitioning") {
val df = spark.range(10)
val rangeExec = df.queryExecution.executedPlan.collect {
case r: RangeExec => r
}
val range = df.queryExecution.optimizedPlan.collect {
case r: Range => r
}
assert(rangeExec.head.outputOrdering == range.head.outputOrdering)
assert(rangeExec.head.outputPartitioning ==
RangePartitioning(rangeExec.head.outputOrdering, df.rdd.getNumPartitions))

val rangeInOnePartition = spark.range(1, 10, 1, 1)
val rangeExecInOnePartition = rangeInOnePartition.queryExecution.executedPlan.collect {
case r: RangeExec => r
}
assert(rangeExecInOnePartition.head.outputPartitioning == SinglePartition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add a test case for the 0 partition case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.


val rangeInZeroPartition = spark.range(-10, -9, -20, 1)
val rangeExecInZeroPartition = rangeInZeroPartition.queryExecution.executedPlan.collect {
case r: RangeExec => r
}
assert(rangeExecInZeroPartition.head.outputPartitioning == UnknownPartitioning(0))
}
}

// Used for unit-testing EnsureRequirements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
val plan = df.queryExecution.executedPlan
assert(plan.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
p.asInstanceOf[WholeStageCodegenExec].child.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can we change the groupBy instead of the test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

case h: HashAggregateExec => h
}.nonEmpty).isDefined)
assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext {

test("debugCodegen") {
val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
Copy link
Contributor

@cloud-fan cloud-fan May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change to groupBy('id * 2)? We should try our best to keep what to test, and keep the shuffle in this query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

assert(res.contains("Subtree 1 / 2"))
assert(res.contains("Subtree 2 / 2"))
assert(res.contains("Subtree 1 / 1"))
assert(res.contains("Object[]"))
}

test("debugCodegenStringSeq") {
val res = codegenStringSeq(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
assert(res.length == 2)
assert(res.length == 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

assert(res.forall{ case (subtree, code) =>
subtree.contains("Range") && code.contains("Object[]")})
}
Expand Down