Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding …
…for json files

## What changes were proposed in this pull request?

I propose new option for JSON datasource which allows to specify encoding (charset) of input and output files. Here is an example of using of the option:

```
spark.read.schema(schema)
  .option("multiline", "true")
  .option("encoding", "UTF-16LE")
  .json(fileName)
```

If the option is not specified, charset auto-detection mechanism is used by default.

The option can be used for saving datasets to jsons. Currently Spark is able to save datasets into json files in `UTF-8` charset only. The changes allow to save data in any supported charset. Here is the approximate list of supported charsets by Oracle Java SE: https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . An user can specify the charset of output jsons via the charset option like `.option("charset", "UTF-16BE")`. By default the output charset is still `UTF-8` to keep backward compatibility.

The solution has the following restrictions for per-line mode (`multiline = false`):

- If charset is different from UTF-8, the lineSep option must be specified. The option required because Hadoop LineReader cannot detect the line separator correctly. Here is the ticket for solving the issue: https://issues.apache.org/jira/browse/SPARK-23725

- Encoding with [BOM](https://en.wikipedia.org/wiki/Byte_order_mark) are not supported. For example, the `UTF-16` and `UTF-32` encodings are blacklisted. The problem can be solved by MaxGekk#2

## How was this patch tested?

I added the following tests:
- reads an json file in `UTF-16LE` encoding with BOM in `multiline` mode
- read json file by using charset auto detection (`UTF-32BE` with BOM)
- read json file using of user's charset (`UTF-16LE`)
- saving in `UTF-32BE` and read the result by standard library (not by Spark)
- checking that default charset is `UTF-8`
- handling wrong (unsupported) charset

Author: Maxim Gekk <[email protected]>
Author: Maxim Gekk <[email protected]>

Closes apache#20937 from MaxGekk/json-encoding-line-sep.
  • Loading branch information
MaxGekk authored and HyukjinKwon committed Apr 29, 2018
commit bd14da6fd5a77cc03efff193a84ffccbe892cc13
15 changes: 11 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None):
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
encoding=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -237,6 +238,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
:param encoding: allows to forcibly set one of standard basic or extended encoding for
the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
the encoding of input JSON will be detected automatically
when the multiLine option is set to ``true``.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param samplingRatio: defines fraction of input JSON objects used for schema inferring.
Expand All @@ -259,7 +264,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio)
samplingRatio=samplingRatio, encoding=encoding)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -752,7 +757,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)

@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
lineSep=None):
lineSep=None, encoding=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
specified path.
Expand All @@ -776,6 +781,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
:param encoding: specifies encoding (charset) of saved json files. If None is set,
the default UTF-8 charset will be used.
:param lineSep: defines the line separator that should be used for writing. If None is
set, it uses the default value, ``\\n``.

Expand All @@ -784,7 +791,7 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
lineSep=lineSep)
lineSep=lineSep, encoding=encoding)
self._jwrite.json(path)

@since(1.4)
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,13 @@ def test_multiline_json(self):
multiLine=True)
self.assertEqual(people1.collect(), people_array.collect())

def test_encoding_json(self):
people_array = self.spark.read\
.json("python/test_support/sql/people_array_utf16le.json",
multiLine=True, encoding="UTF-16LE")
expected = [Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
self.assertEqual(people_array.collect(), expected)

def test_linesep_json(self):
df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",")
expected = [Row(_corrupt_record=None, name=u'Michael'),
Expand Down
Binary file added python/test_support/sql/people_array_utf16le.json
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
package org.apache.spark.sql.catalyst.json

import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.channels.Channels
import java.nio.charset.Charset

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.hadoop.io.Text
import sun.nio.cs.StreamDecoder

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String

private[sql] object CreateJacksonParser extends Serializable {
Expand All @@ -43,7 +47,48 @@ private[sql] object CreateJacksonParser extends Serializable {
jsonFactory.createParser(record.getBytes, 0, record.getLength)
}

def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
jsonFactory.createParser(record)
// Jackson parsers can be ranked according to their performance:
// 1. Array based with actual encoding UTF-8 in the array. This is the fastest parser
// but it doesn't allow to set encoding explicitly. Actual encoding is detected automatically
// by checking leading bytes of the array.
// 2. InputStream based with actual encoding UTF-8 in the stream. Encoding is detected
// automatically by analyzing first bytes of the input stream.
// 3. Reader based parser. This is the slowest parser used here but it allows to create
// a reader with specific encoding.
// The method creates a reader for an array with given encoding and sets size of internal
// decoding buffer according to size of input array.
private def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = {
val bais = new ByteArrayInputStream(in, 0, length)
val byteChannel = Channels.newChannel(bais)
val decodingBufferSize = Math.min(length, 8192)
val decoder = Charset.forName(enc).newDecoder()

StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize)
}

def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = {
val sd = getStreamDecoder(enc, record.getBytes, record.getLength)
jsonFactory.createParser(sd)
}

def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = {
jsonFactory.createParser(is)
}

def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = {
jsonFactory.createParser(new InputStreamReader(is, enc))
}

def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
val ba = row.getBinary(0)

jsonFactory.createParser(ba, 0, ba.length)
}

def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
val binary = row.getBinary(0)
val sd = getStreamDecoder(enc, binary, binary.length)

jsonFactory.createParser(sd)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.json

import java.nio.charset.StandardCharsets
import java.nio.charset.{Charset, StandardCharsets}
import java.util.{Locale, TimeZone}

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
Expand Down Expand Up @@ -86,14 +86,43 @@ private[sql] class JSONOptions(

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/**
* A string between two consecutive JSON records.
*/
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
sep
}
// Note that the option 'lineSep' uses a different default value in read and write.
val lineSeparatorInRead: Option[Array[Byte]] =
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.

