Skip to content

Commit d65c23b

Browse files
committed
Merge remote branch 'origin/execution-context' into topic/stash
2 parents a872663 + 51a930f commit d65c23b

File tree

12 files changed

+958
-228
lines changed

12 files changed

+958
-228
lines changed

src/library/scala/concurrent/Awaitable.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@ package scala.concurrent
1111

1212

1313
import scala.annotation.implicitNotFound
14-
import scala.util.Timeout
14+
import scala.util.Duration
1515

1616

1717

1818
trait Awaitable[+T] {
19-
@implicitNotFound(msg = "Waiting must be done by calling `await(timeout) b`, where `b` is the `Awaitable` object.")
20-
def await(timeout: Timeout)(implicit canblock: CanBlock): T
19+
@implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.")
20+
def await(atMost: Duration)(implicit canawait: CanAwait): T
2121
}
2222

2323

2424

25-
Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
1+
/* __ *\
2+
** ________ ___ / / ___ Scala API **
3+
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
4+
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
5+
** /____/\___/_/ |_/____/_/ | | **
6+
** |/ **
7+
\* */
8+
19
package scala.concurrent
210

311

412

513
import java.util.concurrent.{ Executors, Future => JFuture, Callable }
614
import scala.util.{ Duration, Timeout }
715
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
16+
import scala.collection.generic.CanBuildFrom
817

918

1019

1120
trait ExecutionContext {
1221

13-
protected implicit object CanBlockEvidence extends CanBlock
22+
protected implicit object CanAwaitEvidence extends CanAwait
1423

1524
def execute(runnable: Runnable): Unit
1625

@@ -22,12 +31,47 @@ trait ExecutionContext {
2231

2332
def future[T](body: => T): Future[T]
2433

25-
/** Only callable from the tasks running on the same execution context. */
26-
def blockingCall[T](timeout: Timeout, body: Awaitable[T]): T
34+
def blocking[T](atMost: Duration)(body: =>T): T
35+
36+
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
37+
38+
def futureUtilities: FutureUtilities = FutureUtilitiesImpl
2739

2840
}
2941

3042

31-
sealed trait CanBlock
43+
sealed trait CanAwait
44+
45+
46+
trait FutureUtilities {
47+
48+
def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
49+
val builder = cbf(futures)
50+
val p: Promise[Coll[T]] = promise[Coll[T]]
51+
52+
if (futures.size == 1) futures.head onComplete {
53+
case Left(t) => p failure t
54+
case Right(v) => builder += v
55+
p success builder.result
56+
} else {
57+
val restFutures = all(futures.tail)
58+
futures.head onComplete {
59+
case Left(t) => p failure t
60+
case Right(v) => builder += v
61+
restFutures onComplete {
62+
case Left(t) => p failure t
63+
case Right(vs) => for (v <- vs) builder += v
64+
p success builder.result
65+
}
66+
}
67+
}
68+
69+
p.future
70+
}
71+
72+
}
3273

3374

75+
object FutureUtilitiesImpl extends FutureUtilities {
76+
}
77+

0 commit comments

Comments
 (0)