Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
97523e0
[SPARK-6980] Akka ask timeout description refactored to RPC layer
BryanCutler May 15, 2015
78a2c0a
[SPARK-6980] Using RpcTimeout.awaitResult for future in AppClient now
BryanCutler May 16, 2015
5b59a44
[SPARK-6980] Added some RpcTimeout unit tests
BryanCutler May 16, 2015
49f9f04
[SPARK-6980] Minor cleanup and scala style fix
BryanCutler May 16, 2015
23d2f26
[SPARK-6980] Fixed await result not being handled by RpcTimeout
BryanCutler May 18, 2015
a294569
[SPARK-6980] Added creation of RpcTimeout with Seq of property keys
BryanCutler May 19, 2015
f74064d
Retrieving properties from property list using iterator and while loo…
May 21, 2015
0ee5642
Changing the loop condition to halt at the first match in the propert…
May 21, 2015
4be3a8d
Modifying loop condition to find property match
May 24, 2015
b7fb99f
Merge pull request #2 from hardmettle/configTimeoutUpdates_6980
BryanCutler May 24, 2015
c07d05c
Merge branch 'master' into configTimeout-6980-tmp
BryanCutler Jun 3, 2015
235919b
[SPARK-6980] Resolved conflicts after master merge
BryanCutler Jun 3, 2015
2f94095
[SPARK-6980] Added addMessageIfTimeout for when a Future is completed…
BryanCutler Jun 4, 2015
1607a5f
[SPARK-6980] Changed addMessageIfTimeout to PartialFunction, cleanup …
BryanCutler Jun 8, 2015
4351c48
[SPARK-6980] Added UT for addMessageIfTimeout, cleaned up UTs
BryanCutler Jun 10, 2015
7774d56
[SPARK-6980] Cleaned up UT imports
BryanCutler Jun 11, 2015
995d196
[SPARK-6980] Cleaned up import ordering, comments, spacing from PR fe…
BryanCutler Jun 11, 2015
d3754d1
[SPARK-6980] Added akkaConf to prevent dead letter logging
BryanCutler Jun 11, 2015
08f5afc
[SPARK-6980] Added UT for constructing RpcTimeout with default value
BryanCutler Jun 11, 2015
1b9beab
[SPARK-6980] Cleaned up import ordering
BryanCutler Jun 12, 2015
2206b4d
[SPARK-6980] Added unit test for ask then immediat awaitReply
BryanCutler Jun 12, 2015
1517721
[SPARK-6980] RpcTimeout object scope should be private[spark]
BryanCutler Jun 15, 2015
1394de6
[SPARK-6980] Moved MessagePrefix to createRpcTimeoutException directly
BryanCutler Jun 15, 2015
c6cfd33
[SPARK-6980] Changed UT ask message timeout to explicitly intercept a…
BryanCutler Jun 23, 2015
b05d449
[SPARK-6980] Changed constructor to use val duration instead of gette…
BryanCutler Jun 23, 2015
fa6ed82
[SPARK-6980] Had to increase timeout on positive test case because a …
BryanCutler Jun 23, 2015
fadaf6f
[SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTim…
BryanCutler Jun 24, 2015
218aa50
[SPARK-6980] Corrected issues from feedback
BryanCutler Jun 24, 2015
039afed
[SPARK-6980] Corrected import organization
BryanCutler Jun 24, 2015
be11c4e
Merge branch 'master' into configTimeout-6980
BryanCutler Jun 24, 2015
7636189
[SPARK-6980] Fixed call to askWithReply in DAGScheduler to use RpcTim…
BryanCutler Jun 26, 2015
3a168c7
[SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite
BryanCutler Jun 26, 2015
3d8b1ff
[SPARK-6980] Cleaned up imports in AkkaRpcEnvSuite
BryanCutler Jun 26, 2015
287059a
[SPARK-6980] Removed extra import in AkkaRpcEnvSuite
BryanCutler Jun 26, 2015
7f4d78e
[SPARK-6980] Fixed scala style checks
BryanCutler Jun 26, 2015
6a1c50d
[SPARK-6980] Minor cleanup of test case
BryanCutler Jun 27, 2015
4e89c75
[SPARK-6980] Missed one usage of deprecated RpcUtils.askTimeout in Ya…
BryanCutler Jun 30, 2015
dbd5f73
[SPARK-6980] Changed RpcUtils askRpcTimeout and lookupRpcTimeout scop…
BryanCutler Jul 1, 2015
7bb70f1
Merge branch 'master' into configTimeout-6980
BryanCutler Jul 1, 2015
06afa53
[SPARK-6980] RpcTimeout class extends Serializable, was causing error…
BryanCutler Jul 2, 2015
46c8d48
[SPARK-6980] Changed RpcEnvSuite test to never reply instead of just …
BryanCutler Jul 2, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-6980] Akka ask timeout description refactored to RPC layer
  • Loading branch information
BryanCutler committed May 15, 2015
commit 97523e05c97dc0ea58465aad748357d8f2fc14f9
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout.duration)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)
val statusResponse = timeout.awaitResult(statusFuture)
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ private[spark] class AppClient(
if (actor != null) {
try {
val timeout = RpcUtils.askTimeout(conf)
val future = actor.ask(StopAppClient)(timeout)
Await.result(future, timeout)
val future = actor.ask(StopAppClient)(timeout.duration)
// TODO(bryanc) - RpcTimeout use awaitResult ???
Await.result(future, timeout.duration)
} catch {
case e: TimeoutException =>
logInfo("Stop request to Master timed out; it may already be shut down.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
Expand Down Expand Up @@ -940,8 +939,8 @@ private[deploy] object Master extends Logging {
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = RpcUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
val portsRequest = actor.ask(BoundPortsRequest)(timeout.duration)
val portsResponse = timeout.awaitResult(portsRequest).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import scala.concurrent.Await
import scala.xml.Node

import akka.pattern.ask
Expand All @@ -38,8 +37,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
val stateFuture = (master ? RequestMasterState)(timeout.duration).mapTo[MasterStateResponse]
val state = timeout.awaitResult(stateFuture)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import scala.concurrent.Await
import scala.xml.Node

import akka.pattern.ask
Expand All @@ -36,8 +35,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val timeout = parent.timeout

def getMasterState: MasterStateResponse = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
Await.result(stateFuture, timeout)
val stateFuture = (master ? RequestMasterState)(timeout.duration).mapTo[MasterStateResponse]
timeout.awaitResult(stateFuture)
}

override def renderJson(request: HttpServletRequest): JValue = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.deploy.worker.ui

import scala.concurrent.Await
import scala.xml.Node

import akka.pattern.ask
Expand All @@ -36,14 +35,14 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
private val timeout = parent.timeout

override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
val stateFuture = (workerActor ? RequestWorkerState)(timeout.duration).mapTo[WorkerStateResponse]
val workerState = timeout.awaitResult(stateFuture)
JsonProtocol.writeWorkerState(workerState)
}

def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
val stateFuture = (workerActor ? RequestWorkerState)(timeout.duration).mapTo[WorkerStateResponse]
val workerState = timeout.awaitResult(stateFuture)

val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
val runningExecutors = workerState.executors
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.rpc

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.reflect.ClassTag

import org.apache.spark.util.RpcUtils
Expand Down Expand Up @@ -52,7 +51,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
Expand Down Expand Up @@ -91,15 +90,15 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
var lastException: Exception = null
while (attempts < maxRetries) {
attempts += 1
try {
val future = ask[T](message, timeout)
val result = Await.result(future, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("Actor returned null")
}
Expand All @@ -110,7 +109,10 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}
Thread.sleep(retryWaitMs)

if (attempts < maxRetries) {
Thread.sleep(retryWaitMs)
}
}

throw new SparkException(
Expand Down
70 changes: 69 additions & 1 deletion core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.spark.rpc

import java.net.URI
import java.util.concurrent.TimeoutException

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.language.postfixOps

Expand Down Expand Up @@ -94,7 +97,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout.duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to use timeout.awaitResult here, so we get the better exception msg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope - I must have missed that one, it's fixed now.

}

/**
Expand Down Expand Up @@ -182,3 +185,68 @@ private[spark] object RpcAddress {
RpcAddress(host, port)
}
}


/**
* Associates a timeout with a configuration property so that a TimeoutException can be
* traced back to the controlling property.
* @param timeout timeout duration in seconds
* @param description description to be displayed in a timeout exception
*/
private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {

/** Get the timeout duration */
def duration: FiniteDuration = timeout
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary or should we just use val duration: FiniteDuration in the constructor like val conf: String?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think you should just change the constructor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that sounds good. One other minor question @squito , the constructor has the parameter conf: String while apply has conf: SparkConf and timeoutProp: String which seems a little unclear. I think it might be better to stay consistent and have the constructor say timeoutProp instead of conf, like this

class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)

what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree that would be more consistent, good point


/** Get the message associated with this timeout */
def message: String = description

/** Amends the standard message of TimeoutException to include the description */
def amend(te: TimeoutException): TimeoutException = {
new TimeoutException(te.getMessage() + " " + description)
}

/** Wait on a future result to catch and amend a TimeoutException */
def awaitResult[T](future: Future[T]): T = {
try {
Await.result(future, duration)
}
catch {
case te: TimeoutException =>
throw amend(te)
}
}

// TODO(bryanc) wrap Await.ready also
}

object RpcTimeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spark]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private[this] val messagePrefix = "This timeout is controlled by "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of putting messagePrefix in each of the apply methods, what if its just used by RpcTimeout itself when building its description? Then the second arg to the RpctTimeout constructor could just be the property, not the full description.

The reason I'm asking is because in the tests, where you directly construct an RpcTimeout, the messagePrefix is missing, so if you look at the exception its kinda weird. And I think that the real msgs are missing a "." before the message prefix, so it would be nice to look at in the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point about the message in the tests. Way back when I first made the unit test for AkkaUtils the config was being read using conf.getTimeAsSeconds like here and if I tried to configure the tests in the millisecond range, it wouldn't work. Because of that I couldn't create a RpcTimeout with apply and just created a simple message without the messagePrefix.

I don't think this is the case anymore and maybe we should use conf.getTimeAsMs to read the property values? I think it would then automatically convert seconds to milliseconds, and I could fix the tests to use this instead and get the proper message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is related to whether or not we get the time as seconds or millis. I'm thinking that we still leave the FiniteDuration in the constructor, but just change that second param to the constructor to be just the name of the conf, not the full description. eg. :

diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 5149cf1..e0de397 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -197,19 +197,16 @@ private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
  * Associates a timeout with a description so that a when a TimeoutException occurs, additional
  * context about the timeout can be amended to the exception message.
  * @param timeout timeout duration in seconds
- * @param description description to be displayed in a timeout exception
+ * @param conf the configuration parameter that controls this timeout
  */
-private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {
+private[spark] class RpcTimeout(timeout: FiniteDuration, val conf: String) {

   /** Get the timeout duration */
   def duration: FiniteDuration = timeout

-  /** Get the message associated with this timeout */
-  def message: String = description
-
   /** Amends the standard message of TimeoutException to include the description */
   private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
-    new RpcTimeoutException(te.getMessage() + " " + description, te)
+    new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + conf, te)
   }

(and corresponding changes to the apply methods.)

I do kinda wish that the values were read w/ conf.getTimeAsMs to allow finer grained resolution, but (a) I'd be nervous about changing any default and (b) in any case that should just be a separate issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact we could also mark the constructor as private[rpc] to be clear that normal use should really go through the RpcTimeout.apply methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're saying. I kept the messagePrefix separated so that if there was ever a need to create a RpcTimeout without a conf, it would still be possible and the message could be fit accordingly. But not a big deal, I'll put in the above patch. Right now it's always tied to a conf, so it does make sense.

Quick Scala question: would it be better to make the constructor use val timeout: FiniteDuration and remove that getter method too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @throws NoSuchElementException if property is not set
*/
def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
new RpcTimeout(timeout, messagePrefix + timeoutProp)
}

