Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
rename RDD functions
  • Loading branch information
felixcheung committed Aug 12, 2016
commit fd53bcba1ada906273b38e0d081e9329341cd09a
44 changes: 22 additions & 22 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
.Object
})

setMethod("show", "RDD",
setMethod("showRDD", "RDD",
function(object) {
cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep = ""))
})
Expand Down Expand Up @@ -215,7 +215,7 @@ setValidity("RDD",
#' @rdname cache-methods
#' @aliases cache,RDD-method
#' @noRd
setMethod("cache",
setMethod("cacheRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "cache")
Expand All @@ -235,12 +235,12 @@ setMethod("cache",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' persist(rdd, "MEMORY_AND_DISK")
#' persistRDD(rdd, "MEMORY_AND_DISK")
#'}
#' @rdname persist
#' @aliases persist,RDD-method
#' @noRd
setMethod("persist",
setMethod("persistRDD",
signature(x = "RDD", newLevel = "character"),
function(x, newLevel = "MEMORY_ONLY") {
callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
Expand All @@ -259,12 +259,12 @@ setMethod("persist",
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' cache(rdd) # rdd@@env$isCached == TRUE
#' unpersist(rdd) # rdd@@env$isCached == FALSE
#' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
#'}
#' @rdname unpersist-methods
#' @aliases unpersist,RDD-method
#' @noRd
setMethod("unpersist",
setMethod("unpersistRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "unpersist")
Expand Down Expand Up @@ -351,7 +351,7 @@ setMethod("numPartitions",
#' @rdname collect-methods
#' @aliases collect,RDD-method
#' @noRd
setMethod("collect",
setMethod("collectRDD",
signature(x = "RDD"),
function(x, flatten = TRUE) {
# Assumes a pairwise RDD is backed by a JavaPairRDD.
Expand Down Expand Up @@ -411,13 +411,13 @@ setMethod("collectAsMap",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' count(rdd) # 10
#' countRDD(rdd) # 10
#' length(rdd) # Same as count
#'}
#' @rdname count
#' @aliases count,RDD-method
#' @noRd
setMethod("count",
setMethod("countRDD",
signature(x = "RDD"),
function(x) {
countPartition <- function(part) {
Expand All @@ -431,10 +431,10 @@ setMethod("count",
#' Return the number of elements in the RDD
#' @rdname count
#' @noRd
setMethod("length",
setMethod("lengthRDD",
signature(x = "RDD"),
function(x) {
count(x)
countRDD(x)
})

#' Return the count of each unique value in this RDD as a list of
Expand Down Expand Up @@ -768,13 +768,13 @@ setMethod("foreachPartition",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' take(rdd, 2L) # list(1, 2)
#' takeRDD(rdd, 2L) # list(1, 2)
#'}
# nolint end
#' @rdname take
#' @aliases take,RDD,numeric-method
#' @noRd
setMethod("take",
setMethod("takeRDD",
signature(x = "RDD", num = "numeric"),
function(x, num) {
resList <- list()
Expand Down Expand Up @@ -817,13 +817,13 @@ setMethod("take",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' first(rdd)
#' firstRDD(rdd)
#' }
#' @noRd
setMethod("first",
setMethod("firstRDD",
signature(x = "RDD"),
function(x) {
take(x, 1)[[1]]
takeRDD(x, 1)[[1]]
})

#' Removes the duplicates from RDD.
Expand All @@ -838,13 +838,13 @@ setMethod("first",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
#' sort(unlist(collect(distinctRDD(rdd)))) # c(1, 2, 3)
#'}
# nolint end
#' @rdname distinct
#' @aliases distinct,RDD-method
#' @noRd
setMethod("distinct",
setMethod("distinctRDD",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
Expand Down Expand Up @@ -942,7 +942,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
fraction <- 0.0
total <- 0
multiplier <- 3.0
initialCount <- count(x)
initialCount <- countRDD(x)
maxSelected <- 0
MAXINT <- .Machine$integer.max

Expand Down Expand Up @@ -1019,12 +1019,12 @@ setMethod("keyBy",
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
#' getNumPartitions(rdd) # 4
#' getNumPartitions(repartition(rdd, 2L)) # 2
#' getNumPartitions(repartitionRDD(rdd, 2L)) # 2
#'}
#' @rdname repartition
#' @aliases repartition,RDD
#' @noRd
setMethod("repartition",
setMethod("repartitionRDD",
signature(x = "RDD"),
function(x, numPartitions) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
Expand Down Expand Up @@ -1064,7 +1064,7 @@ setMethod("coalesce",
})
}
shuffled <- lapplyPartitionsWithIndex(x, func)
repartitioned <- partitionBy(shuffled, numPartitions)
repartitioned <- partitionByRDD(shuffled, numPartitions)
values(repartitioned)
} else {
jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
}

if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
row <- first(rdd)
row <- firstRDD(rdd)
names <- if (is.null(schema)) {
names(row)
} else {
Expand Down
18 changes: 9 additions & 9 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ setMethod("flatMapValues",
#' sc <- sparkR.init()
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
#' rdd <- parallelize(sc, pairs)
#' parts <- partitionBy(rdd, 2L)
#' parts <- partitionByRDD(rdd, 2L)
#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
#'}
#' @rdname partitionBy
#' @aliases partitionBy,RDD,integer-method
#' @noRd
setMethod("partitionBy",
setMethod("partitionByRDD",
signature(x = "RDD"),
function(x, numPartitions, partitionFunc = hashCode) {
stopifnot(is.numeric(numPartitions))
Expand Down Expand Up @@ -270,7 +270,7 @@ setMethod("partitionBy",
setMethod("groupByKey",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
shuffled <- partitionBy(x, numPartitions)
shuffled <- partitionByRDD(x, numPartitions)
groupVals <- function(part) {
vals <- new.env()
keys <- new.env()
Expand Down Expand Up @@ -342,7 +342,7 @@ setMethod("reduceByKey",
convertEnvsToList(keys, vals)
}
locallyReduced <- lapplyPartition(x, reduceVals)
shuffled <- partitionBy(locallyReduced, numToInt(numPartitions))
shuffled <- partitionByRDD(locallyReduced, numToInt(numPartitions))
lapplyPartition(shuffled, reduceVals)
})

Expand Down Expand Up @@ -453,7 +453,7 @@ setMethod("combineByKey",
convertEnvsToList(keys, combiners)
}
locallyCombined <- lapplyPartition(x, combineLocally)
shuffled <- partitionBy(locallyCombined, numToInt(numPartitions))
shuffled <- partitionByRDD(locallyCombined, numToInt(numPartitions))
mergeAfterShuffle <- function(part) {
combiners <- new.env()
keys <- new.env()
Expand Down Expand Up @@ -563,13 +563,13 @@ setMethod("foldByKey",
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
#' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
#' joinRDD(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
#'}
# nolint end
#' @rdname join-methods
#' @aliases join,RDD,RDD-method
#' @noRd
setMethod("join",
setMethod("joinRDD",
signature(x = "RDD", y = "RDD"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
Expand Down Expand Up @@ -784,7 +784,7 @@ setMethod("sortByKey",
rangeBounds <- list()

if (numPartitions > 1) {
rddSize <- count(x)
rddSize <- countRDD(x)
# constant from Spark's RangePartitioner
maxSampleSize <- numPartitions * 20
fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)
Expand Down Expand Up @@ -822,7 +822,7 @@ setMethod("sortByKey",
sortKeyValueList(part, decreasing = !ascending)
}

newRDD <- partitionBy(x, numPartitions, rangePartitionFunc)
newRDD <- partitionByRDD(x, numPartitions, rangePartitionFunc)
lapplyPartition(newRDD, partitionFunc)
})

Expand Down
8 changes: 4 additions & 4 deletions R/pkg/inst/tests/testthat/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
rdd <- textFile(sc, fileName1, 1)
saveAsObjectFile(rdd, fileName2)
rdd <- objectFile(sc, fileName2)
expect_equal(collect(rdd), as.list(mockFile))
expect_equal(collectRDD(rdd), as.list(mockFile))

unlink(fileName1)
unlink(fileName2, recursive = TRUE)
Expand All @@ -44,7 +44,7 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
rdd <- parallelize(sc, l, 1)
saveAsObjectFile(rdd, fileName)
rdd <- objectFile(sc, fileName)
expect_equal(collect(rdd), l)
expect_equal(collectRDD(rdd), l)

unlink(fileName, recursive = TRUE)
})
Expand All @@ -64,7 +64,7 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
saveAsObjectFile(counts, fileName2)
counts <- objectFile(sc, fileName2)

output <- collect(counts)
output <- collectRDD(counts)
expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
list("is", 2))
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
Expand All @@ -83,7 +83,7 @@ test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
saveAsObjectFile(rdd2, fileName2)

rdd <- objectFile(sc, c(fileName1, fileName2))
expect_equal(count(rdd), 2)
expect_equal(countRDD(rdd), 2)

unlink(fileName1, recursive = TRUE)
unlink(fileName2, recursive = TRUE)
Expand Down
18 changes: 9 additions & 9 deletions R/pkg/inst/tests/testthat/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ rdd <- parallelize(sc, nums, 2L)
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("union on two RDDs", {
actual <- collect(unionRDD(rdd, rdd))
actual <- collectRDD(unionRDD(rdd, rdd))
expect_equal(actual, as.list(rep(nums, 2)))

fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName)

text.rdd <- textFile(sc, fileName)
union.rdd <- unionRDD(rdd, text.rdd)
actual <- collect(union.rdd)
actual <- collectRDD(union.rdd)
expect_equal(actual, c(as.list(nums), mockFile))
expect_equal(getSerializedMode(union.rdd), "byte")

rdd <- map(text.rdd, function(x) {x})
union.rdd <- unionRDD(rdd, text.rdd)
actual <- collect(union.rdd)
actual <- collectRDD(union.rdd)
expect_equal(actual, as.list(c(mockFile, mockFile)))
expect_equal(getSerializedMode(union.rdd), "byte")

Expand All @@ -54,14 +54,14 @@ test_that("cogroup on two RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
actual <- collect(cogroup.rdd)
actual <- collectRDD(cogroup.rdd)
expect_equal(actual,
list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))

rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
actual <- collect(cogroup.rdd)
actual <- collectRDD(cogroup.rdd)

expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
expect_equal(sortKeyValueList(actual),
Expand All @@ -72,7 +72,7 @@ test_that("zipPartitions() on RDDs", {
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
actual <- collectRDD(zipPartitions(rdd1, rdd2, rdd3,
func = function(x, y, z) { list(list(x, y, z))} ))
expect_equal(actual,
list(list(1, c(1, 2), c(1, 2, 3)), list(2, c(3, 4), c(4, 5, 6))))
Expand All @@ -82,19 +82,19 @@ test_that("zipPartitions() on RDDs", {
writeLines(mockFile, fileName)

rdd <- textFile(sc, fileName, 1)
actual <- collect(zipPartitions(rdd, rdd,
actual <- collectRDD(zipPartitions(rdd, rdd,
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
expected <- list(paste(mockFile, mockFile, sep = "\n"))
expect_equal(actual, expected)

rdd1 <- parallelize(sc, 0:1, 1)
actual <- collect(zipPartitions(rdd1, rdd,
actual <- collectRDD(zipPartitions(rdd1, rdd,
func = function(x, y) { list(x + nchar(y)) }))
expected <- list(0:1 + nchar(mockFile))
expect_equal(actual, expected)

rdd <- map(rdd, function(x) { x })
actual <- collect(zipPartitions(rdd, rdd1,
actual <- collectRDD(zipPartitions(rdd, rdd1,
func = function(x, y) { list(y + nchar(x)) }))
expect_equal(actual, expected)

Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_broadcast.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test_that("using broadcast variable", {
useBroadcast <- function(x) {
sum(SparkR:::value(randomMatBr) * x)
}
actual <- collect(lapply(rrdd, useBroadcast))
actual <- collectRDD(lapply(rrdd, useBroadcast))
expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
expect_equal(actual, expected)
})
Expand All @@ -43,7 +43,7 @@ test_that("without using broadcast variable", {
useBroadcast <- function(x) {
sum(randomMat * x)
}
actual <- collect(lapply(rrdd, useBroadcast))
actual <- collectRDD(lapply(rrdd, useBroadcast))
expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
expect_equal(actual, expected)
})
Expand Down
Loading