Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ exportMethods("arrange",
"dropna",
"dtypes",
"except",
"exceptAll",
"explain",
"fillna",
"filter",
Expand All @@ -131,6 +132,7 @@ exportMethods("arrange",
"hint",
"insertInto",
"intersect",
"intersectAll",
"isLocal",
"isStreaming",
"join",
Expand Down
60 changes: 60 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,35 @@ setMethod("intersect",
dataFrame(intersected)
})

#' intersectAll
#'
#' Return a new SparkDataFrame containing rows in both this SparkDataFrame
#' and another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the intersect all operation.
#' @family SparkDataFrame functions
#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
#' @rdname intersectAll
#' @name intersectAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' intersectAllDF <- intersectAll(df1, df2)
#' }
#' @rdname intersectAll
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here

#' @note intersectAll since 2.4

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put 2.4.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setMethod("intersectAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
dataFrame(intersected)
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add extra empty line after code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
Expand Down Expand Up @@ -2876,6 +2905,37 @@ setMethod("except",
dataFrame(excepted)
})

#' exceptAll
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the except all operation.
#' @family SparkDataFrame functions
#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
#' @rdname exceptAll
#' @name exceptAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' exceptAllDF <- exceptAll(df1, df2)
#' }
#' @rdname exceptAll
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bug in except there should only be one @rdname for each

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung Thanks .. Did you want the original function except fixed at part of this ?

#' @note exceptAll since 2.4
setMethod("exceptAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
dataFrame(excepted)
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove one of the two empty lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung Sure.


#' Save the contents of SparkDataFrame to a data source.
#'
#' The data source is specified by the \code{source} and a set of options (...).
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @rdname except
setGeneric("except", function(x, y) { standardGeneric("except") })

#' @rdname exceptAll
setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") })

#' @rdname nafunctions
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })

Expand All @@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertIn
#' @rdname intersect
setGeneric("intersect", function(x, y) { standardGeneric("intersect") })

#' @rdname intersectAll
setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") })

#' @rdname isLocal
setGeneric("isLocal", function(x) { standardGeneric("isLocal") })

Expand Down
26 changes: 26 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2482,6 +2482,32 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
unlink(jsonPath2)
})

test_that("intersectAll() and exceptAll()", {
df1 <- createDataFrame(
list(list("a", 1),
list("a", 1),
list("a", 1),
list("a", 1),
list("b", 3),
list("c", 4)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

list(list("a", 1), list("a", 1), list("a", 1),
     list("a", 1), list("b", 3), list("c", 4)),

schema = c("a", "b"))
df2 <- createDataFrame(
list(list("a", 1), list("a", 1), list("b", 3)),
schema = c("a", "b"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b"))

intersect_all_expected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3),
stringsAsFactors = FALSE)
except_all_expected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4),
stringsAsFactors = FALSE)
intersect_all_df <- arrange(intersectAll(df1, df2), df1$a)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly, the naming rule is intersectAllDf or intersect.all.df (see #17590 (comment))

expect_is(intersect_all_df, "SparkDataFrame")
except_all_df <- arrange(exceptAll(df1, df2), df1$a)
expect_is(except_all_df, "SparkDataFrame")
intersect_all_actual <- collect(intersect_all_df)
expect_identical(intersect_all_actual, intersect_all_expected)
except_all_actual <- collect(except_all_df)
expect_identical(except_all_actual, except_all_expected)
})

test_that("withColumn() and withColumnRenamed()", {
df <- read.json(jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
Expand Down