Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Jun 13, 2018

What changes were proposed in this pull request?

Support Decimal type push down to the parquet data sources.
The Decimal comparator used is: BINARY_AS_SIGNED_INTEGER_COMPARATOR.

How was this patch tested?

unit tests and manual tests.

manual tests:

spark.range(10000000).selectExpr("id", "cast(id as decimal(9)) as d1", "cast(id as decimal(9, 2)) as d2", "cast(id as decimal(18)) as d3", "cast(id as decimal(18, 4)) as d4", "cast(id as decimal(38)) as d5", "cast(id as decimal(38, 18)) as d6").coalesce(1).write.option("parquet.block.size", 1048576).parquet("/tmp/spark/parquet/decimal")
val df = spark.read.parquet("/tmp/spark/parquet/decimal/")
spark.sql("set spark.sql.parquet.filterPushdown.decimal=true")
// Only read about 1 MB data
df.filter("d2 = 10000").show
// Only read about 1 MB data
df.filter("d4 = 10000").show
spark.sql("set spark.sql.parquet.filterPushdown.decimal=false")
// Read 174.3 MB data
df.filter("d2 = 10000").show
// Read 174.3 MB data
df.filter("d4 = 10000").show

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91769 has finished for PR 21556 at commit 9832661.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case decimal: DecimalType if DecimalType.is32BitDecimalType(decimal) =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(_.asInstanceOf[java.math.BigDecimal].unscaledValue().intValue()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REF:

val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91881 has finished for PR 21556 at commit 51d8540.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jun 15, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91894 has finished for PR 21556 at commit 51d8540.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jun 15, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91909 has finished for PR 21556 at commit 51d8540.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum changed the title [SPARK-24549][SQL] 32BitDecimalType and 64BitDecimalType support push down [SPARK-24549][SQL] Support Decimal type push down to the parquet data sources Jun 28, 2018
@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92413 has finished for PR 21556 at commit 0b5d0e7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jun 28, 2018

Jenkins, retest this please.

@maropu
Copy link
Member

maropu commented Jun 28, 2018

Can you benchmark code and results (on your env) in FilterPushdownBenchmark for this type?

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92414 has finished for PR 21556 at commit 0b5d0e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.booleanConf
.createWithDefault(true)

val PARQUET_READ_LEGACY_FORMAT = buildConf("spark.sql.parquet.readLegacyFormat")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property doesn't mention pushdown, but the description says it is only valid for push-down. Can you make the property name more clear?

Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
.asInstanceOf[java.lang.Long]).orNull)
case decimal: DecimalType
if pushDownDecimal && ((DecimalType.is32BitDecimalType(decimal) && readLegacyFormat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments here to explain what differs when readLegacyFormat is true.

if pushDownDecimal && (DecimalType.is32BitDecimalType(decimal) && !readLegacyFormat) =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to validate the scale of the decimal, or is scale adjusted in the analyzer?

test("filter pushdown - decimal") {
Seq(true, false).foreach { legacyFormat =>
withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyFormat.toString) {
Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 32BitDecimalType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is providing a column name, it would be better to use something more readable than _1.

}
}

test("incompatible parquet file format will throw exeception") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can detect the case where the data is written with the legacy format, then why do we need a property to read with the legacy format? Why not do the right thing without a property?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu
Copy link
Member

maropu commented Jul 2, 2018

@wangyum Thanks for the benchmarks!
@dongjoon-hyun In the benchmarks above, the results of ORC except for the case decimal(9, 2) have worse performance values as compared to the Parquet ones. Is this expected?

wangyum added 2 commits July 4, 2018 23:17
# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)

case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DecimalType contains variable: decimalMetadata. It seems difficult to make a constants like before.

Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
.asInstanceOf[java.lang.Long]).orNull)
// Legacy DecimalType
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal) if pushDownDecimal &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The binary used for the legacy type and for fixed-length storage should be the same, so I don't understand why there are two different conversion methods. Also, because this is using the Parquet schema now, there's no need to base the length of this binary on what older versions of Spark did -- in other words, if the underlying Parquet type is fixed, then just convert the decimal to that size fixed without worrying about legacy types.

