Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add configuration to support JacksonGenrator to keep fields with null…
… values
  • Loading branch information
Jackey Lee committed Oct 12, 2019
commit 0b903446adea16b54f239bd0c2e07c64a2060c0a
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ private[sql] class JSONOptions(
// Whether to ignore column of all null values or empty array/struct during schema inference
val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)

// Whether to ignore column of all null during json generating
val structIngoreNull = parameters.getOrElse("structIngoreNull", "true").toBoolean
Copy link
Contributor

@cloud-fan cloud-fan Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it specific to struct type column? if not how about naming it ignroeNullFields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works on StructType, including struct field and struct inner data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about top-level columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, it also works on that

Copy link
Contributor

@cloud-fan cloud-fan Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then shall we pick a better name for this config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey, ignoreNullFields is much better than structIgnoreNull, I'll change it.


// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ private[sql] class JacksonGenerator(
if (!row.isNullAt(i)) {
gen.writeFieldName(field.name)
fieldWriters(i).apply(row, i)
} else if (!options.structIngoreNull) {
gen.writeFieldName(field.name)
gen.writeNull()
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val JSON_GENERATOR_STRUCT_IGNORE_NULL =
buildConf("spark.sql.jsonGenerator.struct.ignore.null")
.doc("If false, JacksonGenerator will generate null for null value in StructType.")
.booleanConf
.createWithDefault(true)

val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream sink.")
Expand Down Expand Up @@ -2323,6 +2329,8 @@ class SQLConf extends Serializable with Logging {

def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)

def jsonGeneratorStructIngoreNull: Boolean = getConf(SQLConf.JSON_GENERATOR_STRUCT_IGNORE_NULL)

def parallelFileListingInStatsComputation: Boolean =
getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ class JacksonGeneratorSuite extends SparkFunSuite {
assert(writer.toString === """{"a":1}""")
}

test("initial with StructType and write out an empty row with allowStructIncludeNull=true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a test case for bug. We need a JIRA issue ID prefix for the test case name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val dataType = StructType(StructField("a", IntegerType) :: Nil)
val input = InternalRow(null)
val writer = new CharArrayWriter()
val allowNullOption =
new JSONOptions(Map("structIngoreNull" -> "false"), gmtId)
val gen = new JacksonGenerator(dataType, writer, allowNullOption)
gen.write(input)
gen.flush()
assert(writer.toString === """{"a":null}""")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test null inner field? e.g. {"a": {"b": null}}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have added a test for this

}

test("initial with StructType and write out rows") {
val dataType = StructType(StructField("a", IntegerType) :: Nil)
val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)
Expand Down
3 changes: 2 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3170,11 +3170,12 @@ class Dataset[T] private[sql](
def toJSON: Dataset[String] = {
val rowSchema = this.schema
val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone
val structIngoreNull = sparkSession.sessionState.conf.jsonGeneratorStructIngoreNull.toString
mapPartitions { iter =>
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
val gen = new JacksonGenerator(rowSchema, writer,
new JSONOptions(Map.empty[String, String], sessionLocalTimeZone))
new JSONOptions(Map("structIngoreNull" -> structIngoreNull), sessionLocalTimeZone))

new Iterator[String] {
override def hasNext: Boolean = iter.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val conf = job.getConfiguration
val optionsFromConf =
Map("structIngoreNull" ->
sparkSession.sessionState.conf.jsonGeneratorStructIngoreNull.toString)
val parsedOptions = new JSONOptions(
options,
optionsFromConf ++ options,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
parsedOptions.compressionCodec.foreach { codec =>
Expand Down