Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82978d7
Set barrier to prevent re-analysis of analyzed plan.
viirya Apr 26, 2017
24905e3
Use a logical node to set analysis barrier.
viirya Apr 27, 2017
e15b001
Add test for analysis barrier.
viirya Apr 30, 2017
a076d83
Let AnalysisBarrier as LeafNode.
viirya May 3, 2017
b29ded3
Remove resolveOperators path.
viirya May 5, 2017
8c8fe1e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 5, 2017
a855182
Solving merging issue.
viirya May 5, 2017
4ff9610
Do not change exposed logicalPlan.
viirya May 5, 2017
d0a94f4
Fix test.
viirya May 6, 2017
02e11f9
Address comments.
viirya May 9, 2017
17f1a02
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 9, 2017
4629959
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 10, 2017
c313e35
Correctly set isStreaming for barrier.
viirya May 10, 2017
7e9dfac
Address comments.
viirya May 11, 2017
fba3690
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
f63ea0b
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
b9d03cd
Fix test.
viirya May 17, 2017
6a7204c
Address comments.
viirya May 19, 2017
3437ae0
Wrap AnalysisBarrier on df.logicalPlan.
viirya May 22, 2017
555fa8e
Fix test.
viirya May 23, 2017
505aba6
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 23, 2017
f3e4208
fix test.
viirya May 24, 2017
c0bee01
Avoid overriding find in AnalysisBarrier.
viirya May 24, 2017
1c1cc9d
Fix test.
viirya May 24, 2017
eb0598e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 24, 2017
cba784b
fix test.
viirya May 24, 2017
b478e55
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 25, 2017
8314cc3
Create a new field in Dataset for the plan with barrier.
viirya May 25, 2017
6add9ec
Address comments.
viirya May 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Wrap AnalysisBarrier on df.logicalPlan.
  • Loading branch information
viirya committed May 22, 2017
commit 3437ae01a2db9575f49f1ed56e3f0d8990b32243
Original file line number Diff line number Diff line change
Expand Up @@ -919,4 +919,11 @@ case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
override def output: Seq[Attribute] = child.output
override def analyzed: Boolean = true
override def isStreaming: Boolean = child.isStreaming
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding override lazy val canonicalized: LogicalPlan = child.canonicalized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine to use the default canonicalized.

override lazy val canonicalized: LogicalPlan = child.canonicalized

override def find(f: LogicalPlan => Boolean): Option[LogicalPlan] = if (f(this)) {
Copy link
Member Author

@viirya viirya May 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @gatorsmile One result from wrapping AnalysisBarrier on df.logicalPlan is, we have to override find so CacheManager can correctly find out the cached plan referring a specified plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the barrier is a LeafNode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use plan.canonicalized as the cache key in CacheManager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it should work and avoid overriding find.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, seems it is also more reasonable for a key in CacheManager.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to use plan.canonicalized as cache key. However, it breaks looking up of cached plans. To fix it might change current looking up, I'd tend to just override find for AnalysisBarrier

Copy link
Contributor

@cloud-fan cloud-fan May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can't do this, I think we should handle AnalysisBarrier in CacheManager specially. Overwriting find is too hacky and too far away from CacheManager

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Agreed.

Some(this)
} else {
child.find(f)
}
}
95 changes: 47 additions & 48 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,18 @@ class Dataset[T] private[sql](
@transient private[sql] val logicalPlan: LogicalPlan = {
// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
val analyzed = queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
// Wraps analyzed logical plans with an analysis barrier so we won't traverse/resolve it again.
AnalysisBarrier(analyzed)
}

// Wraps analyzed logical plans with an analysis barrier so we won't traverse/resolve it again.
@transient private val planWithBarrier: LogicalPlan = AnalysisBarrier(logicalPlan)

/**
* Currently [[ExpressionEncoder]] is the only implementation of [[Encoder]], here we turn the
* passed in encoder to [[ExpressionEncoder]] explicitly, and mark it implicit so that we can use
Expand Down Expand Up @@ -416,7 +415,7 @@ class Dataset[T] private[sql](
*/
@Experimental
@InterfaceStability.Evolving
def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, planWithBarrier)
def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, logicalPlan)

