Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -595,6 +595,16 @@ object DataSourceStrategy
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
nestedPredicatePushdownEnabled: Boolean)
: Option[Filter] = {

def translateAndRecordLeafNodeFilter(filter: Expression): Option[Filter] = {
val translatedFilter =
translateLeafNodeFilter(filter, PushableColumn(nestedPredicatePushdownEnabled))
if (translatedFilter.isDefined && translatedFilterToExpr.isDefined) {
translatedFilterToExpr.get(translatedFilter.get) = predicate
}
translatedFilter
}

predicate match {
case expressions.And(left, right) =>
// See SPARK-12218 for detailed discussion
Expand All @@ -621,16 +631,25 @@ object DataSourceStrategy
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield sources.Or(leftFilter, rightFilter)

case notNull @ expressions.IsNotNull(_: AttributeReference) =>
// Not null filters on attribute references can always be pushed, also for collated columns.
translateAndRecordLeafNodeFilter(notNull)

case isNull @ expressions.IsNull(_: AttributeReference) =>
// Is null filters on attribute references can always be pushed, also for collated columns.
translateAndRecordLeafNodeFilter(isNull)

case p if p.references.exists(ref => SchemaUtils.hasNonUTF8BinaryCollation(ref.dataType)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be case p if !p.isInstanceOf[expressions.IsNotNull] && !p.isInstanceOf[expressions.IsNull] ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, what if it's Not(Not(IsNotNull(...)))? I feel checking references at this layer is risky. Shall we do it in translateLeafNodeFilter?

Copy link
Contributor Author

@olaky olaky May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking for the IsNull/IsNotNull types is not enough. It would not cover IsNotNull(Min(<collated_col>, x)). So it is important that IsNotNull / IsNotNull is directly wrapped around an attribute reference.
We can actually only do this predicate widening at the top level, particularly because of the Nots. If we for example have Not(EqualTo(<collated_col>, x)) we would end up with Not(AlwaysTrue) if we do the transformation in translateLeafNodeFilter, and that is incorrect

// The filter cannot be pushed and we widen it to be AlwaysTrue(). This is only valid if
// the result of the filter is not negated by a Not expression it is wrapped in.
translateAndRecordLeafNodeFilter(Literal.TrueLiteral)

case expressions.Not(child) =>
translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
.map(sources.Not)

case other =>
val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled))
if (filter.isDefined && translatedFilterToExpr.isDefined) {
translatedFilterToExpr.get(filter.get) = predicate
}
filter
translateAndRecordLeafNodeFilter(other)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,10 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {

test("SPARK-41636: selectFilters returns predicates in deterministic order") {

val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2),
EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6))
val idColAttribute = AttributeReference("id", IntegerType)()
val predicates = Seq(EqualTo(idColAttribute, 1), EqualTo(idColAttribute, 2),
EqualTo(idColAttribute, 3), EqualTo(idColAttribute, 4), EqualTo(idColAttribute, 5),
EqualTo(idColAttribute, 6))

val (unhandledPredicates, pushedFilters, handledFilters) =
DataSourceStrategy.selectFilters(FakeRelation(), predicates)
Expand All @@ -338,4 +340,53 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
})
assert(handledFilters.isEmpty)
}

test("SPARK-48431: Push filters on columns with UTF8_BINARY collation") {
val colAttr = $"col".string("UTF8_BINARY")
testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.EqualTo("col", "value")))
testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))),
Some(sources.Not(sources.EqualTo("col", "value"))))
testTranslateFilter(LessThan(colAttr, Literal("value")),
Some(sources.LessThan("col", "value")))
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.LessThan("col", "value")))
testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")),
Some(sources.LessThanOrEqual("col", "value")))
testTranslateFilter(GreaterThan(colAttr, Literal("value")),
Some(sources.GreaterThan("col", "value")))
testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")),
Some(sources.GreaterThanOrEqual("col", "value")))
testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
}

for (collation <- Seq("UTF8_BINARY_LCASE", "UNICODE")) {
test(s"SPARK-48431: Filter pushdown on columns with $collation collation") {
val colAttr = $"col".string(collation)

// No pushdown for all comparison based filters.
testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(GreaterThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue))

// Allow pushdown of Is(Not)Null filter.
testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
testTranslateFilter(IsNull(colAttr), Some(sources.IsNull("col")))

// Top level filter splitting at And and Or.
testTranslateFilter(And(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)),
Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
testTranslateFilter(Or(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)),
Some(sources.Or(sources.AlwaysTrue, sources.IsNotNull("col"))))

// Different cases involving Not.
testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), Some(sources.AlwaysTrue))
testTranslateFilter(And(Not(EqualTo(colAttr, Literal("value"))), IsNotNull(colAttr)),
Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
// This filter would work, but we want to keep the translation logic simple.
testTranslateFilter(And(EqualTo(colAttr, Literal("value")), Not(IsNotNull(colAttr))),
Some(sources.And(sources.AlwaysTrue, sources.AlwaysTrue)))
}
}
}