Skip to content

Commit 1e95df3

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 52bdf48 + bc1babd commit 1e95df3

File tree

168 files changed

+2536
-1645
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

168 files changed

+2536
-1645
lines changed

R/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
# R on Spark
22

33
SparkR is an R package that provides a light-weight frontend to use Spark from R.
4+
### Installing sparkR
5+
6+
Libraries of sparkR need to be created in `$SPARK_HOME/R/lib`. This can be done by running the script `$SPARK_HOME/R/install-dev.sh`.
7+
By default the above script uses the system wide installation of R. However, this can be changed to any user installed location of R by setting the environment variable `R_HOME` the full path of the base directory where R is installed, before running install-dev.sh script.
8+
Example:
9+
```
10+
# where /home/username/R is where R is installed and /home/username/R/bin contains the files R and RScript
11+
export R_HOME=/home/username/R
12+
./install-dev.sh
13+
```
414

515
### SparkR development
616

R/install-dev.sh

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,19 @@ LIB_DIR="$FWDIR/lib"
3535
mkdir -p $LIB_DIR
3636

3737
pushd $FWDIR > /dev/null
38+
if [ ! -z "$R_HOME" ]
39+
then
40+
R_SCRIPT_PATH="$R_HOME/bin"
41+
else
42+
R_SCRIPT_PATH="$(dirname $(which R))"
43+
fi
44+
echo "USING R_HOME = $R_HOME"
3845

3946
# Generate Rd files if devtools is installed
40-
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'
47+
"$R_SCRIPT_PATH/"Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'
4148

4249
# Install SparkR to $LIB_DIR
43-
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
50+
"$R_SCRIPT_PATH/"R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
4451