/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* Uses the given default value if property is not set
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @param defaultValue default timeout value in seconds if property not found
*/
def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be conf.getTimeAsMs instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned above, I don't think so -- lets stick w/ the current behavior of using conf.getTimeAsSeconds for now.

new RpcTimeout(timeout, messagePrefix + timeoutProp)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.rpc.akka
import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand Down Expand Up @@ -212,7 +211,7 @@ private[spark] class AkkaRpcEnv private[akka] (

override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
import actorSystem.dispatcher
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout.duration).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though this doesn't follow the same pattern of Await.result, can we catch the timeout here too? (I'm not 100% sure if its possible ...)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel there are two ways :

  1. If the return type changes from Future[RpcEndpointRef] to RpcEndpointRef then only the application of Await.result is possible else it will not be possible.
  2. Create one more overloaded function of awaitResult in RpcTimeout whose return type wraps a Future over RpcEndpointRef while returning the result.

In my opinion 2nd one is the best solution as the previous one will require modifications where ever the usages are.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout is a little trickier for Futures. From what I understand, creating the future is non-blocking, so we can't just call Await.result. If the Future isn't fulfilled within the timeout, its marked as failed, then sometime later if the future is acted upon, the TimeoutException is thrown.

I think we might be able to use the andThen method described here: http://doc.akka.io/docs/akka/snapshot/scala/futures.html. We would need to add a function to RpcTimeout that takes in a Future[T], then applies andThen to it which checks if the future completed with failure, then returns a Future[T]. The only problem I see here is that if the andThen amends a TimeoutException message, then awaitResult is called, the message could be amended twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what you are describing sounds reasonable. We certainly don't want to change the return type or await on the result here.