/**
* Converts this strongly typed collection of data to generic `DataFrame` with columns renamed.
Expand Down Expand Up @@ -619,7 +618,7 @@ class Dataset[T] private[sql](
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, planWithBarrier))
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))
}

/**
Expand Down Expand Up @@ -793,7 +792,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def join(right: Dataset[_]): DataFrame = withPlan {
Join(planWithBarrier, right.planWithBarrier, joinType = Inner, None)
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}

/**
Expand Down Expand Up @@ -871,7 +870,7 @@ class Dataset[T] private[sql](
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sparkSession.sessionState.executePlan(
Join(planWithBarrier, right.planWithBarrier, joinType = JoinType(joinType), None))
Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
.analyzed.asInstanceOf[Join]

withPlan {
Expand Down Expand Up @@ -932,7 +931,7 @@ class Dataset[T] private[sql](
// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
val plan = withPlan(
Join(planWithBarrier, right.planWithBarrier, JoinType(joinType), Some(joinExprs.expr)))
Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
.queryExecution.analyzed.asInstanceOf[Join]

// If auto self join alias is disabled, return the plan.
Expand All @@ -941,8 +940,8 @@ class Dataset[T] private[sql](
}

// If left/right have no output set intersection, return the plan.
val lanalyzed = withPlan(this.planWithBarrier).queryExecution.analyzed
val ranalyzed = withPlan(right.planWithBarrier).queryExecution.analyzed
val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
return withPlan(plan)
}
Expand Down Expand Up @@ -974,7 +973,7 @@ class Dataset[T] private[sql](
* @since 2.1.0
*/
def crossJoin(right: Dataset[_]): DataFrame = withPlan {
Join(planWithBarrier, right.planWithBarrier, joinType = Cross, None)
Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
}

