Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ private[spark] class AppClient(
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")

private val receiveAndReplyMaxPoolSize = conf.getInt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider adding a JIRA to follow up and document this new param. Also might be good to have a comment explaining why 3 is the default (at first read through I'm not sure why it is).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping @zsxwing could chime in and help me understand the use cases for this. I just arbitrarily chose 3 because it didn't seem to me like these messages would come in a flurry and need many threads, but I don't know for sure. Maybe a configurable max is also not necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a configurable max is not necessary and a small thread pool size would be enough. There are only two requests using it: RequestExecutors and KillExecutors. These messages are used in dynamic allocation. The request rate is very slow.

"spark.appclient.receiveAndReply.maxThreads", 3)

private val receiveAndReplyThreadPool = new ThreadPoolExecutor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use org.apache.spark.util.ThreadUtils.newDaemonCachedThreadPool instead.

1,
receiveAndReplyMaxPoolSize,
60L, TimeUnit.SECONDS,
new SynchronousQueue[Runnable](),
ThreadUtils.namedThreadFactory("appclient-receive-and-reply-threadpool"))

override def onStart(): Unit = {
try {
registerWithMaster(1)
Expand Down Expand Up @@ -200,15 +210,37 @@ private[spark] class AppClient(

case r: RequestExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](r))
case Some(m) =>
receiveAndReplyThreadPool.execute(new Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code (the new runnable sending reply) seems to be duplicated a few times, maybe factor it out into a helper function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I was trying to put these 2 calls into something like Utils.tryWithSafeFinally with blocks as arguments but the call ended up looking a little confusing as to what was happening.

maybe just a regular function like this would be better, although a little less flexible

private def receiveAndReplyAsync[T](masterRef: RpcEndpointRef, context: RpcCallContext,
                                     msg: T): Unit = {
  // execute ask and reply in thread pool
  ..

override def run(): Unit = {
try {
context.reply(m.askWithRetry[Boolean](r))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(t) =>
context.sendFailure(t)
}
}
})
case None =>
logWarning("Attempted to request executors before registering with Master.")
context.reply(false)
}

case k: KillExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](k))
case Some(m) =>
receiveAndReplyThreadPool.execute(new Runnable {
override def run(): Unit = {
try {
context.reply(m.askWithRetry[Boolean](k))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(t) =>
context.sendFailure(t)
}
}
})
case None =>
logWarning("Attempted to kill executors before registering with Master.")
context.reply(false)
Expand Down Expand Up @@ -252,6 +284,7 @@ private[spark] class AppClient(
registrationRetryThread.shutdownNow()
registerMasterFutures.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
receiveAndReplyThreadPool.shutdownNow()
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.client

import org.apache.spark._
import org.apache.spark.deploy.{Command, ApplicationDescription}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, Master}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._

import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import scala.concurrent.duration._

/**
* End-to-end tests for application client in standalone mode.
*/
class AppClientSuite
extends SparkFunSuite
with LocalSparkContext
with BeforeAndAfterAll {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can put these lines in the same line


private val numWorkers = 2
private val conf = new SparkConf()
private val securityManager = new SecurityManager(conf)

private var masterRpcEnv: RpcEnv = null
private var workerRpcEnvs: Seq[RpcEnv] = null
private var master: Master = null
private var workers: Seq[Worker] = null

/**
* Start the local cluster.
* Note: local-cluster mode is insufficient because we want a reference to the Master.
*/
override def beforeAll(): Unit = {
super.beforeAll()
masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
workerRpcEnvs = (0 until numWorkers).map { i =>
RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
}
master = makeMaster()
workers = makeWorkers(10, 2048)
// Wait until all workers register with master successfully
eventually(timeout(60.seconds), interval(10.millis)) {
assert(getMasterState.workers.size === numWorkers)
}
}

override def afterAll(): Unit = {
workerRpcEnvs.foreach(_.shutdown())
masterRpcEnv.shutdown()
workers.foreach(_.stop())
master.stop()
workerRpcEnvs = null
masterRpcEnv = null
workers = null
master = null
super.afterAll()
}

test("interface methods of AppClient using local Master") {
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)

ci.client.start()

// Client should connect with one Master which registers the application
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection")
assert(apps.size === 1, "master should have 1 registered app")
}

// Send message to Master to request Executors, verify request by change in executor limit
val numExecutorsRequested = 1
assert( ci.client.requestTotalExecutors(numExecutorsRequested) )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant space


eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed")
}

// Send request to kill executor, verify request was made
assert {
val apps = getApplications()
val executorId: String = apps.head.executors.head._2.fullId
ci.client.killExecutors(Seq(executorId))
}

// Issue stop command for Client to disconnect from Master
ci.client.stop()

// Verify Client is marked dead and unregistered from Master
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead")
assert(apps.isEmpty, "master should have 0 registered apps")
}
}

test("request from AppClient before initialized with master") {
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to tell if the client fails to connect to the master? It does log an error, but if I wanted to check to see if the AppClient endpoint was registered after calling start(), there doesn't seem to be a way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the private volatile var registered lets us check if we suggested at registering with a master is that what you were looking for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I was trying to think of making a test for AppClient in the case of an unreachable Master. The rpc env logs an exception right away, but the only way to tell from outside the AppClient is to set a listener with a connected callback, then poll to see if it ever gets hit. Maybe this isn't really an issue in practice though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only way to tell from outside the AppClient is to set a listener with a connected callback, then poll to see if it ever gets hit. Maybe this isn't really an issue in practice though.

I think that's fine.


// requests to master should fail immediately
assert(ci.client.requestTotalExecutors(3) === false)
}

// ===============================
// | Utility methods for testing |
// ===============================

/** Return a SparkConf for applications that want to talk to our Master. */
private def appConf: SparkConf = {
new SparkConf()
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
.set("spark.executor.memory", "256m")
}

/** Make a master to which our application will send executor requests. */
private def makeMaster(): Master = {
val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
master
}

/** Make a few workers that talk to our master. */
private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
(0 until numWorkers).map { i =>
val rpcEnv = workerRpcEnvs(i)
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
worker
}
}

/** Get the Master state */
private def getMasterState: MasterStateResponse = {
master.self.askWithRetry[MasterStateResponse](RequestMasterState)
}

/** Get the applictions that are active from Master */
private def getApplications(): Seq[ApplicationInfo] = {
getMasterState.activeApps
}

/** Application Listener to collect events */
private class AppClientCollector extends AppClientListener with Logging {
val connectedIdList = new ArrayBuffer[String] with SynchronizedBuffer[String]
@volatile var disconnectedCount: Int = 0
val deadReasonList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val execRemovedList = new ArrayBuffer[String] with SynchronizedBuffer[String]

def connected(id: String): Unit = {
connectedIdList += id
}

def disconnected(): Unit = {
disconnectedCount += 1
}

def dead(reason: String): Unit = {
deadReasonList += reason
}

def executorAdded(id: String, workerId: String, hostPort: String,
cores: Int, memory: Int): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indention

execAddedList += id
}

def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
execRemovedList += id
}
}

/** Create AppClient and supporting objects */
private class AppClientInst(masterUrl: String) {
val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, securityManager)
private val cmd = new Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"),
List(), Map(), Seq(), Seq(), Seq())
private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored")
val listener = new AppClientCollector
val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf)
}

}