You could potentially fix the doubly-appended msg by throwing a subclass of TimeoutException, which then you could check for before you amend a msg. However, this is really begging the question -- if there is one timeout passed in here, and another is passed to an Await.result later, how is akka using them both? I suppose it will timeout on whichever one is shorter? I don't see anything very definitive in the docs, perhaps we should confirm on akka-user. That would inform how we should do the error handling.

(Honestly I thiink it will also be fine to not stress too much about this case, it may not be worth it.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to do some research on this and see what our options are while still keeping the same return type.

map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
}

Expand Down Expand Up @@ -293,9 +292,9 @@ private[akka] class AkkaRpcEndpointRef(
actorRef ! AkkaMessage(message, false)
}

override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
import scala.concurrent.ExecutionContext.Implicits.global
actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here about catching the timeout

case msg @ AkkaMessage(message, reply) =>
if (reply) {
logError(s"Receive $msg but the sender cannot reply")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import scala.concurrent.{Await, Future}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.spark.rpc.RpcEndpointRef
Expand Down Expand Up @@ -105,7 +105,7 @@ class BlockManagerMaster(
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
timeout.awaitResult(future)
}
}

Expand All @@ -117,7 +117,7 @@ class BlockManagerMaster(
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
timeout.awaitResult(future)
}
}

