Skip to content

Commit 5a5a868

Browse files
committed
Revert "[SPARK-24244][SQL] Passing only required columns to the CSV parser"
This reverts commit 8086acc.
1 parent df12506 commit 5a5a868

File tree

6 files changed

+18
-104
lines changed

6 files changed

+18
-104
lines changed

docs/sql-programming-guide.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1825,7 +1825,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see
18251825
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
18261826
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
18271827
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
1828-
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
18291828

18301829
## Upgrading From Spark SQL 2.2 to 2.3
18311830

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,13 +1307,6 @@ object SQLConf {
13071307
object Replaced {
13081308
val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
13091309
}
1310-
1311-
val CSV_PARSER_COLUMN_PRUNING = buildConf("spark.sql.csv.parser.columnPruning.enabled")
1312-
.internal()
1313-
.doc("If it is set to true, column names of the requested schema are passed to CSV parser. " +
1314-
"Other column values can be ignored during parsing even if they are malformed.")
1315-
.booleanConf
1316-
.createWithDefault(true)
13171310
}
13181311

13191312
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.commons.lang3.time.FastDateFormat
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.catalyst.util._
28-
import org.apache.spark.sql.internal.SQLConf
2928

3029
class CSVOptions(
3130
@transient val parameters: CaseInsensitiveMap[String],
@@ -81,8 +80,6 @@ class CSVOptions(
8180
}
8281
}
8382

84-
private[csv] val columnPruning = SQLConf.get.getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
85-
8683
val delimiter = CSVUtils.toChar(
8784
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
8885
val parseMode: ParseMode =

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,20 @@ import org.apache.spark.sql.types._
3434
import org.apache.spark.unsafe.types.UTF8String
3535

3636
class UnivocityParser(
37-
dataSchema: StructType,
37+
schema: StructType,
3838
requiredSchema: StructType,
3939
val options: CSVOptions) extends Logging {
40-
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
40+
require(requiredSchema.toSet.subsetOf(schema.toSet),
4141
"requiredSchema should be the subset of schema.")
4242

4343
def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)
4444

4545
// A `ValueConverter` is responsible for converting the given value to a desired type.
4646
private type ValueConverter = String => Any
4747

48-
private val tokenizer = {
49-
val parserSetting = options.asParserSettings
50-
if (options.columnPruning && requiredSchema.length < dataSchema.length) {
51-
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f)))
52-
parserSetting.selectIndexes(tokenIndexArr: _*)
53-
}
54-
new CsvParser(parserSetting)
55-
}
56-
private val schema = if (options.columnPruning) requiredSchema else dataSchema
48+
private val tokenizer = new CsvParser(options.asParserSettings)
5749

58-
private val row = new GenericInternalRow(schema.length)
50+
private val row = new GenericInternalRow(requiredSchema.length)
5951

6052
// Retrieve the raw record string.
6153
private def getCurrentInput: UTF8String = {
@@ -81,8 +73,11 @@ class UnivocityParser(
8173
// Each input token is placed in each output row's position by mapping these. In this case,
8274
//
8375
// output row - ["A", 2]
84-
private val valueConverters: Array[ValueConverter] = {
76+
private val valueConverters: Array[ValueConverter] =
8577
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
78+
79+
private val tokenIndexArr: Array[Int] = {
80+
requiredSchema.map(f => schema.indexOf(f)).toArray
8681
}
8782

8883
/**
@@ -215,8 +210,9 @@ class UnivocityParser(
215210
} else {
216211
try {
217212
var i = 0
218-
while (i < schema.length) {
219-
row(i) = valueConverters(i).apply(tokens(i))
213+
while (i < requiredSchema.length) {
214+
val from = tokenIndexArr(i)
215+
row(i) = valueConverters(from).apply(tokens(from))
220216
i += 1
221217
}
222218
row

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -74,49 +74,7 @@ object CSVBenchmarks {
7474
}
7575
}
7676

77-
def multiColumnsBenchmark(rowsNum: Int): Unit = {
78-
val colsNum = 1000
79-
val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum)
80-
81-
withTempPath { path =>
82-
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
83-
val schema = StructType(fields)
84-
val values = (0 until colsNum).map(i => i.toString).mkString(",")
85-
val columnNames = schema.fieldNames
86-
87-
spark.range(rowsNum)
88-
.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
89-
.write.option("header", true)
90-
.csv(path.getAbsolutePath)
91-
92-
val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
93-
94-
benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
95-
ds.select("*").filter((row: Row) => true).count()
96-
}
97-
val cols100 = columnNames.take(100).map(Column(_))
98-
benchmark.addCase(s"Select 100 columns", 3) { _ =>
99-
ds.select(cols100: _*).filter((row: Row) => true).count()
100-
}
101-
benchmark.addCase(s"Select one column", 3) { _ =>
102-
ds.select($"col1").filter((row: Row) => true).count()
103-
}
104-
105-
/*
106-
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
107-
108-
Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
109-
--------------------------------------------------------------------------------------------
110-
Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X
111-
Select 100 columns 28625 / 32884 0.0 28625.1 2.7X
112-
Select one column 22498 / 22669 0.0 22497.8 3.4X
113-
*/
114-
benchmark.run()
115-
}
116-
}
117-
11877
def main(args: Array[String]): Unit = {
11978
quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
120-
multiColumnsBenchmark(rowsNum = 1000 * 1000)
12179
}
12280
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
260260
}
261261

262262
test("test for DROPMALFORMED parsing mode") {
263-
withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") {
264-
Seq(false, true).foreach { multiLine =>
265-
val cars = spark.read
266-
.format("csv")
267-
.option("multiLine", multiLine)
268-
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
269-
.load(testFile(carsFile))
263+
Seq(false, true).foreach { multiLine =>
264+
val cars = spark.read
265+
.format("csv")
266+
.option("multiLine", multiLine)
267+
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
268+
.load(testFile(carsFile))
270269

271-
assert(cars.select("year").collect().size === 2)
272-
}
270+
assert(cars.select("year").collect().size === 2)
273271
}
274272
}
275273

@@ -1370,31 +1368,4 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
13701368
checkAnswer(computed, expected)
13711369
}
13721370
}
1373-
1374-
test("SPARK-24244: Select a subset of all columns") {
1375-
withTempPath { path =>
1376-
import collection.JavaConverters._
1377-
val schema = new StructType()
1378-
.add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType)
1379-
.add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType)
1380-
.add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType)
1381-
.add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType)
1382-
.add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType)
1383-
1384-
val odf = spark.createDataFrame(List(
1385-
Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
1386-
Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15)
1387-
).asJava, schema)
1388-
odf.write.csv(path.getCanonicalPath)
1389-
val idf = spark.read
1390-
.schema(schema)
1391-
.csv(path.getCanonicalPath)
1392-
.select('f15, 'f10, 'f5)
1393-
1394-
checkAnswer(
1395-
idf,
1396-
List(Row(15, 10, 5), Row(-15, -10, -5))
1397-
)
1398-
}
1399-
}
14001371
}

0 commit comments

Comments
 (0)