/**
* Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE.
* If the encoding is not specified (None), it will be detected automatically
* when the multiLine option is set to `true`.
*/
val encoding: Option[String] = parameters.get("encoding")
.orElse(parameters.get("charset")).map { enc =>
// The following encodings are not supported in per-line mode (multiline is false)
// because they cause some problems in reading files with BOM which is supposed to
// present in the files with such encodings. After splitting input files by lines,
// only the first lines will have the BOM which leads to impossibility for reading
// the rest lines. Besides of that, the lineSep option must have the BOM in such
// encodings which can never present between lines.
val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32"))
val isBlacklisted = blacklist.contains(Charset.forName(enc))
require(multiLine || !isBlacklisted,
s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled:
| ${blacklist.mkString(", ")}""".stripMargin)

val isLineSepRequired = !(multiLine == false &&
Charset.forName(enc) != StandardCharsets.UTF_8 && lineSeparator.isEmpty)
require(isLineSepRequired, s"The lineSep option must be specified for the $enc encoding")

enc
}

val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(encoding.getOrElse("UTF-8"))
}
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")

/** Sets config options on a Jackson [[JsonFactory]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.json

import java.io.ByteArrayOutputStream
import java.io.{ByteArrayOutputStream, CharConversionException}

import scala.collection.mutable.ArrayBuffer
import scala.util.Try
Expand Down Expand Up @@ -361,6 +361,14 @@ class JacksonParser(
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* <li>`encoding` (by default it is not set): allows to forcibly set one of standard basic
* or extended encoding for the JSON files. For example UTF-16BE, UTF-32LE. If the encoding
* is not specified and `multiLine` is set to `true`, it will be detected automatically.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* <li>`samplingRatio` (default is 1.0): defines fraction of input JSON objects used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`lineSep` (default `\n`): defines the line separator that should
* be used for writing.</li>
* <li>`encoding` (by default it is not set): specifies encoding (charset) of saved json
* files. If it is not set, the UTF-8 charset will be used. </li>
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.</li>
* </ul>
*
* @since 1.4.0
Expand Down Expand Up @@ -589,8 +590,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
* <li>`lineSep` (default `\n`): defines the line separator that should
* be used for writing.</li>
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.</li>
* </ul>
*
* @since 1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.TaskContext
import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOptions}
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -92,26 +92,30 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
val json: Dataset[String] = createBaseDataset(
sparkSession, inputPaths, parsedOptions.lineSeparator)
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)

inferFromDataset(json, parsedOptions)
}

def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = {
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0))
JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String)
val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd
val rowParser = parsedOptions.encoding.map { enc =>
CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow)
}.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow))

JsonInferSchema.infer(rdd, parsedOptions, rowParser)
}

private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
lineSeparator: Option[String]): Dataset[String] = {
val textOptions = lineSeparator.map { lineSep =>
Map(TextOptions.LINE_SEPARATOR -> lineSep)
}.getOrElse(Map.empty[String, String])

parsedOptions: JSONOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
val textOptions = Map.empty[String, String] ++
parsedOptions.encoding.map("encoding" -> _) ++
parsedOptions.lineSeparator.map("lineSep" -> _)

sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
Expand All @@ -129,8 +133,12 @@ object TextInputJsonDataSource extends JsonDataSource {
schema: StructType): Iterator[InternalRow] = {
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
val textParser = parser.options.encoding
.map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
.getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))

val safeParser = new FailureSafeParser[Text](
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),
input => parser.parse(input, textParser, textToUTF8String),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
Expand All @@ -153,7 +161,11 @@ object MultiLineJsonDataSource extends JsonDataSource {
parsedOptions: JSONOptions): StructType = {
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
JsonInferSchema.infer(sampled, parsedOptions, createParser)
val parser = parsedOptions.encoding
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream))

JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
}

private def createBaseRdd(
Expand All @@ -175,11 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
.values
}

private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
val path = new Path(record.getPath())
CreateJacksonParser.inputStream(
jsonFactory,
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
private def dataToInputStream(dataStream: PortableDataStream): InputStream = {
val path = new Path(dataStream.getPath())
CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration, path)
}

private def createParser(jsonFactory: JsonFactory, stream: PortableDataStream): JsonParser = {
CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream))
}

private def createParser(enc: String, jsonFactory: JsonFactory,
stream: PortableDataStream): JsonParser = {
CreateJacksonParser.inputStream(enc, jsonFactory, dataToInputStream(stream))
}

override def readFile(
Expand All @@ -194,9 +213,12 @@ object MultiLineJsonDataSource extends JsonDataSource {
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
}
}
val streamParser = parser.options.encoding
.map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _: InputStream))
.getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _: InputStream))

val safeParser = new FailureSafeParser[InputStream](
input => parser.parse(input, CreateJacksonParser.inputStream, partitionedFileString),
input => parser.parse[InputStream](input, streamParser, partitionedFileString),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.json

import java.nio.charset.{Charset, StandardCharsets}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -151,7 +153,13 @@ private[json] class JsonOutputWriter(
context: TaskAttemptContext)
extends OutputWriter with Logging {

private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
private val encoding = options.encoding match {
case Some(charsetName) => Charset.forName(charsetName)
case None => StandardCharsets.UTF_8
}

private val writer = CodecStreams.createOutputStreamWriter(
context, new Path(path), encoding)

// create the Generator without separator inserted between 2 records
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
Expand Down
Loading