Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix
  • Loading branch information
maropu committed Jul 7, 2018
commit 2d11e5fdadb006aaa9b9bc7cfa6a6b39e4ca8f6c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -132,30 +131,20 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
)
}
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val columnPruning = sparkSession.sessionState.conf.csvColumnPruning

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
val parser = new UnivocityParser(
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
val inputRows = CSVDataSource(parsedOptions).readFile(
CSVDataSource(parsedOptions).readFile(
conf,
file,
parser,
requiredSchema,
dataSchema,
caseSensitive)

if (columnPruning) {
inputRows
} else {
val inputAttrs = dataSchema.toAttributes
val outputAttrs = requiredSchema.map(dataSchema.indexOf).map(inputAttrs)
val outputProjection = GenerateUnsafeProjection.generate(outputAttrs, inputAttrs)
inputRows.map(outputProjection)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class UnivocityParser(
requiredSchema: StructType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the parameter descriptions of dataSchema and requiredSchema above class UnivocityParser?

val options: CSVOptions) extends Logging {
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
"requiredSchema should be the subset of schema.")
"requiredSchema should be the subset of dataSchema.")
Copy link
Member

@gatorsmile gatorsmile Jul 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: generally, we should consider printing out the schemas.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)

Expand All @@ -53,9 +53,10 @@ class UnivocityParser(
}
new CsvParser(parserSetting)
}
private val schema = if (options.columnPruning) requiredSchema else dataSchema

private val row = new GenericInternalRow(schema.length)
private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment like

// When column pruning is enabled, the parser only parses the required columns based on their positions in the data schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


private val row = new GenericInternalRow(requiredSchema.length)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
Expand All @@ -82,7 +83,12 @@ class UnivocityParser(
//
// output row - ["A", 2]
private val valueConverters: Array[ValueConverter] = {
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
}

// If `columnPruning` disabled, this index is used to reorder parsed tokens
private lazy val tokenIndexArr: Array[Int] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the code here and in the line https://github.com/apache/spark/pull/21657/files#diff-d19881aceddcaa5c60620fdcda99b4c4R51 above is the same. Could you fold the lines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

requiredSchema.map(f => dataSchema.indexOf(f)).toArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would apply this small optimization: java.lang.Integer.valueOf(dataSchema.indexOf(f))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

/**
Expand Down Expand Up @@ -183,7 +189,7 @@ class UnivocityParser(
}
}

private val doParse = if (schema.nonEmpty) {
private val doParse = if (requiredSchema.nonEmpty) {
(input: String) => convert(tokenizer.parseLine(input))
} else {
// If `columnPruning` enabled and partition attributes scanned only,
Expand All @@ -197,15 +203,21 @@ class UnivocityParser(
*/
def parse(input: String): InternalRow = doParse(input)

private val getToken = if (options.columnPruning) {
(tokens: Array[String], index: Int) => tokens(index)
} else {
(tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lazy for tokenIndexArr means that internal laziness flag will be check per each token. I would remove lazy for tokenIndexArr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != schema.length) {
if (tokens.length != parsedSchema.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, could you add a test case that satisfy tokens.length != parsedSchema.length

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
val checkedTokens = if (schema.length > tokens.length) {
tokens ++ new Array[String](schema.length - tokens.length)
val checkedTokens = if (parsedSchema.length > tokens.length) {
tokens ++ new Array[String](parsedSchema.length - tokens.length)
} else {
tokens.take(schema.length)
tokens.take(parsedSchema.length)
}
def getPartialResult(): Option[InternalRow] = {
try {
Expand All @@ -223,8 +235,8 @@ class UnivocityParser(
} else {
try {
var i = 0
while (i < schema.length) {
row(i) = valueConverters(i).apply(tokens(i))
while (i < requiredSchema.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the comment like

// When the length of the returned tokens is identical to the length of the parsed schema, we just need to convert the tokens that correspond to the required columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

row(i) = valueConverters(i).apply(getToken(tokens, i))
i += 1
}
row
Expand Down