Skip to content

Commit ca63916

Browse files
lianchengrxin
authored andcommitted
[SPARK-17213][SQL] Disable Parquet filter push-down for string and binary columns due to PARQUET-686
This PR targets to both master and branch-2.1. ## What changes were proposed in this pull request? Due to PARQUET-686, Parquet doesn't do string comparison correctly while doing filter push-down for string columns. This PR disables filter push-down for both string and binary columns to work around this issue. Binary columns are also affected because some Parquet data models (like Hive) may store string columns as a plain Parquet `binary` instead of a `binary (UTF8)`. ## How was this patch tested? New test case added in `ParquetFilterSuite`. Author: Cheng Lian <lian@databricks.com> Closes apache#16106 from liancheng/spark-17213-bad-string-ppd.
1 parent c82f16c commit ca63916

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ private[parquet] object ParquetFilters {
4040
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
4141
case DoubleType =>
4242
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
43+
44+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
45+
/*
4346
// Binary.fromString and Binary.fromByteArray don't accept null values
4447
case StringType =>
4548
(n: String, v: Any) => FilterApi.eq(
@@ -49,6 +52,7 @@ private[parquet] object ParquetFilters {
4952
(n: String, v: Any) => FilterApi.eq(
5053
binaryColumn(n),
5154
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
55+
*/
5256
}
5357

5458
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -62,6 +66,9 @@ private[parquet] object ParquetFilters {
6266
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
6367
case DoubleType =>
6468
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
69+
70+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
71+
/*
6572
case StringType =>
6673
(n: String, v: Any) => FilterApi.notEq(
6774
binaryColumn(n),
@@ -70,6 +77,7 @@ private[parquet] object ParquetFilters {
7077
(n: String, v: Any) => FilterApi.notEq(
7178
binaryColumn(n),
7279
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
80+
*/
7381
}
7482

7583
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -81,13 +89,17 @@ private[parquet] object ParquetFilters {
8189
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
8290
case DoubleType =>
8391
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
92+
93+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
94+
/*
8495
case StringType =>
8596
(n: String, v: Any) =>
8697
FilterApi.lt(binaryColumn(n),
8798
Binary.fromString(v.asInstanceOf[String]))
8899
case BinaryType =>
89100
(n: String, v: Any) =>
90101
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
102+
*/
91103
}
92104

93105
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -99,13 +111,17 @@ private[parquet] object ParquetFilters {
99111
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
100112
case DoubleType =>
101113
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
114+
115+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
116+
/*
102117
case StringType =>
103118
(n: String, v: Any) =>
104119
FilterApi.ltEq(binaryColumn(n),
105120
Binary.fromString(v.asInstanceOf[String]))
106121
case BinaryType =>
107122
(n: String, v: Any) =>
108123
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
124+
*/
109125
}
110126

111127
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -117,13 +133,17 @@ private[parquet] object ParquetFilters {
117133
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
118134
case DoubleType =>
119135
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
136+
137+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
138+
/*
120139
case StringType =>
121140
(n: String, v: Any) =>
122141
FilterApi.gt(binaryColumn(n),
123142
Binary.fromString(v.asInstanceOf[String]))
124143
case BinaryType =>
125144
(n: String, v: Any) =>
126145
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
146+
*/
127147
}
128148

129149
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -135,13 +155,17 @@ private[parquet] object ParquetFilters {
135155
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
136156
case DoubleType =>
137157
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
158+
159+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
160+
/*
138161
case StringType =>
139162
(n: String, v: Any) =>
140163
FilterApi.gtEq(binaryColumn(n),
141164
Binary.fromString(v.asInstanceOf[String]))
142165
case BinaryType =>
143166
(n: String, v: Any) =>
144167
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
168+
*/
145169
}
146170

147171
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
4747
* data type is nullable.
4848
*/
4949
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
50-
5150
private def checkFilterPredicate(
5251
df: DataFrame,
5352
predicate: Predicate,
@@ -230,7 +229,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
230229
}
231230
}
232231

233-
test("filter pushdown - string") {
232+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
233+
ignore("filter pushdown - string") {
234234
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
235235
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
236236
checkFilterPredicate(
@@ -258,7 +258,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
258258
}
259259
}
260260

261-
test("filter pushdown - binary") {
261+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
262+
ignore("filter pushdown - binary") {
262263
implicit class IntToBinary(int: Int) {
263264
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
264265
}
@@ -558,4 +559,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
558559
}
559560
}
560561
}
562+
563+
test("SPARK-17213: Broken Parquet filter push-down for string columns") {
564+
withTempPath { dir =>
565+
import testImplicits._
566+
567+
val path = dir.getCanonicalPath
568+
// scalastyle:off nonascii
569+
Seq("a", "é").toDF("name").write.parquet(path)
570+
// scalastyle:on nonascii
571+
572+
assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
573+
assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
574+
575+
// scalastyle:off nonascii
576+
assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
577+
assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
578+
// scalastyle:on nonascii
579+
}
580+
}
561581
}

0 commit comments

Comments
 (0)