Skip to content

Commit d463533

Browse files
maropugatorsmile
authored andcommitted
[SPARK-24676][SQL] Project required data from CSV parsed data when column pruning disabled
## What changes were proposed in this pull request? This pr modified code to project required data from CSV parsed data when column pruning disabled. In the current master, an exception below happens if `spark.sql.csv.parser.columnPruning.enabled` is false. This is because required formats and CSV parsed formats are different from each other; ``` ./bin/spark-shell --conf spark.sql.csv.parser.columnPruning.enabled=false scala> val dir = "/tmp/spark-csv/csv" scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir) scala> spark.read.csv(dir).selectExpr("sum(p)").collect() 18/06/25 13:48:46 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7) java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:41) ... ``` ## How was this patch tested? Added tests in `CSVSuite`. Author: Takeshi Yamamuro <[email protected]> Closes #21657 from maropu/SPARK-24676.
1 parent bcf7121 commit d463533

File tree

2 files changed

+70
-13
lines changed

2 files changed

+70
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,49 @@ import org.apache.spark.sql.execution.datasources.FailureSafeParser
3333
import org.apache.spark.sql.types._
3434
import org.apache.spark.unsafe.types.UTF8String
3535

36+
37+
/**
38+
* Constructs a parser for a given schema that translates CSV data to an [[InternalRow]].
39+
*
40+
* @param dataSchema The CSV data schema that is specified by the user, or inferred from underlying
41+
* data files.
42+
* @param requiredSchema The schema of the data that should be output for each row. This should be a
43+
* subset of the columns in dataSchema.
44+
* @param options Configuration options for a CSV parser.
45+
*/
3646
class UnivocityParser(
3747
dataSchema: StructType,
3848
requiredSchema: StructType,
3949
val options: CSVOptions) extends Logging {
4050
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
41-
"requiredSchema should be the subset of schema.")
51+
s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " +
52+
s"dataSchema (${dataSchema.catalogString}).")
4253

4354
def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)
4455

4556
// A `ValueConverter` is responsible for converting the given value to a desired type.
4657
private type ValueConverter = String => Any
4758

59+
// This index is used to reorder parsed tokens
60+
private val tokenIndexArr =
61+
requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray
62+
63+
// When column pruning is enabled, the parser only parses the required columns based on
64+
// their positions in the data schema.
65+
private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema
66+
4867
val tokenizer = {
4968
val parserSetting = options.asParserSettings
50-
if (options.columnPruning && requiredSchema.length < dataSchema.length) {
51-
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f)))
69+
// When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV
70+
// parser select a sequence of fields for reading by their positions.
71+
// if (options.columnPruning && requiredSchema.length < dataSchema.length) {
72+
if (parsedSchema.length < dataSchema.length) {
5273
parserSetting.selectIndexes(tokenIndexArr: _*)
5374
}
5475
new CsvParser(parserSetting)
5576
}
56-
private val schema = if (options.columnPruning) requiredSchema else dataSchema
5777

58-
private val row = new GenericInternalRow(schema.length)
78+
private val row = new GenericInternalRow(requiredSchema.length)
5979

6080
// Retrieve the raw record string.
6181
private def getCurrentInput: UTF8String = {
@@ -82,7 +102,7 @@ class UnivocityParser(
82102
//
83103
// output row - ["A", 2]
84104
private val valueConverters: Array[ValueConverter] = {
85-
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
105+
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
86106
}
87107

88108
/**
@@ -183,7 +203,7 @@ class UnivocityParser(
183203
}
184204
}
185205

186-
private val doParse = if (schema.nonEmpty) {
206+
private val doParse = if (requiredSchema.nonEmpty) {
187207
(input: String) => convert(tokenizer.parseLine(input))
188208
} else {
189209
// If `columnPruning` enabled and partition attributes scanned only,
@@ -197,15 +217,21 @@ class UnivocityParser(
197217
*/
198218
def parse(input: String): InternalRow = doParse(input)
199219

220+
private val getToken = if (options.columnPruning) {
221+
(tokens: Array[String], index: Int) => tokens(index)
222+
} else {
223+
(tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
224+
}
225+
200226
private def convert(tokens: Array[String]): InternalRow = {
201-
if (tokens.length != schema.length) {
227+
if (tokens.length != parsedSchema.length) {
202228
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
203229
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
204230
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
205-
val checkedTokens = if (schema.length > tokens.length) {
206-
tokens ++ new Array[String](schema.length - tokens.length)
231+
val checkedTokens = if (parsedSchema.length > tokens.length) {
232+
tokens ++ new Array[String](parsedSchema.length - tokens.length)
207233
} else {
208-
tokens.take(schema.length)
234+
tokens.take(parsedSchema.length)
209235
}
210236
def getPartialResult(): Option[InternalRow] = {
211237
try {
@@ -222,9 +248,11 @@ class UnivocityParser(
222248
new RuntimeException("Malformed CSV record"))
223249
} else {
224250
try {
251+
// When the length of the returned tokens is identical to the length of the parsed schema,
252+
// we just need to convert the tokens that correspond to the required columns.
225253
var i = 0
226-
while (i < schema.length) {
227-
row(i) = valueConverters(i).apply(tokens(i))
254+
while (i < requiredSchema.length) {
255+
row(i) = valueConverters(i).apply(getToken(tokens, i))
228256
i += 1
229257
}
230258
row

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1579,4 +1579,33 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
15791579
}
15801580
}
15811581
}
1582+
1583+
test("SPARK-24676 project required data from parsed data when columnPruning disabled") {
1584+
withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") {
1585+
withTempPath { path =>
1586+
val dir = path.getAbsolutePath
1587+
spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.partitionBy("p")
1588+
.option("header", "true").csv(dir)
1589+
val df1 = spark.read.option("header", true).csv(dir).selectExpr("sum(p)", "count(c0)")
1590+
checkAnswer(df1, Row(5, 10))
1591+
1592+
// empty required column case
1593+
val df2 = spark.read.option("header", true).csv(dir).selectExpr("sum(p)")
1594+
checkAnswer(df2, Row(5))
1595+
}
1596+
1597+
// the case where tokens length != parsedSchema length
1598+
withTempPath { path =>
1599+
val dir = path.getAbsolutePath
1600+
Seq("1,2").toDF().write.text(dir)
1601+
// more tokens
1602+
val df1 = spark.read.schema("c0 int").format("csv").option("mode", "permissive").load(dir)
1603+
checkAnswer(df1, Row(1))
1604+
// less tokens
1605+
val df2 = spark.read.schema("c0 int, c1 int, c2 int").format("csv")
1606+
.option("mode", "permissive").load(dir)
1607+
checkAnswer(df2, Row(1, 2, null))
1608+
}
1609+
}
1610+
}
15821611
}

0 commit comments

Comments
 (0)