/**
Expand Down Expand Up @@ -1006,8 +1005,8 @@ class Dataset[T] private[sql](
// etc.
val joined = sparkSession.sessionState.executePlan(
Join(
this.planWithBarrier,
other.planWithBarrier,
this.logicalPlan,
other.logicalPlan,
JoinType(joinType),
Some(condition.expr))).analyzed.asInstanceOf[Join]

Expand Down Expand Up @@ -1177,7 +1176,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
Hint(name, parameters, planWithBarrier)
Hint(name, parameters, logicalPlan)
}

/**
Expand All @@ -1203,7 +1202,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def as(alias: String): Dataset[T] = withTypedPlan {
SubqueryAlias(alias, planWithBarrier)
SubqueryAlias(alias, logicalPlan)
}

/**
Expand Down Expand Up @@ -1241,7 +1240,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame = withPlan {
Project(cols.map(_.named), planWithBarrier)
Project(cols.map(_.named), logicalPlan)
}

/**
Expand Down Expand Up @@ -1296,8 +1295,8 @@ class Dataset[T] private[sql](
@InterfaceStability.Evolving
def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
implicit val encoder = c1.encoder
val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil,
planWithBarrier)
val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil,
logicalPlan)

if (encoder.flat) {
new Dataset[U1](sparkSession, project, encoder)
Expand All @@ -1315,8 +1314,8 @@ class Dataset[T] private[sql](
protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
val encoders = columns.map(_.encoder)
val namedColumns =
columns.map(_.withInputType(exprEnc, planWithBarrier.output).named)
val execution = new QueryExecution(sparkSession, Project(namedColumns, planWithBarrier))
columns.map(_.withInputType(exprEnc, logicalPlan.output).named)
val execution = new QueryExecution(sparkSession, Project(namedColumns, logicalPlan))
new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
}

Expand Down Expand Up @@ -1392,7 +1391,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def filter(condition: Column): Dataset[T] = withTypedPlan {
Filter(condition.expr, planWithBarrier)
Filter(condition.expr, logicalPlan)
}

/**
Expand Down Expand Up @@ -1569,7 +1568,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
val inputPlan = planWithBarrier
val inputPlan = logicalPlan
val withGroupingKey = AppendColumns(func, inputPlan)
val executed = sparkSession.sessionState.executePlan(withGroupingKey)

Expand Down Expand Up @@ -1715,7 +1714,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def limit(n: Int): Dataset[T] = withTypedPlan {
Limit(Literal(n), planWithBarrier)
Limit(Literal(n), logicalPlan)
}

/**
Expand Down Expand Up @@ -1744,7 +1743,8 @@ class Dataset[T] private[sql](
def union(other: Dataset[T]): Dataset[T] = withSetOperator {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan)).mapChildren(AnalysisBarrier)
CombineUnions(Union(EliminateBarriers(logicalPlan), EliminateBarriers(other.logicalPlan)))
.mapChildren(AnalysisBarrier)
Copy link
Member Author

@viirya viirya May 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit ugly. But the barrier will prevent CombineUnions from working normally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively:

CombineUnions(Union(logicalPlan.asInstanceOf[AnalysisBarrier].child, other.logicalPlan.asInstanceOf[AnalysisBarrier].child))

}

/**
Expand All @@ -1758,7 +1758,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
Intersect(planWithBarrier, other.planWithBarrier)
Intersect(logicalPlan, other.logicalPlan)
}

/**
Expand All @@ -1772,7 +1772,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def except(other: Dataset[T]): Dataset[T] = withSetOperator {
Except(planWithBarrier, other.planWithBarrier)
Except(logicalPlan, other.logicalPlan)
}

/**
Expand All @@ -1793,7 +1793,7 @@ class Dataset[T] private[sql](
s"Fraction must be nonnegative, but got ${fraction}")

withTypedPlan {
Sample(0.0, fraction, withReplacement, seed, planWithBarrier)()
Sample(0.0, fraction, withReplacement, seed, logicalPlan)()
}
}

Expand Down Expand Up @@ -1835,15 +1835,15 @@ class Dataset[T] private[sql](
// overlapping splits. To prevent this, we explicitly sort each input partition to make the
// ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
// from the sort order.
val sortOrder = planWithBarrier.output
val sortOrder = logicalPlan.output
.filter(attr => RowOrdering.isOrderable(attr.dataType))
.map(SortOrder(_, Ascending))
val plan = if (sortOrder.nonEmpty) {
Sort(sortOrder, global = false, planWithBarrier)
Sort(sortOrder, global = false, logicalPlan)
} else {
// SPARK-12662: If sort order is empty, we materialize the dataset to guarantee determinism
cache()
planWithBarrier
logicalPlan
}
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
Expand Down Expand Up @@ -1927,7 +1927,7 @@ class Dataset[T] private[sql](

withPlan {
Generate(generator, join = true, outer = false,
qualifier = None, generatorOutput = Nil, planWithBarrier)
qualifier = None, generatorOutput = Nil, logicalPlan)
}
}

Expand Down Expand Up @@ -1968,7 +1968,7 @@ class Dataset[T] private[sql](

withPlan {
Generate(generator, join = true, outer = false,
qualifier = None, generatorOutput = Nil, planWithBarrier)
qualifier = None, generatorOutput = Nil, logicalPlan)
}
}

Expand Down Expand Up @@ -2131,7 +2131,7 @@ class Dataset[T] private[sql](
}
cols
}
Deduplicate(groupCols, planWithBarrier, isStreaming)
Deduplicate(groupCols, logicalPlan, isStreaming)
}

/**
Expand Down Expand Up @@ -2280,7 +2280,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def filter(func: T => Boolean): Dataset[T] = {
withTypedPlan(TypedFilter(func, planWithBarrier))
withTypedPlan(TypedFilter(func, logicalPlan))
}

/**
Expand All @@ -2294,7 +2294,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def filter(func: FilterFunction[T]): Dataset[T] = {
withTypedPlan(TypedFilter(func, planWithBarrier))
withTypedPlan(TypedFilter(func, logicalPlan))
}

/**
Expand All @@ -2308,7 +2308,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan {
MapElements[T, U](func, planWithBarrier)
MapElements[T, U](func, logicalPlan)
}

/**
Expand All @@ -2323,7 +2323,7 @@ class Dataset[T] private[sql](
@InterfaceStability.Evolving
def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
implicit val uEnc = encoder
withTypedPlan(MapElements[T, U](func, planWithBarrier))
withTypedPlan(MapElements[T, U](func, logicalPlan))
}

/**
Expand All @@ -2339,7 +2339,7 @@ class Dataset[T] private[sql](
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, planWithBarrier),
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}

Expand Down Expand Up @@ -2370,7 +2370,7 @@ class Dataset[T] private[sql](
val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
Dataset.ofRows(
sparkSession,
MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier))
MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, logicalPlan))
}

/**
Expand Down Expand Up @@ -2525,7 +2525,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = true, planWithBarrier)
Repartition(numPartitions, shuffle = true, logicalPlan)
}

/**
Expand All @@ -2539,7 +2539,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
RepartitionByExpression(partitionExprs.map(_.expr), planWithBarrier, numPartitions)
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions)
}

/**
Expand All @@ -2555,8 +2555,7 @@ class Dataset[T] private[sql](
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
RepartitionByExpression(
partitionExprs.map(_.expr), planWithBarrier,
sparkSession.sessionState.conf.numShufflePartitions)
partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions)
}

/**
Expand All @@ -2577,7 +2576,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, planWithBarrier)
Repartition(numPartitions, shuffle = false, logicalPlan)
}

/**
Expand Down Expand Up @@ -2666,7 +2665,7 @@ class Dataset[T] private[sql](
*/
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
val deserialized = CatalystSerde.deserialize[T](planWithBarrier)
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
Expand Down Expand Up @@ -2765,7 +2764,7 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
child = planWithBarrier,
child = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
Expand Down Expand Up @@ -2936,7 +2935,7 @@ class Dataset[T] private[sql](
}
}
withTypedPlan {
Sort(sortOrder, global = global, planWithBarrier)
Sort(sortOrder, global = global, logicalPlan)
}
}

Expand Down