Skip to content

Commit 5f8e70d

Browse files
committed
Merge remote-tracking branch 'upstream/master' into native-ddl
Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2 parents 0eb1687 + 07f92ef commit 5f8e70d

File tree

104 files changed

+2235
-1025
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+2235
-1025
lines changed

.github/PULL_REQUEST_TEMPLATE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
(Please fill in changes proposed in this fix)
44

55

6-
## How was the this patch tested?
6+
## How was this patch tested?
77

88
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
99

R/pkg/NAMESPACE

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ export("print.jobj")
1313
# MLlib integration
1414
exportMethods("glm",
1515
"predict",
16-
"summary")
16+
"summary",
17+
"kmeans",
18+
"fitted")
1719

1820
# Job group lifecycle management methods
1921
export("setJobGroup",

R/pkg/R/generics.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,3 +1160,11 @@ setGeneric("predict", function(object, ...) { standardGeneric("predict") })
11601160
#' @rdname rbind
11611161
#' @export
11621162
setGeneric("rbind", signature = "...")
1163+
1164+
#' @rdname kmeans
1165+
#' @export
1166+
setGeneric("kmeans")
1167+
1168+
#' @rdname fitted
1169+
#' @export
1170+
setGeneric("fitted")

R/pkg/R/mllib.R

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,11 @@ setMethod("predict", signature(object = "PipelineModel"),
104104
setMethod("summary", signature(object = "PipelineModel"),
105105
function(object, ...) {
106106
modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
107-
"getModelName", object@model)
107+
"getModelName", object@model)
108108
features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
109-
"getModelFeatures", object@model)
109+
"getModelFeatures", object@model)
110110
coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
111-
"getModelCoefficients", object@model)
111+
"getModelCoefficients", object@model)
112112
if (modelName == "LinearRegressionModel") {
113113
devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
114114
"getModelDevianceResiduals", object@model)
@@ -119,10 +119,76 @@ setMethod("summary", signature(object = "PipelineModel"),
119119
colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)")
120120
rownames(coefficients) <- unlist(features)
121121
return(list(devianceResiduals = devianceResiduals, coefficients = coefficients))
122-
} else {
122+
} else if (modelName == "LogisticRegressionModel") {
123123
coefficients <- as.matrix(unlist(coefficients))
124124
colnames(coefficients) <- c("Estimate")
125125
rownames(coefficients) <- unlist(features)
126126
return(list(coefficients = coefficients))
127+
} else if (modelName == "KMeansModel") {
128+
modelSize <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
129+
"getKMeansModelSize", object@model)
130+
cluster <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
131+
"getKMeansCluster", object@model, "classes")
132+
k <- unlist(modelSize)[1]
133+
size <- unlist(modelSize)[-1]
134+
coefficients <- t(matrix(coefficients, ncol = k))
135+
colnames(coefficients) <- unlist(features)
136+
rownames(coefficients) <- 1:k
137+
return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster)))
138+
} else {
139+
stop(paste("Unsupported model", modelName, sep = " "))
140+
}
141+
})
142+
143+
#' Fit a k-means model
144+
#'
145+
#' Fit a k-means model, similarly to R's kmeans().
146+
#'
147+
#' @param x DataFrame for training
148+
#' @param centers Number of centers
149+
#' @param iter.max Maximum iteration number
150+
#' @param algorithm Algorithm choosen to fit the model
151+
#' @return A fitted k-means model
152+
#' @rdname kmeans
153+
#' @export
154+
#' @examples
155+
#'\dontrun{
156+
#' model <- kmeans(x, centers = 2, algorithm="random")
157+
#'}
158+
setMethod("kmeans", signature(x = "DataFrame"),
159+
function(x, centers, iter.max = 10, algorithm = c("random", "k-means||")) {
160+
columnNames <- as.array(colnames(x))
161+
algorithm <- match.arg(algorithm)
162+
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", x@sdf,
163+
algorithm, iter.max, centers, columnNames)
164+
return(new("PipelineModel", model = model))
165+
})
166+
167+
#' Get fitted result from a model
168+
#'
169+
#' Get fitted result from a model, similarly to R's fitted().
170+
#'
171+
#' @param object A fitted MLlib model
172+
#' @return DataFrame containing fitted values
173+
#' @rdname fitted
174+
#' @export
175+
#' @examples
176+
#'\dontrun{
177+
#' model <- kmeans(trainingData, 2)
178+
#' fitted.model <- fitted(model)
179+
#' showDF(fitted.model)
180+
#'}
181+
setMethod("fitted", signature(object = "PipelineModel"),
182+
function(object, method = c("centers", "classes"), ...) {
183+
modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
184+
"getModelName", object@model)
185+
186+
if (modelName == "KMeansModel") {
187+
method <- match.arg(method)
188+
fittedResult <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
189+
"getKMeansCluster", object@model, method)
190+
return(dataFrame(fittedResult))
191+
} else {
192+
stop(paste("Unsupported model", modelName, sep = " "))
127193
}
128194
})

