Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Jul 2, 2018

What changes were proposed in this pull request?

Replace DataFrame schema to Parquet file schema when create ParquetFilters.
Thus we can easily implement Decimal and Timestamp push down. some thing like this:

// DecimalType: 32BitDecimalType
case ParquetSchemaType(DECIMAL, INT32, decimal)
  if pushDownDecimal =>
  (n: String, v: Any) => FilterApi.eq(
    intColumn(n),
    Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
      .asInstanceOf[Integer]).orNull)
// DecimalType: 64BitDecimalType
case ParquetSchemaType(DECIMAL, INT64, decimal)
  if pushDownDecimal =>
  (n: String, v: Any) => FilterApi.eq(
    longColumn(n),
    Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
      .asInstanceOf[java.lang.Long]).orNull)
// DecimalType: LegacyParquetFormat 32BitDecimalType & 64BitDecimalType
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal)
  if pushDownDecimal && decimal.getPrecision <= Decimal.MAX_LONG_DIGITS =>
  (n: String, v: Any) => FilterApi.eq(
    binaryColumn(n),
    Option(v).map(d => decimalToBinaryUsingUnscaledLong(decimal.getPrecision,
      d.asInstanceOf[JBigDecimal])).orNull)
// DecimalType: ByteArrayDecimalType
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal)
  if pushDownDecimal && decimal.getPrecision > Decimal.MAX_LONG_DIGITS =>
  (n: String, v: Any) => FilterApi.eq(
    binaryColumn(n),
    Option(v).map(d => decimalToBinaryUsingUnscaledBytes(decimal.getPrecision,
      d.asInstanceOf[JBigDecimal])).orNull)
// INT96 doesn't support pushdown
case ParquetSchemaType(TIMESTAMP_MICROS, INT64, null) =>
  (n: String, v: Any) => FilterApi.eq(
    longColumn(n),
    Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
      .asInstanceOf[java.lang.Long]).orNull)
case ParquetSchemaType(TIMESTAMP_MILLIS, INT64, null) =>
  (n: String, v: Any) => FilterApi.eq(
    longColumn(n),
    Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[java.lang.Long]).orNull)

How was this patch tested?

unit tests

@SparkQA
Copy link

SparkQA commented Jul 2, 2018

Test build #92523 has finished for PR 21696 at commit 10d408f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ParquetSchemaType(

@SparkQA
Copy link

SparkQA commented Jul 2, 2018

Test build #92530 has finished for PR 21696 at commit 07dd372.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 2, 2018

Test build #92529 has finished for PR 21696 at commit e7b4ec9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jul 2, 2018

cc @gatorsmile @cloud-fan

case BooleanType =>
private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
// BooleanType
case ParquetSchemaType(null, BOOLEAN, null) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapping type reference:

