Skip to content

Commit a5bcc76

Browse files
committed
Merge remote-tracking branch 'upstream/master' into renameTempTablesToTempViews
2 parents 595e502 + 4728640 commit a5bcc76

File tree

574 files changed

+19356
-4696
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

574 files changed

+19356
-4696
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ exportMethods("arrange",
169169
"transform",
170170
"union",
171171
"unionAll",
172+
"unionByName",
172173
"unique",
173174
"unpersist",
174175
"where",

R/pkg/R/DataFrame.R

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -986,10 +986,10 @@ setMethod("unique",
986986
#' @param x A SparkDataFrame
987987
#' @param withReplacement Sampling with replacement or not
988988
#' @param fraction The (rough) sample target fraction
989-
#' @param seed Randomness seed value
989+
#' @param seed Randomness seed value. Default is a random seed.
990990
#'
991991
#' @family SparkDataFrame functions
992-
#' @aliases sample,SparkDataFrame,logical,numeric-method
992+
#' @aliases sample,SparkDataFrame-method
993993
#' @rdname sample
994994
#' @name sample
995995
#' @export
@@ -998,33 +998,47 @@ setMethod("unique",
998998
#' sparkR.session()
999999
#' path <- "path/to/file.json"
10001000
#' df <- read.json(path)
1001+
#' collect(sample(df, fraction = 0.5))
10011002
#' collect(sample(df, FALSE, 0.5))
1002-
#' collect(sample(df, TRUE, 0.5))
1003+
#' collect(sample(df, TRUE, 0.5, seed = 3))
10031004
#'}
10041005
#' @note sample since 1.4.0
10051006
setMethod("sample",
1006-
signature(x = "SparkDataFrame", withReplacement = "logical",
1007-
fraction = "numeric"),
1008-
function(x, withReplacement, fraction, seed) {
1009-
if (fraction < 0.0) stop(cat("Negative fraction value:", fraction))
1007+
signature(x = "SparkDataFrame"),
1008+
function(x, withReplacement = FALSE, fraction, seed) {
1009+
if (!is.numeric(fraction)) {
1010+
stop(paste("fraction must be numeric; however, got", class(fraction)))
1011+
}
1012+
if (!is.logical(withReplacement)) {
1013+
stop(paste("withReplacement must be logical; however, got", class(withReplacement)))
1014+
}
1015+
10101016
if (!missing(seed)) {
1017+
if (is.null(seed)) {
1018+
stop("seed must not be NULL or NA; however, got NULL")
1019+
}
1020+
if (is.na(seed)) {
1021+
stop("seed must not be NULL or NA; however, got NA")
1022+
}
1023+
10111024
# TODO : Figure out how to send integer as java.lang.Long to JVM so
10121025
# we can send seed as an argument through callJMethod
1013-
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction, as.integer(seed))
1026+
sdf <- handledCallJMethod(x@sdf, "sample", as.logical(withReplacement),
1027+
as.numeric(fraction), as.integer(seed))
10141028
} else {
1015-
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction)
1029+
sdf <- handledCallJMethod(x@sdf, "sample",
1030+
as.logical(withReplacement), as.numeric(fraction))
10161031
}
10171032
dataFrame(sdf)
10181033
})
10191034

10201035
#' @rdname sample
1021-
#' @aliases sample_frac,SparkDataFrame,logical,numeric-method
1036+
#' @aliases sample_frac,SparkDataFrame-method
10221037
#' @name sample_frac
10231038
#' @note sample_frac since 1.4.0
10241039
setMethod("sample_frac",
1025-
signature(x = "SparkDataFrame", withReplacement = "logical",
1026-
fraction = "numeric"),
1027-
function(x, withReplacement, fraction, seed) {
1040+
signature(x = "SparkDataFrame"),
1041+
function(x, withReplacement = FALSE, fraction, seed) {
10281042
sample(x, withReplacement, fraction, seed)
10291043
})
10301044

@@ -2683,7 +2697,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
26832697
#' @rdname union
26842698
#' @name union
26852699
#' @aliases union,SparkDataFrame,SparkDataFrame-method
2686-
#' @seealso \link{rbind}
2700+
#' @seealso \link{rbind} \link{unionByName}
26872701
#' @export
26882702
#' @examples
26892703
#'\dontrun{
@@ -2714,6 +2728,40 @@ setMethod("unionAll",
27142728
union(x, y)
27152729
})
27162730

2731+
#' Return a new SparkDataFrame containing the union of rows, matched by column names
2732+
#'
2733+
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
2734+
#' and another SparkDataFrame. This is different from \code{union} function, and both
2735+
#' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are not taken
2736+
#' into account. Input SparkDataFrames can have different data types in the schema.
2737+
#'
2738+
#' Note: This does not remove duplicate rows across the two SparkDataFrames.
2739+
#' This function resolves columns by name (not by position).
2740+
#'
2741+
#' @param x A SparkDataFrame
2742+
#' @param y A SparkDataFrame
2743+
#' @return A SparkDataFrame containing the result of the union.
2744+
#' @family SparkDataFrame functions
2745+
#' @rdname unionByName
2746+
#' @name unionByName
2747+
#' @aliases unionByName,SparkDataFrame,SparkDataFrame-method
2748+
#' @seealso \link{rbind} \link{union}
2749+
#' @export
2750+
#' @examples
2751+
#'\dontrun{
2752+
#' sparkR.session()
2753+
#' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
2754+
#' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
2755+
#' head(unionByName(df1, df2))
2756+
#' }
2757+
#' @note unionByName since 2.3.0
2758+
setMethod("unionByName",
2759+
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2760+
function(x, y) {
2761+
unioned <- callJMethod(x@sdf, "unionByName", y@sdf)
2762+
dataFrame(unioned)
2763+
})
2764+
27172765
#' Union two or more SparkDataFrames
27182766
#'
27192767
#' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method
@@ -2730,7 +2778,7 @@ setMethod("unionAll",
27302778
#' @aliases rbind,SparkDataFrame-method
27312779
#' @rdname rbind
27322780
#' @name rbind
2733-
#' @seealso \link{union}
2781+
#' @seealso \link{union} \link{unionByName}
27342782
#' @export
27352783
#' @examples
27362784
#'\dontrun{

R/pkg/R/functions.R

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ NULL
176176
#'
177177
#' @param x Column to compute on. Note the difference in the following methods:
178178
#' \itemize{
179-
#' \item \code{to_json}: it is the column containing the struct or array of the structs.
179+
#' \item \code{to_json}: it is the column containing the struct, array of the structs,
180+
#' the map or array of maps.
180181
#' \item \code{from_json}: it is the column containing the JSON string.
181182
#' }
182183
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
@@ -1700,8 +1701,9 @@ setMethod("to_date",
17001701
})
17011702

17021703
#' @details
1703-
#' \code{to_json}: Converts a column containing a \code{structType} or array of \code{structType}
1704-
#' into a Column of JSON string. Resolving the Column can fail if an unsupported type is encountered.
1704+
#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
1705+
#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
1706+
#' Resolving the Column can fail if an unsupported type is encountered.
17051707
#'
17061708
#' @rdname column_collection_functions
17071709
#' @aliases to_json to_json,Column-method
@@ -1715,6 +1717,14 @@ setMethod("to_date",
17151717
#'
17161718
#' # Converts an array of structs into a JSON array
17171719
#' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
1720+
#' df2 <- mutate(df2, people_json = to_json(df2$people))
1721+
#'
1722+
#' # Converts a map into a JSON object
1723+
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
1724+
#' df2 <- mutate(df2, people_json = to_json(df2$people))
1725+
#'
1726+
#' # Converts an array of maps into a JSON array
1727+
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
17181728
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
17191729
#' @note to_json since 2.2.0
17201730
setMethod("to_json", signature(x = "Column"),
@@ -2216,8 +2226,9 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
22162226
})
22172227

22182228
#' @details
2219-
#' \code{from_utc_timestamp}: Given a timestamp, which corresponds to a certain time of day in UTC,
2220-
#' returns another timestamp that corresponds to the same time of day in the given timezone.
2229+
#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
2230+
#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1'
2231+
#' would yield '2017-07-14 03:40:00.0'.
22212232
#'
22222233
#' @rdname column_datetime_diff_functions
22232234
#'
@@ -2276,8 +2287,9 @@ setMethod("next_day", signature(y = "Column", x = "character"),
22762287
})
22772288

22782289
#' @details
2279-
#' \code{to_utc_timestamp}: Given a timestamp, which corresponds to a certain time of day
2280-
#' in the given timezone, returns another timestamp that corresponds to the same time of day in UTC.
2290+
#' \code{to_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
2291+
#' time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1'
2292+
#' would yield '2017-07-14 01:40:00.0'.
22812293
#'
22822294
#' @rdname column_datetime_diff_functions
22832295
#' @aliases to_utc_timestamp to_utc_timestamp,Column,character-method

R/pkg/R/generics.R

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
645645
#' @rdname sample
646646
#' @export
647647
setGeneric("sample",
648-
function(x, withReplacement, fraction, seed) {
648+
function(x, withReplacement = FALSE, fraction, seed) {
649649
standardGeneric("sample")
650650
})
651651

@@ -656,7 +656,7 @@ setGeneric("rollup", function(x, ...) { standardGeneric("rollup") })
656656
#' @rdname sample
657657
#' @export
658658
setGeneric("sample_frac",
659-
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })
659+
function(x, withReplacement = FALSE, fraction, seed) { standardGeneric("sample_frac") })
660660