R/pkg/inst/tests/testthat/test_mllib.R

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,31 @@ test_that("summary works on base GLM models", {
113113
baseSummary <- summary(baseModel)
114114
expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
115115
})
116+
117+
test_that("kmeans", {
118+
newIris <- iris
119+
newIris$Species <- NULL
120+
training <- suppressWarnings(createDataFrame(sqlContext, newIris))
121+
122+
# Cache the DataFrame here to work around the bug SPARK-13178.
123+
cache(training)
124+
take(training, 1)
125+
126+
model <- kmeans(x = training, centers = 2)
127+
sample <- take(select(predict(model, training), "prediction"), 1)
128+
expect_equal(typeof(sample$prediction), "integer")
129+
expect_equal(sample$prediction, 1)
130+
131+
# Test stats::kmeans is working
132+
statsModel <- kmeans(x = newIris, centers = 2)
133+
expect_equal(sort(unique(statsModel$cluster)), c(1, 2))
134+
135+
# Test fitted works on KMeans
136+
fitted.model <- fitted(model)
137+
expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1))
138+
139+
# Test summary works on KMeans
140+
summary.model <- summary(model)
141+
cluster <- summary.model$cluster
142+
expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))
143+
})

core/src/main/resources/org/apache/spark/ui/static/historypage-template.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
<tbody>
6565
{{#applications}}
6666
<tr>
67-
<td class="rowGroupColumn"><a href="/history/{{id}}/{{num}}/jobs/">{{id}}</a></td>
67+
<td class="rowGroupColumn"><span title="{{id}}"><a href="/history/{{id}}/{{num}}/jobs/">{{id}}</a></span></td>
6868
<td class="rowGroupColumn">{{name}}</td>
6969
{{#attempts}}
7070
<td class="attemptIDSpan"><a href="/history/{{id}}/{{attemptId}}/jobs/">{{attemptId}}</a></td>

core/src/main/resources/org/apache/spark/ui/static/historypage.js

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,22 @@ function formatDuration(milliseconds) {
3737
return hours.toFixed(1) + " h";
3838
}
3939

40+
function makeIdNumeric(id) {
41+
var strs = id.split("_");
42+
if (strs.length < 3) {
43+
return id;
44+
}
45+
var appSeqNum = strs[2];
46+
var resl = strs[0] + "_" + strs[1] + "_";
47+
var diff = 10 - appSeqNum.length;
48+
while (diff > 0) {
49+
resl += "0"; // padding 0 before the app sequence number to make sure it has 10 characters
50+
diff--;
51+
}
52+
resl += appSeqNum;
53+
return resl;
54+
}
55+
4056
function formatDate(date) {
4157
return date.split(".")[0].replace("T", " ");
4258
}
@@ -62,6 +78,21 @@ jQuery.extend( jQuery.fn.dataTableExt.oSort, {
6278
}
6379
} );
6480

81+
jQuery.extend( jQuery.fn.dataTableExt.oSort, {
82+
"appid-numeric-pre": function ( a ) {
83+
var x = a.match(/title="*(-?[0-9a-zA-Z\-\_]+)/)[1];
84+
return makeIdNumeric(x);
85+
},
86+
87+
"appid-numeric-asc": function ( a, b ) {
88+
return ((a < b) ? -1 : ((a > b) ? 1 : 0));
89+
},
90+
91+
"appid-numeric-desc": function ( a, b ) {
92+
return ((a < b) ? 1 : ((a > b) ? -1 : 0));
93+
}
94+
} );
95+
6596
$(document).ajaxStop($.unblockUI);
6697
$(document).ajaxStart(function(){
6798
$.blockUI({ message: '<h3>Loading history summary...</h3>'});
@@ -109,7 +140,7 @@ $(document).ready(function() {
109140
var selector = "#history-summary-table";
110141
var conf = {
111142
"columns": [
112-
{name: 'first'},
143+
{name: 'first', type: "appid-numeric"},
113144
{name: 'second'},
114145
{name: 'third'},
115146
{name: 'fourth'},

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,31 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
503503
set("spark.executor.instances", value)
504504
}
505505
}
506+
507+
if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
508+
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
509+
"instead use \"yarn\" with specified deploy mode."
510+
511+
get("spark.master") match {
512+
case "yarn-cluster" =>
513+
logWarning(warning)
514+
set("spark.master", "yarn")
515+
set("spark.submit.deployMode", "cluster")
516+
case "yarn-client" =>
517+
logWarning(warning)
518+
set("spark.master", "yarn")
519+
set("spark.submit.deployMode", "client")
520+
case _ => // Any other unexpected master will be checked when creating scheduler backend.
521+
}
522+
}
523+
524+
if (contains("spark.submit.deployMode")) {
525+
get("spark.submit.deployMode") match {
526+
case "cluster" | "client" =>
527+
case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
528+
"\"client\".")
529+
}
530+
}
506531
}
507532

508533
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
237237
def jars: Seq[String] = _jars
238238
def files: Seq[String] = _files
239239
def master: String = _conf.get("spark.master")
240+
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
240241
def appName: String = _conf.get("spark.app.name")
241242

242243
private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
@@ -375,10 +376,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
375376
}
376377

377378
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
378-
// yarn-standalone is deprecated, but still supported
379-
if ((master == "yarn-cluster" || master == "yarn-standalone") &&
380-
!_conf.contains("spark.yarn.app.id")) {
381-
throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
379+
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
380+
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
382381
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
383382
}
384383

@@ -414,7 +413,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
414413
}
415414
}
416415

417-
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
416+
if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")
418417

419418
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
420419
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
@@ -491,7 +490,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
491490
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
492491

493492
// Create and start the scheduler
494-
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
493+
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
495494
_schedulerBackend = sched
496495
_taskScheduler = ts
497496
_dagScheduler = new DAGScheduler(this)
@@ -1590,10 +1589,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15901589
key = uri.getScheme match {
15911590
// A JAR file which exists only on the driver node
15921591
case null | "file" =>
1593-
// yarn-standalone is deprecated, but still supported
1594-
if (SparkHadoopUtil.get.isYarnMode() &&
1595-
(master == "yarn-standalone" || master == "yarn-cluster")) {
1596-
// In order for this to work in yarn-cluster mode the user must specify the
1592+
if (master == "yarn" && deployMode == "cluster") {
1593+
// In order for this to work in yarn cluster mode the user must specify the
15971594
// --addJars option to the client to upload the file into the distributed cache
15981595
// of the AM to make it show up in the current working directory.
15991596
val fileName = new Path(uri.getPath).getName()
@@ -2319,7 +2316,8 @@ object SparkContext extends Logging {
23192316
*/
23202317
private def createTaskScheduler(
23212318
sc: SparkContext,
2322-
master: String): (SchedulerBackend, TaskScheduler) = {
2319+
master: String,
2320+
deployMode: String): (SchedulerBackend, TaskScheduler) = {
23232321
import SparkMasterRegex._
23242322

23252323
// When running locally, don't try to re-execute tasks on failure.
@@ -2381,11 +2379,7 @@ object SparkContext extends Logging {
23812379
}
23822380
(backend, scheduler)
23832381

2384-
case "yarn-standalone" | "yarn-cluster" =>
2385-
if (master == "yarn-standalone") {
2386-
logWarning(
2387-
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
2388-
}
2382+
case "yarn" if deployMode == "cluster" =>
23892383
val scheduler = try {
23902384
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
23912385
val cons = clazz.getConstructor(classOf[SparkContext])
@@ -2410,7 +2404,7 @@ object SparkContext extends Logging {
24102404
scheduler.initialize(backend)
24112405
(backend, scheduler)
24122406

2413-
case "yarn-client" =>
2407+
case "yarn" if deployMode == "client" =>
24142408
val scheduler = try {
24152409
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
24162410
val cons = clazz.getConstructor(classOf[SparkContext])
@@ -2451,7 +2445,7 @@ object SparkContext extends Logging {
24512445
case zkUrl if zkUrl.startsWith("zk://") =>
24522446
logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
24532447
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
2454-
createTaskScheduler(sc, "mesos://" + zkUrl)
2448+
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
24552449

24562450
case _ =>
24572451
throw new SparkException("Could not parse Master URL: '" + master + "'")

0 commit comments

Comments
 (0)