private def convertField(field: StructField, repetition: Type.Repetition): Type = {
ParquetSchemaConverter.checkFieldName(field.name)
field.dataType match {
// ===================
// Simple atomic types
// ===================
case BooleanType =>
Types.primitive(BOOLEAN, repetition).named(field.name)
case ByteType =>
Types.primitive(INT32, repetition).as(INT_8).named(field.name)
case ShortType =>
Types.primitive(INT32, repetition).as(INT_16).named(field.name)
case IntegerType =>
Types.primitive(INT32, repetition).named(field.name)
case LongType =>
Types.primitive(INT64, repetition).named(field.name)
case FloatType =>
Types.primitive(FLOAT, repetition).named(field.name)
case DoubleType =>
Types.primitive(DOUBLE, repetition).named(field.name)
case StringType =>
Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
case DateType =>
Types.primitive(INT32, repetition).as(DATE).named(field.name)
// NOTE: Spark SQL can write timestamp values to Parquet using INT96, TIMESTAMP_MICROS or
// TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the default to keep the
// behavior same as before.
//
// As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
// timestamp in Impala for some historical reasons. It's not recommended to be used for any
// other types and will probably be deprecated in some future version of parquet-format spec.
// That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and
// `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
//
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
// from Spark 1.5.0, we resort to a timestamp type with microsecond precision so that we can
// store a timestamp into a `Long`. This design decision is subject to change though, for
// example, we may resort to nanosecond precision in the future.
case TimestampType =>
outputTimestampType match {
case SQLConf.ParquetOutputTimestampType.INT96 =>
Types.primitive(INT96, repetition).named(field.name)
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
}
case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)
// ======================
// Decimals (legacy mode)
// ======================
// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
// by `DECIMAL`.
case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.length(ParquetSchemaConverter.minBytesForPrecision(precision))
.named(field.name)
// ========================
// Decimals (standard mode)
// ========================
// Uses INT32 for 1 <= precision <= 9
case DecimalType.Fixed(precision, scale)
if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat =>
Types
.primitive(INT32, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.named(field.name)
// Uses INT64 for 1 <= precision <= 18
case DecimalType.Fixed(precision, scale)
if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat =>
Types
.primitive(INT64, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.named(field.name)
// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.length(ParquetSchemaConverter.minBytesForPrecision(precision))
.named(field.name)
// ===================================
// ArrayType and MapType (legacy mode)
// ===================================
// Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level
// `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro
// (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element
// field name "array" is borrowed from parquet-avro.
case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// optional group bag {
// repeated <element-type> array;
// }
// }
// This should not use `listOfElements` here because this new method checks if the
// element name is `element` in the `GroupType` and throws an exception if not.
// As mentioned above, Spark prior to 1.4.x writes `ArrayType` as `LIST` but with
// `array` as its element name as below. Therefore, we build manually
// the correct group type here via the builder. (See SPARK-16777)
Types
.buildGroup(repetition).as(LIST)
.addField(Types
.buildGroup(REPEATED)
// "array" is the name chosen by parquet-hive (1.7.0 and prior version)
.addField(convertField(StructField("array", elementType, nullable)))
.named("bag"))
.named(field.name)
// Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
// LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
// covered by the backwards-compatibility rules implemented in `isElementType()`.
case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated <element-type> element;
// }
// Here too, we should not use `listOfElements`. (See SPARK-16777)
Types
.buildGroup(repetition).as(LIST)
// "array" is the name chosen by parquet-avro (1.7.0 and prior version)
.addField(convertField(StructField("array", elementType, nullable), REPEATED))
.named(field.name)
// Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
// MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat =>
// <map-repetition> group <name> (MAP) {
// repeated group map (MAP_KEY_VALUE) {
// required <key-type> key;
// <value-repetition> <value-type> value;
// }
// }
ConversionPatterns.mapType(
repetition,
field.name,
convertField(StructField("key", keyType, nullable = false)),
convertField(StructField("value", valueType, valueContainsNull)))
// =====================================
// ArrayType and MapType (standard mode)
// =====================================
case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated group list {
// <element-repetition> <element-type> element;
// }
// }
Types
.buildGroup(repetition).as(LIST)
.addField(
Types.repeatedGroup()
.addField(convertField(StructField("element", elementType, containsNull)))
.named("list"))
.named(field.name)
case MapType(keyType, valueType, valueContainsNull) =>
// <map-repetition> group <name> (MAP) {
// repeated group key_value {
// required <key-type> key;
// <value-repetition> <value-type> value;
// }
// }
Types
.buildGroup(repetition).as(MAP)
.addField(
Types
.repeatedGroup()
.addField(convertField(StructField("key", keyType, nullable = false)))
.addField(convertField(StructField("value", valueType, valueContainsNull)))
.named("key_value"))
.named(field.name)
// ===========
// Other types
// ===========
case StructType(fields) =>
fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
builder.addField(convertField(field))
}.named(field.name)
case udt: UserDefinedType[_] =>
convertField(field.copy(dataType = udt.sqlType))
case _ =>
throw new AnalysisException(s"Unsupported data type $field.dataType")
}
}

val sharedConf = broadcastedHadoopConf.value.value

val fileMetaData =
ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS).getFileMetaData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we read footer again in the parquet reader?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. It should be avoided.isCreatedByParquetMr was intentionally a function to avoid it by short circuiting.

Copy link
Contributor

@rdblue rdblue Jul 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always read the footer and adjust the filters to match. In our version we use the following to handle situations where the column names have changed case or where Hive has returned a different case, which is very similar to this. I don't think this refactor alone is enough to justify the additional overhead, but we should consider whether Spark should be handling column case sensitivity differently.

First, here's our code, which is very similar to what's done here:

      // Try to push down filters when filter push-down is enabled.
      val pushed = if (pushdownEnabled) {
        // read the file schema to create Parquet filters that match case
        val fileReader = ParquetFileReader.open(conf, fileSplit.getPath)
        val fileSchema = try {
          new ParquetToSparkSchemaConverter(conf).convert(
            ParquetReadSupport.clipParquetSchema(
              fileReader.getFileMetaData.getSchema, requiredSchema))
        } finally {
          fileReader.close()
        }

        filters
            // Collects all converted Parquet filter predicates. Notice that not all predicates can
            // be converted (`ParquetFilters.createFilter` returns an `Option`). That's why a
            // `flatMap` is used here.
            .flatMap(ParquetFilters.createFilter(fileSchema, _))
            .reduceOption(FilterApi.and)
      } else {
        None
      }

Filters must match the file's names exactly, so it is the best practice to always make the filter that gets pushed matches the file schema. The only case where you wouldn't need to do this is when you don't support case insensitive column names, nor column renames.

