Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address comments and fix bug.
  • Loading branch information
viirya committed Jul 12, 2018
commit b99d0c7ed2fb9932a3bec661b65598c77a8ad5f3
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,8 @@ class Analyzer(
case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
case s @ Sort(order, _, child)
if (!s.resolved || s.missingInput.nonEmpty) && child.resolved =>
val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child)
val ordering = newOrder.map(_.asInstanceOf[SortOrder])
if (child.output == newChild.output) {
Expand All @@ -1136,7 +1137,7 @@ class Analyzer(
Project(child.output, newSort)
}

case f @ Filter(cond, child) if !f.resolved && child.resolved =>
case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved =>
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child)
if (child.output == newChild.output) {
f.copy(condition = newCond.head)
Expand All @@ -1149,8 +1150,9 @@ class Analyzer(

private def resolveExprsAndAddMissingAttrs(
exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = {
// An expression is possibly resolved but not in the output of `plan`.
if (exprs.forall(e => e.resolved && e.references.subsetOf(plan.outputSet))) {
// All given expressions are resolved, no need to continue anymore.
// All given expressions are resolved and in the plan's output, no need to continue anymore.
(exprs, plan)
} else {
plan match {
Expand All @@ -1163,8 +1165,8 @@ class Analyzer(
case p: Project =>
val maybeResolvedExprs = exprs.map(resolveExpression(_, p))
val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child)
val missingAttrs = AttributeSet(newExprs) --
AttributeSet(maybeResolvedExprs.filter(_.references.subsetOf(p.outputSet)))
// The resolved attributes might not come from `p.child`. Need to filter it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can this happen? if the resolved attributes do not exist in child, then the plan is invalid, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, this case was resolved in ResolveMissingReferences in spark-v2.2.

val missingAttrs = (AttributeSet(newExprs).intersect(p.child.outputSet)) -- p.outputSet
(newExprs, Project(p.projectList ++ missingAttrs, newChild))

case a @ Aggregate(groupExprs, aggExprs, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ abstract class LogicalPlan
* [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
* should return `false`).
*/
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved &&
missingInput.isEmpty
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved

override protected def statePrefix = if (!resolved) "'" else super.statePrefix

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
}.nonEmpty
)

!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions &&
missingInput.isEmpty
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
}

override def validConstraints: Set[Expression] =
Expand Down