Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
23c8846
[STREAMING][MINOR] Fix typo in function name of StateImpl
jerryshao Dec 15, 2015
80d2617
Update branch-1.6 for 1.6.0 release
marmbrus Dec 15, 2015
00a39d9
Preparing Spark release v1.6.0-rc3
pwendell Dec 15, 2015
08aa3b4
Preparing development version 1.6.0-SNAPSHOT
pwendell Dec 15, 2015
9e4ac56
[SPARK-12056][CORE] Part 2 Create a TaskAttemptContext only after cal…
tedyu Dec 16, 2015
2c324d3
[SPARK-12351][MESOS] Add documentation about submitting Spark with me…
tnachen Dec 16, 2015
8e9a600
[SPARK-9886][CORE] Fix to use ShutdownHookManager in
naveenminchu Dec 16, 2015
93095eb
[SPARK-12062][CORE] Change Master to asyc rebuild UI when application…
BryanCutler Dec 16, 2015
fb08f7b
[SPARK-10477][SQL] using DSL in ColumnPruningSuite to improve readabi…
cloud-fan Dec 16, 2015
135a5ee
removed some maven-jar-plugin
markhamstra Dec 16, 2015
9a6494a
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Dec 16, 2015
a2d584e
[SPARK-12324][MLLIB][DOC] Fixes the sidebar in the ML documentation
thunterdb Dec 16, 2015
ac0e2ea
[SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR
yanboliang Dec 16, 2015
16edd93
[SPARK-12215][ML][DOC] User guide section for KMeans in spark.ml
yu-iskw Dec 16, 2015
f815127
[SPARK-12318][SPARKR] Save mode in SparkR should be error by default
zjffdu Dec 16, 2015
e5b8571
[SPARK-12345][MESOS] Filter SPARK_HOME when submitting Spark jobs wit…
tnachen Dec 16, 2015
e1adf6d
[SPARK-6518][MLLIB][EXAMPLE][DOC] Add example code and user guide for…
yu-iskw Dec 16, 2015
168c89e
Preparing Spark release v1.6.0-rc3
pwendell Dec 16, 2015
aee88eb
Preparing development version 1.6.0-SNAPSHOT
pwendell Dec 16, 2015
dffa610
[SPARK-11608][MLLIB][DOC] Added migration guide for MLlib 1.6
jkbradley Dec 16, 2015
04e868b
[SPARK-12364][ML][SPARKR] Add ML example for SparkR
yanboliang Dec 16, 2015
d020431
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Dec 16, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34,172 changes: 34,172 additions & 0 deletions CHANGES.txt

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,11 @@ Vis.js uses and redistributes the following third-party libraries:

- keycharm
https://github.com/AlexDM0/keycharm
The MIT License
The MIT License

===============================================================================

The CSS style for the navigation sidebar of the documentation was originally
submitted by Óscar Nájera for the scikit-learn project. The scikit-learn project
is distributed under the 3-Clause BSD license.
===============================================================================
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
"write.df")
"write.df",
"write.json",
"write.parquet")

exportClasses("Column")

Expand Down
61 changes: 50 additions & 11 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -596,30 +596,69 @@ setMethod("toJSON",
RDD(jrdd, serializedMode = "string")
})

#' saveAsParquetFile
#' write.json
#'
#' Save the contents of a DataFrame as a JSON file (one object per line). Files written out
#' with this method can be read back in as a DataFrame using read.json().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname write.json
#' @name write.json
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' write.json(df, "/tmp/sparkr-tmp/")
#'}
setMethod("write.json",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "json", path))
})

#' write.parquet
#'
#' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out
#' with this method can be read back in as a DataFrame using parquetFile().
#' with this method can be read back in as a DataFrame using read.parquet().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname saveAsParquetFile
#' @name saveAsParquetFile
#' @rdname write.parquet
#' @name write.parquet
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
#' write.parquet(df, "/tmp/sparkr-tmp1/")
#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
setMethod("write.parquet",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "parquet", path))
})

#' @rdname write.parquet
#' @name saveAsParquetFile
#' @export
setMethod("saveAsParquetFile",
signature(x = "DataFrame", path = "character"),
function(x, path) {
invisible(callJMethod(x@sdf, "saveAsParquetFile", path))
.Deprecated("write.parquet")
write.parquet(x, path)
})

#' Distinct
Expand Down Expand Up @@ -1886,7 +1925,7 @@ setMethod("except",
#' @param df A SparkSQL DataFrame
#' @param path A name for the table
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#'
#' @family DataFrame functions
#' @rdname write.df
Expand All @@ -1903,7 +1942,7 @@ setMethod("except",
#' }
setMethod("write.df",
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
function(df, path, source = NULL, mode = "error", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
Expand All @@ -1928,7 +1967,7 @@ setMethod("write.df",
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
function(df, path, source = NULL, mode = "error", ...){
write.df(df, path, source, mode, ...)
})

Expand All @@ -1951,7 +1990,7 @@ setMethod("saveDF",
#' @param df A SparkSQL DataFrame
#' @param tableName A name for the table
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#'
#' @family DataFrame functions
#' @rdname saveAsTable
Expand All @@ -1968,7 +2007,7 @@ setMethod("saveDF",
setMethod("saveAsTable",
signature(df = "DataFrame", tableName = "character", source = "character",
mode = "character"),
function(df, tableName, source = NULL, mode="append", ...){
function(df, tableName, source = NULL, mode="error", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
Expand Down
16 changes: 12 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,6 @@ setGeneric("sample_frac",
#' @export
setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })

#' @rdname saveAsParquetFile
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname saveAsTable
#' @export
setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
Expand All @@ -541,6 +537,18 @@ setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
#' @export
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })

#' @rdname write.parquet
#' @export
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })

#' @rdname write.parquet
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
Expand Down
104 changes: 59 additions & 45 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})

test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
test_that("read/write json files", {
# Test read.df
df <- read.df(sqlContext, jsonPath, "json")
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

# Test read.df with a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))

df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_is(df1, "DataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))

# Test loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_is(df2, "DataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))

# Test read.json
df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
# read.json()/jsonFile() works with multiple input paths

# Test write.df
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
write.df(df, jsonPath2, "json", mode="overwrite")
jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))

# Test write.json
jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json")
write.json(df, jsonPath3)

# Test read.json()/jsonFile() works with multiple input paths
jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "DataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "DataFrame")
expect_equal(count(jsonDF2), 6)

unlink(jsonPath2)
unlink(jsonPath3)
})

test_that("jsonRDD() on a RDD with json string", {
Expand Down Expand Up @@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", {
expect_equal(count(sql(sqlContext, "select * from table1")), 2)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
dropTempTable(sqlContext, "table1")

unlink(jsonPath2)
unlink(parquetPath2)
})

test_that("table() returns a new DataFrame", {
Expand Down Expand Up @@ -848,33 +878,6 @@ test_that("column calculation", {
expect_equal(count(df2), 3)
})

test_that("read.df() from json file", {
df <- read.df(sqlContext, jsonPath, "json")
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

# Check if we can apply a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))

df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_is(df1, "DataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))

# Run the same with loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_is(df2, "DataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
})

test_that("write.df() as parquet file", {
df <- read.df(sqlContext, jsonPath, "json")
write.df(df, parquetPath, "parquet", mode="overwrite")
df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_is(df2, "DataFrame")
expect_equal(count(df2), 3)
})

test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
Expand All @@ -895,6 +898,8 @@ test_that("test HiveContext", {
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "DataFrame")
expect_equal(count(df3), 3)

unlink(jsonPath2)
})

test_that("column operators", {
Expand Down Expand Up @@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", {
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))

unlink(jsonPath2)
unlink(jsonPath3)
})

test_that("toJSON() returns an RDD of the correct values", {
Expand Down Expand Up @@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {

# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)

unlink(jsonPath2)
})

test_that("withColumn() and withColumnRenamed()", {
Expand Down Expand Up @@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", {
detach(airquality)
})

test_that("write.df() on DataFrame and works with read.parquet", {
df <- read.json(sqlContext, jsonPath)
test_that("read/write Parquet files", {
df <- read.df(sqlContext, jsonPath, "json")
# Test write.df and read.df
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, parquetPath)
expect_is(parquetDF, "DataFrame")
expect_equal(count(df), count(parquetDF))
})
df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_is(df2, "DataFrame")
expect_equal(count(df2), 3)

test_that("read.parquet()/parquetFile() works with multiple input paths", {
df <- read.json(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
# Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
write.parquet(df, parquetPath2)
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
suppressWarnings(saveAsParquetFile(df, parquetPath3))
parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
expect_is(parquetDF, "DataFrame")
expect_equal(count(parquetDF), count(df) * 2)
parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2))
parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
expect_is(parquetDF2, "DataFrame")
expect_equal(count(parquetDF2), count(df) * 2)

# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)

unlink(parquetPath2)
unlink(parquetPath3)
unlink(parquetPath4)
})

test_that("describe() and summarize() on a DataFrame", {
Expand Down
Loading