Skip to content

Commit a3b8420

Browse files
olakycloud-fan
authored andcommitted
[SPARK-48431][SQL] Do not forward predicates on collated columns to file readers
### What changes were proposed in this pull request? [SPARK-47657](https://issues.apache.org/jira/browse/SPARK-47657) allows to push filters on collated columns to file sources that support it. If such filters are pushed to file sources, those file sources must not push those filters to the actual file readers (i.e. parquet or csv readers), because there is no guarantee that those support collations. In this PR we are widening filters on collations to be AlwaysTrue when we translate filters for file sources. ### Why are the changes needed? Without this, no file source can implement filter pushdown ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests. No component tests are possible because there is no file source with filter pushdown yet. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46760 from olaky/filter-translation-for-collations. Authored-by: Ole Sasse <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent cf47293 commit a3b8420

File tree

2 files changed

+78
-8
lines changed

2 files changed

+78
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation
5454
import org.apache.spark.sql.sources._
5555
import org.apache.spark.sql.types._
5656
import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils}
57-
import org.apache.spark.sql.util.CaseInsensitiveStringMap
57+
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
5858
import org.apache.spark.unsafe.types.UTF8String
5959

6060
/**
@@ -595,6 +595,16 @@ object DataSourceStrategy
595595
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
596596
nestedPredicatePushdownEnabled: Boolean)
597597
: Option[Filter] = {
598+
599+
def translateAndRecordLeafNodeFilter(filter: Expression): Option[Filter] = {
600+
val translatedFilter =
601+
translateLeafNodeFilter(filter, PushableColumn(nestedPredicatePushdownEnabled))
602+
if (translatedFilter.isDefined && translatedFilterToExpr.isDefined) {
603+
translatedFilterToExpr.get(translatedFilter.get) = predicate
604+
}
605+
translatedFilter
606+
}
607+
598608
predicate match {
599609
case expressions.And(left, right) =>
600610
// See SPARK-12218 for detailed discussion
@@ -621,16 +631,25 @@ object DataSourceStrategy
621631
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
622632
} yield sources.Or(leftFilter, rightFilter)
623633

634+
case notNull @ expressions.IsNotNull(_: AttributeReference) =>
635+
// Not null filters on attribute references can always be pushed, also for collated columns.
636+
translateAndRecordLeafNodeFilter(notNull)
637+
638+
case isNull @ expressions.IsNull(_: AttributeReference) =>
639+
// Is null filters on attribute references can always be pushed, also for collated columns.
640+
translateAndRecordLeafNodeFilter(isNull)
641+
642+
case p if p.references.exists(ref => SchemaUtils.hasNonUTF8BinaryCollation(ref.dataType)) =>
643+
// The filter cannot be pushed and we widen it to be AlwaysTrue(). This is only valid if
644+
// the result of the filter is not negated by a Not expression it is wrapped in.
645+
translateAndRecordLeafNodeFilter(Literal.TrueLiteral)
646+
624647
case expressions.Not(child) =>
625648
translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
626649
.map(sources.Not)
627650

628651
case other =>
629-
val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled))
630-
if (filter.isDefined && translatedFilterToExpr.isDefined) {
631-
translatedFilterToExpr.get(filter.get) = predicate
632-
}
633-
filter
652+
translateAndRecordLeafNodeFilter(other)
634653
}
635654
}
636655

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,10 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
327327

328328
test("SPARK-41636: selectFilters returns predicates in deterministic order") {
329329

330-
val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2),
331-
EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6))
330+
val idColAttribute = AttributeReference("id", IntegerType)()
331+
val predicates = Seq(EqualTo(idColAttribute, 1), EqualTo(idColAttribute, 2),
332+
EqualTo(idColAttribute, 3), EqualTo(idColAttribute, 4), EqualTo(idColAttribute, 5),
333+
EqualTo(idColAttribute, 6))
332334

333335
val (unhandledPredicates, pushedFilters, handledFilters) =
334336
DataSourceStrategy.selectFilters(FakeRelation(), predicates)
@@ -338,4 +340,53 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
338340
})
339341
assert(handledFilters.isEmpty)
340342
}
343+
344+
test("SPARK-48431: Push filters on columns with UTF8_BINARY collation") {
345+
val colAttr = $"col".string("UTF8_BINARY")
346+
testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.EqualTo("col", "value")))
347+
testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))),
348+
Some(sources.Not(sources.EqualTo("col", "value"))))
349+
testTranslateFilter(LessThan(colAttr, Literal("value")),
350+
Some(sources.LessThan("col", "value")))
351+
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.LessThan("col", "value")))
352+
testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")),
353+
Some(sources.LessThanOrEqual("col", "value")))
354+
testTranslateFilter(GreaterThan(colAttr, Literal("value")),
355+
Some(sources.GreaterThan("col", "value")))
356+
testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")),
357+
Some(sources.GreaterThanOrEqual("col", "value")))
358+
testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
359+
}
360+
361+
for (collation <- Seq("UTF8_BINARY_LCASE", "UNICODE")) {
362+
test(s"SPARK-48431: Filter pushdown on columns with $collation collation") {
363+
val colAttr = $"col".string(collation)
364+
365+
// No pushdown for all comparison based filters.
366+
testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.AlwaysTrue))
367+
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
368+
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
369+
testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue))
370+
testTranslateFilter(GreaterThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
371+
testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue))
372+
373+
// Allow pushdown of Is(Not)Null filter.
374+
testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
375+
testTranslateFilter(IsNull(colAttr), Some(sources.IsNull("col")))
376+
377+
// Top level filter splitting at And and Or.
378+
testTranslateFilter(And(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)),
379+
Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
380+
testTranslateFilter(Or(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)),
381+
Some(sources.Or(sources.AlwaysTrue, sources.IsNotNull("col"))))
382+
383+
// Different cases involving Not.
384+
testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), Some(sources.AlwaysTrue))
385+
testTranslateFilter(And(Not(EqualTo(colAttr, Literal("value"))), IsNotNull(colAttr)),
386+
Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
387+
// This filter would work, but we want to keep the translation logic simple.
388+
testTranslateFilter(And(EqualTo(colAttr, Literal("value")), Not(IsNotNull(colAttr))),
389+
Some(sources.And(sources.AlwaysTrue, sources.AlwaysTrue)))
390+
}
391+
}
341392
}

0 commit comments

Comments
 (0)