Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add the new test to test DPP side broadcast query stage is created fi…
…rstly
  • Loading branch information
JkSelf committed Apr 16, 2021
commit c7b3735e12e04b68abd33adbace0653a09a7231a
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.spark.util.ThreadUtils
*
* @param index the index of the join key in the list of keys from the build side
* @param buildKeys the join keys from the build side of the join used
* @param child the BroadcastExchange from the build side of the join
* @param child the BroadcastExchange or the AdaptiveSparkPlan with BroadcastQueryStageExec
* from the build side of the join
*/
case class SubqueryBroadcastExec(
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,15 @@ abstract class DynamicPartitionPruningSuiteBase
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case a: AdaptiveSparkPlanExec =>
val hasReuse = collect(a) {
case r: ReusedExchangeExec => r
}.nonEmpty
val broadcastQueryStage = collectFirst(a) {
case b: BroadcastQueryStageExec => b
}
val broadcastPlan = broadcastQueryStage.get.broadcast
val hasReuse = find(plan) {
case ReusedExchangeExec(_, e) => e eq broadcastPlan
case b: BroadcastExchangeLike => b eq broadcastPlan
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case _ =>
fail(s"Invalid child node found in\n$s")
Expand Down Expand Up @@ -1468,6 +1474,37 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}

test("SPARK-34637: test DPP side broadcast query stage is created firstly") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SPARK-34637: DPP ... remove the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH view1 as (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

view1 -> v?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

| SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 group by f.store_id
| )
|
| SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id
""".stripMargin)

// A possible resulting query plan:
// BroadcastHashJoin
// +- HashAggregate
// +- ShuffleQueryStage
// +- Exchange
// +- HashAggregate
// +- Filter
// +- FileScan
// +- SubqueryBroadcast
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subquery has different symbols in the tree string format. please try to explain some plans locally and update this comment.

// +- AdaptiveSparkPlan
// +- BroadcastQueryStage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no other place to reuse this broadcast, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broadcast only be reused in the build side.

Copy link
Contributor

@tgravescs tgravescs Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is broadcast before the FileScan? what is being broadcast

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broadcast is in the DPP subquery of the FileScan. It will broadcast the results of the build side and then prune the dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make it more clear that this is the subquery in the file scan node, not the child of it?

// +- BroadcastExchange
//
// +- BroadcastQueryStage
// +- ReusedExchange

checkPartitionPruningPredicate(df, false, true)
checkAnswer(df, Row(15, 15) :: Nil)
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
Expand Down