Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
minor improve
  • Loading branch information
cloud-fan committed Jun 16, 2015
commit 3245d28662dbdbeb2c0f36ebe7a3804950a3ecd6
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ class Analyzer(

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
case s @ Sort(ordering, global, child) =>
case s @ Sort(ordering, global, child) if child.resolved && !s.resolved =>
val newOrdering = resolveSortOrders(ordering, child, throws = false)
Sort(newOrdering, global, child)

Expand Down Expand Up @@ -382,8 +382,9 @@ class Analyzer(
private def resolveSortOrders(ordering: Seq[SortOrder], plan: LogicalPlan, throws: Boolean) = {
ordering.map { order =>
// Resolve SortOrder in one round.
// if throws == false, fail and return the origin one.
// else, throw exception.
// If throws == false or the desired attribute doesn't exist
// (like try to resolve `a.b` but `a` doesn't exist), fail and return the origin one.
// Else, throw exception.
try {
val newOrder = order transformUp {
case u @ UnresolvedAttribute(nameParts) =>
Expand All @@ -408,13 +409,13 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case s @ Sort(ordering, global, p @ Project(projectList, child))
if !s.resolved && p.resolved =>
val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, p, child)
val (newOrdering, missing) = resolveAndFindMissing(ordering, p, child)

// If this rule was not a no-op, return the transformed plan, otherwise return the original.
if (missing.nonEmpty) {
// Add missing attributes and then project them away after the sort.
Project(p.output,
Sort(resolvedOrdering, global,
Sort(newOrdering, global,
Project(projectList ++ missing, child)))
} else {
logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}")
Expand All @@ -429,35 +430,34 @@ class Analyzer(
)

// Find sort attributes that are projected away so we can temporarily add them back in.
val (resolvedOrdering, unresolved) = resolveAndFindMissing(ordering, a, groupingRelation)
val (newOrdering, missingAttr) = resolveAndFindMissing(ordering, a, groupingRelation)

// Find aggregate expressions and evaluate them early, since they can't be evaluated in a
// Sort.
val (withAggsRemoved, aliasedAggregateList) = resolvedOrdering.map {
val (withAggsRemoved, aliasedAggregateList) = newOrdering.filter(_.resolved).map {
case aggOrdering if aggOrdering.collect { case a: AggregateExpression => a }.nonEmpty =>
val aliased = Alias(aggOrdering.child, "_aggOrdering")()
(aggOrdering.copy(child = aliased.toAttribute), aliased :: Nil)
(aggOrdering.copy(child = aliased.toAttribute), Some(aliased))

case other => (other, Nil)
case other => (other, None)
}.unzip

val missing = unresolved ++ aliasedAggregateList.flatten
val missing = missingAttr ++ aliasedAggregateList.flatten

if (missing.nonEmpty) {
// Add missing grouping exprs and then project them away after the sort.
Project(a.output,
Sort(withAggsRemoved, global,
Aggregate(grouping, aggs ++ missing, child)))
} else {
logDebug(s"Failed to find $missing in ${a.output.mkString(", ")}")
s // Nothing we can do here. Return original plan.
}
}

/**
* Given a child and a grandchild that are present beneath a sort operator, returns
* a resolved sort ordering and a list of attributes that are missing from the child
* but are present in the grandchild.
* Given a child and a grandchild that are present beneath a sort operator, try to resolve
* the sort ordering and returns it with a list of attributes that are missing from the
* child but are present in the grandchild.
*/
def resolveAndFindMissing(
ordering: Seq[SortOrder],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
* [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
* should return `false`).
*/
lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved

override protected def statePrefix = if (!resolved) "'" else super.statePrefix

/**
* Returns true if all its children of this query plan have been resolved.
*/
def childrenResolved: Boolean = !children.exists(!_.resolved)
def childrenResolved: Boolean = children.forall(_.resolved)

/**
* Returns true when the given logical plan will return the same results as this logical plan.
*
* Since its likely undecideable to generally determine if two given plans will produce the same
* Since its likely undecidable to generally determine if two given plans will produce the same
* results, it is okay for this function to return false, even if the results are actually
* the same. Such behavior will not affect correctness, only the application of performance
* enhancements like caching. However, it is not acceptable to return true if the results could
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,8 +1432,10 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
}

test("SPARK-7067: order by queries for complex ExtractValue chain") {
jsonRDD(sparkContext.makeRDD(
"""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
withTable("t") {
read.json(sparkContext.makeRDD(
"""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
}
}
}