Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
RewriteNonCorrelatedExists,
ComputeCurrentTime,
GetCurrentDatabaseAndCatalog(catalogManager),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
Expand Down Expand Up @@ -196,6 +195,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
EliminateSorts) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Distinct Aggregate Rewrite", Once,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment to say: this batch must be run after "Decimal Optimizations", as "Decimal Optimizations" may change the aggregate distinct column?

RewriteDistinctAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters,
Expand Down
13 changes: 13 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2555,6 +2555,19 @@ class DataFrameSuite extends QueryTest
val df = Seq(0.0 -> -0.0).toDF("pos", "neg")
checkAnswer(df.select($"pos" > $"neg"), Row(false))
}

test("SPARK-32816: aggregating multiple distinct DECIMAL columns") {
withTempPath { path =>
spark.range(0, 100, 1, 1)
.selectExpr("id", "cast(id as decimal(9, 0)) as decimal_col")
.write.mode("overwrite")
.parquet(path.getAbsolutePath)
spark.read.parquet(path.getAbsolutePath).createOrReplaceTempView("test_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need not to write parquet.

Copy link
Contributor

@TJX2014 TJX2014 Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val df = spark.range(0, 50000, 1, 1).selectExpr("id", "cast(id as decimal(9, 0)) as ss_ext_list_price")
df.createOrReplaceTempView("test_table")
sql("select avg(distinct ss_ext_list_price), sum(distinct ss_ext_list_price) from test_table").explain
seems enough to reproduce.

checkAnswer(
sql("select avg(distinct decimal_col), sum(distinct decimal_col) from test_table"),
Row(49.5, 4950))
}
}
}

case class GroupByKey(a: Int, b: Int)