-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24706][SQL] ByteType and ShortType support pushdown to parquet #21682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #92503 has finished for PR 21682 at commit
|
|
retest this please |
|
@HyukjinKwon btw, why we don't support pushdown for these types? Any historical reason? |
| private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { | ||
| case BooleanType => | ||
| (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) | ||
| case ByteType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us update the comment in the line 34.
Note: For now timestamp SQL type is not supported for pushing down, because the corresponding
Parquet type can be Long or INT96. There is no context about Parquet type here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably INT64 or INT96 (which is long vs binary) might be a bit better while we are here.
|
@maropu We accidentally dropped the pushdown of these data types when we refactored the file formats. You can check the change history and find the PR. |
|
Test build #92519 has finished for PR 21682 at commit
|
| private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { | ||
| case BooleanType => | ||
| (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) | ||
| case ByteType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for doubly sure, is it an official way to handle short and byte? Can you double check and leave the description about the API specification (or related code bit or reference) @wangyum? Parquet one is pretty core and we should better be clear on this (so that we could also probably judge if we need a configuration or not). @rdblue too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually, both byte and short would be stored as integers in Parquet. Because Parquet uses bit packing, it doesn't matter if you store them as ints (or even longs) because they'll get packed into the same space.
The important thing is to match the Parquet file's type when pushing a filter. Since Spark stores ByteType and ShortType in Parquet as INT32, this is correct.
|
@gatorsmile aha, thanks. |
|
re: #21682 (comment) - I couldn't recall related things about it. Thanks @gatorsmile. So, is it a regression? shall we update the migration guide to note that the performance effect? - like if your workload are heavily dependent on short and byte in Parquet, avoid upgrading to xx version. This optimization was restored in yy+ version. Please consider using yy+ version. |
|
It is a regression that was introduced in Spark 1.2. Almost 4 years ago. https://issues.apache.org/jira/browse/SPARK-4453 Thus, I think we do not need to document it. The pushdown can be treated as new features. : ) |
|
Wow .. so it was 4 years ago .. okay. |
| case ByteType => | ||
| (n: String, v: Any) => FilterApi.eq( | ||
| intColumn(n), | ||
| Option(v).map(b => b.asInstanceOf[java.lang.Byte].toInt.asInstanceOf[Integer]).orNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use Byte instead of java.lang.Byte here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
Also, there's no need to use Option.map because the value cannot be null. That's why the IntegerType case just casts the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makeEq.lift(nameToType(name)).map(_(name, null)) makes value to null.
scala> null.asInstanceOf[Short].toInt.asInstanceOf[Integer]
res49: Integer = 0
scala> null.asInstanceOf[java.lang.Short].toInt.asInstanceOf[Integer]
java.lang.NullPointerException
at scala.Predef$.Short2short(Predef.scala:360)
... 51 elidedThat's why I use Option.map here.
| case ShortType => | ||
| (n: String, v: Any) => FilterApi.eq( | ||
| intColumn(n), | ||
| Option(v).map(b => b.asInstanceOf[java.lang.Short].toInt.asInstanceOf[Integer]).orNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use Short instead of java.lang.Short here.
| (n: String, v: Any) => FilterApi.eq( | ||
| intColumn(n), | ||
| Option(v).map(b => b.asInstanceOf[java.lang.Byte].toInt.asInstanceOf[Integer]).orNull) | ||
| case ShortType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we actually fold the cases into like:
case ByteType | ShortType | IntegerType =>
... b.asInstanceOf[Number].intValue()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about like this:
makeEqandmakeNotEq
case ByteType | ShortType =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
case IntegerType =>
(n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])makeLt,makeLtEq,makeGtandmakeGtEq:
case ByteType | ShortType =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n),
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case IntegerType =>
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])|
+1 I agree with some of the minor refactoring suggestions, but overall this looks correct to me. |
case ShortType => -> case ByteType | ShortType =>
| case ByteType | ShortType => | ||
| (n: String, v: Any) => FilterApi.eq( | ||
| intColumn(n), | ||
| Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value may be null.
| case ByteType | ShortType => | ||
| (n: String, v: Any) => FilterApi.lt( | ||
| intColumn(n), | ||
| v.asInstanceOf[Number].intValue.asInstanceOf[Integer]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value cannot be null.
|
Test build #92589 has finished for PR 21682 at commit
|
|
ping @wangyum |
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
|
Test build #92781 has finished for PR 21682 at commit
|
| case ParquetByteType | ParquetShortType | ParquetIntegerType => | ||
| (n: String, v: Any) => FilterApi.eq( | ||
| intColumn(n), | ||
| Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when v can be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makeEq and makeNotEq may be null:
Lines 241 to 242 in 021145f
| case sources.IsNull(name) if canMakeFilterOn(name) => | |
| makeEq.lift(nameToType(name)).map(_(name, null)) |
|
thanks, merging to master! |
What changes were proposed in this pull request?
ByteTypeandShortTypesupport pushdown to parquet data source.Benchmark result.
How was this patch tested?
unit tests