-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26312][SQL]Replace RDDConversions.rowToRowRdd with RowEncoder to improve its conversion performance #23262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…access performance
|
Test build #99861 has finished for PR 23262 at commit
|
|
retest this please |
|
Test build #99866 has finished for PR 23262 at commit
|
|
Good catch, LGTM |
| val numColumns = outputTypes.length | ||
| val mutableRow = new GenericInternalRow(numColumns) | ||
| val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) | ||
| val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use ExpressionEncoder here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a good suggestion, and has been modified, would you like to review it again, thanks.
| val numColumns = outputTypes.length | ||
| val mutableRow = new GenericInternalRow(numColumns) | ||
| val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) | ||
| val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use RowEncoder here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been modified, and the performance is the same as converting to arrays.
| object RDDConversions { | ||
| def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { | ||
| def productToRowRdd[A <: Product : TypeTag](data: RDD[A], | ||
| outputSchema: StructType): RDD[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, seems like this is never used actually... shall we remove it instead if this is the case?
| * Convert the objects inside Row into the types Catalyst expected. | ||
| */ | ||
| def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = { | ||
| def rowToRowRdd(data: RDD[Row], outputSchema: StructType): RDD[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove whole object. rowToRowRdd looks only being used at one place and the code here is quite small.
| */ | ||
| def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = { | ||
| def rowToRowRdd(data: RDD[Row], outputSchema: StructType): RDD[InternalRow] = { | ||
| val converters = RowEncoder(outputSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked each case. Every case looks fine except one case:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
Line 289 in fa0d4bf
| case chr: Char => UTF8String.fromString(chr.toString) |
Looks we're going to drop Char as StringType. I think it's trivial and rather a mistake that we supported this. I don't feel strongly about documenting it in migration guide but if anyone feels so, we better do that.
|
LGTM otheriwse |
|
@HyukjinKwon @mgaido91 Thanks for review. @cloud-fan @kiszk Would you like to give some suggestions: remove the object |
|
Test build #99899 has finished for PR 23262 at commit
|
|
Let's remove. No point of keeping unused method. The code will remain in the commit anyway. Also, there's no quite good point of keeping few lines method that's called only at one place. |
|
@HyukjinKwon Ok, removed it, thanks for review. |
| execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) | ||
| val converters = RowEncoder(StructType.fromAttributes(output)) | ||
| rdd.mapPartitions { iterator => | ||
| iterator.map { r => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: iterator.map(converters.toRow)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified, thanks.
|
LGTM, can you update the PR title and description? |
|
Test build #99903 has finished for PR 23262 at commit
|
RDDConversions.rowToRowRdd with RowEncoder to improve its conversion performance
RDDConversions.rowToRowRdd with RowEncoder to improve its conversion performance|
@cloud-fan Updated, thanks. |
|
retest this please |
|
LGTM |
|
Test build #99911 has finished for PR 23262 at commit
|
|
Test build #99916 has finished for PR 23262 at commit
|
|
Test build #99910 has finished for PR 23262 at commit
|
|
Test build #99913 has finished for PR 23262 at commit
|
|
thanks, merging to master! |
|
It doesn't work right now .. right? haha. I tried to merge but failed. Some how the apache spark repo looks down (?). |
|
yea it didn't work. The PR is not merged. I'll try it later. cc @srowen do you hit the same issue? I already switched to gitbox |
|
Oops, looks working fine after switching to gitbox to me. |
|
Weird. My remote is as below FWIW. |
|
It's been working for me, with the merge script and with GitHub and gitbox |
… to improve its conversion performance ## What changes were proposed in this pull request? `RDDConversions` would get disproportionately slower as the number of columns in the query increased, for the type of `converters` before is `scala.collection.immutable.::` which is a subtype of list. This PR removing `RDDConversions` and using `RowEncoder` to convert the Row to InternalRow. The test of `PrunedScanSuite` for 2000 columns and 20k rows takes 409 seconds before this PR, and 361 seconds after. ## How was this patch tested? Test case of `PrunedScanSuite` Closes apache#23262 from eatoncys/toarray. Authored-by: 10129659 <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
RDDConversionswould get disproportionately slower as the number of columns in the query increased,for the type of
convertersbefore isscala.collection.immutable.::which is a subtype of list.This PR removing
RDDConversionsand usingRowEncoderto convert the Row to InternalRow.The test of
PrunedScanSuitefor 2000 columns and 20k rows takes 409 seconds before this PR, and 361 seconds after.How was this patch tested?
Test case of
PrunedScanSuite