Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ object FunctionRegistry {
// Otherwise, find a constructor method that matches the number of arguments, and use that.
val params = Seq.fill(expressions.size)(classOf[Expression])
val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse {
val validParametersCount = constructors.map(_.getParameterCount).distinct.sorted
val validParametersCount = constructors
.filter(_.getParameterTypes.forall(_ == classOf[Expression]))
.map(_.getParameterCount).distinct.sorted
val expectedNumberOfParameters = if (validParametersCount.length == 1) {
validParametersCount.head.toString
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,10 @@ case class JsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
timeZoneId: Option[String],
forceNullableSchema: Boolean)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {

val forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)

// The JSON input data might be missing certain fields. We force the nullability
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
// can generate incorrect files if values are missing in columns declared as non-nullable.
Expand All @@ -532,14 +531,21 @@ case class JsonToStructs(
schema = JsonExprUtils.validateSchemaLiteral(schema),
options = Map.empty[String, String],
child = child,
timeZoneId = None)
timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

def this(child: Expression, schema: Expression, options: Expression) =
this(
schema = JsonExprUtils.validateSchemaLiteral(schema),
options = JsonExprUtils.convertToMapData(options),
child = child,
timeZoneId = None)
timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

// Used in `org.apache.spark.sql.functions`
def this(schema: DataType, options: Map[String, String], child: Expression) =
this(schema, options, child, timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
case _: StructType | ArrayType(_: StructType, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
InternalRow(1)
)
}
Expand All @@ -401,13 +401,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
null
)

// Other modes should still return `null`.
checkEvaluation(
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId, true),
null
)
}
Expand All @@ -416,70 +416,70 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val input = """[{"a": 1}, {"a": 2}]"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: InternalRow(2) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=object, schema=array, output=array of single row") {
val input = """{"a": 1}"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty array, schema=array, output=empty array") {
val input = "[ ]"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty object, schema=array, output=array of single row with null") {
val input = "{ }"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(null) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=array of single object, schema=struct, output=single row") {
val input = """[{"a": 1}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(1)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=array, schema=struct, output=null") {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty array, schema=struct, output=null") {
val input = """[]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty object, schema=struct, output=single row with null") {
val input = """{ }"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(null)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId, true),
null
)
}

test("SPARK-20549: from_json bad UTF-8") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId, true),
null)
}

Expand All @@ -491,14 +491,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
c.set(2016, 0, 1, 0, 0, 0)
c.set(Calendar.MILLISECOND, 123)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId, true),
InternalRow(c.getTimeInMillis * 1000L)
)
// The result doesn't change because the json string includes timezone string ("Z" here),
// which means the string represents the timestamp string in the timezone regardless of
// the timeZoneId parameter.
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST"), true),
InternalRow(c.getTimeInMillis * 1000L)
)

Expand All @@ -512,7 +512,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
schema,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
Literal(jsonData2),
Option(tz.getID)),
Option(tz.getID),
true),
InternalRow(c.getTimeInMillis * 1000L)
)
checkEvaluation(
Expand All @@ -521,7 +522,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
Literal(jsonData2),
gmtId),
gmtId,
true),
InternalRow(c.getTimeInMillis * 1000L)
)
}
Expand All @@ -530,7 +532,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
test("SPARK-19543: from_json empty input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId, true),
null
)
}
Expand Down Expand Up @@ -685,27 +687,23 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with

test("from_json missing fields") {
for (forceJsonNullableSchema <- Seq(false, true)) {
withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> forceJsonNullableSchema.toString) {
val input =
"""{
| "a": 1,
| "c": "foo"
|}
|""".stripMargin
val jsonSchema = new StructType()
.add("a", LongType, nullable = false)
.add("b", StringType, nullable = false)
.add("c", StringType, nullable = false)
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
checkEvaluation(
JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId),
output
)
val schema = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId)
.dataType
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
assert(schemaToCompare == schema)
}
val input =
"""{
| "a": 1,
| "c": "foo"
|}
|""".stripMargin
val jsonSchema = new StructType()
.add("a", LongType, nullable = false)
.add("b", StringType, nullable = false)
.add("c", StringType, nullable = false)
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
val expr = JsonToStructs(
jsonSchema, Map.empty, Literal.create(input, StringType), gmtId, forceJsonNullableSchema)
checkEvaluation(expr, output)
val schema = expr.dataType
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
assert(schemaToCompare == schema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3179,7 +3179,7 @@ object functions {
* @since 2.2.0
*/
def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
JsonToStructs(schema, options, e.expr)
new JsonToStructs(schema, options, e.expr)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ select to_json()
struct<>
-- !query 12 output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function to_json. Expected: one of 1, 2 and 3; Found: 0; line 1 pos 7
Invalid number of arguments for function to_json. Expected: one of 1 and 2; Found: 0; line 1 pos 7


-- !query 13
Expand Down Expand Up @@ -225,7 +225,7 @@ select from_json()
struct<>
-- !query 21 output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function from_json. Expected: one of 2, 3 and 4; Found: 0; line 1 pos 7
Invalid number of arguments for function from_json. Expected: one of 2 and 3; Found: 0; line 1 pos 7


-- !query 22
Expand Down