I think this should pass in the fixed array's length and convert the BigDecimal value to that length array for all cases. That works no matter what the file contains.

intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)

case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this uses the file schema, I think it should validate that the file uses the same scale as the value passed in. That's a cheap sanity check to ensure correctness.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems invalidate value already filtered by:

protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't validate the value against the decimal scale from the file, which is what I'm suggesting. The decimal scale must match exactly and this is a good place to check because this has the file information. If the scale doesn't match, then the schema used to read this file is incorrect, which would cause data corruption.

In my opinion, it is better to add a check if it is cheap instead of debating whether or not some other part of the code covers the case. If this were happening per record then I would opt for a different strategy, but because this is at the file level it is a good idea to add it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add check method to canMakeFilterOn and add a test case:

    val decimal = new JBigDecimal(10).setScale(scale)
    assert(decimal.scale() === scale)
    assertResult(Some(lt(intColumn("cdecimal1"), 1000: Integer))) {
      parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal1", decimal))
    }

    val decimal1 = new JBigDecimal(10).setScale(scale + 1)
    assert(decimal1.scale() === scale + 1)

    assertResult(None) {
      parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal1", decimal1))
    }

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92619 has finished for PR 21556 at commit f160648.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private case class ParquetSchemaType(
originalType: OriginalType,
primitiveTypeName: PrimitiveTypeName,
decimalMetadata: DecimalMetadata)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need DecimalMetadata.

@SparkQA
Copy link

SparkQA commented Jul 5, 2018

Test build #92641 has finished for PR 21556 at commit c7308ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92843 has finished for PR 21556 at commit 16528f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case _ => false
}

// Since SPARK-24716, ParquetFilter accepts parquet file schema to convert to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this issue reference correct? The PR says this is for SPARK-24549.

(n: String, v: Any) =>
FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])

case ParquetSchemaType(DECIMAL, INT32, 0, _) if pushDownDecimal =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why match 0 instead of _?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, the length is always 0, I replaced it to _.

case ParquetStringType => value.isInstanceOf[String]
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
case ParquetDateType => value.isInstanceOf[Date]
case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the decimal cases be collapsed to a single case on ParquetSchemaType(DECIMAL, _, _, decimalMetadata)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried not using | and ignoring the physical type with _?

case ParquetDoubleType => value.isInstanceOf[JDouble]
case ParquetStringType => value.isInstanceOf[String]
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
case ParquetDateType => value.isInstanceOf[Date]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there no support for timestamp?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally it is not supported. Do we need to support it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR that adds Decimal support. We should consider it in the future, though.

System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length)
decimalBuffer
}
Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
Copy link
Contributor

@rdblue rdblue Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This byte array is not reused, it is allocated each time this function runs. This should use the fromConstantByteArray variant. That tells Parquet that it isn't necessary to make defensive copies of the bytes.

Native ORC Vectorized 3981 / 4049 4.0 253.1 1.0X
Native ORC Vectorized (Pushdown) 702 / 735 22.4 44.6 5.4X
Parquet Vectorized 4407 / 4852 3.6 280.2 1.0X
Parquet Vectorized (Pushdown) 1602 / 1634 9.8 101.8 2.8X
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on why this is slower than the other tests with decimal(18, 2) and decimal(38, 2)? This seems very strange to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is that the data is more dense, so we need to read more values in the row group that contains the one we're looking for?

Copy link
Member Author

@wangyum wangyum Jul 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because 1024 * 1024 * 15 is out of decimal(9, 2) range. so no stats for that column. I will update benchmark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. That's less than 2^24, so it should fit in an int. It should also fit in 8 base-ten digits so decimal(9,2) should work. And last, if the values don't fit in an int, I'm not sure how we would be able to store them in the first place, regardless of how stats are handled.

Did you verify that there are no stats for the file produced here? If that's the case, it would make sense with these numbers. I think we just need to look for a different reason why stats are missing.

Copy link
Member Author

@wangyum wangyum Jul 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a test:

// decimal(9, 2) max values is 9999999.99
// 1024 * 1024 * 15 =          15728640
val path = "/tmp/spark/parquet"
spark.range(1024 * 1024 * 15).selectExpr("cast((id) as decimal(9, 2)) as id").orderBy("id").write.mode("overwrite").parquet(path)

The generated parquet metadata:

$ java -jar ./parquet-tools/target/parquet-tools-1.10.1-SNAPSHOT.jar meta  /tmp/spark/parquet
file:        file:/tmp/spark/parquet/part-00000-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet 
creator:     parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:5728640 TS:36 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:38/36/0.95 VC:5728640 ENC:PLAIN,BIT_PACKED,RLE ST:[no stats for this column]
file:        file:/tmp/spark/parquet/part-00001-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet 
creator:     parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:651016 TS:2604209 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:2604325/2604209/1.00 VC:651016 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 0.00, max: 651015.00, num_nulls: 0]
file:        file:/tmp/spark/parquet/part-00002-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet 
creator:     parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:3231146 TS:12925219 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:12925864/12925219/1.00 VC:3231146 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 651016.00, max: 3882161.00, num_nulls: 0]
file:        file:/tmp/spark/parquet/part-00003-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet 
creator:     parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:2887956 TS:11552408 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:11552986/11552408/1.00 VC:2887956 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 3882162.00, max: 6770117.00, num_nulls: 0]
file:        file:/tmp/spark/parquet/part-00004-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet 
creator:     parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:3229882 TS:12920163 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:12920808/12920163/1.00 VC:3229882 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 6770118.00, max: 9999999.00, num_nulls: 0]

As you can see file:/tmp/spark/parquet/part-00000-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet have not generated stats for that column.

scala> spark.read.parquet(path).filter("id is null").count
res0: Long = 5728640 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. The tenths and hundredths are always 0, which makes the precision-8 numbers actually precision-10. It is still odd that this is causing Parquet to have no stats, but I'm happy with the fix. Thanks for explaining.

Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
}

private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since makeEq is called for EqualsNullSafe and valueCanMakeFilterOn allows null values through, I think these could be null, like the String case. I think this should use the Option pattern from String for all values, unless I'm missing some reason why these will never be null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetBooleanType, ParquetLongType, ParquetFloatType and ParquetDoubleType do not need Option. Here is a example:

scala> import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.Binary

scala> Option(null).map(s => Binary.fromString(s.asInstanceOf[String])).orNull
res7: org.apache.parquet.io.api.Binary = null

scala> Binary.fromString(null.asInstanceOf[String])
java.lang.NullPointerException
  at org.apache.parquet.io.api.Binary$FromStringBinary.encodeUTF8(Binary.java:224)
  at org.apache.parquet.io.api.Binary$FromStringBinary.<init>(Binary.java:214)
  at org.apache.parquet.io.api.Binary.fromString(Binary.java:554)
  ... 52 elided

scala> null.asInstanceOf[java.lang.Long]
res9: Long = null

scala> null.asInstanceOf[java.lang.Boolean]
res10: Boolean = null

scala> Option(null).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull
res11: Integer = null

scala> null.asInstanceOf[Number].intValue.asInstanceOf[Integer]
java.lang.NullPointerException
  ... 52 elided

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks!

