Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Extract distinct aggregate expressions.
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
val distincgAggExpressions = aggExpressions.filter(_.isDistinct)
val distinctAggGroups = distincgAggExpressions.groupBy { e =>
val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
if (unfoldableChildren.nonEmpty) {
// Only expand the unfoldable children
Expand All @@ -132,7 +133,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Aggregation strategy can handle queries with a single distinct group.
if (distinctAggGroups.size > 1) {
if (distincgAggExpressions.size > 1) {
// Create the attributes for the grouping id and the group by clause.
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
Expand All @@ -151,7 +152,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Setup unique distinct aggregate children.
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq
Copy link
Member

@gatorsmile gatorsmile May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this pr though, I dropped .distinct because it does nothing (keySet.flatten is already a set)?

val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair)
val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our MultipleDistinctRewriter should take care this case.
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
"Spark user mailing list.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,4 +687,12 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-24369 multiple distinct aggregations having the same argument set") {
val df = sql(
s"""SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to SQLQueryTestSuite?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

| FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
""".stripMargin)
checkAnswer(df, Row(1.0, 1.0, 3))
}
}