Skip to content
Prev Previous commit
Next Next commit
Update comments
  • Loading branch information
bersprockets committed Oct 11, 2022
commit 27dcffe22f4f18003fdd8e493f677f5324a11b55
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,14 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

def rewrite(aOrig: Aggregate): Aggregate = {
// make children of distinct aggregations the same if they are different
// only because of superficial reasons, e.g.:
// "1 + col1" vs "col1 + 1", both become "1 + col1"
// Make children of distinct aggregations the same if they are only
// different due to superficial reasons, e.g.:
// "1 + col1" vs "col1 + 1", both should become "1 + col1"
// or
// "col1" vs "Col1", both become "col1"
// "col1" vs "Col1", both should become "col1"
// This could potentially reduce the number of distinct
// aggregate groups, and therefore reduce the number of
// projections in Expand (or eliminate the need for Expand)
val a = reduceDistinctAggregateGroups(aOrig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just canonicalize the function inputs when group by them? e.g. change e.aggregateFunction.children.filter(!_.foldable).toSet to ExpressionSet(e.aggregateFunction.children.filter(!_.foldable))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I am working on it, just working through some small complications.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change to use ExpressionSet and also commented on some of the issues.

I still prefer 'sanitizing' each original function child to use the first semantically equivalent child, in essence creating a new set of "original" children, as it bypasses some complexities (in particular the one where we may lose some of the original children as keys when we group by ExpressionSet).


val aggExpressions = collectAggregateExprs(a)
Expand Down Expand Up @@ -408,6 +411,10 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}
Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate)
} else {
// It's possible we avoided rewriting the plan to use Expand only because
// reduceDistinctAggregateGroups reduced the number of distinct aggregate groups
// from > 1 to 1. To prevent SparkStrategies from complaining during sanity check,
// we use the potentially patched Aggregate returned by reduceDistinctAggregateGroups.
a
}
}
Expand Down