Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -257,11 +256,24 @@ class Analyzer(catalog: Catalog,

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
q transformExpressionsUp {
q transformExpressionsUp {
case u @ UnresolvedAttribute(name) if resolver(name, VirtualColumn.groupingIdName) &&
q.isInstanceOf[GroupingAnalytics] =>
// Resolve the virtual column GROUPING__ID for the operator GroupingAnalytics
q.asInstanceOf[GroupingAnalytics].gid
case u @ UnresolvedAttribute(name) if q.isInstanceOf[Sort] =>
val s = q.asInstanceOf[Sort]
val newChild = s.child match {
case Project(list, c) =>
val newList = list.filter {
case Alias(g: GetField, _) => false
case Alias(g: GetItem, _) => false
case _ => true
}
Project(newList, c)
case other => other
}
Sort(s.order, s.global, newChild).resolveChildren(name, resolver).getOrElse(u)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will not work when we have a order by in a subquery and the outer query block try to access the those filtered out fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You are creating a fake Sort node in order to resolve the attribute. If the attribute is resolved, use the resolved one to replace the unresolved one.

case u @ UnresolvedAttribute(name) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result = q.resolveChildren(name, resolver).getOrElse(u)
Expand Down Expand Up @@ -321,8 +333,7 @@ class Analyzer(catalog: Catalog,
if !s.resolved && p.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
val resolved = unresolved.flatMap(child.resolve(_, resolver))
val requiredAttributes =
AttributeSet(resolved.flatMap(_.collect { case a: Attribute => a }))
val requiredAttributes = AttributeSet(resolved)

val missingInProject = requiredAttributes -- p.output
if (missingInProject.nonEmpty) {
Expand Down
19 changes: 14 additions & 5 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1053,10 +1053,19 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
test("SPARK-6145: ORDER BY test for nested fields") {
jsonRDD(sparkContext.makeRDD(
"""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil)).registerTempTable("nestedOrder")
// These should be successfully analyzed
sql("SELECT 1 FROM nestedOrder ORDER BY a.b").queryExecution.analyzed
sql("SELECT a.b FROM nestedOrder ORDER BY a.b").queryExecution.analyzed
sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a").queryExecution.analyzed
sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d").queryExecution.analyzed

checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
checkAnswer(sql("SELECT a.b FROM nestedOrder ORDER BY a.b"), Row(1))
checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a"), Row(1))
checkAnswer(sql("SELECT a.a.a FROM nestedOrder ORDER BY a.a.a"), Row(1))
checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d"), Row(1))
checkAnswer(sql("SELECT c[0].d FROM nestedOrder ORDER BY c[0].d"), Row(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try SELECT d FROM (SELECT c[0].d FROM nestedOrder ORDER BY c[0].d) as tmp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore the above comment

}

test("SPARK-6145: special cases") {
jsonRDD(sparkContext.makeRDD(
"""{"a": {"b": [1]}, "b": [{"a": 1}], "c0": {"a": 1}}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY c0.a"), Row(1))
checkAnswer(sql("SELECT b[0].a FROM t ORDER BY c0.a"), Row(1))
}
}