From fdecb9a65f411d8ba6642e22df33e02f1050d2d6 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 30 Nov 2018 11:12:00 +0800 Subject: [PATCH 1/3] from_[csv|json] should accept schema_of_[csv|json] in R API --- R/pkg/R/functions.R | 54 ++++++++++++------- R/pkg/tests/fulltests/test_sparkSQL.R | 16 +++++- .../org/apache/spark/sql/api/r/SQLUtils.scala | 8 ++- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index f568a931ae1fe..3659ed4e8f0de 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -202,8 +202,9 @@ NULL #' \itemize{ #' \item \code{from_json}: a structType object to use as the schema to use #' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is -#' also supported for the schema. -#' \item \code{from_csv}: a DDL-formatted string +#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or +#' a string literal can also be accepted. +#' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv} #' } #' @param ... additional argument(s). #' \itemize{ @@ -2254,6 +2255,8 @@ setMethod("date_format", signature(y = "Column", x = "character"), column(jc) }) +setClassUnion("characterOrstructTypeOrColumn", c("character", "structType", "Column")) + #' @details #' \code{from_json}: Parses a column containing a JSON string into a Column of \code{structType} #' with the specified \code{schema} or array of \code{structType} if \code{as.json.array} is set @@ -2261,7 +2264,7 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' #' @rdname column_collection_functions #' @param as.json.array indicating if input string is JSON array of objects or a single object. -#' @aliases from_json from_json,Column,characterOrstructType-method +#' @aliases from_json from_json,Column,characterOrstructTypeOrColumn-method #' @examples #' #' \dontrun{ @@ -2269,25 +2272,31 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' df2 <- mutate(df2, d2 = to_json(df2$d, dateFormat = 'dd/MM/yyyy')) #' schema <- structType(structField("date", "string")) #' head(select(df2, from_json(df2$d2, schema, dateFormat = 'dd/MM/yyyy'))) - #' df2 <- sql("SELECT named_struct('name', 'Bob') as people") #' df2 <- mutate(df2, people_json = to_json(df2$people)) #' schema <- structType(structField("name", "string")) #' head(select(df2, from_json(df2$people_json, schema))) -#' head(select(df2, from_json(df2$people_json, "name STRING")))} +#' head(select(df2, from_json(df2$people_json, "name STRING"))) +#' head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))} #' @note from_json since 2.2.0 -setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"), +setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOrColumn"), function(x, schema, as.json.array = FALSE, ...) { if (is.character(schema)) { - schema <- structType(schema) + jschema <- structType(schema)$jobj + } else if (class(schema) == "structType") { + jschema <- schema$jobj + } else { + jschema <- schema@jc } if (as.json.array) { - jschema <- callJStatic("org.apache.spark.sql.types.DataTypes", - "createArrayType", - schema$jobj) - } else { - jschema <- schema$jobj + # This case is R-specifically different. Unlike Scala and Python side, + # R side has 'as.json.array' option to indicate if the schema should be + # treated as struct or element type of array in order to make it more + # R-friendly. + jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "createArrayType", + jschema) } options <- varargsToStrEnv(...) jc <- callJStatic("org.apache.spark.sql.functions", @@ -2328,22 +2337,27 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"), #' If the string is unparseable, the Column will contain the value NA. #' #' @rdname column_collection_functions -#' @aliases from_csv from_csv,Column,character-method +#' @aliases from_csv from_csv,Column,characterOrstructTypeOrColumn-method #' @examples #' #' \dontrun{ -#' df <- sql("SELECT 'Amsterdam,2018' as csv") +#' csv <- "Amsterdam,2018" +#' df <- sql(paste0("SELECT '", csv, "' as csv")) #' schema <- "city STRING, year INT" -#' head(select(df, from_csv(df$csv, schema)))} +#' head(select(df, from_csv(df$csv, schema))) +#' head(select(df, from_csv(df$csv, structType(schema)))) +#' head(select(df, from_csv(df$csv, schema_of_csv(csv))))} #' @note from_csv since 3.0.0 -setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"), +setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"), function(x, schema, ...) { - if (class(schema) == "Column") { - jschema <- schema@jc - } else if (is.character(schema)) { + if (class(schema) == "structType") { + schema <- callJMethod(schema$job, "toDDL") + } + + if (is.character(schema)) { jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema) } else { - stop("schema argument should be a column or character") + jschema <- schema@jc } options <- varargsToStrEnv(...) jc <- callJStatic("org.apache.spark.sql.functions", diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 0d5118c127f2b..a1805f57b1dcf 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1626,6 +1626,12 @@ test_that("column functions", { expect_equal(c[[1]][[1]]$a, 1) c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv"))) expect_equal(c[[1]][[1]]$a, 1) + c <- collect(select(df, alias(from_csv(df$col, structType("a INT")), "csv"))) + expect_equal(c[[1]][[1]]$a, 1) + c <- collect(select(df, alias(from_csv(df$col, schema_of_csv("1")), "csv"))) + expect_equal(c[[1]][[1]]$`_c0`, 1) + c <- collect(select(df, alias(from_csv(df$col, schema_of_csv(lit("1"))), "csv"))) + expect_equal(c[[1]][[1]]$`_c0`, 1) df <- as.DataFrame(list(list("col" = "1"))) c <- collect(select(df, schema_of_csv("Amsterdam,2018"))) @@ -1651,7 +1657,9 @@ test_that("column functions", { expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") df <- as.DataFrame(j) schemas <- list(structType(structField("age", "integer"), structField("height", "double")), - "age INT, height DOUBLE") + "age INT, height DOUBLE", + schema_of_json("{\"age\":16,\"height\":176.5}"), + schema_of_json(lit("{\"age\":16,\"height\":176.5}"))) for (schema in schemas) { s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) expect_equal(ncol(s), 1) @@ -1691,7 +1699,11 @@ test_that("column functions", { # check if array type in string is correctly supported. jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" df <- as.DataFrame(list(list("people" = jsonArr))) - for (schema in list(structType(structField("name", "string")), "name STRING")) { + schemas <- list(structType(structField("name", "string")), + "name STRING", + schema_of_json("{\"name\":\"Alice\"}"), + schema_of_json(lit("{\"name\":\"Bob\"}"))) + for (schema in schemas) { arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol"))) expect_equal(ncol(arr), 1) expect_equal(nrow(arr), 1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index becb05cf72aba..f4b8354598e4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -225,4 +225,10 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + def createArrayType(elementType: DataType): ArrayType = DataTypes.createArrayType(elementType) + + def createArrayType(elementType: Column): ArrayType = { + new ArrayType(ExprUtils.evalTypeExpr(elementType.expr), true) + } } From c731ad181cc0b69f263b8334d1e1498240121b0c Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sat, 1 Dec 2018 17:53:32 +0800 Subject: [PATCH 2/3] Address comments --- R/pkg/R/functions.R | 14 ++++++++++---- .../org/apache/spark/sql/api/r/SQLUtils.scala | 6 ++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 3659ed4e8f0de..c050bdab830d1 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -203,7 +203,7 @@ NULL #' \item \code{from_json}: a structType object to use as the schema to use #' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is #' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or -#' a string literal can also be accepted. +#' the DDL-formatted string literal can also be accepted. #' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv} #' } #' @param ... additional argument(s). @@ -2294,9 +2294,15 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOr # R side has 'as.json.array' option to indicate if the schema should be # treated as struct or element type of array in order to make it more # R-friendly. - jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "createArrayType", - jschema) + if (class(schema) == "Column") { + jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "createArrayType", + jschema) + } else { + jschema <- callJStatic("org.apache.spark.sql.types.DataTypes", + "createArrayType", + jschema) + } } options <- varargsToStrEnv(...) jc <- callJStatic("org.apache.spark.sql.functions", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index f4b8354598e4f..95ad76fd1d2ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -226,9 +226,7 @@ private[sql] object SQLUtils extends Logging { sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } - def createArrayType(elementType: DataType): ArrayType = DataTypes.createArrayType(elementType) - - def createArrayType(elementType: Column): ArrayType = { - new ArrayType(ExprUtils.evalTypeExpr(elementType.expr), true) + def createArrayType(column: Column): ArrayType = { + new ArrayType(ExprUtils.evalTypeExpr(column.expr), true) } } From 66e6290e83131cd51ca398a6e8418e65573defab Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 28 Dec 2018 18:25:53 +0800 Subject: [PATCH 3/3] Fix typo --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index c050bdab830d1..49ddf337f1177 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2357,7 +2357,7 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"), setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"), function(x, schema, ...) { if (class(schema) == "structType") { - schema <- callJMethod(schema$job, "toDDL") + schema <- callJMethod(schema$jobj, "toDDL") } if (is.character(schema)) {