diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 9b70eecd2e83..2fe563a42b89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} @@ -52,6 +53,17 @@ case class DataSourceV2ScanExec( case _ => false } + override def doCanonicalize(): DataSourceV2ScanExec = { + DataSourceV2ScanExec( + output.map(QueryPlan.normalizeExprId(_, output)), + source, + options, + QueryPlan.normalizePredicates( + pushedFilters, + AttributeSeq(pushedFilters.flatMap(_.references).distinct)), + reader) + } + override def hashCode(): Int = { Seq(output, source, options).hashCode() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index ef0a8bdfd2fc..92e0ac93db56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -393,6 +393,29 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-32708: same columns with different ExprIds should be equal after canonicalization ") { + def getV2ScanExec(query: DataFrame): DataSourceV2ScanExec = { + query.queryExecution.executedPlan.collect { + case d: DataSourceV2ScanExec => d + }.head + } + + val df1 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load() + val q1 = df1.select('i).filter('i > 6) + val df2 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load() + val q2 = df2.select('i).filter('i > 6) + val scan1 = getV2ScanExec(q1) + val scan2 = getV2ScanExec(q2) + assert(scan1.sameResult(scan2)) + assert(scan1.doCanonicalize().equals(scan2.doCanonicalize())) + + val q3 = df2.select('i).filter('i > 5) + val scan3 = getV2ScanExec(q3) + assert(!scan1.sameResult(scan3)) + assert(!scan1.doCanonicalize().equals(scan3.doCanonicalize())) + } + } class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {