Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Update code
  • Loading branch information
beliefer committed Aug 9, 2022
commit 293590bf2b403c3f40abeb680b86742b41cf3771
246 changes: 113 additions & 133 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -860,214 +860,194 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df2, Seq(Row(2, "david", 10000.00)))
}

test("scan with aggregate push-down and top N push-down") {
test("scan with aggregate push-down, top N push-down and offset push-down") {
val df1 = spark.read
.table("h2.test.employee")
.groupBy("DEPT").sum("SALARY")
.orderBy("DEPT")
.limit(1)
checkSortRemoved(df1)
checkLimitRemoved(df1)
checkPushedInfo(df1,

val paging1 = df1.offset(1).limit(1)
checkSortRemoved(paging1)
checkLimitRemoved(paging1)
checkPushedInfo(paging1,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2")
checkAnswer(paging1, Seq(Row(2, 22000.00)))

val topN1 = df1.limit(1)
checkSortRemoved(topN1)
checkLimitRemoved(topN1)
checkPushedInfo(topN1,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
checkAnswer(df1, Seq(Row(1, 19000.00)))
checkAnswer(topN1, Seq(Row(1, 19000.00)))

val df2 = spark.read
.table("h2.test.employee")
.select($"DEPT".cast("string").as("my_dept"), $"SALARY")
.groupBy("my_dept").sum("SALARY")
.orderBy("my_dept")
.limit(1)
checkSortRemoved(df2)
checkLimitRemoved(df2)
checkPushedInfo(df2,

val paging2 = df2.offset(1).limit(1)
checkSortRemoved(paging2)
checkLimitRemoved(paging2)
checkPushedInfo(paging2,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [CAST(DEPT AS string)]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 2")
checkAnswer(paging2, Seq(Row("2", 22000.00)))

val topN2 = df2.limit(1)
checkSortRemoved(topN2)
checkLimitRemoved(topN2)
checkPushedInfo(topN2,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [CAST(DEPT AS string)]",
"PushedFilters: []",
"PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 1")
checkAnswer(df2, Seq(Row("1", 19000.00)))
checkAnswer(topN2, Seq(Row("1", 19000.00)))

val df3 = spark.read
.table("h2.test.employee")
.groupBy("dept").sum("SALARY")
.orderBy($"dept".cast("string"))
.limit(1)
checkSortRemoved(df3)
checkLimitRemoved(df3)
checkPushedInfo(df3,

val paging3 = df3.offset(1).limit(1)
checkSortRemoved(paging3)
checkLimitRemoved(paging3)
checkPushedInfo(paging3,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 2")
checkAnswer(paging3, Seq(Row(2, 22000.00)))

val topN3 = df3.limit(1)
checkSortRemoved(topN3)
checkLimitRemoved(topN3)
checkPushedInfo(topN3,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 1")
checkAnswer(df3, Seq(Row(1, 19000.00)))
checkAnswer(topN3, Seq(Row(1, 19000.00)))

val df4 = spark.read
.table("h2.test.employee")
.groupBy("DEPT", "IS_MANAGER").sum("SALARY")
.orderBy("DEPT", "IS_MANAGER")
.limit(1)
checkSortRemoved(df4)
checkLimitRemoved(df4)
checkPushedInfo(df4,

val paging4 = df4.offset(1).limit(1)
checkSortRemoved(paging4)
checkLimitRemoved(paging4)
checkPushedInfo(paging4,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT, IS_MANAGER]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2")
checkAnswer(paging4, Seq(Row(1, true, 10000.00)))

val topN4 = df4.limit(1)
checkSortRemoved(topN4)
checkLimitRemoved(topN4)
checkPushedInfo(topN4,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT, IS_MANAGER]",
"PushedFilters: []",
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
checkAnswer(df4, Seq(Row(1, false, 9000.00)))
checkAnswer(topN4, Seq(Row(1, false, 9000.00)))

val df5 = spark.read
.table("h2.test.employee")
.select($"SALARY", $"IS_MANAGER", $"DEPT".cast("string").as("my_dept"))
.groupBy("my_dept", "IS_MANAGER").sum("SALARY")
.orderBy("my_dept", "IS_MANAGER")
.limit(1)
checkSortRemoved(df5)
checkLimitRemoved(df5)
checkPushedInfo(df5,

val paging5 = df5.offset(1).limit(1)
checkSortRemoved(paging5)
checkLimitRemoved(paging5)
checkPushedInfo(paging5,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: " +
"ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2")
checkAnswer(paging5, Seq(Row("1", true, 10000.00)))

val topN5 = df5.limit(1)
checkSortRemoved(topN5)
checkLimitRemoved(topN5)
checkPushedInfo(topN5,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]",
"PushedFilters: []",
"PushedTopN: " +
"ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
checkAnswer(df5, Seq(Row("1", false, 9000.00)))
checkAnswer(topN5, Seq(Row("1", false, 9000.00)))

val df6 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY"))
.orderBy(sum("SALARY"))
.limit(1)
checkSortRemoved(df6)
checkLimitRemoved(df6)
checkPushedInfo(df6,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1")
checkAnswer(df6, Seq(Row(6, 12000.00)))

val df7 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY").as("total"))
.orderBy("total")
.limit(1)
checkSortRemoved(df7)
checkLimitRemoved(df7)
checkPushedInfo(df7,
val paging6 = df6.offset(1).limit(1)
checkSortRemoved(paging6)
checkLimitRemoved(paging6)
checkPushedInfo(paging6,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1")
checkAnswer(df7, Seq(Row(6, 12000.00)))
}

test("scan with aggregate push-down and paging push-down") {
val df1 = spark.read
.table("h2.test.employee")
.groupBy("DEPT").sum("SALARY")
.orderBy("DEPT")
.offset(1)
.limit(1)
checkSortRemoved(df1)
checkLimitRemoved(df1)
checkPushedInfo(df1,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2")
checkAnswer(df1, Seq(Row(2, 22000.00)))

val df2 = spark.read
.table("h2.test.employee")
.select($"DEPT".cast("string").as("my_dept"), $"SALARY")
.groupBy("my_dept").sum("SALARY")
.orderBy("my_dept")
.offset(1)
.limit(1)
checkSortRemoved(df2)
checkLimitRemoved(df2)
checkPushedInfo(df2,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [CAST(DEPT AS string)]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 2")
checkAnswer(df2, Seq(Row("2", 22000.00)))

val df3 = spark.read
.table("h2.test.employee")
.select($"DEPT".cast("string").as("my_dept"), $"IS_MANAGER", $"SALARY")
.groupBy("my_dept", "is_manager").sum("SALARY")
.orderBy("my_dept", "is_manager")
.offset(1)
.limit(1)
checkSortRemoved(df3)
checkLimitRemoved(df3)
checkPushedInfo(df3,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: " +
"ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2")
checkAnswer(df3, Seq(Row("1", true, 10000.00)))
"PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2")
checkAnswer(paging6, Seq(Row(1, 19000.00)))

val df4 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY"))
.orderBy(sum("SALARY"))
.offset(1)
.limit(1)
checkSortRemoved(df4)
checkLimitRemoved(df4)
checkPushedInfo(df4,
val topN6 = df6.limit(1)
checkSortRemoved(topN6)
checkLimitRemoved(topN6)
checkPushedInfo(topN6,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2")
checkAnswer(df4, Seq(Row(1, 19000.00)))
"PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1")
checkAnswer(topN6, Seq(Row(6, 12000.00)))

val df5 = spark.read
val df7 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"SALARY")
.groupBy("dept").agg(sum("SALARY").as("total"))
.orderBy("total")
.offset(1)
.limit(1)
checkSortRemoved(df5)
checkLimitRemoved(df5)
checkPushedInfo(df5,

val paging7 = df7.offset(1).limit(1)
checkSortRemoved(paging7)
checkLimitRemoved(paging7)
checkPushedInfo(paging7,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2")
checkAnswer(df5, Seq(Row(1, 19000.00)))
checkAnswer(paging7, Seq(Row(1, 19000.00)))

val df6 = spark.read
.table("h2.test.employee")
.select($"DEPT", $"IS_MANAGER", $"SALARY")
.groupBy("dept", "is_manager").sum("SALARY")
.orderBy(when($"is_manager", $"dept").otherwise(0), $"dept")
.offset(1)
.limit(1)
checkSortRemoved(df6)
checkLimitRemoved(df6)
checkPushedInfo(df6,
val topN7 = df7.limit(1)
checkSortRemoved(topN7)
checkLimitRemoved(topN7)
checkPushedInfo(topN7,
"PushedAggregates: [SUM(SALARY)]",
"PushedGroupByExpressions: [DEPT, IS_MANAGER]",
"PushedGroupByExpressions: [DEPT]",
"PushedFilters: []",
"PushedOffset: OFFSET 1",
"PushedTopN: ORDER BY [CASE WHEN IS_MANAGER = true THEN DEPT ELSE 0 END ASC NULLS FIRST, " +
"DEPT ASC NULLS FIRST] LIMIT 2")
checkAnswer(df6, Seq(Row(2, false, 12000.00)))
"PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1")
checkAnswer(topN7, Seq(Row(6, 12000.00)))
}

test("scan with filter push-down") {
Expand Down