Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ class Analyzer(

private def resolveExprsAndAddMissingAttrs(
exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = {
if (exprs.forall(_.resolved)) {
if (exprs.forall(e => e.resolved && e.references.subsetOf(plan.outputSet))) {
// All given expressions are resolved, no need to continue anymore.
(exprs, plan)
} else {
Expand All @@ -1163,7 +1163,8 @@ class Analyzer(
case p: Project =>
val maybeResolvedExprs = exprs.map(resolveExpression(_, p))
val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child)
val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs)
val missingAttrs = AttributeSet(newExprs) --
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also fix in Aggregate case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss something, but how about val missingAttrs = AttributeSet(newExprs) -- p.outputSet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Aggregate, I've tested it. Seems ResolveAggregateFunctions already covers it.

Copy link
Member Author

@viirya viirya Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think using p.outputSet is simpler. Will update later.

AttributeSet(maybeResolvedExprs.filter(_.references.subsetOf(p.outputSet)))
(newExprs, Project(p.projectList ++ missingAttrs, newChild))

case a @ Aggregate(groupExprs, aggExprs, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ abstract class LogicalPlan
* [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
* should return `false`).
*/
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved &&
missingInput.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missingInput is special, mostly we can't resolve it. I think that's why we didn't consider it in the resolved at the first place.

We can update the if condition in ResolveMissingReferences to take missingInput into consideration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I found that this change causes one test failure.


override protected def statePrefix = if (!resolved) "'" else super.statePrefix

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
}.nonEmpty
)

!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions &&
missingInput.isEmpty
}

override def validConstraints: Set[Expression] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,16 @@ class LogicalPlanSuite extends SparkFunSuite {
OneRowRelation())
assert(result.sameResult(expected))
}

test("Logical plan with missing inputs should be unresolved") {
// Normally we won't add a missing resolved reference into a logical plan,
// but a valid query like `df.select(df("name")).filter(df("id") === 0)` can make a query
// like this.
val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val plan = Project(Stream(AttributeReference("b", IntegerType, nullable = true)()), relation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No special reason. Just following above test case.

assert(plan.expressions.forall(_.resolved))
assert(plan.childrenResolved)
assert(plan.missingInput.nonEmpty)
assert(!plan.resolved)
}
}
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2329,4 +2329,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1))
checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1))
}

test("SPARK-24781: Using a reference from Dataset in Filter/Sort might not work") {
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
val filter1 = df.select(df("name")).filter(df("id") === 0)
val filter2 = df.select(col("name")).filter(col("id") === 0)
checkAnswer(filter1, filter2.collect())

val sort1 = df.select(df("name")).orderBy(df("id"))
val sort2 = df.select(col("name")).orderBy(col("id"))
checkAnswer(sort1, sort2.collect())
}
}