From 6aec10f37fe3ae9187b137edf2ea8791732af3d3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 14 Apr 2018 16:12:33 +0200 Subject: [PATCH 1/2] Adding a test for ignoring cases of json fields --- .../datasources/json/JsonSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 70aee561ff0f..34535c3aac94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2162,4 +2162,26 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(ds.schema == new StructType().add("f1", LongType)) } + + test("Respect to spark.sql.caseSensitive while json parsing") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + withTempPath { dir => + val path = dir.getCanonicalPath + val data = + """{"field": "a"} + |{"Field": "b"} + |{"FIELD": "c"}""".stripMargin + Seq(data).toDF().repartition(1).write.text(path) + val readback = spark.read.json(path) + + checkAnswer(readback, Seq(Row("a"), Row("b"), Row("c"))) + + val expectedSchema = new StructType().add("field", StringType) + assert(readback.schema == expectedSchema) + + val readbackWithSchema = spark.read.schema(expectedSchema).json(path) + checkAnswer(readbackWithSchema, Seq(Row("a"), Row("b"), Row("c"))) + } + } + } } From 863ace7afdcdc3b51aab1fa6320b16559112c02c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 14 Apr 2018 19:58:22 +0200 Subject: [PATCH 2/2] Making JSON reader respectful to the caseSensitive parameter --- .../expressions/jsonExpressions.scala | 5 +- .../sql/catalyst/json/JacksonParser.scala | 15 +- .../apache/spark/sql/DataFrameReader.scala | 5 +- .../datasources/json/JsonDataSource.scala | 13 +- .../datasources/json/JsonFileFormat.scala | 3 +- .../datasources/json/JsonInferSchema.scala | 16 +- .../datasources/json/JsonSuite.scala | 416 +++++++++--------- 7 files changed, 251 insertions(+), 222 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index fdd672c416a0..03edf919d95d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -524,6 +524,8 @@ case class JsonToStructs( // can generate incorrect files if values are missing in columns declared as non-nullable. val nullableSchema = if (forceNullableSchema) schema.asNullable else schema + val caseSensitive = SQLConf.get.getConf(SQLConf.CASE_SENSITIVE) + override def nullable: Boolean = true // Used in `FunctionRegistry` @@ -567,7 +569,8 @@ case class JsonToStructs( lazy val parser = new JacksonParser( rowSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get), + caseSensitive) override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 7f6956994f31..66010ee24c70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -37,7 +37,8 @@ import org.apache.spark.util.Utils */ class JacksonParser( schema: StructType, - val options: JSONOptions) extends Logging { + val options: JSONOptions, + caseSensitive: Boolean) extends Logging { import JacksonUtils._ import com.fasterxml.jackson.core.JsonToken._ @@ -281,6 +282,14 @@ class JacksonParser( s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).") } + private def getCurrentName(parser: JsonParser): String = { + if (caseSensitive) { + parser.getCurrentName + } else { + parser.getCurrentName.toLowerCase + } + } + /** * Parse an object from the token stream into a new Row representing the schema. * Fields in the json that are not defined in the requested schema will be dropped. @@ -291,7 +300,7 @@ class JacksonParser( fieldConverters: Array[ValueConverter]): InternalRow = { val row = new GenericInternalRow(schema.length) while (nextUntil(parser, JsonToken.END_OBJECT)) { - schema.getFieldIndex(parser.getCurrentName) match { + schema.getFieldIndex(getCurrentName(parser)) match { case Some(index) => row.update(index, fieldConverters(index).apply(parser)) @@ -312,7 +321,7 @@ class JacksonParser( val keys = ArrayBuffer.empty[UTF8String] val values = ArrayBuffer.empty[Any] while (nextUntil(parser, JsonToken.END_OBJECT)) { - keys += UTF8String.fromString(parser.getCurrentName) + keys += UTF8String.fromString(getCurrentName(parser)) values += fieldConverter.apply(parser) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ae3ba1690f69..ef2c8f92db8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -419,9 +419,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val schema = userSpecifiedSchema.getOrElse { - TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions) + TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions, caseSensitive) } verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord) @@ -430,7 +431,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val createParser = CreateJacksonParser.string _ val parsed = jsonDataset.rdd.mapPartitions { iter => - val rawParser = new JacksonParser(actualSchema, parsedOptions) + val rawParser = new JacksonParser(actualSchema, parsedOptions, caseSensitive) val parser = new FailureSafeParser[String]( input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 5769c09c9a1d..f243409fcf7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -94,13 +94,16 @@ object TextInputJsonDataSource extends JsonDataSource { parsedOptions: JSONOptions): StructType = { val json: Dataset[String] = createBaseDataset( sparkSession, inputPaths, parsedOptions.lineSeparator) - inferFromDataset(json, parsedOptions) + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + + inferFromDataset(json, parsedOptions, caseSensitive) } - def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { + def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions, + caseSensitive: Boolean): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) - JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) + JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String, caseSensitive) } private def createBaseDataset( @@ -153,7 +156,9 @@ object MultiLineJsonDataSource extends JsonDataSource { parsedOptions: JSONOptions): StructType = { val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths) val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions) - JsonInferSchema.infer(sampled, parsedOptions, createParser) + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + + JsonInferSchema.infer(sampled, parsedOptions, createParser, caseSensitive) } private def createBaseRdd( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 0862c746fffa..406e388837fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -126,9 +126,10 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { "df.filter($\"_corrupt_record\".isNotNull).count()." ) } + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis (file: PartitionedFile) => { - val parser = new JacksonParser(actualSchema, parsedOptions) + val parser = new JacksonParser(actualSchema, parsedOptions, caseSensitive) JsonDataSource(parsedOptions).readFile( broadcastedHadoopConf.value.value, file, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala index a270a6451d5d..c3dcafb6e487 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala @@ -41,7 +41,8 @@ private[sql] object JsonInferSchema { def infer[T]( json: RDD[T], configOptions: JSONOptions, - createParser: (JsonFactory, T) => JsonParser): StructType = { + createParser: (JsonFactory, T) => JsonParser, + caseSensitive: Boolean): StructType = { val parseMode = configOptions.parseMode val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord @@ -53,7 +54,7 @@ private[sql] object JsonInferSchema { try { Utils.tryWithResource(createParser(factory, row)) { parser => parser.nextToken() - Some(inferField(parser, configOptions)) + Some(inferField(parser, configOptions, caseSensitive)) } } catch { case e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match { @@ -98,14 +99,15 @@ private[sql] object JsonInferSchema { /** * Infer the type of a json document from the parser's token stream */ - private def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = { + private def inferField(parser: JsonParser, configOptions: JSONOptions, + caseSensitive: Boolean): DataType = { import com.fasterxml.jackson.core.JsonToken._ parser.getCurrentToken match { case null | VALUE_NULL => NullType case FIELD_NAME => parser.nextToken() - inferField(parser, configOptions) + inferField(parser, configOptions, caseSensitive) case VALUE_STRING if parser.getTextLength < 1 => // Zero length strings and nulls have special handling to deal @@ -121,8 +123,8 @@ private[sql] object JsonInferSchema { val builder = Array.newBuilder[StructField] while (nextUntil(parser, END_OBJECT)) { builder += StructField( - parser.getCurrentName, - inferField(parser, configOptions), + if (caseSensitive) parser.getCurrentName else parser.getCurrentName.toLowerCase, + inferField(parser, configOptions, caseSensitive), nullable = true) } val fields: Array[StructField] = builder.result() @@ -137,7 +139,7 @@ private[sql] object JsonInferSchema { var elementType: DataType = NullType while (nextUntil(parser, END_ARRAY)) { elementType = compatibleType( - elementType, inferField(parser, configOptions)) + elementType, inferField(parser, configOptions, caseSensitive)) } ArrayType(elementType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 34535c3aac94..6982c57589ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) - val parser = new JacksonParser(dummySchema, dummyOption) + val parser = new JacksonParser(dummySchema, dummyOption, caseSensitive = true) Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken() @@ -238,8 +238,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val jsonDF = spark.read.json(jsonNullStruct) val expectedSchema = StructType( StructField("headers", StructType( - StructField("Charset", StringType, true) :: - StructField("Host", StringType, true) :: Nil) + StructField("charset", StringType, true) :: + StructField("host", StringType, true) :: Nil) , true) :: StructField("ip", StringType, true) :: StructField("nullstr", StringType, true):: Nil) @@ -257,7 +257,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val jsonDF = spark.read.json(primitiveFieldAndType) val expectedSchema = StructType( - StructField("bigInteger", DecimalType(20, 0), true) :: + StructField("biginteger", DecimalType(20, 0), true) :: StructField("boolean", BooleanType, true) :: StructField("double", DoubleType, true) :: StructField("integer", LongType, true) :: @@ -282,102 +282,104 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Complex field and type inferring") { - val jsonDF = spark.read.json(complexFieldAndType1) - - val expectedSchema = StructType( - StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: - StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, true), true), true) :: - StructField("arrayOfBigInteger", ArrayType(DecimalType(21, 0), true), true) :: - StructField("arrayOfBoolean", ArrayType(BooleanType, true), true) :: - StructField("arrayOfDouble", ArrayType(DoubleType, true), true) :: - StructField("arrayOfInteger", ArrayType(LongType, true), true) :: - StructField("arrayOfLong", ArrayType(LongType, true), true) :: - StructField("arrayOfNull", ArrayType(StringType, true), true) :: - StructField("arrayOfString", ArrayType(StringType, true), true) :: - StructField("arrayOfStruct", ArrayType( - StructType( - StructField("field1", BooleanType, true) :: - StructField("field2", StringType, true) :: - StructField("field3", StringType, true) :: Nil), true), true) :: - StructField("struct", StructType( - StructField("field1", BooleanType, true) :: - StructField("field2", DecimalType(20, 0), true) :: Nil), true) :: - StructField("structWithArrayFields", StructType( - StructField("field1", ArrayType(LongType, true), true) :: - StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val jsonDF = spark.read.json(complexFieldAndType1) + + val expectedSchema = StructType( + StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: + StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, true), true), true) :: + StructField("arrayOfBigInteger", ArrayType(DecimalType(21, 0), true), true) :: + StructField("arrayOfBoolean", ArrayType(BooleanType, true), true) :: + StructField("arrayOfDouble", ArrayType(DoubleType, true), true) :: + StructField("arrayOfInteger", ArrayType(LongType, true), true) :: + StructField("arrayOfLong", ArrayType(LongType, true), true) :: + StructField("arrayOfNull", ArrayType(StringType, true), true) :: + StructField("arrayOfString", ArrayType(StringType, true), true) :: + StructField("arrayOfStruct", ArrayType( + StructType( + StructField("field1", BooleanType, true) :: + StructField("field2", StringType, true) :: + StructField("field3", StringType, true) :: Nil), true), true) :: + StructField("struct", StructType( + StructField("field1", BooleanType, true) :: + StructField("field2", DecimalType(20, 0), true) :: Nil), true) :: + StructField("structWithArrayFields", StructType( + StructField("field1", ArrayType(LongType, true), true) :: + StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) + + assert(expectedSchema === jsonDF.schema) - assert(expectedSchema === jsonDF.schema) - - jsonDF.createOrReplaceTempView("jsonTable") + jsonDF.createOrReplaceTempView("jsonTable") - // Access elements of a primitive array. - checkAnswer( - sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), - Row("str1", "str2", null) - ) + // Access elements of a primitive array. + checkAnswer( + sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), + Row("str1", "str2", null) + ) - // Access an array of null values. - checkAnswer( - sql("select arrayOfNull from jsonTable"), - Row(Seq(null, null, null, null)) - ) + // Access an array of null values. + checkAnswer( + sql("select arrayOfNull from jsonTable"), + Row(Seq(null, null, null, null)) + ) - // Access elements of a BigInteger array (we use DecimalType internally). - checkAnswer( - sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"), - Row(new java.math.BigDecimal("922337203685477580700"), - new java.math.BigDecimal("-922337203685477580800"), null) - ) + // Access elements of a BigInteger array (we use DecimalType internally). + checkAnswer( + sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"), + Row(new java.math.BigDecimal("922337203685477580700"), + new java.math.BigDecimal("-922337203685477580800"), null) + ) - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), - Row(Seq("1", "2", "3"), Seq("str1", "str2")) - ) + // Access elements of an array of arrays. + checkAnswer( + sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), + Row(Seq("1", "2", "3"), Seq("str1", "str2")) + ) - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), - Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) - ) + // Access elements of an array of arrays. + checkAnswer( + sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), + Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) + ) - // Access elements of an array inside a filed with the type of ArrayType(ArrayType). - checkAnswer( - sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), - Row("str2", 2.1) - ) + // Access elements of an array inside a filed with the type of ArrayType(ArrayType). + checkAnswer( + sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), + Row("str2", 2.1) + ) - // Access elements of an array of structs. - checkAnswer( - sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + - "from jsonTable"), - Row( - Row(true, "str1", null), - Row(false, null, null), - Row(null, null, null), - null) - ) + // Access elements of an array of structs. + checkAnswer( + sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + + "from jsonTable"), + Row( + Row(true, "str1", null), + Row(false, null, null), + Row(null, null, null), + null) + ) - // Access a struct and fields inside of it. - checkAnswer( - sql("select struct, struct.field1, struct.field2 from jsonTable"), - Row( - Row(true, new java.math.BigDecimal("92233720368547758070")), - true, - new java.math.BigDecimal("92233720368547758070")) :: Nil - ) + // Access a struct and fields inside of it. + checkAnswer( + sql("select struct, struct.field1, struct.field2 from jsonTable"), + Row( + Row(true, new java.math.BigDecimal("92233720368547758070")), + true, + new java.math.BigDecimal("92233720368547758070")) :: Nil + ) - // Access an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), - Row(Seq(4, 5, 6), Seq("str1", "str2")) - ) + // Access an array field of a struct. + checkAnswer( + sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), + Row(Seq(4, 5, 6), Seq("str1", "str2")) + ) - // Access elements of an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"), - Row(5, null) - ) + // Access elements of an array field of a struct. + checkAnswer( + sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"), + Row(5, null) + ) + } } test("GetField operation on complex data type") { @@ -597,7 +599,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val jsonDF = spark.read.json(path) val expectedSchema = StructType( - StructField("bigInteger", DecimalType(20, 0), true) :: + StructField("biginteger", DecimalType(20, 0), true) :: StructField("boolean", BooleanType, true) :: StructField("double", DoubleType, true) :: StructField("integer", LongType, true) :: @@ -629,7 +631,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val jsonDF = spark.read.option("primitivesAsString", "true").json(path) val expectedSchema = StructType( - StructField("bigInteger", StringType, true) :: + StructField("biginteger", StringType, true) :: StructField("boolean", StringType, true) :: StructField("double", StringType, true) :: StructField("integer", StringType, true) :: @@ -654,108 +656,110 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Loading a JSON dataset primitivesAsString returns complex fields as strings") { - val jsonDF = spark.read.option("primitivesAsString", "true").json(complexFieldAndType1) - - val expectedSchema = StructType( - StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: - StructField("arrayOfArray2", ArrayType(ArrayType(StringType, true), true), true) :: - StructField("arrayOfBigInteger", ArrayType(StringType, true), true) :: - StructField("arrayOfBoolean", ArrayType(StringType, true), true) :: - StructField("arrayOfDouble", ArrayType(StringType, true), true) :: - StructField("arrayOfInteger", ArrayType(StringType, true), true) :: - StructField("arrayOfLong", ArrayType(StringType, true), true) :: - StructField("arrayOfNull", ArrayType(StringType, true), true) :: - StructField("arrayOfString", ArrayType(StringType, true), true) :: - StructField("arrayOfStruct", ArrayType( - StructType( - StructField("field1", StringType, true) :: - StructField("field2", StringType, true) :: - StructField("field3", StringType, true) :: Nil), true), true) :: - StructField("struct", StructType( - StructField("field1", StringType, true) :: - StructField("field2", StringType, true) :: Nil), true) :: - StructField("structWithArrayFields", StructType( - StructField("field1", ArrayType(StringType, true), true) :: - StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val jsonDF = spark.read.option("primitivesAsString", "true").json(complexFieldAndType1) + + val expectedSchema = StructType( + StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: + StructField("arrayOfArray2", ArrayType(ArrayType(StringType, true), true), true) :: + StructField("arrayOfBigInteger", ArrayType(StringType, true), true) :: + StructField("arrayOfBoolean", ArrayType(StringType, true), true) :: + StructField("arrayOfDouble", ArrayType(StringType, true), true) :: + StructField("arrayOfInteger", ArrayType(StringType, true), true) :: + StructField("arrayOfLong", ArrayType(StringType, true), true) :: + StructField("arrayOfNull", ArrayType(StringType, true), true) :: + StructField("arrayOfString", ArrayType(StringType, true), true) :: + StructField("arrayOfStruct", ArrayType( + StructType( + StructField("field1", StringType, true) :: + StructField("field2", StringType, true) :: + StructField("field3", StringType, true) :: Nil), true), true) :: + StructField("struct", StructType( + StructField("field1", StringType, true) :: + StructField("field2", StringType, true) :: Nil), true) :: + StructField("structWithArrayFields", StructType( + StructField("field1", ArrayType(StringType, true), true) :: + StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) + + assert(expectedSchema === jsonDF.schema) - assert(expectedSchema === jsonDF.schema) - - jsonDF.createOrReplaceTempView("jsonTable") + jsonDF.createOrReplaceTempView("jsonTable") - // Access elements of a primitive array. - checkAnswer( - sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), - Row("str1", "str2", null) - ) + // Access elements of a primitive array. + checkAnswer( + sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), + Row("str1", "str2", null) + ) - // Access an array of null values. - checkAnswer( - sql("select arrayOfNull from jsonTable"), - Row(Seq(null, null, null, null)) - ) + // Access an array of null values. + checkAnswer( + sql("select arrayOfNull from jsonTable"), + Row(Seq(null, null, null, null)) + ) - // Access elements of a BigInteger array (we use DecimalType internally). - checkAnswer( - sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"), - Row("922337203685477580700", "-922337203685477580800", null) - ) + // Access elements of a BigInteger array (we use DecimalType internally). + checkAnswer( + sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"), + Row("922337203685477580700", "-922337203685477580800", null) + ) - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), - Row(Seq("1", "2", "3"), Seq("str1", "str2")) - ) + // Access elements of an array of arrays. + checkAnswer( + sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), + Row(Seq("1", "2", "3"), Seq("str1", "str2")) + ) - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), - Row(Seq("1", "2", "3"), Seq("1.1", "2.1", "3.1")) - ) + // Access elements of an array of arrays. + checkAnswer( + sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), + Row(Seq("1", "2", "3"), Seq("1.1", "2.1", "3.1")) + ) - // Access elements of an array inside a filed with the type of ArrayType(ArrayType). - checkAnswer( - sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), - Row("str2", "2.1") - ) + // Access elements of an array inside a filed with the type of ArrayType(ArrayType). + checkAnswer( + sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), + Row("str2", "2.1") + ) - // Access elements of an array of structs. - checkAnswer( - sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + - "from jsonTable"), - Row( - Row("true", "str1", null), - Row("false", null, null), - Row(null, null, null), - null) - ) + // Access elements of an array of structs. + checkAnswer( + sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + + "from jsonTable"), + Row( + Row("true", "str1", null), + Row("false", null, null), + Row(null, null, null), + null) + ) - // Access a struct and fields inside of it. - checkAnswer( - sql("select struct, struct.field1, struct.field2 from jsonTable"), - Row( - Row("true", "92233720368547758070"), - "true", - "92233720368547758070") :: Nil - ) + // Access a struct and fields inside of it. + checkAnswer( + sql("select struct, struct.field1, struct.field2 from jsonTable"), + Row( + Row("true", "92233720368547758070"), + "true", + "92233720368547758070") :: Nil + ) - // Access an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), - Row(Seq("4", "5", "6"), Seq("str1", "str2")) - ) + // Access an array field of a struct. + checkAnswer( + sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), + Row(Seq("4", "5", "6"), Seq("str1", "str2")) + ) - // Access elements of an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"), - Row("5", null) - ) + // Access elements of an array field of a struct. + checkAnswer( + sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"), + Row("5", null) + ) + } } test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") { val jsonDF = spark.read.option("prefersDecimal", "true").json(primitiveFieldAndType) val expectedSchema = StructType( - StructField("bigInteger", DecimalType(20, 0), true) :: + StructField("biginteger", DecimalType(20, 0), true) :: StructField("boolean", BooleanType, true) :: StructField("double", DecimalType(17, -292), true) :: StructField("integer", LongType, true) :: @@ -879,7 +883,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) val schema = StructType( - StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) :: + StructField("biginteger", DecimalType.SYSTEM_DEFAULT, true) :: StructField("boolean", BooleanType, true) :: StructField("double", DoubleType, true) :: StructField("integer", IntegerType, true) :: @@ -1179,34 +1183,36 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-4068: nulls in arrays") { - val jsonDF = spark.read.json(nullsInArrays) - jsonDF.createOrReplaceTempView("jsonTable") + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val jsonDF = spark.read.json(nullsInArrays) + jsonDF.createOrReplaceTempView("jsonTable") - val schema = StructType( - StructField("field1", - ArrayType(ArrayType(ArrayType(ArrayType(StringType, true), true), true), true), true) :: - StructField("field2", - ArrayType(ArrayType( - StructType(StructField("Test", LongType, true) :: Nil), true), true), true) :: - StructField("field3", - ArrayType(ArrayType( - StructType(StructField("Test", StringType, true) :: Nil), true), true), true) :: - StructField("field4", - ArrayType(ArrayType(ArrayType(LongType, true), true), true), true) :: Nil) + val schema = StructType( + StructField("field1", + ArrayType(ArrayType(ArrayType(ArrayType(StringType, true), true), true), true), true) :: + StructField("field2", + ArrayType(ArrayType( + StructType(StructField("Test", LongType, true) :: Nil), true), true), true) :: + StructField("field3", + ArrayType(ArrayType( + StructType(StructField("Test", StringType, true) :: Nil), true), true), true) :: + StructField("field4", + ArrayType(ArrayType(ArrayType(LongType, true), true), true), true) :: Nil) - assert(schema === jsonDF.schema) + assert(schema === jsonDF.schema) - checkAnswer( - sql( - """ - |SELECT field1, field2, field3, field4 - |FROM jsonTable - """.stripMargin), - Row(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) :: - Row(null, Seq(null, Seq(Row(1))), null, null) :: - Row(null, null, Seq(Seq(null), Seq(Row("2"))), null) :: - Row(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil - ) + checkAnswer( + sql( + """ + |SELECT field1, field2, field3, field4 + |FROM jsonTable + """.stripMargin), + Row(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) :: + Row(null, Seq(null, Seq(Row(1))), null, null) :: + Row(null, null, Seq(Seq(null), Seq(Row("2"))), null) :: + Row(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil + ) + } } test("SPARK-4228 DataFrame to JSON") { @@ -1372,7 +1378,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val emptySchema = JsonInferSchema.infer( empty.rdd, new JSONOptions(Map.empty[String, String], "GMT"), - CreateJacksonParser.string) + CreateJacksonParser.string, + caseSensitive = true) assert(StructType(Seq()) === emptySchema) } @@ -1399,7 +1406,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val emptySchema = JsonInferSchema.infer( emptyRecords.rdd, new JSONOptions(Map.empty[String, String], "GMT"), - CreateJacksonParser.string) + CreateJacksonParser.string, + caseSensitive = true) assert(StructType(Seq()) === emptySchema) }