Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82978d7
Set barrier to prevent re-analysis of analyzed plan.
viirya Apr 26, 2017
24905e3
Use a logical node to set analysis barrier.
viirya Apr 27, 2017
e15b001
Add test for analysis barrier.
viirya Apr 30, 2017
a076d83
Let AnalysisBarrier as LeafNode.
viirya May 3, 2017
b29ded3
Remove resolveOperators path.
viirya May 5, 2017
8c8fe1e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 5, 2017
a855182
Solving merging issue.
viirya May 5, 2017
4ff9610
Do not change exposed logicalPlan.
viirya May 5, 2017
d0a94f4
Fix test.
viirya May 6, 2017
02e11f9
Address comments.
viirya May 9, 2017
17f1a02
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 9, 2017
4629959
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 10, 2017
c313e35
Correctly set isStreaming for barrier.
viirya May 10, 2017
7e9dfac
Address comments.
viirya May 11, 2017
fba3690
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
f63ea0b
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
b9d03cd
Fix test.
viirya May 17, 2017
6a7204c
Address comments.
viirya May 19, 2017
3437ae0
Wrap AnalysisBarrier on df.logicalPlan.
viirya May 22, 2017
555fa8e
Fix test.
viirya May 23, 2017
505aba6
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 23, 2017
f3e4208
fix test.
viirya May 24, 2017
c0bee01
Avoid overriding find in AnalysisBarrier.
viirya May 24, 2017
1c1cc9d
Fix test.
viirya May 24, 2017
eb0598e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 24, 2017
cba784b
fix test.
viirya May 24, 2017
b478e55
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 25, 2017
8314cc3
Create a new field in Dataset for the plan with barrier.
viirya May 25, 2017
6add9ec
Address comments.
viirya May 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address comments.
  • Loading branch information
viirya committed May 19, 2017
commit 6a7204c0fc00dbe2e43d6d65e722b3b13c3b35d0
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class Analyzer(
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases,
CleanupBarriers)
EliminateBarriers)
)

/**
Expand Down Expand Up @@ -673,7 +673,7 @@ class Analyzer(
*/
private def dedupRight (left: LogicalPlan, oriRight: LogicalPlan): LogicalPlan = {
// Remove analysis barrier if any.
val right = CleanupBarriers(oriRight)
val right = EliminateBarriers(oriRight)
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
s"between $left and $right")
Expand Down Expand Up @@ -1050,7 +1050,7 @@ class Analyzer(
case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, orgChild) if !s.resolved && orgChild.resolved =>
val child = CleanupBarriers(orgChild)
val child = EliminateBarriers(orgChild)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the AnalysisBarrier back at the end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is added back below at line 1102 in addMissingAttr. The logical plans added with missing attributes seems need to be resolved again under some cases. One test gets failed before due to this issue.

try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
Expand All @@ -1072,7 +1072,7 @@ class Analyzer(
}

case f @ Filter(cond, orgChild) if !f.resolved && orgChild.resolved =>
val child = CleanupBarriers(orgChild)
val child = EliminateBarriers(orgChild)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, do we need to do this? can resolveExpressionRecursively handle AnalysisBarrier?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do need it. The logical plan added with missing attributes is needed to be traverse for some cases. At lease one test gets failed previously for that. So we can't keep the barrier at original position.

try {
val newCond = resolveExpressionRecursively(cond, child)
val requiredAttrs = newCond.references.filter(_.resolved)
Expand Down Expand Up @@ -1553,7 +1553,7 @@ class Analyzer(
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case filter @ Filter(havingCondition, AnalysisBarrier(aggregate: Aggregate)) =>
apply(Filter(havingCondition, aggregate)).mapChildren(AnalysisBarrier(_))
apply(Filter(havingCondition, aggregate)).mapChildren(AnalysisBarrier)
case filter @ Filter(havingCondition,
aggregate @ Aggregate(grouping, originalAggExprs, child))
if aggregate.resolved =>
Expand Down Expand Up @@ -1614,7 +1614,7 @@ class Analyzer(
}

case sort @ Sort(sortOrder, global, AnalysisBarrier(aggregate: Aggregate)) =>
apply(Sort(sortOrder, global, aggregate)).mapChildren(AnalysisBarrier(_))
apply(Sort(sortOrder, global, aggregate)).mapChildren(AnalysisBarrier)
case sort @ Sort(sortOrder, global, aggregate: Aggregate) if aggregate.resolved =>

// Try resolving the ordering as though it is in the aggregate clause.
Expand Down Expand Up @@ -2481,7 +2481,7 @@ object CleanupAliases extends Rule[LogicalPlan] {
}

/** Remove the barrier nodes of analysis */
object CleanupBarriers extends Rule[LogicalPlan] {
object EliminateBarriers extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case AnalysisBarrier(child) => child
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.types.IntegerType

/**
* This suite is used to test [[LogicalPlan]]'s `transformUp` plus analysis barrier and make sure
* it can correctly skips sub-trees that have already been marked as analyzed.
* it can correctly skip sub-trees that have already been marked as analyzed.
*/
class LogicalPlanSuite extends SparkFunSuite {
private var invocationCount = 0
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class Dataset[T] private[sql](
}
}

// Wrap analyzed logical plan with an analysis barrier so we won't traverse/resolve it again.
// Wraps analyzed logical plans with an analysis barrier so we won't traverse/resolve it again.
@transient private val planWithBarrier: LogicalPlan = AnalysisBarrier(logicalPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just add AnalysisBarrier when constructing logicalPlan instead of adding a new field planWithBarrier?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logicalPlan is necessary for caching. Otherwise we need to strip the barrier when wanting to cache it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the above idea https://github.com/apache/spark/pull/17770/files#r115913503 by @cloud-fan after traversing all the usage of DataSet.logicalPlan. If caching is the only issue we have, we can override canonicalized to its children.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheManager uses Dataset.logicalPlan as key to look up identical plans already cached. If we always wrap logicalPlan with a barrier, we need to strip it when looking up caches.


/**
Expand Down Expand Up @@ -1744,7 +1744,7 @@ class Dataset[T] private[sql](
def union(other: Dataset[T]): Dataset[T] = withSetOperator {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan)).mapChildren(AnalysisBarrier(_))
CombineUnions(Union(logicalPlan, other.logicalPlan)).mapChildren(AnalysisBarrier)
}

/**
Expand Down