Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Modify comment and test cases.
  • Loading branch information
viirya committed Dec 6, 2017
commit b7747c4a9b2b6869c38685bcea8f41b7a5294101
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,19 @@ case class Deduplicate(
override def output: Seq[Attribute] = child.output
}

/** A logical plan for setting a barrier of analysis */
/**
* A logical plan for setting a barrier of analysis.
*
* The SQL Analyzer goes through a whole query plan even most part of it is analyzed. This
* increases the time spent on query analysis for long pipelines in ML, especially.
*
* This logical plan wraps an analyzed logical plan to prevent it from analysis again. The barrier
* is applied to the analyzed logical plan in Dataset. It won't change the output of wrapped
* logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset
* will be put on the barrier, so only the new nodes created will be analyzed.
*
* This analysis barrier will be removed at the end of analysis stage.
*/
case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the PR descriptions to the comment of this class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

override def output: Seq[Attribute] = child.output
override def isStreaming: Boolean = child.isStreaming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.IntegerType

/**
* This suite is used to test [[LogicalPlan]]'s `transformUp` plus analysis barrier and make sure
* it can correctly skip sub-trees that have already been marked as analyzed.
* This suite is used to test [[LogicalPlan]]'s `transformUp/transformDown` plus analysis barrier
* and make sure it can correctly skip sub-trees that have already been analyzed.
*/
class LogicalPlanSuite extends SparkFunSuite {
private var invocationCount = 0
Expand All @@ -42,6 +42,10 @@ class LogicalPlanSuite extends SparkFunSuite {
plan transformUp function

assert(invocationCount === 1)

invocationCount = 0
plan transformDown function
assert(invocationCount === 1)
}

test("transformUp runs on operators recursively") {
Expand All @@ -50,6 +54,10 @@ class LogicalPlanSuite extends SparkFunSuite {
plan transformUp function

assert(invocationCount === 2)

invocationCount = 0
plan transformDown function
assert(invocationCount === 2)
}

test("transformUp skips all ready resolved plans wrapped in analysis barrier") {
Expand All @@ -58,6 +66,10 @@ class LogicalPlanSuite extends SparkFunSuite {
plan transformUp function

assert(invocationCount === 0)

invocationCount = 0
plan transformDown function
assert(invocationCount === 0)
}

test("transformUp skips partially resolved plans wrapped in analysis barrier") {
Expand All @@ -67,6 +79,10 @@ class LogicalPlanSuite extends SparkFunSuite {
plan2 transformUp function

assert(invocationCount === 1)

invocationCount = 0
plan2 transformDown function
assert(invocationCount === 1)
}

test("isStreaming") {
Expand Down