Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update code
  • Loading branch information
beliefer committed Aug 8, 2022
commit c509e85444e9c8813e36b18da3726b80c886ece7
130 changes: 70 additions & 60 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -873,13 +873,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
checkAnswer(df1, Seq(Row(1, 19000.00)))

val df2 = sql(
"""
|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee
|GROUP BY dept
|ORDER BY my_dept
|LIMIT 1
|""".stripMargin)
val df2 = spark.read
.table("h2.test.employee")
.select($"DEPT".as("my_dept"), $"SALARY")
.groupBy("my_dept").sum("SALARY")
.orderBy("my_dept")
.limit(1)
checkSortRemoved(df2)
checkLimitRemoved(df2)
checkPushedInfo(df2,
Expand Down Expand Up @@ -922,13 +921,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
checkAnswer(df4, Seq(Row(1, false, 9000.00)))

val df5 = sql(
"""
|SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM h2.test.employee
|GROUP BY dept, my_manager
|ORDER BY my_dept, my_manager
|LIMIT 1
|""".stripMargin)
val df5 = spark.read
.table("h2.test.employee")
.select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")
.groupBy("my_dept", "my_manager").sum("SALARY")
.orderBy("my_dept", "my_manager")
.limit(1)
checkSortRemoved(df5)
checkLimitRemoved(df5)
checkPushedInfo(df5,
Expand Down Expand Up @@ -958,13 +956,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))

val df7 = sql(
"""
|SELECT dept, SUM(SALARY) FROM h2.test.employee
|GROUP BY dept
|ORDER BY SUM(SALARY)
|LIMIT 1
|""".stripMargin)
val df7 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY"))
.orderBy(sum("SALARY"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments to explain why top n can't be pushed here?

Copy link
Contributor Author

@beliefer beliefer Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the comments into V2ScanRelationPushDown directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can translate agg expressions now, why this test still can't trigger top-n pushdown?

.limit(1)
checkSortRemoved(df7, false)
checkLimitRemoved(df7, false)
checkPushedInfo(df7,
Expand All @@ -973,13 +970,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedFilters: []")
checkAnswer(df7, Seq(Row(6, 12000.00)))

val df8 = sql(
"""
|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee
|GROUP BY dept
|ORDER BY total
|LIMIT 1
|""".stripMargin)
val df8 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY").as("total"))
.orderBy("total")
.limit(1)
checkSortRemoved(df8, false)
checkLimitRemoved(df8, false)
checkPushedInfo(df8,
Expand All @@ -1006,14 +1002,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2")
checkAnswer(df1, Seq(Row(2, 22000.00)))

val df2 = sql(
"""
|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee
|GROUP BY my_dept
|ORDER BY my_dept
|LIMIT 1
|OFFSET 1
|""".stripMargin)
val df2 = spark.read
.table("h2.test.employee")
.select($"DEPT".as("my_dept"), $"SALARY")
.groupBy("my_dept").sum("SALARY")
.orderBy("my_dept")
.offset(1)
.limit(1)
checkSortRemoved(df2)
checkLimitRemoved(df2)
checkPushedInfo(df2,
Expand Down Expand Up @@ -1045,14 +1040,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"ASC NULLS FIRST] LIMIT 2")
checkAnswer(df3, Seq(Row(9000, 9000.00)))

val df4 = sql(
"""
|SELECT dept AS my_dept, is_manager, SUM(SALARY) FROM h2.test.employee
|GROUP BY my_dept, is_manager
|ORDER BY my_dept, is_manager
|LIMIT 1
|OFFSET 1
|""".stripMargin)
val df4 = spark.read
.table("h2.test.employee")
.select($"DEPT".as("my_dept"), $"IS_MANAGER", $"SALARY")
.groupBy("my_dept", "is_manager").sum("SALARY")
.orderBy("my_dept", "is_manager")
.offset(1)
.limit(1)
checkSortRemoved(df4)
checkLimitRemoved(df4)
checkPushedInfo(df4,
Expand All @@ -1063,14 +1057,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2")
checkAnswer(df4, Seq(Row(1, true, 10000.00)))

val df5 = sql(
"""
|SELECT dept, SUM(SALARY) FROM h2.test.employee
|GROUP BY dept
|ORDER BY SUM(SALARY)
|LIMIT 1
|OFFSET 1
|""".stripMargin)
val df5 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY"))
.orderBy(sum("SALARY"))
.offset(1)
.limit(1)
checkSortRemoved(df5, false)
checkLimitRemoved(df5, false)
checkPushedInfo(df5,
Expand All @@ -1079,21 +1072,38 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedFilters: []")
checkAnswer(df5, Seq(Row(1, 19000.00)))

val df6 = sql(
"""
|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee
|GROUP BY dept
|ORDER BY total
|LIMIT 1
|OFFSET 1
|""".stripMargin)
val df6 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY").as("total"))
.orderBy("total")
.offset(1)
.limit(1)
checkSortRemoved(df6, false)
checkLimitRemoved(df6, false)
checkPushedInfo(df6,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []")
checkAnswer(df6, Seq(Row(1, 19000.00)))

val df7 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"IS_MANAGER", $"SALARY")
.groupBy("dept", "is_manager").sum("SALARY")
.orderBy(when($"is_manager", $"dept").otherwise(0))
.offset(1)
.limit(1)
checkSortRemoved(df7)
checkLimitRemoved(df7)
checkPushedInfo(df7,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT, IS_MANAGER]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: " +
"ORDER BY [CASE WHEN IS_MANAGER = true THEN DEPT ELSE 0 END ASC NULLS FIRST] LIMIT 2")
checkAnswer(df7, Seq(Row(1, false, 9000.00)))
}

test("scan with filter push-down") {
Expand Down