Skip to content

Commit 52bdf48

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 9422a4f + 753b194 commit 52bdf48

File tree

42 files changed

+752
-148
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+752
-148
lines changed

R/pkg/NAMESPACE

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ exportMethods("arrange",
3939
"describe",
4040
"dim",
4141
"distinct",
42+
"dropDuplicates",
4243
"dropna",
4344
"dtypes",
4445
"except",
@@ -271,15 +272,15 @@ export("as.DataFrame",
271272
"createExternalTable",
272273
"dropTempTable",
273274
"jsonFile",
274-
"read.json",
275275
"loadDF",
276276
"parquetFile",
277277
"read.df",
278+
"read.json",
278279
"read.parquet",
279280
"read.text",
280281
"sql",
281282
"str",
282-
"table",
283+
"tableToDF",
283284
"tableNames",
284285
"tables",
285286
"uncacheTable")

R/pkg/R/DataFrame.R

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1645,6 +1645,36 @@ setMethod("where",
16451645
filter(x, condition)
16461646
})
16471647

1648+
#' dropDuplicates
1649+
#'
1650+
#' Returns a new DataFrame with duplicate rows removed, considering only
1651+
#' the subset of columns.
1652+
#'
1653+
#' @param x A DataFrame.
1654+
#' @param colnames A character vector of column names.
1655+
#' @return A DataFrame with duplicate rows removed.
1656+
#' @family DataFrame functions
1657+
#' @rdname dropduplicates
1658+
#' @name dropDuplicates
1659+
#' @export
1660+
#' @examples
1661+
#'\dontrun{
1662+
#' sc <- sparkR.init()
1663+
#' sqlContext <- sparkRSQL.init(sc)
1664+
#' path <- "path/to/file.json"
1665+
#' df <- read.json(sqlContext, path)
1666+
#' dropDuplicates(df)
1667+
#' dropDuplicates(df, c("col1", "col2"))
1668+
#' }
1669+
setMethod("dropDuplicates",
1670+
signature(x = "DataFrame"),
1671+
function(x, colNames = columns(x)) {
1672+
stopifnot(class(colNames) == "character")
1673+
1674+
sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(colNames))
1675+
dataFrame(sdf)
1676+
})
1677+
16481678
#' Join
16491679
#'
16501680
#' Join two DataFrames based on the given join expression.

