Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ class ParquetFileFormat
sparkSession.sessionState.conf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass pushed instead of declaring new pushDownDate?
The following can be handled at line 345 here, not inside (file: PartitionedFile) => {}

       // Try to push down filters when filter push-down is enabled.
       val pushed = if (enableParquetFilterPushDown) {
         filters
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
           // is used here.
          .flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
          .reduceOption(FilterApi.and)
       } else {
         None
       }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we can't, see #21086

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thank you, @cloud-fan !


(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -352,7 +353,7 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] object ParquetFilters {
private[parquet] class ParquetFilters(pushDownDate: Boolean) {

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
Expand All @@ -59,7 +58,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Expand All @@ -85,7 +84,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Expand All @@ -108,7 +107,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Expand All @@ -131,7 +130,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Expand All @@ -154,7 +153,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.gt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Expand All @@ -177,7 +176,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
*/
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {

private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)

override def beforeEach(): Unit = {
super.beforeEach()
// Note that there are many tests here that require record-level filtering set to be true.
Expand Down Expand Up @@ -99,7 +101,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(selectedFilters.nonEmpty, "No filter is pushed down")

selectedFilters.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
val maybeFilter = parquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
maybeFilter.exists(_.getClass === filterClass)
Expand Down Expand Up @@ -517,23 +519,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
lt(intColumn("a"), 10: Integer),
gt(doubleColumn("c"), 1.5: java.lang.Double)))
) {
ParquetFilters.createFilter(
parquetFilters.createFilter(
schema,
sources.And(
sources.LessThan("a", 10),
sources.GreaterThan("c", 1.5D)))
}

assertResult(None) {
ParquetFilters.createFilter(
parquetFilters.createFilter(
schema,
sources.And(
sources.LessThan("a", 10),
sources.StringContains("b", "prefix")))
}

assertResult(None) {
ParquetFilters.createFilter(
parquetFilters.createFilter(
schema,
sources.Not(
sources.And(
Expand Down