Skip to content

Commit c836fb9

Browse files
Merge remote-tracking branch 'apache/master' into netMem-9104
2 parents 5ad7a6a + 2cef1bb commit c836fb9

File tree

62 files changed

+1512
-764
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1512
-764
lines changed

R/pkg/R/mllib.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ setMethod("summary", signature(x = "PipelineModel"),
9292
function(x, ...) {
9393
features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
9494
"getModelFeatures", x@model)
95-
weights <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
96-
"getModelWeights", x@model)
97-
coefficients <- as.matrix(unlist(weights))
95+
coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
96+
"getModelCoefficients", x@model)
97+
coefficients <- as.matrix(unlist(coefficients))
9898
colnames(coefficients) <- c("Estimate")
9999
rownames(coefficients) <- unlist(features)
100100
return(list(coefficients = coefficients))

core/pom.xml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@
262262
<dependency>
263263
<groupId>org.tachyonproject</groupId>
264264
<artifactId>tachyon-client</artifactId>
265-
<version>0.7.1</version>
265+
<version>0.8.1</version>
266266
<exclusions>
267267
<exclusion>
268268
<groupId>org.apache.hadoop</groupId>
@@ -284,10 +284,6 @@
284284
<groupId>org.tachyonproject</groupId>
285285
<artifactId>tachyon-underfs-glusterfs</artifactId>
286286
</exclusion>
287-
<exclusion>
288-
<groupId>org.tachyonproject</groupId>
289-
<artifactId>tachyon-underfs-s3</artifactId>
290-
</exclusion>
291287
</exclusions>
292288
</dependency>
293289
<dependency>

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ object SparkEnv extends Logging {
252252

253253
// Create the ActorSystem for Akka and get the port it binds to.
254254
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
255-
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
255+
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
256+
clientMode = !isDriver)
256257
val actorSystem: ActorSystem =
257258
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
258259
rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
@@ -262,9 +263,11 @@ object SparkEnv extends Logging {
262263
}
263264

264265
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
266+
// In the non-driver case, the RPC env's address may be null since it may not be listening
267+
// for incoming connections.
265268
if (isDriver) {
266269
conf.set("spark.driver.port", rpcEnv.address.port.toString)
267-
} else {
270+
} else if (rpcEnv.address != null) {
268271
conf.set("spark.executor.port", rpcEnv.address.port.toString)
269272
}
270273

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ private[spark] class CoarseGrainedExecutorBackend(
4545
env: SparkEnv)
4646
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
4747

48-
Utils.checkHostPort(hostPort, "Expected hostport")
49-
5048
var executor: Executor = null
5149
@volatile var driver: Option[RpcEndpointRef] = None
5250

@@ -80,9 +78,8 @@ private[spark] class CoarseGrainedExecutorBackend(
8078
}
8179

8280
override def receive: PartialFunction[Any, Unit] = {
83-
case RegisteredExecutor =>
81+
case RegisteredExecutor(hostname) =>
8482
logInfo("Successfully registered with driver")
85-
val (hostname, _) = Utils.parseHostPort(hostPort)
8683
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
8784

8885
case RegisterExecutorFailed(message) =>
@@ -163,7 +160,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
163160
hostname,
164161
port,
165162
executorConf,
166-
new SecurityManager(executorConf))
163+
new SecurityManager(executorConf),
164+
clientMode = true)
167165
val driver = fetcher.setupEndpointRefByURI(driverUrl)
168166
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
169167
Seq[(String, String)](("spark.app.id", appId))
@@ -188,12 +186,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
188186
val env = SparkEnv.createExecutorEnv(
189187
driverConf, executorId, hostname, port, cores, isLocal = false)
190188

191-
// SparkEnv sets spark.driver.port so it shouldn't be 0 anymore.
192-
val boundPort = env.conf.getInt("spark.executor.port", 0)
193-
assert(boundPort != 0)
194-
195-
// Start the CoarseGrainedExecutorBackend endpoint.
196-
val sparkHostPort = hostname + ":" + boundPort
189+
// SparkEnv will set spark.executor.port if the rpc env is listening for incoming
190+
// connections (e.g., if it's using akka). Otherwise, the executor is running in
191+
// client mode only, and does not accept incoming connections.
192+
val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
193+
hostname + ":" + port
194+
}.orNull
197195
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
198196
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
199197
workerUrl.foreach { url =>

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ private[spark] object RpcEnv {
4343
host: String,
4444
port: Int,
4545
conf: SparkConf,
46-
securityManager: SecurityManager): RpcEnv = {
46+
securityManager: SecurityManager,
47+
clientMode: Boolean = false): RpcEnv = {
4748
// Using Reflection to create the RpcEnv to avoid to depend on Akka directly
48-
val config = RpcEnvConfig(conf, name, host, port, securityManager)
49+
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
4950
getRpcEnvFactory(conf).create(config)
5051
}
5152
}
@@ -139,4 +140,5 @@ private[spark] case class RpcEnvConfig(
139140
name: String,
140141
host: String,
141142
port: Int,
142-
securityManager: SecurityManager)
143+
securityManager: SecurityManager,
144+
clientMode: Boolean)

core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
5555
private var stopped = false
5656

5757
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
58-
val addr = new RpcEndpointAddress(nettyEnv.address.host, nettyEnv.address.port, name)
58+
val addr = RpcEndpointAddress(nettyEnv.address, name)
5959
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
6060
synchronized {
6161
if (stopped) {

0 commit comments

Comments
 (0)