Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
  • Loading branch information
cloud-fan committed Jun 16, 2015
commit 1fc41a2eead35ac03f207c166cb29b52e29c4a6c
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,37 @@ class Analyzer(
}
j.copy(right = newRight)

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
case s @ Sort(ordering, global, child) =>
var changed = false
val newOrdering = ordering.map { order =>
// Resolve SortOrder in one round, or fail and return the origin one.
try {
val newOrder = order transformUp {
case u @ UnresolvedAttribute(nameParts) =>
child.resolve(nameParts, resolver).getOrElse(u)
case UnresolvedExtractValue(child, fieldName) if child.resolved =>
ExtractValue(child, fieldName, resolver)
}
if (!newOrder.fastEquals(order)) {
changed = true
}
newOrder.asInstanceOf[SortOrder]
} catch {
case a: AnalysisException => order
}
}

if (changed) {
Sort(newOrdering, global, child)
} else {
s
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two suggestions here:

  • Can we share the code with the block below? and only add a try/catch around it?
  • I think we can probably avoid the changed optimization. The rule executor and transform already do checks to avoid churn when the plan does not change. Either way, I think its better to keep rules simple even if there is a small performance penalty.


case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
q transformExpressionsUp {
q transformExpressionsUp {
case u @ UnresolvedAttribute(nameParts) if nameParts.length == 1 &&
resolver(nameParts(0), VirtualColumn.groupingIdName) &&
q.isInstanceOf[GroupingAnalytics] =>
Expand Down Expand Up @@ -424,6 +452,7 @@ class Analyzer(
Sort(withAggsRemoved, global,
Aggregate(grouping, aggs ++ missing, child)))
} else {
logDebug(s"Failed to find $missing in ${a.output.mkString(", ")}")
s // Nothing we can do here. Return original plan.
}
}
Expand All @@ -437,31 +466,31 @@ class Analyzer(
ordering: Seq[SortOrder],
child: LogicalPlan,
grandchild: LogicalPlan): (Seq[SortOrder], Seq[Attribute]) = {
// Find any attributes that remain unresolved in the sort.
val unresolved: Seq[Seq[String]] =
ordering.flatMap(_.collect { case UnresolvedAttribute(nameParts) => nameParts })

// Create a map from name, to resolved attributes, when the desired name can be found
// prior to the projection.
val resolved: Map[Seq[String], NamedExpression] =
unresolved.flatMap(u => grandchild.resolve(u, resolver).map(a => u -> a)).toMap
// Store `SortOrder`s we resolved based on grandchild
val resolved = scala.collection.mutable.ListBuffer.empty[Expression]

val resolvedOrdering = ordering.map { order =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this just be a map without the mutable list buffer? again, i'm not really worried about performance here.

val newOrder = order transformUp {
case u @ UnresolvedAttribute(nameParts) =>
grandchild.resolve(nameParts, resolver).getOrElse(u)
case UnresolvedExtractValue(child, fieldName) if child.resolved =>
ExtractValue(child, fieldName, resolver)
}
if (!newOrder.fastEquals(order) && newOrder.resolved) {
resolved += newOrder
}
newOrder.asInstanceOf[SortOrder]
}

// Construct a set that contains all of the attributes that we need to evaluate the
// ordering.
val requiredAttributes = AttributeSet(resolved.values)
val requiredAttributes = AttributeSet(resolved)

// Figure out which ones are missing from the projection, so that we can add them and
// remove them after the sort.
val missingInProject = requiredAttributes -- child.output

// Now that we have all the attributes we need, reconstruct a resolved ordering.
// It is important to do it here, instead of waiting for the standard resolved as adding
// attributes to the project below can actually introduce ambiquity that was not present
// before.
val resolvedOrdering = ordering.map(_ transform {
case u @ UnresolvedAttribute(name) => resolved.getOrElse(name, u)
}).asInstanceOf[Seq[SortOrder]]

(resolvedOrdering, missingInProject.toSeq)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ trait CheckAnalysis {
case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
if (operator.childrenResolved) {
a match {
case UnresolvedAttribute(nameParts) =>
// Throw errors for specific problems with get field.
operator.resolveChildren(nameParts, resolver, throwErrors = true)
}
}

val from = operator.inputSet.map(_.name).mkString(", ")
a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def resolveChildren(
nameParts: Seq[String],
resolver: Resolver,
throwErrors: Boolean = false): Option[NamedExpression] =
resolve(nameParts, children.flatMap(_.output), resolver, throwErrors)
resolver: Resolver): Option[NamedExpression] =
resolve(nameParts, children.flatMap(_.output), resolver)

/**
* Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
Expand All @@ -122,9 +121,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def resolve(
nameParts: Seq[String],
resolver: Resolver,
throwErrors: Boolean = false): Option[NamedExpression] =
resolve(nameParts, output, resolver, throwErrors)
resolver: Resolver): Option[NamedExpression] =
resolve(nameParts, output, resolver)

/**
* Given an attribute name, split it to name parts by dot, but
Expand Down Expand Up @@ -219,8 +217,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
protected def resolve(
nameParts: Seq[String],
input: Seq[Attribute],
resolver: Resolver,
throwErrors: Boolean): Option[NamedExpression] = {
resolver: Resolver): Option[NamedExpression] = {

// A sequence of possible candidate matches.
// Each candidate is a tuple. The first element is a resolved attribute, followed by a list
Expand Down Expand Up @@ -254,19 +251,14 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) =>
try {
// The foldLeft adds GetFields for every remaining parts of the identifier,
// and aliases it with the last part of the identifier.
// For example, consider "a.b.c", where "a" is resolved to an existing attribute.
// Then this will add GetField("c", GetField("b", a)), and alias
// the final expression as "c".
val fieldExprs = nestedFields.foldLeft(a: Expression)((expr, fieldName) =>
ExtractValue(expr, Literal(fieldName), resolver))
val aliasName = nestedFields.last
Some(Alias(fieldExprs, aliasName)())
} catch {
case a: AnalysisException if !throwErrors => None
}
// The foldLeft adds ExtractValues for every remaining parts of the identifier,
// and aliases it with the last part of the identifier.
// For example, consider "a.b.c", where "a" is resolved to an existing attribute.
// Then this will add ExtractValue("c", ExtractValue("b", a)), and alias
// the final expression as "c".
val fieldExprs = nestedFields.foldLeft(a: Expression)(ExtractValue(_, _, resolver))
val aliasName = nestedFields.last
Some(Alias(fieldExprs, aliasName)())

// No matches.
case Seq() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
* @param rule the function use to transform this nodes children
*/
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
val afterRuleOnChildren = transformChildrenUp(rule);
val afterRuleOnChildren = transformChildrenUp(rule)
if (this fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[BaseType])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1430,4 +1430,10 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
checkAnswer(sql("select i <=> b from t"), sql("select r2 from t"))
}
}

test("SPARK-7067: order by queries for complex ExtractValue chain") {
jsonRDD(sparkContext.makeRDD(
"""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
}
}