Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,49 @@ import org.apache.spark.sql.execution.datasources.FailureSafeParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String


/**
* Constructs a parser for a given schema that translates CSV data to an [[InternalRow]].
*
* @param dataSchema The CSV data schema that is specified by the user, or inferred from underlying
* data files.
* @param requiredSchema The schema of the data that should be output for each row. This should be a
* subset of the columns in dataSchema.
* @param options Configuration options for a CSV parser.
*/
class UnivocityParser(
dataSchema: StructType,
requiredSchema: StructType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the parameter descriptions of dataSchema and requiredSchema above class UnivocityParser?

val options: CSVOptions) extends Logging {
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
"requiredSchema should be the subset of schema.")
s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " +
s"dataSchema (${dataSchema.catalogString}).")

def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)

// A `ValueConverter` is responsible for converting the given value to a desired type.
private type ValueConverter = String => Any

// This index is used to reorder parsed tokens
private val tokenIndexArr =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you convert it to Array explicitly. I have checked type of tokenIndexArr, it is actually scala.collection.immutable.$colon$colon - lazy list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any side-effect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have O(n) instead of O(1) for getting a value from the collection by an index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see. I'll recheck. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fixed.

requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, we can do an optimization by memory here. The array is used under the flag options.columnPruning only. We can create an empty array (or null) if options.columnPruning is set to false.

Copy link
Member Author

@maropu maropu Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This array is used in both cases: line 56 ( options.columnPruning=true) and line 208 (options.columnPruning=false)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see


// When column pruning is enabled, the parser only parses the required columns based on
// their positions in the data schema.
private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema

val tokenizer = {
val parserSetting = options.asParserSettings
if (options.columnPruning && requiredSchema.length < dataSchema.length) {
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f)))
// When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV
// parser select a sequence of fields for reading by their positions.
// if (options.columnPruning && requiredSchema.length < dataSchema.length) {
if (parsedSchema.length < dataSchema.length) {
parserSetting.selectIndexes(tokenIndexArr: _*)
}
new CsvParser(parserSetting)
}
private val schema = if (options.columnPruning) requiredSchema else dataSchema

private val row = new GenericInternalRow(schema.length)
private val row = new GenericInternalRow(requiredSchema.length)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
Expand All @@ -82,7 +102,7 @@ class UnivocityParser(
//
// output row - ["A", 2]
private val valueConverters: Array[ValueConverter] = {
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
}

/**
Expand Down Expand Up @@ -183,7 +203,7 @@ class UnivocityParser(
}
}

private val doParse = if (schema.nonEmpty) {
private val doParse = if (requiredSchema.nonEmpty) {
(input: String) => convert(tokenizer.parseLine(input))
} else {
// If `columnPruning` enabled and partition attributes scanned only,
Expand All @@ -197,15 +217,21 @@ class UnivocityParser(
*/
def parse(input: String): InternalRow = doParse(input)

private val getToken = if (options.columnPruning) {
(tokens: Array[String], index: Int) => tokens(index)
} else {
(tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lazy for tokenIndexArr means that internal laziness flag will be check per each token. I would remove lazy for tokenIndexArr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != schema.length) {
if (tokens.length != parsedSchema.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, could you add a test case that satisfy tokens.length != parsedSchema.length

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
val checkedTokens = if (schema.length > tokens.length) {
tokens ++ new Array[String](schema.length - tokens.length)
val checkedTokens = if (parsedSchema.length > tokens.length) {
tokens ++ new Array[String](parsedSchema.length - tokens.length)
} else {
tokens.take(schema.length)
tokens.take(parsedSchema.length)
}
def getPartialResult(): Option[InternalRow] = {
try {
Expand All @@ -222,9 +248,11 @@ class UnivocityParser(
new RuntimeException("Malformed CSV record"))
} else {
try {
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to convert the tokens that correspond to the required columns.
var i = 0
while (i < schema.length) {
row(i) = valueConverters(i).apply(tokens(i))
while (i < requiredSchema.length) {
row(i) = valueConverters(i).apply(getToken(tokens, i))
i += 1
}
row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1579,4 +1579,33 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
}
}

test("SPARK-24676 project required data from parsed data when columnPruning disabled") {
withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") {
withTempPath { path =>
val dir = path.getAbsolutePath
spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.partitionBy("p")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, if required schema is empty, the fix works too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.option("header", "true").csv(dir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"true" -> true ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val df1 = spark.read.option("header", true).csv(dir).selectExpr("sum(p)", "count(c0)")
checkAnswer(df1, Row(5, 10))

// empty required column case
val df2 = spark.read.option("header", true).csv(dir).selectExpr("sum(p)")
checkAnswer(df2, Row(5))
}

// the case where tokens length != parsedSchema length
withTempPath { path =>
val dir = path.getAbsolutePath
Seq("1,2").toDF().write.text(dir)
// more tokens
val df1 = spark.read.schema("c0 int").format("csv").option("mode", "permissive").load(dir)
checkAnswer(df1, Row(1))
// less tokens
val df2 = spark.read.schema("c0 int, c1 int, c2 int").format("csv")
.option("mode", "permissive").load(dir)
checkAnswer(df2, Row(1, 2, null))
}
}
}
}