Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Jun 28, 2018

What changes were proposed in this pull request?

This pr modified code to project required data from CSV parsed data when column pruning disabled.
In the current master, an exception below happens if spark.sql.csv.parser.columnPruning.enabled is false. This is because required formats and CSV parsed formats are different from each other;

./bin/spark-shell --conf spark.sql.csv.parser.columnPruning.enabled=false
scala> val dir = "/tmp/spark-csv/csv"
scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
18/06/25 13:48:46 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:41)
        ...

How was this patch tested?

Added tests in CSVSuite.

@maropu
Copy link
Member Author

maropu commented Jun 28, 2018

@HyukjinKwon @MaxGekk

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92418 has finished for PR 21657 at commit ecd3c80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use something else instead of avg? I would eliminate equality checking for floating point operands. You can easily get 4.49999999999 instead of 4.5.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the bug is not related to the column pruning feature, and most likely it presented in previous versions. Should it be ported to the 2.3 branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, before the column pruning pr, UnivocityParser always required columns only. So, we don't need to project them in CSVFileFormat. cc: @HyukjinKwon

private val row = new GenericInternalRow(requiredSchema.length)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we better just merge this into the master since we already added some changes related with column pruning stuff. Let me double check it before merging it in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string exprs and selectExpr can be replaced by .select(sum('p), avg('c0))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both is ok. We have preferred one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both are fine~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"true" -> true ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, if required schema is empty, the fix works too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Jun 29, 2018

Test build #92445 has finished for PR 21657 at commit 4f192a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 29, 2018

Actually the issue happened because I removed the mapping: 64fad0b#diff-d19881aceddcaa5c60620fdcda99b4c4L79

I would propose to revert it back, and remove all those "expensive" (comparing to look up tokenIndexArr) maps and the projection: https://github.com/apache/spark/pull/21657/files#diff-a549ac2e19ee7486911e2e6403444d9dR156

@maropu
Copy link
Member Author

maropu commented Jul 2, 2018

@MaxGekk Do you mean we remove the option for column pruning in csv?

@MaxGekk
Copy link
Member

MaxGekk commented Jul 2, 2018

Do you mean we remove the option for column pruning in csv?

I mean reverting back the index mapping - tokenIndexArr. In this case, your changes in buildReader are not needed.

If spark.sql.csv.parser.columnPruning.enabledis set to false, the tokenIndexArr will map data index to required index. In the default case (spark.sql.csv.parser.columnPruning.enabled is true), the array is actually not needed, and it can be replaced by the identity function.

@maropu
Copy link
Member Author

maropu commented Jul 2, 2018

Not sure though, the tokenIndexArr implementation is always faster than the unsafe projection?

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking into account that spark.sql.csv.parser.columnPruning.enabled is set to true by default. I think performance of another path is not so matter. For me both solutions are ok.

@maropu
Copy link
Member Author

maropu commented Jul 2, 2018

@HyukjinKwon WDYT?

@maropu
Copy link
Member Author

maropu commented Jul 3, 2018

@HyukjinKwon kindly ping

@HyukjinKwon
Copy link
Member

Eh .. actually can we revive 64fad0b#diff-d19881aceddcaa5c60620fdcda99b4c4L79 ? This sounds safer to me.

@maropu
Copy link
Member Author

maropu commented Jul 3, 2018

ok, will revive the code here

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92590 has finished for PR 21657 at commit fc2108e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 4, 2018

@HyukjinKwon @MaxGekk plz check?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the code here and in the line https://github.com/apache/spark/pull/21657/files#diff-d19881aceddcaa5c60620fdcda99b4c4R51 above is the same. Could you fold the lines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would apply this small optimization: java.lang.Integer.valueOf(dataSchema.indexOf(f))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lazy for tokenIndexArr means that internal laziness flag will be check per each token. I would remove lazy for tokenIndexArr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@SparkQA
Copy link

SparkQA commented Jul 5, 2018

Test build #92629 has finished for PR 21657 at commit af620cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you convert it to Array explicitly. I have checked type of tokenIndexArr, it is actually scala.collection.immutable.$colon$colon - lazy list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any side-effect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have O(n) instead of O(1) for getting a value from the collection by an index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see. I'll recheck. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fixed.

@maropu
Copy link
Member Author

maropu commented Jul 6, 2018

cc: @HyukjinKwon

@SparkQA
Copy link

SparkQA commented Jul 7, 2018

Test build #92703 has finished for PR 21657 at commit d5921f0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 7, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 7, 2018

Test build #92704 has finished for PR 21657 at commit d5921f0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 7, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 7, 2018

Test build #92706 has finished for PR 21657 at commit d5921f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 9, 2018

@MaxGekk, does this look good to you in general?

@MaxGekk
Copy link
Member

MaxGekk commented Jul 9, 2018

@HyukjinKwon yes


// This index is used to reorder parsed tokens
private val tokenIndexArr =
requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, we can do an optimization by memory here. The array is used under the flag options.columnPruning only. We can create an empty array (or null) if options.columnPruning is set to false.

Copy link
Member Author

@maropu maropu Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This array is used in both cases: line 56 ( options.columnPruning=true) and line 208 (options.columnPruning=false)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see

val options: CSVOptions) extends Logging {
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
"requiredSchema should be the subset of schema.")
"requiredSchema should be the subset of dataSchema.")
Copy link
Member

@gatorsmile gatorsmile Jul 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: generally, we should consider printing out the schemas.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92834 has finished for PR 21657 at commit dd5bb59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 11, 2018

@HyukjinKwon ping

@MaxGekk
Copy link
Member

MaxGekk commented Jul 11, 2018

@HyukjinKwon @gatorsmile Would you mind to merge the PR?

@HyukjinKwon
Copy link
Member

Please let me do one pass within few days.

@gatorsmile
Copy link
Member

ok to test

@gatorsmile
Copy link
Member

retest this please

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for finding and fixing this issue!

LGTM except a few comments.


class UnivocityParser(
dataSchema: StructType,
requiredSchema: StructType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the parameter descriptions of dataSchema and requiredSchema above class UnivocityParser?

val dir = path.getAbsolutePath
spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.partitionBy("p")
.option("header", "true").csv(dir)
var df = spark.read.option("header", true).csv(dir).selectExpr("sum(p)", "count(c0)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, we do not use var for DataFrame even in test cases


val tokenizer = {
val parserSetting = options.asParserSettings
if (options.columnPruning && requiredSchema.length < dataSchema.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified to

// When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV parser select a sequence of fields for reading by their positions.
if (parsedSchema.length < dataSchema.length)

private val schema = if (options.columnPruning) requiredSchema else dataSchema

private val row = new GenericInternalRow(schema.length)
private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment like

// When column pruning is enabled, the parser only parses the required columns based on their positions in the data schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

var i = 0
while (i < schema.length) {
row(i) = valueConverters(i).apply(tokens(i))
while (i < requiredSchema.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the comment like

// When the length of the returned tokens is identical to the length of the parsed schema, we just need to convert the tokens that correspond to the required columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != schema.length) {
if (tokens.length != parsedSchema.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, could you add a test case that satisfy tokens.length != parsedSchema.length

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

@SparkQA
Copy link

SparkQA commented Jul 15, 2018

Test build #93010 has finished for PR 21657 at commit dd5bb59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

@SparkQA
Copy link

SparkQA commented Jul 15, 2018

Test build #93015 has finished for PR 21657 at commit 81b3971.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master

@asfgit asfgit closed this in d463533 Jul 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants