-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20392][SQL] Set barrier to prevent re-entering a tree #17770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
82978d7
24905e3
e15b001
a076d83
b29ded3
8c8fe1e
a855182
4ff9610
d0a94f4
02e11f9
17f1a02
4629959
c313e35
7e9dfac
fba3690
f63ea0b
b9d03cd
6a7204c
3437ae0
555fa8e
505aba6
f3e4208
c0bee01
1c1cc9d
eb0598e
cba784b
b478e55
8314cc3
6add9ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1032,6 +1032,7 @@ class Analyzer( | |
| object ResolveMissingReferences extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { | ||
| // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions | ||
| case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa | ||
| case sa @ Sort(_, _, child: Aggregate) => sa | ||
|
|
||
| case s @ Sort(order, _, child) if child.resolved => | ||
|
||
|
|
@@ -1107,6 +1108,8 @@ class Analyzer( | |
| throw new AnalysisException(s"Can't add $missingAttrs to $d") | ||
| case u: UnaryNode => | ||
| u.withNewChildren(addMissingAttr(u.child, missingAttrs) :: Nil) | ||
| case AnalysisBarrier(subPlan) => | ||
| AnalysisBarrier(addMissingAttr(subPlan, missingAttrs)) | ||
| case other => | ||
| throw new AnalysisException(s"Can't add $missingAttrs to $other") | ||
| } | ||
|
|
@@ -1125,6 +1128,7 @@ class Analyzer( | |
| plan match { | ||
| case u: UnaryNode if !u.isInstanceOf[SubqueryAlias] => | ||
| resolveExpressionRecursively(resolved, u.child) | ||
| case AnalysisBarrier(subPlan) => resolveExpressionRecursively(resolved, subPlan) | ||
| case other => resolved | ||
| } | ||
| } | ||
|
|
@@ -1535,6 +1539,8 @@ class Analyzer( | |
| */ | ||
| object ResolveAggregateFunctions extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { | ||
| case filter @ Filter(havingCondition, AnalysisBarrier(aggregate: Aggregate)) => | ||
| apply(Filter(havingCondition, aggregate)).mapChildren(AnalysisBarrier(_)) | ||
|
||
| case filter @ Filter(havingCondition, | ||
| aggregate @ Aggregate(grouping, originalAggExprs, child)) | ||
| if aggregate.resolved => | ||
|
|
@@ -1594,6 +1600,8 @@ class Analyzer( | |
| case ae: AnalysisException => filter | ||
| } | ||
|
|
||
| case sort @ Sort(sortOrder, global, AnalysisBarrier(aggregate: Aggregate)) => | ||
| apply(Sort(sortOrder, global, aggregate)).mapChildren(AnalysisBarrier(_)) | ||
|
||
| case sort @ Sort(sortOrder, global, aggregate: Aggregate) if aggregate.resolved => | ||
|
|
||
| // Try resolving the ordering as though it is in the aggregate clause. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -189,7 +189,7 @@ class Dataset[T] private[sql]( | |
| } | ||
|
|
||
| // Wrap analyzed logical plan with an analysis barrier so we won't traverse/resolve it again. | ||
|
||
| @transient private val planBarrier: AnalysisBarrier = AnalysisBarrier(logicalPlan) | ||
| @transient private val planBarrier: LogicalPlan = AnalysisBarrier(logicalPlan) | ||
|
||
|
|
||
| /** | ||
| * Currently [[ExpressionEncoder]] is the only implementation of [[Encoder]], here we turn the | ||
|
|
@@ -1743,7 +1743,7 @@ class Dataset[T] private[sql]( | |
| def union(other: Dataset[T]): Dataset[T] = withSetOperator { | ||
| // This breaks caching, but it's usually ok because it addresses a very specific use case: | ||
| // using union to union many files or partitions. | ||
| CombineUnions(Union(planBarrier, other.planBarrier)) | ||
| CombineUnions(Union(logicalPlan, other.logicalPlan)).mapChildren(AnalysisBarrier(_)) | ||
|
||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -241,7 +241,7 @@ class PlannerSuite extends SharedSQLContext { | |
| test("collapse adjacent repartitions") { | ||
| val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5) | ||
| def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length | ||
| assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3) | ||
| assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 3) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary change?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah i see |
||
| assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2) | ||
| doubleRepartitioned.queryExecution.optimizedPlan match { | ||
| case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) => | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Filtercase of thisResolveMissingReferencesrule doesn't look making much sense to me. I've opened #17874 for it. Here I'd make necessary change to pass tests.