Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-32708] Query optimization fails to reuse exchange with DataSou…
…rceV2
  • Loading branch information
mingjialiu committed Sep 11, 2020
commit 8b864e7921061958c43006335196898e6f3c4be8
Original file line number Diff line number Diff line change
Expand Up @@ -395,25 +395,25 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}

test("SPARK-32708: same columns with different ExprIds should be equal after canonicalization ") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have an end-to-end test, how about a low-level UT? Create two DataSourceV2ScanExec instances and check scan1.sameResult(scan2).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think this test case creates two DataSourceV2ScanExec and do the check. It looks ok to me.

def getV2ScanExecs(query: DataFrame): Seq[DataSourceV2ScanExec] = {
def getV2ScanExec(query: DataFrame): DataSourceV2ScanExec = {
query.queryExecution.executedPlan.collect {
case d: DataSourceV2ScanExec => d
}
}.head
}

val df1 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
val q1 = df1.select(($"i" + 1).as("k"), ($"i" - 1).as("j")).filter('i > 5)
val q1 = df1.select('i).filter('i > 6)
val df2 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
val q2 = df2.select(($"i" + 1).as("k"), ($"i" - 1).as("j")).filter('i > 5)

val scans1 = getV2ScanExecs(q1.join(q2, "j"))
assert(scans1(0).sameResult(scans1(1)))
assert(scans1(0).doCanonicalize().equals(scans1(1).doCanonicalize()))

val q3 = df2.select(($"i" + 1).as("k"), ($"i" - 1).as("j")).filter('i > 6)
val scans2 = getV2ScanExecs(q1.join(q3, "j"))
assert(!scans2(0).sameResult(scans2(1)))
assert(!scans2(0).doCanonicalize().equals(scans2(1).doCanonicalize()))
val q2 = df2.select('i).filter('i > 6)
val scan1 = getV2ScanExec(q1)
val scan2 = getV2ScanExec(q2)
assert(scan1.sameResult(scan2))
assert(scan1.doCanonicalize().equals(scan2.doCanonicalize()))

val q3 = df2.select('i).filter('i > 5)
val scan3 = getV2ScanExec(q3)
assert(!scan1.sameResult(scan3))
assert(!scan1.doCanonicalize().equals(scan3.doCanonicalize()))
}

}
Expand Down