Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
*/
def getFilterableTableScan(a: Expression, plan: LogicalPlan): Option[LogicalPlan] = {
val srcInfo: Option[(Expression, LogicalPlan)] = findExpressionAndTrackLineageDown(a, plan)
// filter out literal
.filter(_._1.references.nonEmpty)
srcInfo.flatMap {
case (resExp, l: LogicalRelation) =>
l.relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,55 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}

test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
withTable("fact1", "fact2", "dim") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have lots of built-in tables. Cloud we change the test query to:

SELECT f.store_id,
       f.date_id,
       s.state_province
FROM   (SELECT 4 AS store_id,
               date_id,
               product_id
        FROM   fact_sk
        WHERE  date_id >= 1300
        UNION ALL
        SELECT 5 AS store_id,
               date_id,
               product_id
        FROM   fact_stats
        WHERE  date_id <= 1000) f
       JOIN dim_store s
         ON f.store_id = s.store_id
WHERE  s.country = 'US'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So embarrassing, I wanted to use these built-in tables before, but I didn't see table fact_stats.

val fact1 = Seq[(Int, String, String)](
(1, "a1", "part1"),
(3, "a3", "part1"),
(5, "a5", "part1")
)
fact1.toDF("joinCol", "otherCol", "partCol")
.write
.partitionBy("partCol")
.format(tableFormat)
.saveAsTable("fact1")

val fact2 = Seq[(Int, String, String)](
(2, "b2", "part1"),
(4, "b4", "part1"),
(6, "b6", "part1")
)
fact2.toDF("joinCol", "otherCol", "partCol")
.write
.partitionBy("partCol")
.format(tableFormat)
.saveAsTable("fact2")

val dim = Seq[(String, Int, Int)](
("type1", 1, 100),
("type2", 2, 200)
)
dim.toDF("type", "joinCol", "score")
.write
.format(tableFormat)
.saveAsTable("dim")

val df = sql(
"""
|SELECT a.type,a.joinCol,a.otherCol,b.score FROM
|(SELECT 'type1' as type,joinCol,otherCol FROM fact1 WHERE partCol='part1'
|UNION ALL
|SELECT 'type2' as type,joinCol,otherCol FROM fact2 WHERE partCol='part1') a
|Join dim b ON a.type=b.type AND a.joinCol=b.joinCol;
|""".stripMargin)

checkPartitionPruningPredicate(df, false, withBroadcast = false)
checkAnswer(df, Row("type1", 1, "a1", 100) :: Row("type2", 2, "b2", 200) :: Nil)
}
}
}
}

abstract class DynamicPartitionPruningDataSourceSuiteBase
Expand Down