4552
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
4653
cd $LIB_DIR

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ exportMethods("arrange",
3939
"describe",
4040
"dim",
4141
"distinct",
42+
"drop",
4243
"dropDuplicates",
4344
"dropna",
4445
"dtypes",

R/pkg/R/DataFrame.R

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,23 +1192,10 @@ setMethod("$", signature(x = "DataFrame"),
11921192
setMethod("$<-", signature(x = "DataFrame"),
11931193
function(x, name, value) {
11941194
stopifnot(class(value) == "Column" || is.null(value))
1195-
cols <- columns(x)
1196-
if (name %in% cols) {
1197-
if (is.null(value)) {
1198-
cols <- Filter(function(c) { c != name }, cols)
1199-
}
1200-
cols <- lapply(cols, function(c) {
1201-
if (c == name) {
1202-
alias(value, name)
1203-
} else {
1204-
col(c)
1205-
}
1206-
})
1207-
nx <- select(x, cols)
1195+
1196+
if (is.null(value)) {
1197+
nx <- drop(x, name)
12081198
} else {
1209-
if (is.null(value)) {
1210-
return(x)
1211-
}
12121199
nx <- withColumn(x, name, value)
12131200
}
12141201
x@sdf <- nx@sdf
@@ -1386,12 +1373,13 @@ setMethod("selectExpr",
13861373

13871374
#' WithColumn
13881375
#'
1389-
#' Return a new DataFrame with the specified column added.
1376+
#' Return a new DataFrame by adding a column or replacing the existing column
1377+
#' that has the same name.
13901378
#'
13911379
#' @param x A DataFrame
1392-
#' @param colName A string containing the name of the new column.
1380+
#' @param colName A column name.
13931381
#' @param col A Column expression.
1394-
#' @return A DataFrame with the new column added.
1382+
#' @return A DataFrame with the new column added or the existing column replaced.
13951383
#' @family DataFrame functions
13961384
#' @rdname withColumn
13971385
#' @name withColumn
@@ -1404,12 +1392,16 @@ setMethod("selectExpr",
14041392
#' path <- "path/to/file.json"
14051393
#' df <- read.json(sqlContext, path)
14061394
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
1395+
#' # Replace an existing column
1396+
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
14071397
#' }
14081398
setMethod("withColumn",
14091399
signature(x = "DataFrame", colName = "character", col = "Column"),
14101400
function(x, colName, col) {
1411-
select(x, x$"*", alias(col, colName))
1401+
sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc)
1402+
dataFrame(sdf)
14121403
})
1404+
14131405
#' Mutate
14141406
#'
14151407
#' Return a new DataFrame with the specified columns added.
@@ -2005,7 +1997,13 @@ setMethod("write.df",
20051997
signature(df = "DataFrame", path = "character"),
20061998
function(df, path, source = NULL, mode = "error", ...){
20071999
if (is.null(source)) {
2008-
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
2000+
if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
2001+
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
2002+
} else if (exists(".sparkRHivesc", envir = .sparkREnv)) {
2003+
sqlContext <- get(".sparkRHivesc", envir = .sparkREnv)
2004+
} else {
2005+
stop("sparkRHive or sparkRSQL context has to be specified")
2006+
}
20092007
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
20102008
"org.apache.spark.sql.parquet")
20112009
}
@@ -2063,13 +2061,18 @@ setMethod("saveDF",
20632061
#' saveAsTable(df, "myfile")
20642062
#' }
20652063
setMethod("saveAsTable",
2066-
signature(df = "DataFrame", tableName = "character", source = "character",
2067-
mode = "character"),
2064+
signature(df = "DataFrame", tableName = "character"),
20682065
function(df, tableName, source = NULL, mode="error", ...){
20692066
if (is.null(source)) {
2070-
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
2071-
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
2072-
"org.apache.spark.sql.parquet")
2067+
if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
2068+
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
2069+
} else if (exists(".sparkRHivesc", envir = .sparkREnv)) {
2070+
sqlContext <- get(".sparkRHivesc", envir = .sparkREnv)
2071+
} else {
2072+
stop("sparkRHive or sparkRSQL context has to be specified")
2073+
}
2074+
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
2075+
"org.apache.spark.sql.parquet")
20732076
}
20742077
jmode <- convertToJSaveMode(mode)
20752078
options <- varargsToEnv(...)
@@ -2401,4 +2404,47 @@ setMethod("str",
24012404
cat(paste0("\nDisplaying first ", ncol(localDF), " columns only."))
24022405
}
24032406
}
2404-
})
2407+
})
2408+
2409+
#' drop
2410+
#'
2411+
#' Returns a new DataFrame with columns dropped.
2412+
#' This is a no-op if schema doesn't contain column name(s).
2413+
#'
2414+
#' @param x A SparkSQL DataFrame.
2415+
#' @param cols A character vector of column names or a Column.
2416+
#' @return A DataFrame
2417+
#'
2418+
#' @family DataFrame functions
2419+
#' @rdname drop
2420+
#' @name drop
2421+
#' @export
2422+
#' @examples
2423+
#'\dontrun{
2424+
#' sc <- sparkR.init()
2425+
#' sqlCtx <- sparkRSQL.init(sc)
2426+
#' path <- "path/to/file.json"
2427+
#' df <- read.json(sqlCtx, path)
2428+
#' drop(df, "col1")
2429+
#' drop(df, c("col1", "col2"))
2430+
#' drop(df, df$col1)
2431+
#' }
2432+
setMethod("drop",
2433+
signature(x = "DataFrame"),
2434+
function(x, col) {
2435+
stopifnot(class(col) == "character" || class(col) == "Column")
2436+
2437+
if (class(col) == "Column") {
2438+
sdf <- callJMethod(x@sdf, "drop", col@jc)
2439+
} else {
2440+
sdf <- callJMethod(x@sdf, "drop", as.list(col))
2441+
}
2442+
dataFrame(sdf)
2443+
})
2444+
2445+
# Expose base::drop
2446+
setMethod("drop",
2447+
signature(x = "ANY"),
2448+
function(x) {
2449+
base::drop(x)
2450+
})

R/pkg/R/generics.R

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,10 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") })
428428
#' @export
429429
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
430430

431+
#' @rdname drop
432+
#' @export
433+
setGeneric("drop", function(x, ...) { standardGeneric("drop") })
434+
431435
#' @rdname dropduplicates
432436
#' @export
433437
setGeneric("dropDuplicates",
@@ -535,7 +539,7 @@ setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("samp
535539

536540
#' @rdname saveAsTable
537541
#' @export
538-
setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
542+
setGeneric("saveAsTable", function(df, tableName, source = NULL, mode = "error", ...) {
539543
standardGeneric("saveAsTable")
540544
})
541545

@@ -548,7 +552,15 @@ setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") })
548552

549553
#' @rdname write.df
550554
#' @export
551-
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })
555+
setGeneric("write.df", function(df, path, source = NULL, mode = "error", ...) {
556+
standardGeneric("write.df")
557+
})
558+
559+
#' @rdname write.df
560+
#' @export
561+
setGeneric("saveDF", function(df, path, source = NULL, mode = "error", ...) {
562+
standardGeneric("saveDF")
563+
})
552564

553565
#' @rdname write.json
554566
#' @export

R/pkg/inst/tests/testthat/test_context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ test_that("Check masked functions", {
2626
maskedBySparkR <- masked[funcSparkROrEmpty]
2727
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
2828
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
29-
"summary", "transform")
29+
"summary", "transform", "drop")
3030
expect_equal(length(maskedBySparkR), length(namesOfMasked))
3131
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
3232
# above are those reported as masked when `library(SparkR)`

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -824,11 +824,6 @@ test_that("select operators", {
824824
df$age2 <- df$age * 2
825825
expect_equal(columns(df), c("name", "age", "age2"))
826826
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)
827-
828-
df$age2 <- NULL
829-
expect_equal(columns(df), c("name", "age"))
830-
df$age3 <- NULL
831-
expect_equal(columns(df), c("name", "age"))
832827
})
833828

