Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
"coalesce",
"collect",
"colnames",
"colnames<-",
Expand Down
46 changes: 43 additions & 3 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -678,14 +678,53 @@ setMethod("storageLevel",
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
})

#' Coalesce
#'
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are more partitions then there will be a shuffle right ? Might be useful to add that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no, coalesce is set to min(prev partitions, numPartitions) according to CoalescedRDD here so it will be unchanged then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh well I guess thats worth mentioning then ?

#' the current partitions. If a larger number of partitions is requested, it will stay at the
#' current number of partitions.
#'
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
#' this may result in your computation taking place on fewer nodes than
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
#' call \code{repartition}. This will add a shuffle step, but means the
#' current upstream partitions will be executed in parallel (per whatever
#' the current partitioning is).
#'
#' @param numPartitions the number of partitions to use.
#'
#' @family SparkDataFrame functions
#' @rdname coalesce
#' @name coalesce
#' @aliases coalesce,SparkDataFrame-method
#' @seealso \link{repartition}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' newDF <- coalesce(df, 1L)
#'}
#' @note coalesce(SparkDataFrame) since 2.1.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.2.0? Or this will be ported back to 2.1.1 too.

setMethod("coalesce",
signature(x = "SparkDataFrame"),
function(x, numPartitions) {
stopifnot(is.numeric(numPartitions))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we enforce the input param as Integer?

Copy link
Member Author

@felixcheung felixcheung Jan 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's being coerced into integer - the reason we don't want this to be integer is to allow calls like

coalesce(df, 3)

in which 3 is a numeric by default. (vs 3L is integer) IMO, forcing the user to call with 3L is a bit too much

sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
dataFrame(sdf)
})

#' Repartition
#'
#' The following options for repartition are possible:
#' \itemize{
#' \item{1.} {Return a new SparkDataFrame partitioned by
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
#' the given columns into \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#' @param x a SparkDataFrame.
Expand All @@ -697,6 +736,7 @@ setMethod("storageLevel",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
#' @seealso \link{coalesce}
#' @export
#' @examples
#'\dontrun{
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
signature(x = "RDD"),
function(x, numPartitions) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
coalesce(x, numPartitions, TRUE)
coalesceRDD(x, numPartitions, TRUE)
} else {
stop("Please, specify the number of partitions")
}
Expand All @@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
#' @rdname coalesce
#' @aliases coalesce,RDD
#' @noRd
setMethod("coalesce",
setMethod("coalesceRDD",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
Expand Down
26 changes: 24 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,28 @@ setMethod("ceil",
column(jc)
})

#' Returns the first column that is not NA
#'
#' Returns the first column that is not NA, or NA if all inputs are.
#'
#' @rdname coalesce
#' @name coalesce
#' @family normal_funcs
#' @export
#' @aliases coalesce,Column-method
#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
#' @note coalesce(Column) since 2.1.1
setMethod("coalesce",
signature(x = "Column"),
function(x, ...) {
jcols <- lapply(list(x, ...), function (x) {
stopifnot(class(x) == "Column")
x@jc
})
jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
column(jc)
})

#' Though scala functions has "col" function, we don't expose it in SparkR
#' because we don't want to conflict with the "col" function in the R base
#' package and we also have "column" function exported which is an alias of "col".
Expand All @@ -297,15 +319,15 @@ col <- function(x) {
#' Returns a Column based on the given column name
#'
#' Returns a Column based on the given column name.
#
#'
#' @param x Character column name.
#'
#' @rdname column
#' @name column
#' @family normal_funcs
#' @export
#' @aliases column,character-method
#' @examples \dontrun{column(df)}
#' @examples \dontrun{column("name")}
#' @note column since 1.6.0
setMethod("column",
signature(x = "character"),
Expand Down
9 changes: 8 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
# @rdname coalesce
# @seealso repartition
# @export
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })

# @rdname checkpoint-methods
# @export
Expand Down Expand Up @@ -406,6 +406,13 @@ setGeneric("attach")
#' @export
setGeneric("cache", function(x) { standardGeneric("cache") })

#' @rdname coalesce
#' @param x a Column or a SparkDataFrame.
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
#' provided.
#' @export
setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })

#' @rdname collect
#' @export
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
expect_true(count >= 0 && count <= 4)

# coalesce
r3 <- coalesce(rdd, 1)
r3 <- coalesceRDD(rdd, 1)
expect_equal(getNumPartitionsRDD(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
Expand Down
32 changes: 27 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
df <- read.json(jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)

expect_is(objectIn, "RDD")
Expand Down Expand Up @@ -1236,7 +1236,7 @@ test_that("column functions", {
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
c19 <- spark_partition_id()
c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")

# Test if base::is.nan() is exposed
Expand Down Expand Up @@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", {
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)

# repartition by column and number of partitions
actual <- repartition(df, 3L, col = df$"a")
actual <- repartition(df, 3, col = df$"a")

# since we cannot access the number of partitions from dataframe, checking
# that at least the dimensions are identical
# Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 3L)

# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 13L)

expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)

# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
Expand All @@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})

test_that("coalesce, repartition, numPartitions", {
df <- as.DataFrame(cars, numPartitions = 5)
expect_equal(getNumPartitions(df), 5)
expect_equal(getNumPartitions(coalesce(df, 3)), 3)
expect_equal(getNumPartitions(coalesce(df, 6)), 5)

df1 <- coalesce(df, 3)
expect_equal(getNumPartitions(df1), 3)
expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
expect_equal(getNumPartitions(coalesce(df1, 2)), 2)

df2 <- repartition(df1, 10)
expect_equal(getNumPartitions(df2), 10)
expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
})

test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ abstract class RDD[T: ClassTag](
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
Expand Down
10 changes: 9 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,15 @@ def coalesce(self, numPartitions):
Similar to coalesce defined on an :class:`RDD`, this operation results in a
narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
there will not be a shuffle, instead each of the 100 new partitions will
claim 10 of the current partitions.
claim 10 of the current partitions. If a larger number of partitions is requested,
it will stay at the current number of partitions.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can call repartition(). This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).

>>> df.coalesce(1).rdd.getNumPartitions()
1
Expand Down
10 changes: 9 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2432,7 +2432,15 @@ class Dataset[T] private[sql](
* Returns a new Dataset that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* the 100 new partitions will claim 10 of the current partitions. If a larger number of
* partitions is requested, it will stay at the current number of partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we seem to have left out the warning from RDD about darastic coaleces in the Dataset coalesce. Since we are updating the docstrings now anyways would it maybe make sense to include that warning here as well? (Looking at the implementation of CoalesceExec it seems like it would still apply unless I'm missing something).

*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can call repartition. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @group typedrel
* @since 1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,15 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions
* is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you see ShuffleExchange. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
Expand Down