Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix some code style
  • Loading branch information
root authored and root committed Nov 7, 2016
commit c5b3a3d2c81ebaf86ed08682c4d30de0120a2850
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPl
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.IntegerType

/*
/**
* This rule rewrites an aggregate query with distinct aggregations into an expanded double
* aggregation in which the regular aggregation expressions and every distinct clause is aggregated
* in a separate group. The results are then combined in a second aggregate.
Expand Down Expand Up @@ -125,6 +125,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
// we must expand at least one of the children (here we take the first child),
// or If we don't, we will get the wrong result, for example:
// count(distinct 1) will be explained to count(1) after the rewrite function.
// Generally, the distinct aggregateFunction should not run
// foldable TypeCheck for the first child.
e.aggregateFunction.children.take(1).toSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch. It would be great if we could git rid of this by constant folding (not needed in this PR). Another way of getting rid of this, would be by creating a separate processing group for these distincts.

}
}
Expand All @@ -144,8 +146,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {

// Functions used to modify aggregate functions and their inputs.
def evalWithinGroup(id: Literal, e: Expression) = If(EqualTo(gid, id), e, nullify(e))
def patchAggregateFunctionChildren(af: AggregateFunction)(
attrs: Expression => Option[Expression]): AggregateFunction = {
def patchAggregateFunctionChildren(
af: AggregateFunction)(
attrs: Expression => Option[Expression]): AggregateFunction = {
val newChildren = af.children.map(c => attrs(c).getOrElse(c))
af.withNewChildren(newChildren).asInstanceOf[AggregateFunction]
}
Expand Down Expand Up @@ -251,8 +254,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {

// Construct the second aggregate
val transformations: Map[Expression, Expression] =
(distinctAggOperatorMap.flatMap(_._2) ++
regularAggOperatorMap.map(e => (e._1, e._3))).toMap
(distinctAggOperatorMap.flatMap(_._2) ++
regularAggOperatorMap.map(e => (e._1, e._3))).toMap

val patchedAggExpressions = a.aggregateExpressions.map { e =>
e.transformDown {
Expand All @@ -275,9 +278,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
private def nullify(e: Expression) = Literal.create(null, e.dataType)

private def expressionAttributePair(e: Expression) =
// We are creating a new reference here instead of reusing the attribute in case of a
// NamedExpression. This is done to prevent collisions between distinct and regular aggregate
// children, in this case attribute reuse causes the input of the regular aggregate to bound to
// the (nulled out) input of the distinct aggregate.
// We are creating a new reference here instead of reusing the attribute in case of a
// NamedExpression. This is done to prevent collisions between distinct and regular aggregate
// children, in this case attribute reuse causes the input of the regular aggregate to bound to
// the (nulled out) input of the distinct aggregate.
e -> AttributeReference(e.sql, e.dataType, nullable = true)()
}