If columns can be renamed within a Parquet file then you need to push filters for the names used in the file's schema. I don't think that's a problem here because I don't know of a way to rename columns in a Spark Parquet table. (Iceberg uses field IDs to do this.)

For case-insensitive column names, I don't think there is currently support in Spark, at least for Parquet files. But, the SQL engine does allow you to customize whether column names are case sensitive or not using the resolver.

So the decision here comes down to whether we think case insensitive naming should be supported when reading Parquet files (or if we are okay adding the overhead just for the pushdown refactor). I think it should, since the SQL analyzer allows you to resolve column names case-insensitive. In addition, the support for writing Parquet to a location makes it easy to create a table with multiple writes that are completely independent of one another -- there is no validation that one write matches an overall table schema. That makes it easy to write two partitions with schemas that differ in column name case, which Spark thinks is fine if you're using the right resolver options.

@cloud-fan
Copy link
Contributor

makes sense to me, since we need the physical schema information to pushdown decimal and timestamp. also cc @rdblue @michal-databricks

@HyukjinKwon
Copy link
Member

physical schema information shouldn't usually referred in a higher layer though cc @liancheng. It's kind of something we should avoid .. I got that we need this but wonder if this is the only way to get through.

@cloud-fan
Copy link
Contributor

you can't get the physical schema information in a higher layer, as it may vary for different files. The table schema can evolve(add/drop column).

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 3, 2018

Yea, I got that why it should have been done like this, and wonder if we can avoid this. It sounds more like a bandaid fix mainly caused by decimal. FWIW, in case of timestamp as INT96 (deprecated in Parquet), this is a legacy and something we should remove out in the long term.

@HyukjinKwon
Copy link
Member

Don't block on me. Just wanted to doubly sure if this is the only way. I am fine.

(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
case IntegerType =>
// IntegerType
case ParquetSchemaType(null, INT32, null) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, it was a valid assumption that the value's type matched the DataType. Now that this is the file's type that might not be the case. For example, byte and short are stored as INT32. This should cast to Number and then convert to the file's type.

I would also do this for INT64 columns, in case the schema has evolved and a column that was INT32 is not INT64. The converters (used to materialize records) don't currently support this, but it would be reasonable for them to support it eventually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the safest way is to look at both file's type and Spark's type, and deal with type mismatch. We can do it later since it's an existing problem. Currently Spark tries its best to guarantee the type matches(except missing/extra columns). The only case I can think of that may break the assumption is: the parquet files have conflicting schema and Sparks read them using a user-specified schema(so that we can skip schema inference) that doesn't match all the parquet files.

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
// BooleanType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other partial functions don't have these comments. Is that on purpose? Maybe these should be constants instead to make the code more readable and consistent?

case ParquetSchemaType(UTF8, BINARY, null) =>
(n: String, v: Any) => FilterApi.lt(
binaryColumn(n),
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this introduce Option to handle null passed as v?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question. It's different from the original code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry. I copy Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) from makeEq.

Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
Copy link
Member Author

@wangyum wangyum Jul 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v can't be null here, removed Option(v).map. same as ParquetStringType.

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92595 has finished for PR 21696 at commit 9b489ec.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jul 4, 2018

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92600 has finished for PR 21696 at commit 9b489ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92599 has finished for PR 21696 at commit 9b489ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92603 has finished for PR 21696 at commit 9e1aa9d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 021145f Jul 4, 2018
@rdblue
Copy link
Contributor

rdblue commented Jul 4, 2018

Thanks, @wangyum! I think this is refactor was a good idea.

case LongType =>
private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Integer])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a problem here. we map Spark byte/short to parquet int, so the v here can be java.lang.Byte and we may have a cast exception. We should do v.asInstanceOf[java.lang.Number].intValue. @wangyum can you fix it and add a test? thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fjh100456 pushed a commit to fjh100456/spark that referenced this pull request Aug 31, 2018
…t schema are in different letter cases

## What changes were proposed in this pull request?

Since apache#21696. Spark uses Parquet schema instead of Hive metastore schema to do pushdown.
That change can avoid wrong records returned when Hive metastore schema and parquet schema are in different letter cases. This pr add a test case for it.

More details:
https://issues.apache.org/jira/browse/SPARK-25206

## How was this patch tested?

unit tests

Closes apache#22267 from wangyum/SPARK-24716-TESTS.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
val filePath = fileSplit.getPath
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, when it's easy to reduce the diff, please reduce the diff to make it easier to track. I think the diff was considerably small if you just move these three line up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to create a follow-up PR?

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope .. the commit and blame history is already overwritten. Just wanted to leave a note for next time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants