diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index f568a931ae1fe..49ddf337f1177 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -202,8 +202,9 @@ NULL #' \itemize{ #' \item \code{from_json}: a structType object to use as the schema to use #' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is -#' also supported for the schema. -#' \item \code{from_csv}: a DDL-formatted string +#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or +#' the DDL-formatted string literal can also be accepted. +#' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv} #' } #' @param ... additional argument(s). #' \itemize{ @@ -2254,6 +2255,8 @@ setMethod("date_format", signature(y = "Column", x = "character"), column(jc) }) +setClassUnion("characterOrstructTypeOrColumn", c("character", "structType", "Column")) + #' @details #' \code{from_json}: Parses a column containing a JSON string into a Column of \code{structType} #' with the specified \code{schema} or array of \code{structType} if \code{as.json.array} is set @@ -2261,7 +2264,7 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' #' @rdname column_collection_functions #' @param as.json.array indicating if input string is JSON array of objects or a single object. -#' @aliases from_json from_json,Column,characterOrstructType-method +#' @aliases from_json from_json,Column,characterOrstructTypeOrColumn-method #' @examples #' #' \dontrun{ @@ -2269,25 +2272,37 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' df2 <- mutate(df2, d2 = to_json(df2$d, dateFormat = 'dd/MM/yyyy')) #' schema <- structType(structField("date", "string")) #' head(select(df2, from_json(df2$d2, schema, dateFormat = 'dd/MM/yyyy'))) - #' df2 <- sql("SELECT named_struct('name', 'Bob') as people") #' df2 <- mutate(df2, people_json = to_json(df2$people)) #' schema <- structType(structField("name", "string")) #' head(select(df2, from_json(df2$people_json, schema))) -#' head(select(df2, from_json(df2$people_json, "name STRING")))} +#' head(select(df2, from_json(df2$people_json, "name STRING"))) +#' head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))} #' @note from_json since 2.2.0 -setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"), +setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOrColumn"), function(x, schema, as.json.array = FALSE, ...) { if (is.character(schema)) { - schema <- structType(schema) + jschema <- structType(schema)$jobj + } else if (class(schema) == "structType") { + jschema <- schema$jobj + } else { + jschema <- schema@jc } if (as.json.array) { - jschema <- callJStatic("org.apache.spark.sql.types.DataTypes", - "createArrayType", - schema$jobj) - } else { - jschema <- schema$jobj + # This case is R-specifically different. Unlike Scala and Python side, + # R side has 'as.json.array' option to indicate if the schema should be + # treated as struct or element type of array in order to make it more + # R-friendly. + if (class(schema) == "Column") { + jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "createArrayType", + jschema) + } else { + jschema <- callJStatic("org.apache.spark.sql.types.DataTypes", + "createArrayType", + jschema) + } } options <- varargsToStrEnv(...) jc <- callJStatic("org.apache.spark.sql.functions", @@ -2328,22 +2343,27 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"), #' If the string is unparseable, the Column will contain the value NA. #' #' @rdname column_collection_functions -#' @aliases from_csv from_csv,Column,character-method +#' @aliases from_csv from_csv,Column,characterOrstructTypeOrColumn-method #' @examples #' #' \dontrun{ -#' df <- sql("SELECT 'Amsterdam,2018' as csv") +#' csv <- "Amsterdam,2018" +#' df <- sql(paste0("SELECT '", csv, "' as csv")) #' schema <- "city STRING, year INT" -#' head(select(df, from_csv(df$csv, schema)))} +#' head(select(df, from_csv(df$csv, schema))) +#' head(select(df, from_csv(df$csv, structType(schema)))) +#' head(select(df, from_csv(df$csv, schema_of_csv(csv))))} #' @note from_csv since 3.0.0 -setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"), +setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"), function(x, schema, ...) { - if (class(schema) == "Column") { - jschema <- schema@jc - } else if (is.character(schema)) { + if (class(schema) == "structType") { + schema <- callJMethod(schema$jobj, "toDDL") + } + + if (is.character(schema)) { jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema) } else { - stop("schema argument should be a column or character") + jschema <- schema@jc } options <- varargsToStrEnv(...) jc <- callJStatic("org.apache.spark.sql.functions", diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 0d5118c127f2b..a1805f57b1dcf 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1626,6 +1626,12 @@ test_that("column functions", { expect_equal(c[[1]][[1]]$a, 1) c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv"))) expect_equal(c[[1]][[1]]$a, 1) + c <- collect(select(df, alias(from_csv(df$col, structType("a INT")), "csv"))) + expect_equal(c[[1]][[1]]$a, 1) + c <- collect(select(df, alias(from_csv(df$col, schema_of_csv("1")), "csv"))) + expect_equal(c[[1]][[1]]$`_c0`, 1) + c <- collect(select(df, alias(from_csv(df$col, schema_of_csv(lit("1"))), "csv"))) + expect_equal(c[[1]][[1]]$`_c0`, 1) df <- as.DataFrame(list(list("col" = "1"))) c <- collect(select(df, schema_of_csv("Amsterdam,2018"))) @@ -1651,7 +1657,9 @@ test_that("column functions", { expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") df <- as.DataFrame(j) schemas <- list(structType(structField("age", "integer"), structField("height", "double")), - "age INT, height DOUBLE") + "age INT, height DOUBLE", + schema_of_json("{\"age\":16,\"height\":176.5}"), + schema_of_json(lit("{\"age\":16,\"height\":176.5}"))) for (schema in schemas) { s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) expect_equal(ncol(s), 1) @@ -1691,7 +1699,11 @@ test_that("column functions", { # check if array type in string is correctly supported. jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" df <- as.DataFrame(list(list("people" = jsonArr))) - for (schema in list(structType(structField("name", "string")), "name STRING")) { + schemas <- list(structType(structField("name", "string")), + "name STRING", + schema_of_json("{\"name\":\"Alice\"}"), + schema_of_json(lit("{\"name\":\"Bob\"}"))) + for (schema in schemas) { arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol"))) expect_equal(ncol(arr), 1) expect_equal(nrow(arr), 1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index becb05cf72aba..95ad76fd1d2ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -225,4 +225,8 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + def createArrayType(column: Column): ArrayType = { + new ArrayType(ExprUtils.evalTypeExpr(column.expr), true) + } }