Skip to content

Commit 59993a8

Browse files
MaxGekkJackey Lee
authored andcommitted
[SPARK-26122][SQL] Support encoding for multiLine in CSV datasource
## What changes were proposed in this pull request? In the PR, I propose to pass the CSV option `encoding`/`charset` to `uniVocity` parser to allow parsing CSV files in different encodings when `multiLine` is enabled. The value of the option is passed to the `beginParsing` method of `CSVParser`. ## How was this patch tested? Added new test to `CSVSuite` for different encodings and enabled/disabled header. Closes apache#23091 from MaxGekk/csv-miltiline-encoding. Authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
1 parent 407c30d commit 59993a8

File tree

3 files changed

+32
-7
lines changed

3 files changed

+32
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,12 @@ private[sql] object UnivocityParser {
271271
def tokenizeStream(
272272
inputStream: InputStream,
273273
shouldDropHeader: Boolean,
274-
tokenizer: CsvParser): Iterator[Array[String]] = {
274+
tokenizer: CsvParser,
275+
encoding: String): Iterator[Array[String]] = {
275276
val handleHeader: () => Unit =
276277
() => if (shouldDropHeader) tokenizer.parseNext
277278

278-
convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
279+
convertStream(inputStream, tokenizer, handleHeader, encoding)(tokens => tokens)
279280
}
280281

281282
/**
@@ -297,17 +298,18 @@ private[sql] object UnivocityParser {
297298
val handleHeader: () => Unit =
298299
() => headerChecker.checkHeaderColumnNames(tokenizer)
299300

300-
convertStream(inputStream, tokenizer, handleHeader) { tokens =>
301+
convertStream(inputStream, tokenizer, handleHeader, parser.options.charset) { tokens =>
301302
safeParser.parse(tokens)
302303
}.flatten
303304
}
304305

305306
private def convertStream[T](
306307
inputStream: InputStream,
307308
tokenizer: CsvParser,
308-
handleHeader: () => Unit)(
309+
handleHeader: () => Unit,
310+
encoding: String)(
309311
convert: Array[String] => T) = new Iterator[T] {
310-
tokenizer.beginParsing(inputStream)
312+
tokenizer.beginParsing(inputStream, encoding)
311313

312314
// We can handle header here since here the stream is open.
313315
handleHeader()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
192192
UnivocityParser.tokenizeStream(
193193
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
194194
shouldDropHeader = false,
195-
new CsvParser(parsedOptions.asParserSettings))
195+
new CsvParser(parsedOptions.asParserSettings),
196+
encoding = parsedOptions.charset)
196197
}.take(1).headOption match {
197198
case Some(firstRow) =>
198199
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -203,7 +204,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
203204
lines.getConfiguration,
204205
new Path(lines.getPath())),
205206
parsedOptions.headerFlag,
206-
new CsvParser(parsedOptions.asParserSettings))
207+
new CsvParser(parsedOptions.asParserSettings),
208+
encoding = parsedOptions.charset)
207209
}
208210
val sampled = CSVUtils.sample(tokenRDD, parsedOptions)
209211
CSVInferSchema.infer(sampled, header, parsedOptions)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,4 +1859,25 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
18591859
checkAnswer(df, Row(null, csv))
18601860
}
18611861
}
1862+
1863+
test("encoding in multiLine mode") {
1864+
val df = spark.range(3).toDF()
1865+
Seq("UTF-8", "ISO-8859-1", "CP1251", "US-ASCII", "UTF-16BE", "UTF-32LE").foreach { encoding =>
1866+
Seq(true, false).foreach { header =>
1867+
withTempPath { path =>
1868+
df.write
1869+
.option("encoding", encoding)
1870+
.option("header", header)
1871+
.csv(path.getCanonicalPath)
1872+
val readback = spark.read
1873+
.option("multiLine", true)
1874+
.option("encoding", encoding)
1875+
.option("inferSchema", true)
1876+
.option("header", header)
1877+
.csv(path.getCanonicalPath)
1878+
checkAnswer(readback, df)
1879+
}
1880+
}
1881+
}
1882+
}
18621883
}

0 commit comments

Comments
 (0)