Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-32708] Query optimization fails to reuse exchange with DataSou…
…rceV2
  • Loading branch information
mingjialiu committed Sep 11, 2020
commit 98483c82151634760a82fc860ac6de26d4b024ba
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,29 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-32708: same columns with different ExprIds should be equal after canonicalization ") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have an end-to-end test, how about a low-level UT? Create two DataSourceV2ScanExec instances and check scan1.sameResult(scan2).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think this test case creates two DataSourceV2ScanExec and do the check. It looks ok to me.

def getV2ScanExecs(query: DataFrame): Seq[DataSourceV2ScanExec] = {
query.queryExecution.executedPlan.collect {
case d: DataSourceV2ScanExec => d
}
}

val df1 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
val q1 = df1.select(($"i" + 1).as("k"), ($"i" - 1).as("j")).filter('i > 5)
val df2 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
val q2 = df2.select(($"i" + 1).as("k"), ($"i" - 1).as("j")).filter('i > 5)

val scans1 = getV2ScanExecs(q1.join(q2, "j"))
assert(scans1(0).sameResult(scans1(1)))
assert(scans1(0).doCanonicalize().equals(scans1(1).doCanonicalize()))

val q3 = df2.select(($"i" + 1).as("k"), ($"i" - 1).as("j")).filter('i > 6)
val scans2 = getV2ScanExecs(q1.join(q3, "j"))
assert(!scans2(0).sameResult(scans2(1)))
assert(!scans2(0).doCanonicalize().equals(scans2(1).doCanonicalize()))
}

}

class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
Expand Down