Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ NULL
#' \itemize{
#' \item \code{from_json}: a structType object to use as the schema to use
#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or
#' the DDL-formatted string literal can also be accepted.
#' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv}
#' }
#' @param ... additional argument(s).
#' \itemize{
Expand Down Expand Up @@ -2254,40 +2255,54 @@ setMethod("date_format", signature(y = "Column", x = "character"),
column(jc)
})

setClassUnion("characterOrstructTypeOrColumn", c("character", "structType", "Column"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably try to pull all the setClassUnion in one place. (to avoid conflict or duplication)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I agree.. Would you mind if I do this separately? I roughly checked by grep and looks:

./pkg/R/DataFrame.R:setClassUnion("characterOrstructType", c("character", "structType"))
./pkg/R/DataFrame.R:setClassUnion("numericOrcharacter", c("numeric", "character"))
./pkg/R/DataFrame.R:setClassUnion("characterOrColumn", c("character", "Column"))
./pkg/R/DataFrame.R:setClassUnion("numericOrColumn", c("numeric", "Column"))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


#' @details
#' \code{from_json}: Parses a column containing a JSON string into a Column of \code{structType}
#' with the specified \code{schema} or array of \code{structType} if \code{as.json.array} is set
#' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
#' @aliases from_json from_json,Column,characterOrstructType-method
#' @aliases from_json from_json,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
#' df2 <- mutate(df2, d2 = to_json(df2$d, dateFormat = 'dd/MM/yyyy'))
#' schema <- structType(structField("date", "string"))
#' head(select(df2, from_json(df2$d2, schema, dateFormat = 'dd/MM/yyyy')))

#' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#' schema <- structType(structField("name", "string"))
#' head(select(df2, from_json(df2$people_json, schema)))
#' head(select(df2, from_json(df2$people_json, "name STRING")))}
#' head(select(df2, from_json(df2$people_json, "name STRING")))
#' head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, as.json.array = FALSE, ...) {
if (is.character(schema)) {
schema <- structType(schema)
jschema <- structType(schema)$jobj
} else if (class(schema) == "structType") {
jschema <- schema$jobj
} else {
jschema <- schema@jc
}

if (as.json.array) {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
schema$jobj)
} else {
jschema <- schema$jobj
# This case is R-specifically different. Unlike Scala and Python side,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if as.json.array is TRUE but schema is also set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, the provided schema is wrapped by Array. The test cases are ...

here https://github.com/apache/spark/pull/23184/files#diff-d4011863c8b176830365b2f224a84bf2R1707

# R side has 'as.json.array' option to indicate if the schema should be
# treated as struct or element type of array in order to make it more
# R-friendly.
if (class(schema) == "Column") {
jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"createArrayType",
jschema)
} else {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
jschema)
}
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down Expand Up @@ -2328,22 +2343,27 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"),
#' If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @aliases from_csv from_csv,Column,character-method
#' @aliases from_csv from_csv,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df <- sql("SELECT 'Amsterdam,2018' as csv")
#' csv <- "Amsterdam,2018"
#' df <- sql(paste0("SELECT '", csv, "' as csv"))
#' schema <- "city STRING, year INT"
#' head(select(df, from_csv(df$csv, schema)))}
#' head(select(df, from_csv(df$csv, schema)))
#' head(select(df, from_csv(df$csv, structType(schema))))
#' head(select(df, from_csv(df$csv, schema_of_csv(csv))))}
#' @note from_csv since 3.0.0
setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, ...) {
if (class(schema) == "Column") {
jschema <- schema@jc
} else if (is.character(schema)) {
if (class(schema) == "structType") {
schema <- callJMethod(schema$jobj, "toDDL")
}

if (is.character(schema)) {
jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why in the case for from_json, if schema is character is structType(schema)$jobj
where for from_csv, is callJStatic("org.apache.spark.sql.functions", "lit", schema)
?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yea, that looks a bit confusing. It's similar reason. Fortunately, from_json has StructType with Java Map. So we can directly call it from R side.

def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =

} else {
stop("schema argument should be a column or character")
jschema <- schema@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down
16 changes: 14 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,12 @@ test_that("column functions", {
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, structType("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv("1")), "csv")))
expect_equal(c[[1]][[1]]$`_c0`, 1)
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv(lit("1"))), "csv")))
expect_equal(c[[1]][[1]]$`_c0`, 1)

df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
Expand All @@ -1651,7 +1657,9 @@ test_that("column functions", {
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schemas <- list(structType(structField("age", "integer"), structField("height", "double")),
"age INT, height DOUBLE")
"age INT, height DOUBLE",
schema_of_json("{\"age\":16,\"height\":176.5}"),
schema_of_json(lit("{\"age\":16,\"height\":176.5}")))
for (schema in schemas) {
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
Expand Down Expand Up @@ -1691,7 +1699,11 @@ test_that("column functions", {
# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
df <- as.DataFrame(list(list("people" = jsonArr)))
for (schema in list(structType(structField("name", "string")), "name STRING")) {
schemas <- list(structType(structField("name", "string")),
"name STRING",
schema_of_json("{\"name\":\"Alice\"}"),
schema_of_json(lit("{\"name\":\"Bob\"}")))
for (schema in schemas) {
arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
expect_equal(ncol(arr), 1)
expect_equal(nrow(arr), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
Expand Down Expand Up @@ -225,4 +225,8 @@ private[sql] object SQLUtils extends Logging {
}
sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
}

def createArrayType(column: Column): ArrayType = {
new ArrayType(ExprUtils.evalTypeExpr(column.expr), true)
}
}