Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
resolve the comments
  • Loading branch information
JkSelf committed May 11, 2021
commit 4ccd4b8b1d2604f05a4564303d0b8aa65ea5dfa3
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[S

if (canReuseExchange) {
exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get)
val newAdaptivePlan = AdaptiveSparkPlanExec(
exchange, adaptivePlan.context, adaptivePlan.preprocessingRules, true)
val newAdaptivePlan = adaptivePlan.copy(inputPlan = exchange)

val broadcastValues = SubqueryBroadcastExec(
name, index, buildKeys, newAdaptivePlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1475,14 +1475,14 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

test("SPARK-34637: test DPP side broadcast query stage is created firstly") {
test("SPARK-34637: DPP side broadcast query stage is created firstly") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH view1 as (
""" WITH v as (
| SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 group by f.store_id
| )
|
| SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id
| SELECT * FROM v v1 join v v2 WHERE v1.store_id = v2.store_id
""".stripMargin)

// A possible resulting query plan:
Expand All @@ -1492,9 +1492,10 @@ abstract class DynamicPartitionPruningSuiteBase
// +- Exchange
// +- HashAggregate
// +- Filter
// +- FileScan
// Dynamicpruning Subquery
// +- AdaptiveSparkPlan
// +- FileScan [PartitionFilters: [isnotnull(store_id#3367),
// dynamicpruningexpression(store_id#3367 IN dynamicpruning#3385)]]
// +- SubqueryBroadcast dynamicpruning#3385
// +- AdaptiveSparkPlan
// +- BroadcastQueryStage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no other place to reuse this broadcast, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broadcast only be reused in the build side.

Copy link
Contributor

@tgravescs tgravescs Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is broadcast before the FileScan? what is being broadcast

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broadcast is in the DPP subquery of the FileScan. It will broadcast the results of the build side and then prune the dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make it more clear that this is the subquery in the file scan node, not the child of it?

// +- BroadcastExchange
//
Expand Down