834829
test_that("select with column", {
@@ -854,6 +849,27 @@ test_that("select with column", {
854849
"To select multiple columns, use a character vector or list for col")
855850
})
856851

852+
test_that("drop column", {
853+
df <- select(read.json(sqlContext, jsonPath), "name", "age")
854+
df1 <- drop(df, "name")
855+
expect_equal(columns(df1), c("age"))
856+
857+
df$age2 <- df$age
858+
df1 <- drop(df, c("name", "age"))
859+
expect_equal(columns(df1), c("age2"))
860+
861+
df1 <- drop(df, df$age)
862+
expect_equal(columns(df1), c("name", "age2"))
863+
864+
df$age2 <- NULL
865+
expect_equal(columns(df), c("name", "age"))
866+
df$age3 <- NULL
867+
expect_equal(columns(df), c("name", "age"))
868+
869+
# Test to make sure base::drop is not masked
870+
expect_equal(drop(1:3 %*% 2:4), 20)
871+
})
872+
857873
test_that("subsetting", {
858874
# read.json returns columns in random order
859875
df <- select(read.json(sqlContext, jsonPath), "name", "age")
@@ -937,8 +953,21 @@ test_that("test HiveContext", {
937953
df3 <- sql(hiveCtx, "select * from json2")
938954
expect_is(df3, "DataFrame")
939955
expect_equal(count(df3), 3)
940-
941956
unlink(jsonPath2)
957+
958+
hivetestDataPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
959+
invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
960+
df4 <- sql(hiveCtx, "select * from hivetestbl")
961+
expect_is(df4, "DataFrame")
962+
expect_equal(count(df4), 3)
963+
unlink(hivetestDataPath)
964+
965+
parquetDataPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
966+
invisible(saveAsTable(df, "parquetest", "parquet", mode="overwrite", path=parquetDataPath))
967+
df5 <- sql(hiveCtx, "select * from parquetest")
968+
expect_is(df5, "DataFrame")
969+
expect_equal(count(df5), 3)
970+
unlink(parquetDataPath)
942971
})
943972

944973
test_that("column operators", {
@@ -1462,6 +1491,11 @@ test_that("withColumn() and withColumnRenamed()", {
14621491
expect_equal(columns(newDF)[3], "newAge")
14631492
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
14641493

1494+
# Replace existing column
1495+
newDF <- withColumn(df, "age", df$age + 2)
1496+
expect_equal(length(columns(newDF)), 2)
1497+
expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)
1498+
14651499
newDF2 <- withColumnRenamed(df, "age", "newerAge")
14661500
expect_equal(length(columns(newDF2)), 2)
14671501
expect_equal(columns(newDF2)[1], "newerAge")
@@ -1812,7 +1846,7 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
18121846
expect_equal(coltypes(x), "map<string,string>")
18131847

18141848
df <- selectExpr(read.json(sqlContext, jsonPath), "name", "(age * 1.21) as age")
1815-
expect_equal(dtypes(df), list(c("name", "string"), c("age", "double")))
1849+
expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
18161850

18171851
df1 <- select(df, cast(df$age, "integer"))
18181852
coltypes(df) <- c("character", "integer")

core/pom.xml

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -185,19 +185,6 @@
185185
<groupId>commons-net</groupId>
186186
<artifactId>commons-net</artifactId>
187187
</dependency>
188-
<dependency>
189-
<groupId>${akka.group}</groupId>
190-
<artifactId>akka-remote_${scala.binary.version}</artifactId>
191-
</dependency>
192-
<dependency>
193-
<groupId>${akka.group}</groupId>
194-
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
195-
</dependency>
196-
<dependency>
197-
<groupId>${akka.group}</groupId>
198-
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
199-
<scope>test</scope>
200-
</dependency>
201188
<dependency>
202189
<groupId>org.scala-lang</groupId>
203190
<artifactId>scala-library</artifactId>
@@ -224,6 +211,10 @@
224211
<groupId>io.netty</groupId>
225212
<artifactId>netty-all</artifactId>
226213
</dependency>
214+
<dependency>
215+
<groupId>io.netty</groupId>
216+
<artifactId>netty</artifactId>
217+
</dependency>
227218
<dependency>
228219
<groupId>com.clearspring.analytics</groupId>
229220
<artifactId>stream</artifactId>

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
8686
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
8787
*
8888
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
89-
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
90-
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
89+
* workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
90+
* issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
9191
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
9292
* longer in scope.
9393
*/
@@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
101101
* exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
102102
* parameter by default disables blocking on shuffle cleanups. Note that this does not affect
103103
* the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
104-
* until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
104+
* until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
105105
* resolved.
106106
*/
107107
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(

0 commit comments

Comments
 (0)