Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82978d7
Set barrier to prevent re-analysis of analyzed plan.
viirya Apr 26, 2017
24905e3
Use a logical node to set analysis barrier.
viirya Apr 27, 2017
e15b001
Add test for analysis barrier.
viirya Apr 30, 2017
a076d83
Let AnalysisBarrier as LeafNode.
viirya May 3, 2017
b29ded3
Remove resolveOperators path.
viirya May 5, 2017
8c8fe1e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 5, 2017
a855182
Solving merging issue.
viirya May 5, 2017
4ff9610
Do not change exposed logicalPlan.
viirya May 5, 2017
d0a94f4
Fix test.
viirya May 6, 2017
02e11f9
Address comments.
viirya May 9, 2017
17f1a02
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 9, 2017
4629959
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 10, 2017
c313e35
Correctly set isStreaming for barrier.
viirya May 10, 2017
7e9dfac
Address comments.
viirya May 11, 2017
fba3690
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
f63ea0b
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
b9d03cd
Fix test.
viirya May 17, 2017
6a7204c
Address comments.
viirya May 19, 2017
3437ae0
Wrap AnalysisBarrier on df.logicalPlan.
viirya May 22, 2017
555fa8e
Fix test.
viirya May 23, 2017
505aba6
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 23, 2017
f3e4208
fix test.
viirya May 24, 2017
c0bee01
Avoid overriding find in AnalysisBarrier.
viirya May 24, 2017
1c1cc9d
Fix test.
viirya May 24, 2017
eb0598e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 24, 2017
cba784b
fix test.
viirya May 24, 2017
b478e55
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 25, 2017
8314cc3
Create a new field in Dataset for the plan with barrier.
viirya May 25, 2017
6add9ec
Address comments.
viirya May 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/master' into SPARK-20392
  • Loading branch information
viirya committed May 5, 2017
commit 8c8fe1e20609a373f164e8b2252a970e4e468eb3
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class Analyzer(
* Analyze cte definitions and substitute child plan with analyzed cte definitions.
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case With(child, relations) =>
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
case (resolved, (name, relation)) =>
Expand Down Expand Up @@ -202,7 +202,7 @@ class Analyzer(
* Substitute child plan with WindowSpecDefinitions.
*/
object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
Expand Down Expand Up @@ -244,7 +244,7 @@ class Analyzer(
private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
exprs.exists(_.find(_.isInstanceOf[UnresolvedAlias]).isDefined)

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case Aggregate(groups, aggs, child) if child.resolved && hasUnresolvedAlias(aggs) =>
Aggregate(groups, assignAliases(aggs), child)

Expand Down Expand Up @@ -616,7 +616,7 @@ class Analyzer(
case _ => plan
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case v: View =>
Expand Down Expand Up @@ -790,7 +790,7 @@ class Analyzer(
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p: LogicalPlan if !p.childrenResolved => p

// If the projection list contains Stars, expand it.
Expand Down Expand Up @@ -964,7 +964,7 @@ class Analyzer(
* have no effect on the results.
*/
object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p if !p.childrenResolved => p
// Replace the index with the related attribute for ORDER BY,
// which is a 1-base position of the projection list.
Expand Down Expand Up @@ -1030,7 +1030,7 @@ class Analyzer(
* The HAVING clause could also used a grouping columns that is not presented in the SELECT.
*/
object ResolveMissingReferences extends Rule[LogicalPlan] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Filter case of this ResolveMissingReferences rule doesn't look making much sense to me. I've opened #17874 for it. Here I'd make necessary change to pass tests.

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa

Expand Down Expand Up @@ -1154,7 +1154,7 @@ class Analyzer(
* Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
*/
object ResolveFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case q: LogicalPlan =>
q transformExpressions {
case u if !u.childrenResolved => u // Skip until children are resolved.
Expand Down Expand Up @@ -1493,7 +1493,7 @@ class Analyzer(
/**
* Resolve and rewrite all subqueries in an operator tree..
*/
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
// In case of HAVING (a filter after an aggregate) we use both the aggregate and
// its child for resolution.
case f @ Filter(_, a: Aggregate) if f.childrenResolved =>
Expand All @@ -1508,7 +1508,7 @@ class Analyzer(
* Turns projections that contain aggregate expressions into aggregations.
*/
object GlobalAggregates extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case Project(projectList, child) if containsAggregates(projectList) =>
Aggregate(Nil, projectList, child)
}
Expand All @@ -1534,7 +1534,7 @@ class Analyzer(
* underlying aggregate operator and then projected away after the original operator.
*/
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case filter @ Filter(havingCondition,
aggregate @ Aggregate(grouping, originalAggExprs, child))
if aggregate.resolved =>
Expand Down Expand Up @@ -1706,7 +1706,7 @@ class Analyzer(
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
val nestedGenerator = projectList.find(hasNestedGenerator).get
throw new AnalysisException("Generators are not supported when it's nested in " +
Expand Down Expand Up @@ -1764,7 +1764,7 @@ class Analyzer(
* that wrap the [[Generator]].
*/
object ResolveGenerate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case g: Generate if !g.child.resolved || !g.generator.resolved => g
case g: Generate if !g.resolved =>
g.copy(generatorOutput = makeGeneratorOutput(g.generator, g.generatorOutput.map(_.name)))
Expand Down Expand Up @@ -2081,7 +2081,7 @@ class Analyzer(
* put them into an inner Project and finally project them away at the outer Project.
*/
object PullOutNondeterministic extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p if !p.resolved => p // Skip unresolved nodes.
case p: Project => p
case f: Filter => f
Expand Down Expand Up @@ -2126,7 +2126,7 @@ class Analyzer(
* and we should return null if the input is null.
*/
object HandleNullInputsForUDF extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p if !p.resolved => p // Skip unresolved nodes.

case p => p transformExpressionsUp {
Expand Down Expand Up @@ -2191,7 +2191,7 @@ class Analyzer(
* Then apply a Project on a normal Join to eliminate natural or using join.
*/
object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
if left.resolved && right.resolved && j.duplicateResolved =>
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
Expand Down Expand Up @@ -2256,7 +2256,7 @@ class Analyzer(
* to the given input attributes.
*/
object ResolveDeserializer extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p if !p.childrenResolved => p
case p if p.resolved => p

Expand Down Expand Up @@ -2342,7 +2342,7 @@ class Analyzer(
* constructed is an inner class.
*/
object ResolveNewInstance extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p if !p.childrenResolved => p
case p if p.resolved => p

Expand Down Expand Up @@ -2376,7 +2376,7 @@ class Analyzer(
"type of the field in the target object")
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p if !p.childrenResolved => p
case p if p.resolved => p

Expand Down Expand Up @@ -2430,7 +2430,7 @@ object CleanupAliases extends Rule[LogicalPlan] {
case other => trimAliases(other)
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case Project(projectList, child) =>
val cleanedProjectList =
projectList.map(trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])
Expand Down Expand Up @@ -2505,7 +2505,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
* @return the logical plan that will generate the time windows using the Expand operator, with
* the Filter operator for correctness and Project for usability.
*/
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val windowExpressions =
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.