R/pkg/R/SQLContext.R

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,8 @@ sql <- function(sqlContext, sqlQuery) {
352352
#' @param sqlContext SQLContext to use
353353
#' @param tableName The SparkSQL Table to convert to a DataFrame.
354354
#' @return DataFrame
355+
#' @rdname tableToDF
356+
#' @name tableToDF
355357
#' @export
356358
#' @examples
357359
#'\dontrun{
@@ -360,15 +362,14 @@ sql <- function(sqlContext, sqlQuery) {
360362
#' path <- "path/to/file.json"
361363
#' df <- read.json(sqlContext, path)
362364
#' registerTempTable(df, "table")
363-
#' new_df <- table(sqlContext, "table")
365+
#' new_df <- tableToDF(sqlContext, "table")
364366
#' }
365367

366-
table <- function(sqlContext, tableName) {
368+
tableToDF <- function(sqlContext, tableName) {
367369
sdf <- callJMethod(sqlContext, "table", tableName)
368370
dataFrame(sdf)
369371
}
370372

371-
372373
#' Tables
373374
#'
374375
#' Returns a DataFrame containing names of tables in the given database.

R/pkg/R/generics.R

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,13 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") })
428428
#' @export
429429
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
430430

431+
#' @rdname dropduplicates
432+
#' @export
433+
setGeneric("dropDuplicates",
434+
function(x, colNames = columns(x)) {
435+
standardGeneric("dropDuplicates")
436+
})
437+
431438
#' @rdname nafunctions
432439
#' @export
433440
setGeneric("dropna",

R/pkg/inst/tests/testthat/test_context.R

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,30 @@
1717

1818
context("test functions in sparkR.R")
1919

20+
test_that("Check masked functions", {
21+
# Check that we are not masking any new function from base, stats, testthat unexpectedly
22+
masked <- conflicts(detail = TRUE)$`package:SparkR`
23+
expect_true("describe" %in% masked) # only when with testthat..
24+
func <- lapply(masked, function(x) { capture.output(showMethods(x))[[1]] })
25+
funcSparkROrEmpty <- grepl("\\(package SparkR\\)$|^$", func)
26+
maskedBySparkR <- masked[funcSparkROrEmpty]
27+
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
28+
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
29+
"summary", "transform")
30+
expect_equal(length(maskedBySparkR), length(namesOfMasked))
31+
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
32+
# above are those reported as masked when `library(SparkR)`
33+
# note that many of these methods are still callable without base:: or stats:: prefix
34+
# there should be a test for each of these, except followings, which are currently "broken"
35+
funcHasAny <- unlist(lapply(masked, function(x) {
36+
any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1])))
37+
}))
38+
maskedCompletely <- masked[!funcHasAny]
39+
namesOfMaskedCompletely <- c("cov", "filter", "sample")
40+
expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely))
41+
expect_equal(sort(maskedCompletely), sort(namesOfMaskedCompletely))
42+
})
43+
2044
test_that("repeatedly starting and stopping SparkR", {
2145
for (i in 1:4) {
2246
sc <- sparkR.init()

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ writeLines(mockLinesMapType, mapTypeJsonPath)
335335
test_that("Collect DataFrame with complex types", {
336336
# ArrayType
337337
df <- read.json(sqlContext, complexTypeJsonPath)
338-
339338
ldf <- collect(df)
340339
expect_equal(nrow(ldf), 3)
341340
expect_equal(ncol(ldf), 3)
@@ -490,19 +489,15 @@ test_that("insertInto() on a registered table", {
490489
unlink(parquetPath2)
491490
})
492491

493-
test_that("table() returns a new DataFrame", {
492+
test_that("tableToDF() returns a new DataFrame", {
494493
df <- read.json(sqlContext, jsonPath)
495494
registerTempTable(df, "table1")
496-
tabledf <- table(sqlContext, "table1")
495+
tabledf <- tableToDF(sqlContext, "table1")
497496
expect_is(tabledf, "DataFrame")
498497
expect_equal(count(tabledf), 3)
498+
tabledf2 <- tableToDF(sqlContext, "table1")
499+
expect_equal(count(tabledf2), 3)
499500
dropTempTable(sqlContext, "table1")
500-
501-
# nolint start
502-
# Test base::table is working
503-
#a <- letters[1:3]
504-
#expect_equal(class(table(a, sample(a))), "table")
505-
# nolint end
506501
})
507502

508503
test_that("toRDD() returns an RRDD", {
@@ -734,7 +729,7 @@ test_that("head() and first() return the correct data", {
734729
expect_equal(ncol(testFirst), 2)
735730
})
736731

737-
test_that("distinct() and unique on DataFrames", {
732+
test_that("distinct(), unique() and dropDuplicates() on DataFrames", {
738733
lines <- c("{\"name\":\"Michael\"}",
739734
"{\"name\":\"Andy\", \"age\":30}",
740735
"{\"name\":\"Justin\", \"age\":19}",
@@ -750,6 +745,42 @@ test_that("distinct() and unique on DataFrames", {
750745
uniques2 <- unique(df)
751746
expect_is(uniques2, "DataFrame")
752747
expect_equal(count(uniques2), 3)
748+
749+
# Test dropDuplicates()
750+
df <- createDataFrame(
751+
sqlContext,
752+
list(
753+
list(2, 1, 2), list(1, 1, 1),
754+
list(1, 2, 1), list(2, 1, 2),
755+
list(2, 2, 2), list(2, 2, 1),
756+
list(2, 1, 1), list(1, 1, 2),
757+
list(1, 2, 2), list(1, 2, 1)),
758+
schema = c("key", "value1", "value2"))
759+
result <- collect(dropDuplicates(df))
760+
expected <- rbind.data.frame(
761+
c(1, 1, 1), c(1, 1, 2), c(1, 2, 1),
762+
c(1, 2, 2), c(2, 1, 1), c(2, 1, 2),
763+
c(2, 2, 1), c(2, 2, 2))
764+
names(expected) <- c("key", "value1", "value2")
765+
expect_equivalent(
766+
result[order(result$key, result$value1, result$value2),],
767+
expected)
768+
769+
result <- collect(dropDuplicates(df, c("key", "value1")))
770+
expected <- rbind.data.frame(
771+
c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
772+
names(expected) <- c("key", "value1", "value2")
773+
expect_equivalent(
774+
result[order(result$key, result$value1, result$value2),],
775+
expected)
776+
777+
result <- collect(dropDuplicates(df, "key"))
778+
expected <- rbind.data.frame(
779+
c(1, 1, 1), c(2, 1, 2))
780+
names(expected) <- c("key", "value1", "value2")
781+
expect_equivalent(
782+
result[order(result$key, result$value1, result$value2),],
783+
expected)
753784
})
754785

755786
test_that("sample on a DataFrame", {

core/src/main/scala/org/apache/spark/SSLOptions.scala

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
3939
* @param keyStore a path to the key-store file
4040
* @param keyStorePassword a password to access the key-store file
4141
* @param keyPassword a password to access the private key in the key-store
42+
* @param keyStoreType the type of the key-store
43+
* @param needClientAuth set true if SSL needs client authentication
4244
* @param trustStore a path to the trust-store file
4345
* @param trustStorePassword a password to access the trust-store file
46+
* @param trustStoreType the type of the trust-store
4447
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
4548
* @param enabledAlgorithms a set of encryption algorithms that may be used
4649
*/
@@ -49,8 +52,11 @@ private[spark] case class SSLOptions(
4952
keyStore: Option[File] = None,
5053
keyStorePassword: Option[String] = None,
5154
keyPassword: Option[String] = None,
55+
keyStoreType: Option[String] = None,
56+
needClientAuth: Boolean = false,
5257
trustStore: Option[File] = None,
5358
trustStorePassword: Option[String] = None,
59+
trustStoreType: Option[String] = None,
5460
protocol: Option[String] = None,
5561
enabledAlgorithms: Set[String] = Set.empty)
5662
extends Logging {
@@ -63,12 +69,18 @@ private[spark] case class SSLOptions(
6369
val sslContextFactory = new SslContextFactory()
6470

6571
keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
66-
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
6772
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
68-
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
6973
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
74+
keyStoreType.foreach(sslContextFactory.setKeyStoreType)
75+
if (needClientAuth) {
76+
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
77+
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
78+
trustStoreType.foreach(sslContextFactory.setTrustStoreType)
79+
}
7080
protocol.foreach(sslContextFactory.setProtocol)
71-
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
81+
if (supportedAlgorithms.nonEmpty) {
82+
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
83+
}
7284

7385
Some(sslContextFactory)
7486
} else {
@@ -82,6 +94,13 @@ private[spark] case class SSLOptions(
8294
*/
8395
def createAkkaConfig: Option[Config] = {
8496
if (enabled) {
97+
if (keyStoreType.isDefined) {
98+
logWarning("Akka configuration does not support key store type.");
99+
}
100+
if (trustStoreType.isDefined) {
101+
logWarning("Akka configuration does not support trust store type.");
102+
}
103+
85104
Some(ConfigFactory.empty()
86105
.withValue("akka.remote.netty.tcp.security.key-store",
87106
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
@@ -110,7 +129,9 @@ private[spark] case class SSLOptions(
110129
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
111130
* are supported by the current Java security provider for this protocol.
112131
*/
113-
private val supportedAlgorithms: Set[String] = {
132+
private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
133+
Set()
134+
} else {
114135
var context: SSLContext = null
115136
try {
116137
context = SSLContext.getInstance(protocol.orNull)
@@ -133,7 +154,11 @@ private[spark] case class SSLOptions(
133154
logDebug(s"Discarding unsupported cipher $cipher")
134155
}
135156

136-
enabledAlgorithms & providerAlgorithms
157+
val supported = enabledAlgorithms & providerAlgorithms
158+
require(supported.nonEmpty || sys.env.contains("SPARK_TESTING"),
159+
"SSLContext does not support any of the enabled algorithms: " +
160+
enabledAlgorithms.mkString(","))
161+
supported
137162
}
138163

139164
/** Returns a string representation of this SSLOptions with all the passwords masked. */
@@ -153,9 +178,12 @@ private[spark] object SSLOptions extends Logging {
153178
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
154179
* $ - `[ns].keyStorePassword` - a password to the key-store file
155180
* $ - `[ns].keyPassword` - a password to the private key
181+
* $ - `[ns].keyStoreType` - the type of the key-store
182+
* $ - `[ns].needClientAuth` - whether SSL needs client authentication
156183
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
157184
* directory
158185
* $ - `[ns].trustStorePassword` - a password to the trust-store file
186+
* $ - `[ns].trustStoreType` - the type of trust-store
159187
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
160188
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
161189
*
@@ -183,12 +211,21 @@ private[spark] object SSLOptions extends Logging {
183211
val keyPassword = conf.getOption(s"$ns.keyPassword")
184212
.orElse(defaults.flatMap(_.keyPassword))
185213

214+
val keyStoreType = conf.getOption(s"$ns.keyStoreType")
215+
.orElse(defaults.flatMap(_.keyStoreType))
216+
217+
val needClientAuth =
218+
conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))
219+
186220
val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
187221
.orElse(defaults.flatMap(_.trustStore))
188222

189223
val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
190224
.orElse(defaults.flatMap(_.trustStorePassword))
191225

226+
val trustStoreType = conf.getOption(s"$ns.trustStoreType")
227+
.orElse(defaults.flatMap(_.trustStoreType))
228+
192229
val protocol = conf.getOption(s"$ns.protocol")
193230
.orElse(defaults.flatMap(_.protocol))
194231

@@ -202,8 +239,11 @@ private[spark] object SSLOptions extends Logging {
202239
keyStore,
203240
keyStorePassword,
204241
keyPassword,
242+
keyStoreType,
243+
needClientAuth,
205244
trustStore,
206245
trustStorePassword,
246+
trustStoreType,
207247
protocol,
208248
enabledAlgorithms)
209249
}

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
244244
// the default SSL configuration - it will be used by all communication layers unless overwritten
245245
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
246246

247-
// SSL configuration for different communication layers - they can override the default
248-
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
249-
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
250-
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
251-
252-
logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
253-
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
254-
247+
// SSL configuration for the file server. This is used by Utils.setupSecureURLConnection().
248+
val fileServerSSLOptions = getSSLOptions("fs")
255249
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
256250
val trustStoreManagers =
257251
for (trustStore <- fileServerSSLOptions.trustStore) yield {
@@ -292,6 +286,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
292286
(None, None)
293287
}
294288

289+
def getSSLOptions(module: String): SSLOptions = {
290+
val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
291+
logDebug(s"Created SSL options for $module: $opts")
292+
opts
293+
}
294+
295295
/**
296296
* Split a comma separated String, filter out any empty items, and return a Set of strings
297297
*/

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ private[deploy] object DeployMessages {
4141
worker: RpcEndpointRef,
4242
cores: Int,
4343
memory: Int,
44-
webUiPort: Int,
45-
publicAddress: String)
44+
workerWebUiUrl: String)
4645
extends DeployMessage {
4746
Utils.checkHost(host, "Required hostname")
4847
assert (port > 0)

0 commit comments

Comments
 (0)