Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove analyzed stuff.
  • Loading branch information
viirya committed Dec 5, 2017
commit 54182bf3ea6db2815146f9e158bbb08609400b93
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,9 @@ class Analyzer(
* Generate a new logical plan for the right child with different expression IDs
* for all conflicting attributes.
*/
private def dedupRight (left: LogicalPlan, oriRight: LogicalPlan): LogicalPlan = {
private def dedupRight (left: LogicalPlan, originalRight: LogicalPlan): LogicalPlan = {
// Remove analysis barrier if any.
val right = EliminateBarriers(oriRight)
val right = EliminateBarriers(originalRight)
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
s"between $left and $right")
Expand Down Expand Up @@ -712,7 +712,7 @@ class Analyzer(
* that this rule cannot handle. When that is the case, there must be another rule
* that resolves these conflicts. Otherwise, the analysis will fail.
*/
oriRight
originalRight
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
Expand Down Expand Up @@ -1081,8 +1081,8 @@ class Analyzer(
case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, oriChild) if !s.resolved && oriChild.resolved =>
val child = EliminateBarriers(oriChild)
case s @ Sort(order, _, originalChild) if !s.resolved && originalChild.resolved =>
val child = EliminateBarriers(originalChild)
try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
Expand All @@ -1103,8 +1103,8 @@ class Analyzer(
case ae: AnalysisException => s
}

case f @ Filter(cond, oriChild) if !f.resolved && oriChild.resolved =>
val child = EliminateBarriers(oriChild)
case f @ Filter(cond, originalChild) if !f.resolved && originalChild.resolved =>
val child = EliminateBarriers(originalChild)
try {
val newCond = resolveExpressionRecursively(cond, child)
val requiredAttrs = newCond.references.filter(_.resolved)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ trait CheckAnalysis extends PredicateHelper {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")

Expand Down Expand Up @@ -353,8 +351,6 @@ trait CheckAnalysis extends PredicateHelper {
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
case _ =>
}

plan.foreach(_.setAnalyzed())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,6 @@ object TypeCoercion {
object WidenSetOperationTypes extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p if p.analyzed => p

case s @ SetOperation(left, right) if s.childrenResolved &&
left.output.length == right.output.length && !s.resolved =>
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,6 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {

private var _analyzed: Boolean = false

/**
* Marks this plan as already analyzed. This should only be called by [[CheckAnalysis]].
*/
private[catalyst] def setAnalyzed(): Unit = { _analyzed = true }

/**
* Returns true if this node and its children have already been gone through analysis and
* verification. Note that this is only an optimization used to avoid analyzing trees that
* have already been analyzed, and can be reset by transformations.
*/
def analyzed: Boolean = _analyzed

/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming == true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ case class Deduplicate(
/** A logical plan for setting a barrier of analysis */
case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the PR descriptions to the comment of this class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

override def output: Seq[Attribute] = child.output
override def analyzed: Boolean = true
override def isStreaming: Boolean = child.isStreaming
override def doCanonicalize(): LogicalPlan = child.canonicalized
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ import org.apache.spark.util.Utils
* (either batch or streaming) or to write out data using an external library.
*
* From an end user's perspective a DataSource description can be created explicitly using
* [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL. Additionally, this class is
* [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL. Additionally, this class is
* used when resolving a description from a metastore to a concrete implementation.
*
* Many of the arguments to this class are optional, though depending on the specific API being used
* these optional arguments might be filled in during resolution using either inference or external
* metadata. For example, when reading a partitioned table from a file system, partition columns
* metadata. For example, when reading a partitioned table from a file system, partition columns
* will be inferred from the directory layout even if they are not specified.
*
* @param paths A list of file system paths that hold data. These will be globbed before and
* @param paths A list of file system paths that hold data. These will be globbed before and
* qualified. This option only works when reading from a [[FileFormat]].
* @param userSpecifiedSchema An optional specification of the schema of the data. When present
* we skip attempting to infer the schema.
Expand Down Expand Up @@ -470,7 +470,7 @@ case class DataSource(
}.head
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
Expand Down