Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
rename QueryPlan.collectInPlanAndSubqueries to collectWithSubqueries
  • Loading branch information
cloud-fan committed Apr 1, 2020
commit b3e3484c9a8acf9db5c577db78afbefc1ca645be
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
}

/**
* Returns a sequence containing the result of applying a partial function to all elements in this
* A variant of `collect`. This method not only apply the given function to all elements in this
* plan, also considering all the plans in its (nested) subqueries
*/
def collectInPlanAndSubqueries[B](f: PartialFunction[PlanType, B]): Seq[B] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

This internal API was added in Spark 3.0 by #25433

def collectWithSubqueries[B](f: PartialFunction[PlanType, B]): Seq[B] =
(this +: subqueriesAll).flatMap(_.collect(f))

override def innerChildren: Seq[QueryPlan[_]] = subqueries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class QueryPlanSuite extends SparkFunSuite {

val countRelationsInPlan = plan.collect({ case _: UnresolvedRelation => 1 }).sum
val countRelationsInPlanAndSubqueries =
plan.collectInPlanAndSubqueries({ case _: UnresolvedRelation => 1 }).sum
plan.collectWithSubqueries({ case _: UnresolvedRelation => 1 }).sum

assert(countRelationsInPlan == 2)
assert(countRelationsInPlanAndSubqueries == 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object CollectMetricsExec {
* Recursively collect all collected metrics from a query tree.
*/
def collect(plan: SparkPlan): Map[String, Row] = {
val metrics = plan.collectInPlanAndSubqueries {
val metrics = plan.collectWithSubqueries {
case collector: CollectMetricsExec => collector.name -> collector.collectedMetrics
}
metrics.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ abstract class DynamicPartitionPruningSuiteBase

val plan = df.queryExecution.executedPlan
val countSubqueryBroadcasts =
plan.collectInPlanAndSubqueries({ case _: SubqueryBroadcastExec => 1 }).sum
plan.collectWithSubqueries({ case _: SubqueryBroadcastExec => 1 }).sum

assert(countSubqueryBroadcasts == 2)
}
Expand Down