Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/.lintr
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
linters: with_defaults(line_length_linter(100), camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE), commented_code_linter = NULL)
linters: with_defaults(line_length_linter(100), camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE))
exclusions: list("inst/profile/general.R" = 1, "inst/profile/shell.R")
40 changes: 38 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
}
# Save the serialization flag after we create a RRDD
rdd@env$serializedMode <- serializedMode
rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") # rddRef$asJavaRDD()
rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD")
rdd@env$jrdd_val
})

Expand Down Expand Up @@ -225,7 +225,7 @@ setMethod("cache",
#'
#' Persist this RDD with the specified storage level. For details of the
#' supported storage levels, refer to
#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
#'\url{http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence}.
#'
#' @param x The RDD to persist
#' @param newLevel The new storage level to be assigned
Expand Down Expand Up @@ -382,11 +382,13 @@ setMethod("collectPartition",
#' \code{collectAsMap} returns a named list as a map that contains all of the elements
#' in a key-value pair RDD.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
#'}
# nolint end
#' @rdname collect-methods
#' @aliases collectAsMap,RDD-method
#' @noRd
Expand Down Expand Up @@ -442,11 +444,13 @@ setMethod("length",
#' @return list of (value, count) pairs, where count is number of each unique
#' value in rdd.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, c(1,2,3,2,1))
#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
#'}
# nolint end
#' @rdname countByValue
#' @aliases countByValue,RDD-method
#' @noRd
Expand Down Expand Up @@ -597,11 +601,13 @@ setMethod("mapPartitionsWithIndex",
#' @param x The RDD to be filtered.
#' @param f A unary predicate function.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
#'}
# nolint end
#' @rdname filterRDD
#' @aliases filterRDD,RDD,function-method
#' @noRd
Expand Down Expand Up @@ -756,11 +762,13 @@ setMethod("foreachPartition",
#' @param x The RDD to take elements from
#' @param num Number of elements to take
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' take(rdd, 2L) # list(1, 2)
#'}
# nolint end
#' @rdname take
#' @aliases take,RDD,numeric-method
#' @noRd
Expand Down Expand Up @@ -824,11 +832,13 @@ setMethod("first",
#' @param x The RDD to remove duplicates from.
#' @param numPartitions Number of partitions to create.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
#'}
# nolint end
#' @rdname distinct
#' @aliases distinct,RDD-method
#' @noRd
Expand Down Expand Up @@ -974,11 +984,13 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
#' @param x The RDD.
#' @param func The function to be applied.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3))
#' collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
#'}
# nolint end
#' @rdname keyBy
#' @aliases keyBy,RDD
#' @noRd
Expand Down Expand Up @@ -1113,11 +1125,13 @@ setMethod("saveAsTextFile",
#' @param numPartitions Number of partitions to create.
#' @return An RDD where all elements are sorted.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(3, 2, 1))
#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
#'}
# nolint end
#' @rdname sortBy
#' @aliases sortBy,RDD,RDD-method
#' @noRd
Expand Down Expand Up @@ -1188,11 +1202,13 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
#' @param num Number of elements to return.
#' @return The first N elements from the RDD in ascending order.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
#'}
# nolint end
#' @rdname takeOrdered
#' @aliases takeOrdered,RDD,RDD-method
#' @noRd
Expand All @@ -1209,11 +1225,13 @@ setMethod("takeOrdered",
#' @return The top N elements from the RDD.
#' @rdname top
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
#'}
# nolint end
#' @aliases top,RDD,RDD-method
#' @noRd
setMethod("top",
Expand Down Expand Up @@ -1261,6 +1279,7 @@ setMethod("fold",
#' @rdname aggregateRDD
#' @seealso reduce
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4))
Expand All @@ -1269,6 +1288,7 @@ setMethod("fold",
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
#'}
# nolint end
#' @aliases aggregateRDD,RDD,RDD-method
#' @noRd
setMethod("aggregateRDD",
Expand Down Expand Up @@ -1367,12 +1387,14 @@ setMethod("setName",
#' @return An RDD with zipped items.
#' @seealso zipWithIndex
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
#' collect(zipWithUniqueId(rdd))
#' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
#'}
# nolint end
#' @rdname zipWithUniqueId
#' @aliases zipWithUniqueId,RDD
#' @noRd
Expand Down Expand Up @@ -1408,12 +1430,14 @@ setMethod("zipWithUniqueId",
#' @return An RDD with zipped items.
#' @seealso zipWithUniqueId
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
#' collect(zipWithIndex(rdd))
#' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
#'}
# nolint end
#' @rdname zipWithIndex
#' @aliases zipWithIndex,RDD
#' @noRd
Expand Down Expand Up @@ -1454,12 +1478,14 @@ setMethod("zipWithIndex",
#' @return An RDD created by coalescing all elements within
#' each partition into a list.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, as.list(1:4), 2L)
#' collect(glom(rdd))
#' # list(list(1, 2), list(3, 4))
#'}
# nolint end
#' @rdname glom
#' @aliases glom,RDD
#' @noRd
Expand Down Expand Up @@ -1519,13 +1545,15 @@ setMethod("unionRDD",
#' @param other Another RDD to be zipped.
#' @return An RDD zipped from the two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 0:4)
#' rdd2 <- parallelize(sc, 1000:1004)
#' collect(zipRDD(rdd1, rdd2))
#' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
#'}
# nolint end
#' @rdname zipRDD
#' @aliases zipRDD,RDD
#' @noRd
Expand Down Expand Up @@ -1557,12 +1585,14 @@ setMethod("zipRDD",
#' @param other An RDD.
#' @return A new RDD which is the Cartesian product of these two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2)
#' sortByKey(cartesian(rdd, rdd))
#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
#'}
# nolint end
#' @rdname cartesian
#' @aliases cartesian,RDD,RDD-method
#' @noRd
Expand All @@ -1587,13 +1617,15 @@ setMethod("cartesian",
#' @param numPartitions Number of the partitions in the result RDD.
#' @return An RDD with the elements from this that are not in other.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
#' rdd2 <- parallelize(sc, list(2, 4))
#' collect(subtract(rdd1, rdd2))
#' # list(1, 1, 3)
#'}
# nolint end
#' @rdname subtract
#' @aliases subtract,RDD
#' @noRd
Expand All @@ -1619,13 +1651,15 @@ setMethod("subtract",
#' @param numPartitions The number of partitions in the result RDD.
#' @return An RDD which is the intersection of these two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
#' # list(1, 2, 3)
#'}
# nolint end
#' @rdname intersection
#' @aliases intersection,RDD
#' @noRd
Expand Down Expand Up @@ -1653,6 +1687,7 @@ setMethod("intersection",
#' Assumes that all the RDDs have the *same number of partitions*, but
#' does *not* require them to have the same number of elements in each partition.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
Expand All @@ -1662,6 +1697,7 @@ setMethod("intersection",
#' func = function(x, y, z) { list(list(x, y, z))} ))
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
#'}
# nolint end
#' @rdname zipRDD
#' @aliases zipPartitions,RDD
#' @noRd
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# Utility functions to deserialize objects from Java.

# nolint start
# Type mapping from Java to R
#
# void -> NULL
Expand All @@ -32,6 +33,8 @@
#
# Array[T] -> list()
# Object -> jobj
#
# nolint end

readObject <- function(con) {
# Read type first
Expand Down
Loading