Expand All @@ -131,7 +131,7 @@ class BlockManagerMaster(
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
timeout.awaitResult(future)
}
}

Expand Down Expand Up @@ -169,7 +169,7 @@ class BlockManagerMaster(
val response = driverEndpoint.
askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout)
val result = timeout.awaitResult(Future.sequence(futures))
if (result == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
Expand All @@ -192,7 +192,7 @@ class BlockManagerMaster(
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
Await.result(future, timeout)
timeout.awaitResult(future)
}

/** Stop the driver endpoint, called only on the Spark driver node */
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.util

import org.apache.spark.rpc.RpcTimeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ordering

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration

import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
Expand Down Expand Up @@ -147,7 +147,7 @@ private[spark] object AkkaUtils extends Logging {
def askWithReply[T](
message: Any,
actor: ActorRef,
timeout: FiniteDuration): T = {
timeout: RpcTimeout): T = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest reverting the changes to AkkaUtils. These methods you changed won't be called any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I disagree. No harm in making these changes. If we're really certain AkkaUtils wont' be used anymore, then we'll delete it ... but in the meantime, these changes are here just in case.

Maybe it wasn't worth the effort in the first place (my fault!) but I dont' see the harm ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with the changes. Not a big deal.

askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
}

Expand All @@ -160,7 +160,7 @@ private[spark] object AkkaUtils extends Logging {
actor: ActorRef,
maxAttempts: Int,
retryInterval: Long,
timeout: FiniteDuration): T = {
timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
if (actor == null) {
throw new SparkException(s"Error sending message [message = $message]" +
Expand All @@ -171,8 +171,8 @@ private[spark] object AkkaUtils extends Logging {
while (attempts < maxAttempts) {
attempts += 1
try {
val future = actor.ask(message)(timeout)
val result = Await.result(future, timeout)
val future = actor.ask(message)(timeout.duration)
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("Actor returned null")
}
Expand Down Expand Up @@ -200,7 +200,7 @@ private[spark] object AkkaUtils extends Logging {
val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
}

def makeExecutorRef(
Expand All @@ -214,7 +214,7 @@ private[spark] object AkkaUtils extends Logging {
val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
}

def protocol(actorSystem: ActorSystem): String = {
Expand Down
Loading