661661
#' @rdname sampleBy
662662
#' @export
@@ -769,6 +769,10 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
769769
#' @export
770770
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
771771

772+
#' @rdname unionByName
773+
#' @export
774+
setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })
775+
772776
#' @rdname unpersist
773777
#' @export
774778
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,20 @@ test_that("sample on a DataFrame", {
11161116
sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result
11171117
expect_true(count(sampled3) < 3)
11181118

1119+
# Different arguments
1120+
df <- createDataFrame(as.list(seq(10)))
1121+
expect_equal(count(sample(df, fraction = 0.5, seed = 3)), 4)
1122+
expect_equal(count(sample(df, withReplacement = TRUE, fraction = 0.5, seed = 3)), 2)
1123+
expect_equal(count(sample(df, fraction = 1.0)), 10)
1124+
expect_equal(count(sample(df, fraction = 1L)), 10)
1125+
expect_equal(count(sample(df, FALSE, fraction = 1.0)), 10)
1126+
1127+
expect_error(sample(df, fraction = "a"), "fraction must be numeric")
1128+
expect_error(sample(df, "a", fraction = 0.1), "however, got character")
1129+
expect_error(sample(df, fraction = 1, seed = NA), "seed must not be NULL or NA; however, got NA")
1130+
expect_error(sample(df, fraction = -1.0),
1131+
"illegal argument - requirement failed: Sampling fraction \\(-1.0\\)")
1132+
11191133
# nolint start
11201134
# Test base::sample is working
11211135
#expect_equal(length(sample(1:12)), 12)
@@ -1491,6 +1505,14 @@ test_that("column functions", {
14911505
j <- collect(select(df, alias(to_json(df$people), "json")))
14921506
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
14931507

1508+
df <- sql("SELECT map('name', 'Bob') as people")
1509+
j <- collect(select(df, alias(to_json(df$people), "json")))
1510+
expect_equal(j[order(j$json), ][1], "{\"name\":\"Bob\"}")
1511+
1512+
df <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
1513+
j <- collect(select(df, alias(to_json(df$people), "json")))
1514+
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
1515+
14941516
df <- read.json(mapTypeJsonPath)
14951517
j <- collect(select(df, alias(to_json(df$info), "json")))
14961518
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
@@ -2255,7 +2277,7 @@ test_that("isLocal()", {
22552277
expect_false(isLocal(df))
22562278
})
22572279

2258-
test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
2280+
test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataFrame", {
22592281
df <- read.json(jsonPath)
22602282

22612283
lines <- c("{\"name\":\"Bob\", \"age\":24}",
@@ -2271,6 +2293,13 @@ test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
22712293
expect_equal(first(unioned)$name, "Michael")
22722294
expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
22732295

2296+
df1 <- select(df2, "age", "name")
2297+
unioned1 <- arrange(unionByName(df1, df), df1$age)
2298+
expect_is(unioned, "SparkDataFrame")
2299+
expect_equal(count(unioned), 6)
2300+
# Here, we test if 'Michael' in df is correctly mapped to the same name.
2301+
expect_equal(first(unioned)$name, "Michael")
2302+
22742303
unioned2 <- arrange(rbind(unioned, df, df2), df$age)
22752304
expect_is(unioned2, "SparkDataFrame")
22762305
expect_equal(count(unioned2), 12)
@@ -2509,14 +2538,14 @@ test_that("describe() and summary() on a DataFrame", {
25092538

25102539
stats2 <- summary(df)
25112540
expect_equal(collect(stats2)[5, "summary"], "25%")
2512-
expect_equal(collect(stats2)[5, "age"], "30.0")
2541+
expect_equal(collect(stats2)[5, "age"], "30")
25132542

25142543
stats3 <- summary(df, "min", "max", "55.1%")
25152544

25162545
expect_equal(collect(stats3)[1, "summary"], "min")
25172546
expect_equal(collect(stats3)[2, "summary"], "max")
25182547
expect_equal(collect(stats3)[3, "summary"], "55.1%")
2519-
expect_equal(collect(stats3)[3, "age"], "30.0")
2548+
expect_equal(collect(stats3)[3, "age"], "30")
25202549

25212550
# SPARK-16425: SparkR summary() fails on column of type logical
25222551
df <- withColumn(df, "boolean", df$age == 30)

R/pkg/tests/run-all.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
4343
test_package("SparkR")
4444

4545
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
46+
# set random seed for predictable results. mostly for base's sample() in tree and classification
47+
set.seed(42)
4648
# for testthat 1.0.2 later, change reporter from "summary" to default_reporter()
4749
testthat:::run_tests("SparkR",
4850
file.path(sparkRDir, "pkg", "tests", "fulltests"),

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ only_commits:
3232
- sql/core/src/main/scala/org/apache/spark/sql/api/r/
3333
- core/src/main/scala/org/apache/spark/api/r/
3434
- mllib/src/main/scala/org/apache/spark/ml/r/
35+
- core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
3536

3637
cache:
3738
- C:\Users\appveyor\.m2

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@
187187
<plugin>
188188
<groupId>org.apache.maven.plugins</groupId>
189189
<artifactId>maven-assembly-plugin</artifactId>
190-
<version>3.0.0</version>
190+
<version>3.1.0</version>
191191
<executions>
192192
<execution>
193193
<id>dist</id>

bin/load-spark-env.cmd

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,21 @@ if [%SPARK_ENV_LOADED%] == [] (
3535

3636
rem Setting SPARK_SCALA_VERSION if not already set.
3737

38-
rem set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
39-
rem set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"
38+
set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
39+
set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"
4040

4141
if [%SPARK_SCALA_VERSION%] == [] (
4242

43-
rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
44-
rem echo "Presence of build for multiple Scala versions detected."
45-
rem echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
46-
rem exit 1
47-
rem )
48-
rem if exist %ASSEMBLY_DIR2% (
43+
if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
44+
echo "Presence of build for multiple Scala versions detected."
45+
echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd."
46+
exit 1
47+
)
48+
if exist %ASSEMBLY_DIR2% (
4949
set SPARK_SCALA_VERSION=2.11
50-
rem ) else (
51-
rem set SPARK_SCALA_VERSION=2.12
52-
rem )
50+
) else (
51+
set SPARK_SCALA_VERSION=2.12
52+
)
5353
)
5454
exit /b 0
5555

bin/load-spark-env.sh

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ fi
4646

4747
if [ -z "$SPARK_SCALA_VERSION" ]; then
4848

49-
#ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
50-
#ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"
49+
ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
50+
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"
5151

52-
#if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
53-
# echo -e "Presence of build for multiple Scala versions detected." 1>&2
54-
# echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
55-
# exit 1
56-
#fi
52+
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
53+
echo -e "Presence of build for multiple Scala versions detected." 1>&2
54+
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION in spark-env.sh.' 1>&2
55+
exit 1
56+
fi
5757

58-
#if [ -d "$ASSEMBLY_DIR2" ]; then
58+
if [ -d "$ASSEMBLY_DIR2" ]; then
5959
export SPARK_SCALA_VERSION="2.11"
60-
#else
61-
# export SPARK_SCALA_VERSION="2.12"
62-
#fi
60+
else
61+
export SPARK_SCALA_VERSION="2.12"
62+
fi
6363
fi

0 commit comments

Comments
 (0)