Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Prev Previous commit
Merge branch 'master' into issue/no-spark-repl-fork
  • Loading branch information
dragos committed Jul 10, 2015
commit c596c6f26c06eb6f9a8533d37f1333f640f56a88
6 changes: 6 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,10 @@ local-1430917381535_2
DESCRIPTION
NAMESPACE
test_support/*
.*Rd
help/*
html/*
INDEX
.lintr
gen-java.*
.*avpr
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,6 @@ The following components are provided under the MIT License. See project link fo
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.5 - http://www.slf4j.org)
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(The MIT License) Mockito (org.mockito:mockito-core:1.9.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
2 changes: 1 addition & 1 deletion R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ SparkR is an R package that provides a light-weight frontend to use Spark from R

#### Build Spark

Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-PsparkR` profile to build the R package. For example to use the default Hadoop versions you can run
Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
```
build/mvn -DskipTests -Psparkr package
```
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ setMethod("isLocal",
#'}
setMethod("showDF",
signature(x = "DataFrame"),
function(x, numRows = 20) {
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
function(x, numRows = 20, truncate = TRUE) {
s <- callJMethod(x@sdf, "showString", numToInt(numRows), truncate)
cat(s)
})

Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ infer_type <- function(x) {
createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
if (is.data.frame(data)) {
# get the names of columns, they will be put into RDD
schema <- names(data)
if (is.null(schema)) {
schema <- names(data)
}
n <- nrow(data)
m <- ncol(data)
# get rid of factor type
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
}

launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
sparkSubmitBin <- determineSparkSubmitBin()
sparkSubmitBinName <- determineSparkSubmitBin()
if (sparkHome != "") {
sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
} else {
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
# @rdname aggregateRDD
# @seealso reduce
# @export
setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
setGeneric("aggregateRDD",
function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })

# @rdname cache-methods
# @export
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ setMethod("join",
# Left outer join two RDDs
#
# @description
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -597,8 +597,8 @@ setMethod("leftOuterJoin",
# Right outer join two RDDs
#
# @description
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -634,8 +634,8 @@ setMethod("rightOuterJoin",
# Full outer join two RDDs
#
# @description
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down
13 changes: 8 additions & 5 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ sparkR.init <- function(
sparkPackages = "") {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
cat(paste("Re-using existing Spark Context.",
"Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n"))
return(get(".sparkRjsc", envir = .sparkREnv))
}

sparkMem <- Sys.getenv("SPARK_MEM", "512m")
sparkMem <- Sys.getenv("SPARK_MEM", "1024m")
jars <- suppressWarnings(normalizePath(as.character(sparkJars)))

# Classpath separator is ";" on Windows
Expand All @@ -132,7 +133,7 @@ sparkR.init <- function(
sparkHome = sparkHome,
jars = jars,
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
sparkPackages = sparkPackages)
packages = sparkPackages)
# wait atmost 100 seconds for JVM to launch
wait <- 0.1
for (i in 1:25) {
Expand Down Expand Up @@ -180,14 +181,16 @@ sparkR.init <- function(

sparkExecutorEnvMap <- new.env()
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
}
for (varname in names(sparkExecutorEnv)) {
sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
}

nonEmptyJars <- Filter(function(x) { x != "" }, jars)
localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
localJarPaths <- sapply(nonEmptyJars,
function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })

# Set the start time to identify jobjs
# Seconds resolution is good enough for this purpose, so use ints
Expand Down
31 changes: 18 additions & 13 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,21 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"MEMORY_ONLY_SER_2",
"OFF_HEAP")) {
match.arg(newLevel)
storageLevelClass <- "org.apache.spark.storage.StorageLevel"
storageLevel <- switch(newLevel,
"DISK_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP"))
"DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}

# Utility function for functions where an argument needs to be integer but we want to allow
Expand Down Expand Up @@ -545,9 +548,11 @@ mergePartitions <- function(rdd, zip) {
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
# For zip operation, check if corresponding partitions
# of both RDDs have the same number of elements.
if (zip && lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
stop(paste("Can only zip RDDs with same number of elements",
"in each pair of corresponding partitions."))
}

if (lengthOfKeys > 1) {
Expand Down
Binary file added R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar
Binary file not shown.
32 changes: 32 additions & 0 deletions R/pkg/inst/tests/jarTest.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(SparkR)

sc <- sparkR.init()

helloTest <- SparkR:::callJStatic("sparkR.test.hello",
"helloWorld",
"Dave")

basicFunction <- SparkR:::callJStatic("sparkR.test.basicFunction",
"addStuff",
2L,
2L)

sparkR.stop()
output <- c(helloTest, basicFunction)
writeLines(output)
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
saveAsObjectFile(rdd2, fileName2)

rdd <- objectFile(sc, c(fileName1, fileName2))
expect_true(count(rdd) == 2)
expect_equal(count(rdd), 2)

unlink(fileName1, recursive = TRUE)
unlink(fileName2, recursive = TRUE)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ test_that("union on two RDDs", {
union.rdd <- unionRDD(rdd, text.rdd)
actual <- collect(union.rdd)
expect_equal(actual, c(as.list(nums), mockFile))
expect_true(getSerializedMode(union.rdd) == "byte")
expect_equal(getSerializedMode(union.rdd), "byte")

rdd<- map(text.rdd, function(x) {x})
union.rdd <- unionRDD(rdd, text.rdd)
actual <- collect(union.rdd)
expect_equal(actual, as.list(c(mockFile, mockFile)))
expect_true(getSerializedMode(union.rdd) == "byte")
expect_equal(getSerializedMode(union.rdd), "byte")

unlink(fileName)
})
Expand Down
37 changes: 37 additions & 0 deletions R/pkg/inst/tests/test_includeJAR.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
context("include an external JAR in SparkContext")

runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
args = c(jarPath, scriptPath),
stdout = TRUE)
tail(res, 2)
}

test_that("sparkJars tag in SparkContext", {
testOutput <- runScript()
helloTest <- testOutput[1]
expect_equal(helloTest, "Hello, Dave")
basicFunction <- testOutput[2]
expect_equal(basicFunction, "4")
})
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_parallelize_collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ test_that("parallelize() on simple vectors and lists returns an RDD", {
strListRDD2)

for (rdd in rdds) {
expect_true(inherits(rdd, "RDD"))
expect_is(rdd, "RDD")
expect_true(.hasSlot(rdd, "jrdd")
&& inherits(rdd@jrdd, "jobj")
&& isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
Expand Down
16 changes: 10 additions & 6 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ test_that("get number of partitions in RDD", {
})

test_that("first on RDD", {
expect_true(first(rdd) == 1)
expect_equal(first(rdd), 1)
newrdd <- lapply(rdd, function(x) x + 1)
expect_true(first(newrdd) == 2)
expect_equal(first(newrdd), 2)
})

test_that("count and length on RDD", {
Expand Down Expand Up @@ -669,27 +669,31 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
list(3, list(NULL, 3)), list(4, list(NULL, 4)))))

rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})

test_that("sortByKey() on pairwise RDDs", {
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.