Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,14 @@ private[sql] object JsonRDD extends Logging {
def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match {
case ArrayType(StructType(Nil), containsNull) => {
// The elements of this arrays are structs.
v.asInstanceOf[Seq[Map[String, Any]]].flatMap {
v.asInstanceOf[Seq[Map[String, Any]]].flatMap(Option(_)).flatMap {
element => allKeysWithValueTypes(element)
}.map {
case (k, t) => (s"$key.$k", t)
}
}
case ArrayType(t1, containsNull) =>
v.asInstanceOf[Seq[Any]].flatMap {
v.asInstanceOf[Seq[Any]].flatMap(Option(_)).flatMap {
element => buildKeyPathForInnerStructs(element, t1)
}
case other => Nil
Expand Down
35 changes: 33 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class JsonSuite extends QueryTest {
}

test("Complex field and type inferring") {
val jsonSchemaRDD = jsonRDD(complexFieldAndType)
val jsonSchemaRDD = jsonRDD(complexFieldAndType1)

val expectedSchema = StructType(
StructField("arrayOfArray1", ArrayType(ArrayType(StringType, false), false), true) ::
Expand Down Expand Up @@ -305,7 +305,7 @@ class JsonSuite extends QueryTest {
}

ignore("Complex field and type inferring (Ignored)") {
val jsonSchemaRDD = jsonRDD(complexFieldAndType)
val jsonSchemaRDD = jsonRDD(complexFieldAndType1)
jsonSchemaRDD.registerTempTable("jsonTable")

// Right now, "field1" and "field2" are treated as aliases. We should fix it.
Expand Down Expand Up @@ -707,4 +707,35 @@ class JsonSuite extends QueryTest {

TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord)
}

test("SPARK-4068: nulls in arrays") {
val jsonSchemaRDD = jsonRDD(nullsInArrays)
jsonSchemaRDD.registerTempTable("jsonTable")

val schema = StructType(
StructField("field1",
ArrayType(ArrayType(ArrayType(ArrayType(StringType, false), false), true), false), true) ::
StructField("field2",
ArrayType(ArrayType(
StructType(StructField("Test", IntegerType, true) :: Nil), false), true), true) ::
StructField("field3",
ArrayType(ArrayType(
StructType(StructField("Test", StringType, true) :: Nil), true), false), true) ::
StructField("field4",
ArrayType(ArrayType(ArrayType(IntegerType, false), true), false), true) :: Nil)

assert(schema === jsonSchemaRDD.schema)

checkAnswer(
sql(
"""
|SELECT field1, field2, field3, field4
|FROM jsonTable
""".stripMargin),
Seq(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) ::
Seq(null, Seq(null, Seq(Seq(1))), null, null) ::
Seq(null, null, Seq(Seq(null), Seq(Seq("2"))), null) ::
Seq(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,6 @@ object TestJsonData {
"null":null
}""" :: Nil)

val complexFieldAndType =
TestSQLContext.sparkContext.parallelize(
"""{"struct":{"field1": true, "field2": 92233720368547758070},
"structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
"arrayOfString":["str1", "str2"],
"arrayOfInteger":[1, 2147483647, -2147483648],
"arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808],
"arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
"arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
"arrayOfBoolean":[true, false, true],
"arrayOfNull":[null, null, null, null],
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
}""" :: Nil)

val primitiveFieldValueTypeConflict =
TestSQLContext.sparkContext.parallelize(
"""{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
Expand Down Expand Up @@ -83,6 +67,22 @@ object TestJsonData {
"""{"d":{"field":true}}""" ::
"""{"e":"str"}""" :: Nil)

val complexFieldAndType1 =
TestSQLContext.sparkContext.parallelize(
"""{"struct":{"field1": true, "field2": 92233720368547758070},
"structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
"arrayOfString":["str1", "str2"],
"arrayOfInteger":[1, 2147483647, -2147483648],
"arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808],
"arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
"arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
"arrayOfBoolean":[true, false, true],
"arrayOfNull":[null, null, null, null],
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
}""" :: Nil)

val complexFieldAndType2 =
TestSQLContext.sparkContext.parallelize(
"""{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
Expand Down Expand Up @@ -137,6 +137,13 @@ object TestJsonData {
]]
}""" :: Nil)

val nullsInArrays =
TestSQLContext.sparkContext.parallelize(
"""{"field1":[[null], [[["Test"]]]]}""" ::
"""{"field2":[null, [{"Test":1}]]}""" ::
"""{"field3":[[null], [{"Test":"2"}]]}""" ::
"""{"field4":[[null, [1,2,3]]]}""" :: Nil)

val jsonArray =
TestSQLContext.sparkContext.parallelize(
"""[{"a":"str_a_1"}]""" ::
Expand Down