Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,21 @@ class ParquetFileFormat

lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand All @@ -296,9 +306,6 @@ class ParquetFileFormat
None
}

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros}
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -48,7 +50,8 @@ class ParquetFilters(
pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int,
caseSensitive: Boolean) {
caseSensitive: Boolean,
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
// A map which contains parquet field name and data type, if predicate push down applies.
//
// Each key in `nameToParquetField` represents a column; `dots` are used as separators for
Expand Down Expand Up @@ -129,14 +132,26 @@ class ParquetFilters(
private val ParquetTimestampMillisType =
ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS), INT64, 0)

private def dateToDays(date: Any): Int = date match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
private def dateToDays(date: Any): Int = {
val gregorianDays = date match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
}
datetimeRebaseMode match {
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays)
case _ => gregorianDays
}
}

private def timestampToMicros(v: Any): JLong = v match {
case i: Instant => DateTimeUtils.instantToMicros(i)
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
private def timestampToMicros(v: Any): JLong = {
val gregorianMicros = v match {
case i: Instant => DateTimeUtils.instantToMicros(i)
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
}
datetimeRebaseMode match {
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros)
case _ => gregorianMicros
}
}

private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,21 @@ case class ParquetPartitionReaderFactory(

lazy val footerFileMetaData =
ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand Down Expand Up @@ -170,9 +180,6 @@ case class ParquetPartitionReaderFactory(
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -51,8 +52,17 @@ case class ParquetScanBuilder(
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetSchema =
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema())
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
// The rebase mode doesn't matter here because the filters are used to determine
// whether they is convertible.
LegacyBehaviorPolicy.CORRECTED)
parquetFilters.convertibleFilters(this.filters).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create ParquetFilters here only to check which filters are convertible, and rebase mode doesn't matter.

Shall we use pass datetimeRebaseMode as CORRECTED here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Let me pass CORRECTED here.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedSQLTest
Expand Down Expand Up @@ -73,11 +75,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared

protected def createParquetFilters(
schema: MessageType,
caseSensitive: Option[Boolean] = None): ParquetFilters =
caseSensitive: Option[Boolean] = None,
datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED
): ParquetFilters =
new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
conf.parquetFilterPushDownInFilterThreshold,
caseSensitive.getOrElse(conf.caseSensitiveAnalysis))
caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
datetimeRebaseMode)

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down Expand Up @@ -587,71 +592,75 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
def date: Date = Date.valueOf(s)
}

val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21")
import testImplicits._

Seq(false, true).foreach { java8Api =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
implicit val df: DataFrame = inputDF

def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
fun(parsed)
}

val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)

checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) {
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
implicit val df: DataFrame = inputDF

def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
fun(parsed)
}

Seq(3, 20).foreach { threshold =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") {
checkFilterPredicate(
In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date,
"2018-03-22".date).map(Literal.apply)),
if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or],
Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")),
Row(resultFun("2018-03-21"))))
val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)

checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21"))))

Seq(3, 20).foreach { threshold =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") {
checkFilterPredicate(
In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date,
"2018-03-22".date).map(Literal.apply)),
if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or],
Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")),
Row(resultFun("2018-03-21"))))
}
}
}
}
Expand All @@ -661,35 +670,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared

test("filter pushdown - timestamp") {
Seq(true, false).foreach { java8Api =>
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED",
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
val millisData = Seq(
"1000-06-14 08:28:53.123",
"1582-06-15 08:28:53.001",
"1900-06-16 08:28:53.0",
"2018-06-17 08:28:53.999")
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) {
testTimestampPushdown(millisData, java8Api)
}

// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
val microsData = Seq(
"1000-06-14 08:28:53.123456",
"1582-06-15 08:28:53.123456",
"1900-06-16 08:28:53.123456",
"2018-06-17 08:28:53.123456")
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) {
testTimestampPushdown(microsData, java8Api)
}

// spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.INT96.toString) {
// INT96 doesn't support pushdown
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) {
import testImplicits._
withTempPath { file =>
millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF
Expand Down