makeEq.lift(nameToType(name)).map(_(name, value))
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) =>
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) =>
makeNotEq.lift(nameToType(name)).map(_(name, value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since makeNotEq is also used for EqualNullSafe, I think it should handle null values as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handled null values at valueCanMakeFilterOn:

def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
  value == null || (nameToType(name) match {
    case ParquetBooleanType => value.isInstanceOf[JBoolean]
    case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
    case ParquetLongType => value.isInstanceOf[JLong]
    case ParquetFloatType => value.isInstanceOf[JFloat]
    case ParquetDoubleType => value.isInstanceOf[JDouble]
    case ParquetStringType => value.isInstanceOf[String]
    case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
    case ParquetDateType => value.isInstanceOf[Date]
    case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
      isDecimalMatched(value, decimalMeta)
    case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
      isDecimalMatched(value, decimalMeta)
    case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
      isDecimalMatched(value, decimalMeta)
    case _ => false
  })
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but that returns true for all null values.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92936 has finished for PR 21556 at commit 33d1f18.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92938 has finished for PR 21556 at commit f73eab2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Jul 12, 2018

@wangyum, can you explain what was happening with the decimal(9,2) benchmark more clearly? I asked additional questions, but the thread is on a line that changed so it's collapsed by default.

Also, valueCanMakeFilterOn returns true for all null values, so I think we still have a problem there. Conversion from EqualNullSafe needs to support null filter values.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 13, 2018

@rdblue, so basically you mean it looks both equality comparison and nullsafe equality comparison are identically pushed down and looks it should be distinguished; otherwise, there could be a potential problem? If so, yup. I agree with it.

I think we won't have actually a chance to push down equality comparison or nullsafe equality comparison with actual null value by the optimizer. However, sure, I think we shouldn't relay on it. I think actually we should disallow one of both nullsafe equality comparison or equality comparison with null in ParquetFilters.

Thing is, I remember I checked the inside of Parquet's equality comparison API itself is actually nullsafe a long ago like few years ago - this of course should be double checked.

Since this PR doesn't change the existing behaviour on this and looks needing some more investigation (e.g., checking if it is still (or it has been) true what I remembered and checked about Parquet's equality comparison), probably, it might be okay to leave it as is here and proceed separately.

case _ => false
}

// Decimal type must make sure that filter value's scale matched the file.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we leave this comment around the decimal cases below or around isDecimalMatched?


// Decimal type must make sure that filter value's scale matched the file.
// If doesn't matched, which would cause data corruption.
// Other types must make sure that filter value's type matched the file.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say like .. Parquet's type in the given file should be matched to the value's type in the pushed filter in order to push down the filter to Parquet.

@rdblue
Copy link
Contributor

rdblue commented Jul 13, 2018

@HyukjinKwon, even if the values are null, the makeEq function only casts null to Java Integer so the handling is still safe. It just looks odd that null.asInstanceOf[JInt] is safe. Thanks to @wangyum for explaining it. Even if the null-safe equality predicate contains a null value, this should be safe.

And, passing null in an equals predicate is supported by Parquet.

@rdblue
Copy link
Contributor

rdblue commented Jul 13, 2018

+1, I think this looks ready to go.

@wangyum
Copy link
Member Author

wangyum commented Jul 14, 2018

@SparkQA
Copy link

SparkQA commented Jul 14, 2018

Test build #92996 has finished for PR 21556 at commit e713698.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@rdblue, ah, I misunderstood then. thanks for clarifying it.

@rdblue
Copy link
Contributor

rdblue commented Jul 14, 2018

I misunderstood how it was safe as well. It was Yuming's clarification that helped.

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@SparkQA
Copy link

SparkQA commented Jul 15, 2018

Test build #93014 has finished for PR 21556 at commit e31c201.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class JavaSummarizerExample
  • class SerializableConfiguration(@transient var value: Configuration)
  • class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
  • case class SchemaType(dataType: DataType, nullable: Boolean)
  • implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T])
  • implicit class AvroDataFrameReader(reader: DataFrameReader)
  • class KMeansModel (@Since(\"1.0.0\") val clusterCenters: Array[Vector],
  • trait ComplexTypeMergingExpression extends Expression
  • case class Size(child: Expression) extends UnaryExpression with ExpectsInputTypes
  • abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast
  • case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike

@wangyum
Copy link
Member Author

wangyum commented Jul 15, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 15, 2018

Test build #93017 has finished for PR 21556 at commit e31c201.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class JavaSummarizerExample
  • class SerializableConfiguration(@transient var value: Configuration)
  • class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
  • case class SchemaType(dataType: DataType, nullable: Boolean)
  • implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T])
  • implicit class AvroDataFrameReader(reader: DataFrameReader)
  • class KMeansModel (@Since(\"1.0.0\") val clusterCenters: Array[Vector],
  • trait ComplexTypeMergingExpression extends Expression
  • case class Size(child: Expression) extends UnaryExpression with ExpectsInputTypes
  • abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast
  • case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 9549a28 Jul 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants