Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -321,8 +320,7 @@ class Analyzer(catalog: Catalog,
if !s.resolved && p.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
val resolved = unresolved.flatMap(child.resolve(_, resolver))
val requiredAttributes =
AttributeSet(resolved.flatMap(_.collect { case a: Attribute => a }))
val requiredAttributes = AttributeSet(resolved)

val missingInProject = requiredAttributes -- p.output
if (missingInProject.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -152,6 +153,18 @@ case class Sort(
global: Boolean,
child: LogicalPlan) extends UnaryNode {
override def output = child.output

override def resolveChildren(name: String, resolver: Resolver) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels kind of hacky to me as its mixing the particulars of analysis for SQL into the logical plan. Could we instead just make resolve not do partial resolution, where we can resolve the base attribute but not the GetFields that are on top. I think this change is the root cause of the regression.

val input = child match {
case Project(list, c) => list.filter {
case Alias(g: GetField, _) => false
case Alias(g: GetItem, _) => false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just filter out everything except AttributeReference? I don't know all corner cases of ORDER BY and feel this way is safer.

case _ => true
}.map(_.toAttribute)
case _ => child.flatMap(_.output)
}
resolve(name, input, resolver)
}
}

case class Aggregate(
Expand Down
19 changes: 14 additions & 5 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1053,10 +1053,19 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
test("SPARK-6145: ORDER BY test for nested fields") {
jsonRDD(sparkContext.makeRDD(
"""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil)).registerTempTable("nestedOrder")
// These should be successfully analyzed
sql("SELECT 1 FROM nestedOrder ORDER BY a.b").queryExecution.analyzed
sql("SELECT a.b FROM nestedOrder ORDER BY a.b").queryExecution.analyzed
sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a").queryExecution.analyzed
sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d").queryExecution.analyzed

checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
checkAnswer(sql("SELECT a.b FROM nestedOrder ORDER BY a.b"), Row(1))
checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a"), Row(1))
checkAnswer(sql("SELECT a.a.a FROM nestedOrder ORDER BY a.a.a"), Row(1))
checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d"), Row(1))
checkAnswer(sql("SELECT c[0].d FROM nestedOrder ORDER BY c[0].d"), Row(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try SELECT d FROM (SELECT c[0].d FROM nestedOrder ORDER BY c[0].d) as tmp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore the above comment

}

test("SPARK-6145: special cases") {
jsonRDD(sparkContext.makeRDD(
"""{"a": {"b": [1]}, "b": [{"a": 1}], "c0": {"a": 1}}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY c0.a"), Row(1))
checkAnswer(sql("SELECT b[0].a FROM t ORDER BY c0.a"), Row(1))
}
}