Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,38 +353,42 @@ class ParquetFileFormat
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)

// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}

val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
val filePath = fileSplit.getPath
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, when it's easy to reduce the diff, please reduce the diff to make it easier to track. I think the diff was considerably small if you just move these three line up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to create a follow-up PR?

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope .. the commit and blame history is already overwritten. Just wanted to leave a note for next time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Thank you


val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
fileSplit.getPath,
filePath,
fileSplit.getStart,
fileSplit.getStart + fileSplit.getLength,
fileSplit.getLength,
fileSplit.getLocations,
null)

val sharedConf = broadcastedHadoopConf.value.value

// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
.getFileMetaData.getSchema
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
.createFilter(parquetSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}

// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
def isCreatedByParquetMr(): Boolean = {
val footer = ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS)
val footer = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
}
val convertTz =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,187 +19,200 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.sql.Date

import scala.collection.JavaConverters.asScalaBufferConverter

import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.PrimitiveComparator
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType}
import org.apache.parquet.schema.OriginalType._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {

private case class ParquetSchemaType(
originalType: OriginalType,
primitiveTypeName: PrimitiveTypeName,
decimalMetadata: DecimalMetadata)

private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
private val ParquetLongType = ParquetSchemaType(null, INT64, null)
private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
case IntegerType =>
case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
case ParquetLongType =>
(n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
case ParquetFloatType =>
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
case ParquetStringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
case ParquetBinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if pushDownDate =>
case ParquetDateType if pushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
case IntegerType =>
case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
case ParquetLongType =>
(n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
case ParquetFloatType =>
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
case ParquetStringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
case ParquetBinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if pushDownDate =>
case ParquetDateType if pushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
case LongType =>
case ParquetLongType =>
(n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
case ParquetFloatType =>
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
case ParquetStringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
Copy link
Member Author

@wangyum wangyum Jul 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v can't be null here, removed Option(v).map. same as ParquetStringType.

}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[Integer])
case ParquetLongType =>
(n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
case ParquetFloatType =>
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
case ParquetStringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[Integer])
case ParquetLongType =>
(n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
case ParquetFloatType =>
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
case ParquetStringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.gt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
case LongType =>
private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Integer])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a problem here. we map Spark byte/short to parquet int, so the v here can be java.lang.Byte and we may have a cast exception. We should do v.asInstanceOf[java.lang.Number].intValue. @wangyum can you fix it and add a test? thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case ParquetLongType =>
(n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case FloatType =>
case ParquetFloatType =>
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
case ParquetStringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}

/**
* Returns a map from name of the column to the data type, if predicate push down applies.
*/
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match {
case m: MessageType =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
f.getName -> ParquetSchemaType(
f.getOriginalType, f.getPrimitiveTypeName, f.getDecimalMetadata)
}.toMap
case _ => Map.empty[String, ParquetSchemaType]
}

/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToType = getFieldMap(schema)

// Parquet does not allow dots in the column name because dots are used as a column path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ case class StringStartsWith(attribute: String, value: String) extends Filter {

/**
* A filter that evaluates to `true` iff the attribute evaluates to
* a string that starts with `value`.
* a string that ends with `value`.
*
* @since 1.3.1
*/
Expand Down
Loading