Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
Expand Down Expand Up @@ -397,19 +398,31 @@ class Analyzer(
}
case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
if !s.resolved && a.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
// A small hack to create an object that will allow us to resolve any references that
// refer to named expressions that are present in the grouping expressions.
val groupingRelation = LocalRelation(
grouping.collect { case ne: NamedExpression => ne.toAttribute }
)

val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, a, groupingRelation)
// Find sort attributes that are projected away so we can temporarily add them back in.
val (resolvedOrdering, unresolved) = resolveAndFindMissing(ordering, a, groupingRelation)

// Find aggregate expressions and evaluate them early, since they can't be evaluated in a
// Sort.
val (withAggsRemoved, aliasedAggregateList) = resolvedOrdering.map {
case aggOrdering if aggOrdering.collect { case a: AggregateExpression => a }.nonEmpty =>
val aliased = Alias(aggOrdering.child, "_aggOrdering")()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are more than one aggregation in order by, will it be the same alias name _aggOrdering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be, but that does not matter. The names are only for readability.
All work in execution is done by expression id.
On Jun 14, 2015 6:24 PM, "Yadong Qi" [email protected] wrote:

In
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
#6816 (comment):

     // A small hack to create an object that will allow us to resolve any references that
     // refer to named expressions that are present in the grouping expressions.
     val groupingRelation = LocalRelation(
       grouping.collect { case ne: NamedExpression => ne.toAttribute }
     )
  •    val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, a, groupingRelation)
    
  •    // Find sort attributes that are projected away so we can temporarily add them back in.
    
  •    val (resolvedOrdering, unresolved) = resolveAndFindMissing(ordering, a, groupingRelation)
    
  •    // Find aggregate expressions and evaluate them early, since they can't be evaluated in a
    
  •    // Sort.
    
  •    val (withAggsRemoved, aliasedAggregateList) = resolvedOrdering.map {
    
  •      case aggOrdering if aggOrdering.collect { case a: AggregateExpression => a }.nonEmpty =>
    
  •        val aliased = Alias(aggOrdering.child, "_aggOrdering")()
    

If there are more than one aggregation in order by, will it be the same
alias name _aggOrdering?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6816/files#r32387465.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For something like SELECT a, sum(b) FROM orderByData GROUP BY a ORDER BY sum(b), we don't need to add a extra projection. Should we handle this case?

(aggOrdering.copy(child = aliased.toAttribute), aliased :: Nil)

case other => (other, Nil)
}.unzip

val missing = unresolved ++ aliasedAggregateList.flatten

if (missing.nonEmpty) {
// Add missing grouping exprs and then project them away after the sort.
Project(a.output,
Sort(resolvedOrdering, global,
Sort(withAggsRemoved, global,
Aggregate(grouping, aggs ++ missing, child)))
} else {
s // Nothing we can do here. Return original plan.
Expand Down
45 changes: 45 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,51 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
}

test("SPARK-6583 order by aggregated function") {
Seq("1" -> 3, "1" -> 4, "2" -> 7, "2" -> 8, "3" -> 5, "3" -> 6, "4" -> 1, "4" -> 2)
.toDF("a", "b").registerTempTable("orderByData")

checkAnswer(
sql(
"""
|SELECT a
|FROM orderByData
|GROUP BY a
|ORDER BY sum(b)
""".stripMargin),
Row("4") :: Row("1") :: Row("3") :: Row("2") :: Nil)

checkAnswer(
sql(
"""
|SELECT sum(b)
|FROM orderByData
|GROUP BY a
|ORDER BY sum(b)
""".stripMargin),
Row(3) :: Row(7) :: Row(11) :: Row(15) :: Nil)

checkAnswer(
sql(
"""
|SELECT a, sum(b)
|FROM orderByData
|GROUP BY a
|ORDER BY sum(b)
""".stripMargin),
Row("4", 3) :: Row("1", 7) :: Row("3", 11) :: Row("2", 15) :: Nil)

checkAnswer(
sql(
"""
|SELECT a, sum(b)
|FROM orderByData
|GROUP BY a
|ORDER BY sum(b) + 1
""".stripMargin),
Row("4", 3) :: Row("1", 7) :: Row("3", 11) :: Row("2", 15) :: Nil)
}

test("SPARK-7952: fix the equality check between boolean and numeric types") {
withTempTable("t") {
// numeric field i, boolean field j, result of i = j, result of i <=> j
Expand Down