Skip to content

Commit 0d841e2

Browse files
committed
Merge remote-tracking branch 'upstream/master' into filestreamsource-option
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
2 parents 2771d71 + 095862a commit 0d841e2

File tree

212 files changed

+5157
-2198
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

212 files changed

+5157
-2198
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
/lib/
2424
R-unit-tests.log
2525
R/unit-tests.out
26+
R/cran-check.out
2627
build/*.jar
2728
build/apache-maven*
2829
build/scala*

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ notifications:
4444
# 5. Run maven install before running lint-java.
4545
install:
4646
- export MAVEN_SKIP_RC=1
47-
- build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
47+
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
4848

4949
# 6. Run lint-java.
5050
script:

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
263263
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
264264
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
265265
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
266-
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.1 - http://py4j.sourceforge.net/)
266+
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/)
267267
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
268268
(BSD licence) sbt and sbt-launch-lib.bash
269269
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ exportMethods("glm",
2727
"summary",
2828
"spark.kmeans",
2929
"fitted",
30+
"spark.mlp",
3031
"spark.naiveBayes",
3132
"spark.survreg",
3233
"spark.lda",

R/pkg/R/DataFrame.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,9 @@ setMethod("showDF",
212212

213213
#' show
214214
#'
215-
#' Print the SparkDataFrame column names and types
215+
#' Print class and type information of a Spark object.
216216
#'
217-
#' @param object a SparkDataFrame.
217+
#' @param object a Spark object. Can be a SparkDataFrame, Column, GroupedData, WindowSpec.
218218
#'
219219
#' @family SparkDataFrame functions
220220
#' @rdname show

R/pkg/R/WindowSpec.R

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,18 @@ setMethod("rangeBetween",
203203
#' @aliases over,Column,WindowSpec-method
204204
#' @family colum_func
205205
#' @export
206+
#' @examples \dontrun{
207+
#' df <- createDataFrame(mtcars)
208+
#'
209+
#' # Partition by am (transmission) and order by hp (horsepower)
210+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
211+
#'
212+
#' # Rank on hp within each partition
213+
#' out <- select(df, over(rank(), ws), df$hp, df$am)
214+
#'
215+
#' # Lag mpg values by 1 row on the partition-and-ordered table
216+
#' out <- select(df, over(lead(df$mpg), ws), df$mpg, df$hp, df$am)
217+
#' }
206218
#' @note over since 2.0.0
207219
setMethod("over",
208220
signature(x = "Column", window = "WindowSpec"),

R/pkg/R/functions.R

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3121,9 +3121,9 @@ setMethod("ifelse",
31213121
#' @aliases cume_dist,missing-method
31223122
#' @export
31233123
#' @examples \dontrun{
3124-
#' df <- createDataFrame(iris)
3125-
#' ws <- orderBy(windowPartitionBy("Species"), "Sepal_Length")
3126-
#' out <- select(df, over(cume_dist(), ws), df$Sepal_Length, df$Species)
3124+
#' df <- createDataFrame(mtcars)
3125+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3126+
#' out <- select(df, over(cume_dist(), ws), df$hp, df$am)
31273127
#' }
31283128
#' @note cume_dist since 1.6.0
31293129
setMethod("cume_dist",
@@ -3148,7 +3148,11 @@ setMethod("cume_dist",
31483148
#' @family window_funcs
31493149
#' @aliases dense_rank,missing-method
31503150
#' @export
3151-
#' @examples \dontrun{dense_rank()}
3151+
#' @examples \dontrun{
3152+
#' df <- createDataFrame(mtcars)
3153+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3154+
#' out <- select(df, over(dense_rank(), ws), df$hp, df$am)
3155+
#' }
31523156
#' @note dense_rank since 1.6.0
31533157
setMethod("dense_rank",
31543158
signature("missing"),
@@ -3168,18 +3172,26 @@ setMethod("dense_rank",
31683172
#' @param x the column as a character string or a Column to compute on.
31693173
#' @param offset the number of rows back from the current row from which to obtain a value.
31703174
#' If not specified, the default is 1.
3171-
#' @param defaultValue default to use when the offset row does not exist.
3175+
#' @param defaultValue (optional) default to use when the offset row does not exist.
31723176
#' @param ... further arguments to be passed to or from other methods.
31733177
#' @rdname lag
31743178
#' @name lag
31753179
#' @aliases lag,characterOrColumn-method
31763180
#' @family window_funcs
31773181
#' @export
3178-
#' @examples \dontrun{lag(df$c)}
3182+
#' @examples \dontrun{
3183+
#' df <- createDataFrame(mtcars)
3184+
#'
3185+
#' # Partition by am (transmission) and order by hp (horsepower)
3186+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3187+
#'
3188+
#' # Lag mpg values by 1 row on the partition-and-ordered table
3189+
#' out <- select(df, over(lag(df$mpg), ws), df$mpg, df$hp, df$am)
3190+
#' }
31793191
#' @note lag since 1.6.0
31803192
setMethod("lag",
31813193
signature(x = "characterOrColumn"),
3182-
function(x, offset, defaultValue = NULL) {
3194+
function(x, offset = 1, defaultValue = NULL) {
31833195
col <- if (class(x) == "Column") {
31843196
x@jc
31853197
} else {
@@ -3194,25 +3206,35 @@ setMethod("lag",
31943206
#' lead
31953207
#'
31963208
#' Window function: returns the value that is \code{offset} rows after the current row, and
3197-
#' NULL if there is less than \code{offset} rows after the current row. For example,
3198-
#' an \code{offset} of one will return the next row at any given point in the window partition.
3209+
#' \code{defaultValue} if there is less than \code{offset} rows after the current row.
3210+
#' For example, an \code{offset} of one will return the next row at any given point
3211+
#' in the window partition.
31993212
#'
32003213
#' This is equivalent to the \code{LEAD} function in SQL.
32013214
#'
3202-
#' @param x Column to compute on
3203-
#' @param offset Number of rows to offset
3204-
#' @param defaultValue (Optional) default value to use
3215+
#' @param x the column as a character string or a Column to compute on.
3216+
#' @param offset the number of rows after the current row from which to obtain a value.
3217+
#' If not specified, the default is 1.
3218+
#' @param defaultValue (optional) default to use when the offset row does not exist.
32053219
#'
32063220
#' @rdname lead
32073221
#' @name lead
32083222
#' @family window_funcs
32093223
#' @aliases lead,characterOrColumn,numeric-method
32103224
#' @export
3211-
#' @examples \dontrun{lead(df$c)}
3225+
#' @examples \dontrun{
3226+
#' df <- createDataFrame(mtcars)
3227+
#'
3228+
#' # Partition by am (transmission) and order by hp (horsepower)
3229+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3230+
#'
3231+
#' # Lead mpg values by 1 row on the partition-and-ordered table
3232+
#' out <- select(df, over(lead(df$mpg), ws), df$mpg, df$hp, df$am)
3233+
#' }
32123234
#' @note lead since 1.6.0
32133235
setMethod("lead",
32143236
signature(x = "characterOrColumn", offset = "numeric", defaultValue = "ANY"),
3215-
function(x, offset, defaultValue = NULL) {
3237+
function(x, offset = 1, defaultValue = NULL) {
32163238
col <- if (class(x) == "Column") {
32173239
x@jc
32183240
} else {
@@ -3239,7 +3261,15 @@ setMethod("lead",
32393261
#' @aliases ntile,numeric-method
32403262
#' @family window_funcs
32413263
#' @export
3242-
#' @examples \dontrun{ntile(1)}
3264+
#' @examples \dontrun{
3265+
#' df <- createDataFrame(mtcars)
3266+
#'
3267+
#' # Partition by am (transmission) and order by hp (horsepower)
3268+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3269+
#'
3270+
#' # Get ntile group id (1-4) for hp
3271+
#' out <- select(df, over(ntile(4), ws), df$hp, df$am)
3272+
#' }
32433273
#' @note ntile since 1.6.0
32443274
setMethod("ntile",
32453275
signature(x = "numeric"),
@@ -3263,7 +3293,11 @@ setMethod("ntile",
32633293
#' @family window_funcs
32643294
#' @aliases percent_rank,missing-method
32653295
#' @export
3266-
#' @examples \dontrun{percent_rank()}
3296+
#' @examples \dontrun{
3297+
#' df <- createDataFrame(mtcars)
3298+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3299+
#' out <- select(df, over(percent_rank(), ws), df$hp, df$am)
3300+
#' }
32673301
#' @note percent_rank since 1.6.0
32683302
setMethod("percent_rank",
32693303
signature("missing"),
@@ -3288,7 +3322,11 @@ setMethod("percent_rank",
32883322
#' @family window_funcs
32893323
#' @aliases rank,missing-method
32903324
#' @export
3291-
#' @examples \dontrun{rank()}
3325+
#' @examples \dontrun{
3326+
#' df <- createDataFrame(mtcars)
3327+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3328+
#' out <- select(df, over(rank(), ws), df$hp, df$am)
3329+
#' }
32923330
#' @note rank since 1.6.0
32933331
setMethod("rank",
32943332
signature(x = "missing"),
@@ -3321,7 +3359,11 @@ setMethod("rank",
33213359
#' @aliases row_number,missing-method
33223360
#' @family window_funcs
33233361
#' @export
3324-
#' @examples \dontrun{row_number()}
3362+
#' @examples \dontrun{
3363+
#' df <- createDataFrame(mtcars)
3364+
#' ws <- orderBy(windowPartitionBy("am"), "hp")
3365+
#' out <- select(df, over(row_number(), ws), df$hp, df$am)
3366+
#' }
33253367
#' @note row_number since 1.6.0
33263368
setMethod("row_number",
33273369
signature("missing"),

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,10 @@ setGeneric("spark.kmeans", function(data, formula, ...) { standardGeneric("spark
13301330
#' @export
13311331
setGeneric("fitted")
13321332

1333+
#' @rdname spark.mlp
1334+
#' @export
1335+
setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") })
1336+
13331337
#' @rdname spark.naiveBayes
13341338
#' @export
13351339
setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("spark.naiveBayes") })

R/pkg/R/install.R

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
7070
localDir = NULL, overwrite = FALSE) {
7171
version <- paste0("spark-", packageVersion("SparkR"))
7272
hadoopVersion <- tolower(hadoopVersion)
73-
hadoopVersionName <- hadoop_version_name(hadoopVersion)
73+
hadoopVersionName <- hadoopVersionName(hadoopVersion)
7474
packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
75-
localDir <- ifelse(is.null(localDir), spark_cache_path(),
75+
localDir <- ifelse(is.null(localDir), sparkCachePath(),
7676
normalizePath(localDir, mustWork = FALSE))
7777

7878
if (is.na(file.info(localDir)$isdir)) {
@@ -88,12 +88,14 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
8888

8989
# can use dir.exists(packageLocalDir) under R 3.2.0 or later
9090
if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
91-
fmt <- "Spark %s for Hadoop %s is found, and SPARK_HOME set to %s"
91+
fmt <- "%s for Hadoop %s found, with SPARK_HOME set to %s"
9292
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
9393
packageLocalDir)
9494
message(msg)
9595
Sys.setenv(SPARK_HOME = packageLocalDir)
9696
return(invisible(packageLocalDir))
97+
} else {
98+
message("Spark not found in the cache directory. Installation will start.")
9799
}
98100

99101
packageLocalPath <- paste0(packageLocalDir, ".tgz")
@@ -102,7 +104,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
102104
if (tarExists && !overwrite) {
103105
message("tar file found.")
104106
} else {
105-
robust_download_tar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
107+
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
106108
}
107109

108110
message(sprintf("Installing to %s", localDir))
@@ -116,33 +118,37 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
116118
invisible(packageLocalDir)
117119
}
118120

119-
robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
121+
robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
120122
# step 1: use user-provided url
121123
if (!is.null(mirrorUrl)) {
122124
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
123125
message(msg)
124-
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
126+
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
125127
packageName, packageLocalPath)
126-
if (success) return()
128+
if (success) {
129+
return()
130+
} else {
131+
message(paste0("Unable to download from mirrorUrl: ", mirrorUrl))
132+
}
127133
} else {
128-
message("Mirror site not provided.")
134+
message("MirrorUrl not provided.")
129135
}
130136

131137
# step 2: use url suggested from apache website
132-
message("Looking for site suggested from apache website...")
133-
mirrorUrl <- get_preferred_mirror(version, packageName)
138+
message("Looking for preferred site from apache website...")
139+
mirrorUrl <- getPreferredMirror(version, packageName)
134140
if (!is.null(mirrorUrl)) {
135-
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
141+
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
136142
packageName, packageLocalPath)
137143
if (success) return()
138144
} else {
139-
message("Unable to find suggested mirror site.")
145+
message("Unable to find preferred mirror site.")
140146
}
141147

142148
# step 3: use backup option
143149
message("To use backup site...")
144-
mirrorUrl <- default_mirror_url()
145-
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
150+
mirrorUrl <- defaultMirrorUrl()
151+
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
146152
packageName, packageLocalPath)
147153
if (success) {
148154
return(packageLocalPath)
@@ -155,7 +161,7 @@ robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName,
155161
}
156162
}
157163

158-
get_preferred_mirror <- function(version, packageName) {
164+
getPreferredMirror <- function(version, packageName) {
159165
jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=",
160166
file.path("spark", version, packageName),
161167
".tgz&as_json=1")
@@ -175,10 +181,10 @@ get_preferred_mirror <- function(version, packageName) {
175181
mirrorPreferred
176182
}
177183

178-
direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
184+
directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
179185
packageRemotePath <- paste0(
180186
file.path(mirrorUrl, version, packageName), ".tgz")
181-
fmt <- paste("Downloading Spark %s for Hadoop %s from:\n- %s")
187+
fmt <- "Downloading %s for Hadoop %s from:\n- %s"
182188
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
183189
packageRemotePath)
184190
message(msg)
@@ -192,11 +198,11 @@ direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName,
192198
!isFail
193199
}
194200

195-
default_mirror_url <- function() {
201+
defaultMirrorUrl <- function() {
196202
"http://www-us.apache.org/dist/spark"
197203
}
198204

199-
hadoop_version_name <- function(hadoopVersion) {
205+
hadoopVersionName <- function(hadoopVersion) {
200206
if (hadoopVersion == "without") {
201207
"without-hadoop"
202208
} else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) {
@@ -208,7 +214,7 @@ hadoop_version_name <- function(hadoopVersion) {
208214

209215
# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
210216
# adapt to Spark context
211-
spark_cache_path <- function() {
217+
sparkCachePath <- function() {
212218
if (.Platform$OS.type == "windows") {
213219
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
214220
if (is.na(winAppPath)) {
@@ -231,3 +237,21 @@ spark_cache_path <- function() {
231237
}
232238
normalizePath(path, mustWork = FALSE)
233239
}
240+
241+
242+
installInstruction <- function(mode) {
243+
if (mode == "remote") {
244+
paste0("Connecting to a remote Spark master. ",
245+
"Please make sure Spark package is also installed in this machine.\n",
246+
"- If there is one, set the path in sparkHome parameter or ",
247+
"environment variable SPARK_HOME.\n",
248+
"- If not, you may run install.spark function to do the job. ",
249+
"Please make sure the Spark and the Hadoop versions ",
250+
"match the versions on the cluster. ",
251+
"SparkR package is compatible with Spark ", packageVersion("SparkR"), ".",
252+
"If you need further help, ",
253+
"contact the administrators of the cluster.")
254+
} else {
255+
stop(paste0("No instruction found for ", mode, " mode."))
256+
}
257+
}

0 commit comments

Comments
 (0)