Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,9 @@ object SQLConf {
.doc("The threshold of set size for InSet predicate when pruning partitions through Hive " +
"Metastore. When the set size exceeds the threshold, we rewrite the InSet predicate " +
"to be greater than or equal to the minimum value in set and less than or equal to the " +
"maximum value in set. Larger values may cause Hive Metastore stack overflow.")
"maximum value in set. Larger values may cause Hive Metastore stack overflow. But for " +
"InSet inside Not with values exceeding the threshold, we won't push it to Hive Metastore."
)
.version("3.1.0")
.internal()
.intConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
values.map(value => s"$name = $value").mkString("(", " or ", ")")
}

def convertNotInToAnd(name: String, values: Seq[String]): String = {
values.map(value => s"$name != $value").mkString("(", " and ", ")")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than 10,000 values will cause the Hive Metastore stack overflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about stoping push it If it's values size exceeds the threshold ? In this case it can not be covert like >= and <=.

}

def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists {
case Literal(null, _) => true
case _ => false
}

val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold

Expand All @@ -763,10 +772,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}

def convert(expr: Expression): Option[String] = expr match {
case Not(InSet(_, values)) if values.size > inSetThreshold =>
None

case Not(In(_, list)) if hasNullLiteral(list) => None
case Not(InSet(_, list)) if list.contains(null) => None

case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))
if useAdvanced =>
Some(convertInToOr(name, values))

case Not(In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not(a IN (null, 2)): if a = 1, the final result is null.
a != 2: if a = 1, the final result is true

I think this rewrite is incorrect. We need to make sure the values of IN are all not null.

Copy link
Contributor Author

@ulysses-you ulysses-you Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, missed this. For Not(InSet) case, null value can not be skip safely, it should convert to and(xx != null).

if useAdvanced =>
Some(convertNotInToAnd(name, values))

case InSet(child, values) if useAdvanced && values.size > inSetThreshold =>
val dataType = child.dataType
// Skip null here is safe, more details could see at ExtractableLiterals.
Expand All @@ -779,10 +798,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
if useAdvanced && child.dataType == DateType =>
Some(convertInToOr(name, values))

case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)),
ExtractableDateValues(values))) if useAdvanced && child.dataType == DateType =>
Some(convertNotInToAnd(name, values))

case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))
if useAdvanced =>
Some(convertInToOr(name, values))

case Not(InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)))
if useAdvanced =>
Some(convertNotInToAnd(name, values))

case op @ SpecialBinaryComparison(
ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name ${op.symbol} $value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,47 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
(a("datecol", DateType) =!= Literal(Date.valueOf("2019-01-01"))) :: Nil,
"datecol != 2019-01-01")

filterTest("not-in, string filter",
(Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"))))) :: Nil,
"""(strcol != "a" and strcol != "b")""")

filterTest("not-in, string filter with null",
(Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"), Literal(null))))) :: Nil,
"")

filterTest("not-in, date filter",
(Not(In(a("datecol", DateType),
Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")))))) :: Nil,
"""(datecol != 2021-01-01 and datecol != 2021-01-02)""")

filterTest("not-in, date filter with null",
(Not(In(a("datecol", DateType),
Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")),
Literal(null))))) :: Nil,
"")

filterTest("not-inset, string filter",
(Not(InSet(a("strcol", StringType), Set(Literal("a").eval(), Literal("b").eval())))) :: Nil,
"""(strcol != "a" and strcol != "b")""")

filterTest("not-inset, string filter with null",
(Not(InSet(a("strcol", StringType),
Set(Literal("a").eval(), Literal("b").eval(), Literal(null).eval())))) :: Nil,
"")

filterTest("not-inset, date filter",
(Not(InSet(a("datecol", DateType),
Set(Literal(Date.valueOf("2020-01-01")).eval(),
Literal(Date.valueOf("2020-01-02")).eval())))) :: Nil,
"""(datecol != 2020-01-01 and datecol != 2020-01-02)""")

filterTest("not-inset, date filter with null",
(Not(InSet(a("datecol", DateType),
Set(Literal(Date.valueOf("2020-01-01")).eval(),
Literal(Date.valueOf("2020-01-02")).eval(),
Literal(null).eval())))) :: Nil,
"")

// Applying the predicate `x IN (NULL)` should return an empty set, but since this optimization
// will be applied by Catalyst, this filter converter does not need to account for this.
filterTest("SPARK-24879 IN predicates with only NULLs will not cause a NPE",
Expand Down Expand Up @@ -187,6 +228,14 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
}
}

test("Don't push not inset if it's values exceeds the threshold") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "2") {
val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3)))
val converted = shim.convertFilters(testTable, Seq(filter), conf.sessionLocalTimeZone)
assert(converted.isEmpty)
}
}

test("SPARK-34538: Skip InSet null value during push filter to Hive metastore") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "3") {
val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,76 @@ class HivePartitionFilteringSuite(version: String)
dateStrValue)
}

test("getPartitionsByFilter: not in/inset string type") {
def check(condition: Expression, result: Seq[String]): Unit = {
testMetastorePartitionFiltering(
condition,
dsValue,
hValue,
result,
dateValue,
dateStrValue
)
}

check(
Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab")))),
Seq("ba", "bb")
)
check(
Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab"), Literal(null)))),
chunkValue
Copy link
Contributor

@cloud-fan cloud-fan Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, this test can detect the correctness bug about null handling.

)

check(
Not(InSet(attr("chunk"), Set(Literal("aa").eval(), Literal("ab").eval()))),
Seq("ba", "bb")
)
check(
Not(InSet(attr("chunk"), Set("aa", "ab", null))),
chunkValue
)
}

test("getPartitionsByFilter: not in/inset date type") {
def check(condition: Expression, result: Seq[String]): Unit = {
testMetastorePartitionFiltering(
condition,
dsValue,
hValue,
chunkValue,
result,
dateStrValue
)
}

check(
Not(In(attr("d"),
Seq(Literal(Date.valueOf("2019-01-01")),
Literal(Date.valueOf("2019-01-02"))))),
Seq("2019-01-03")
)
check(
Not(In(attr("d"),
Seq(Literal(Date.valueOf("2019-01-01")),
Literal(Date.valueOf("2019-01-02")), Literal(null)))),
dateValue
)

check(
Not(InSet(attr("d"),
Set(Literal(Date.valueOf("2019-01-01")).eval(),
Literal(Date.valueOf("2019-01-02")).eval()))),
Seq("2019-01-03")
)
check(
Not(InSet(attr("d"),
Set(Literal(Date.valueOf("2019-01-01")).eval(),
Literal(Date.valueOf("2019-01-02")).eval(), null))),
dateValue
)
}

test("getPartitionsByFilter: cast(datestr as date)= 2020-01-01") {
testMetastorePartitionFiltering(
attr("datestr").cast(DateType) === Date.valueOf("2020-01-01"),
Expand Down