Skip to content

Commit 8b5f05a

Browse files
author
aleksandar
committed
Add execution context implementation to akka futures.
1 parent 7993ec0 commit 8b5f05a

File tree

3 files changed

+163
-32
lines changed

3 files changed

+163
-32
lines changed

src/library/scala/concurrent/ExecutionContext.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
/* __ *\
2+
** ________ ___ / / ___ Scala API **
3+
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
4+
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
5+
** /____/\___/_/ |_/____/_/ | | **
6+
** |/ **
7+
\* */
8+
19
package scala.concurrent
210

311

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/* __ *\
2+
** ________ ___ / / ___ Scala API **
3+
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
4+
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
5+
** /____/\___/_/ |_/____/_/ | | **
6+
** |/ **
7+
\* */
8+
9+
package scala.concurrent.akka
10+
11+
12+
13+
import java.util.concurrent.{Callable, ExecutorService}
14+
import scala.concurrent.{ExecutionContext, resolver, Awaitable}
15+
import scala.util.Duration
16+
import scala.collection.mutable.Stack
17+
18+
19+
20+
class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext {
21+
22+
def execute(runnable: Runnable): Unit = executorService execute runnable
23+
24+
def execute[U](body: () => U): Unit = execute(new Runnable {
25+
def run() = body()
26+
})
27+
28+
def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this)
29+
30+
def future[T](body: =>T): Future[T] = {
31+
val p = promise[T]
32+
33+
dispatchFuture {
34+
() =>
35+
p complete {
36+
try {
37+
Right(body)
38+
} catch {
39+
case e => resolver(e)
40+
}
41+
}
42+
}
43+
44+
p.future
45+
}
46+
47+
/** Only callable from the tasks running on the same execution context. */
48+
def blockingCall[T](body: Awaitable[T]): T = {
49+
releaseStack()
50+
51+
// TODO see what to do with timeout
52+
body.await(Duration.fromNanos(0))(CanAwaitEvidence)
53+
}
54+
55+
// an optimization for batching futures
56+
// TODO we should replace this with a public queue,
57+
// so that it can be stolen from
58+
// OR: a push to the local task queue should be so cheap that this is
59+
// not even needed, but stealing is still possible
60+
private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
61+
62+
private def releaseStack(): Unit =
63+
_taskStack.get match {
64+
case stack if (stack ne null) && stack.nonEmpty =>
65+
val tasks = stack.elems
66+
stack.clear()
67+
_taskStack.remove()
68+
dispatchFuture(() => _taskStack.get.elems = tasks, true)
69+
case null =>
70+
// do nothing - there is no local batching stack anymore
71+
case _ =>
72+
_taskStack.remove()
73+
}
74+
75+
private[akka] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit =
76+
_taskStack.get match {
77+
case stack if (stack ne null) && !force => stack push task
78+
case _ => this.execute(
79+
new Runnable {
80+
def run() {
81+
try {
82+
val taskStack = Stack[() => Unit](task)
83+
_taskStack set taskStack
84+
while (taskStack.nonEmpty) {
85+
val next = taskStack.pop()
86+
try {
87+
next.apply()
88+
} catch {
89+
case e =>
90+
// TODO catching all and continue isn't good for OOME
91+
e.printStackTrace()
92+
}
93+
}
94+
} finally {
95+
_taskStack.remove()
96+
}
97+
}
98+
}
99+
)
100+
}
101+
102+
}

src/library/scala/concurrent/akka/Promise.scala

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ package scala.concurrent.akka
1212

1313
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
1414
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
15-
import scala.concurrent.{Awaitable, ExecutionContext, resolver, blocking, CanAwait, TimeoutException}
15+
import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException}
1616
import scala.util.continuations._
1717
import scala.util.Duration
1818
import scala.annotation.tailrec
@@ -21,6 +21,8 @@ import scala.annotation.tailrec
2121

2222
trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
2323

24+
def future = this
25+
2426
// TODO refine answer and return types here from Any to type parameters
2527
// then move this up in the hierarchy
2628

@@ -75,7 +77,7 @@ object Promise {
7577
*/
7678
sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
7779

78-
case class Pending[T](listeners: List[Either[Throwable, T] Unit] = Nil) extends FState[T] {
80+
case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] {
7981
def value: Option[Either[Throwable, T]] = None
8082
}
8183

@@ -89,8 +91,9 @@ object Promise {
8991

9092
private val emptyPendingValue = Pending[Nothing](Nil)
9193

92-
/* default promise implementation */
93-
abstract class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
94+
/** Default promise implementation.
95+
*/
96+
class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] {
9497
self =>
9598

9699
updater.set(this, Promise.EmptyPending())
@@ -102,7 +105,13 @@ object Promise {
102105
val ms = NANOSECONDS.toMillis(waitTimeNanos)
103106
val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec
104107
val start = System.nanoTime()
105-
try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException }
108+
try {
109+
synchronized {
110+
while (value.isEmpty) wait(ms, ns)
111+
}
112+
} catch {
113+
case e: InterruptedException =>
114+
}
106115

107116
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
108117
} else
@@ -133,80 +142,92 @@ object Promise {
133142
@inline
134143
protected final def getState: FState[T] = updater.get(this)
135144

136-
/*
137145
def tryComplete(value: Either[Throwable, T]): Boolean = {
138-
val callbacks: List[Either[Throwable, T] => Unit] = {
146+
val callbacks: List[Either[Throwable, T] => Any] = {
139147
try {
140148
@tailrec
141-
def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
149+
def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = {
142150
getState match {
143151
case cur @ Pending(listeners) =>
144-
if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners
145-
else tryComplete(v)
152+
if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners
153+
else tryComplete(v)
146154
case _ => null
147155
}
148156
}
149157
tryComplete(resolve(value))
150158
} finally {
151-
synchronized { notifyAll() } //Notify any evil blockers
159+
synchronized { notifyAll() } // notify any blockers from `tryAwait`
152160
}
153161
}
154162

155163
callbacks match {
156164
case null => false
157165
case cs if cs.isEmpty => true
158-
case cs => Future.dispatchTask(() => cs.foreach(f => notifyCompleted(f, value))); true
166+
case cs =>
167+
executor dispatchFuture {
168+
() => cs.foreach(f => notifyCompleted(f, value))
169+
}
170+
true
159171
}
160172
}
161173

162-
def onComplete(func: Either[Throwable, T] => Unit): this.type = {
163-
@tailrec //Returns whether the future has already been completed or not
174+
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
175+
@tailrec // Returns whether the future has already been completed or not
164176
def tryAddCallback(): Boolean = {
165177
val cur = getState
166178
cur match {
167179
case _: Success[_] | _: Failure[_] => true
168180
case p: Pending[_] =>
169-
val pt = p.asInstanceOf[Pending[T]]
170-
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
181+
val pt = p.asInstanceOf[Pending[T]]
182+
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
171183
}
172184
}
173185

174186
if (tryAddCallback()) {
175187
val result = value.get
176-
Future.dispatchTask(() => notifyCompleted(func, result))
188+
executor dispatchFuture {
189+
() => notifyCompleted(func, result)
190+
}
177191
}
178192

179193
this
180194
}
181195

182-
private final def notifyCompleted(func: Either[Throwable, T] => Unit, result: Either[Throwable, T]) {
183-
try { func(result) } catch { case e => logError("Future onComplete-callback raised an exception", e) }
196+
private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
197+
// TODO see what to do about logging
198+
//try {
199+
func(result)
200+
//} catch {
201+
// case e => logError("Future onComplete-callback raised an exception", e)
202+
//}
184203
}
185-
*/
186204
}
187205

188-
/*
189-
/**
190-
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
191-
* a Future-composition but you already have a value to contribute.
206+
/** An already completed Future is given its result at creation.
207+
*
208+
* Useful in Future-composition when a value to contribute is already available.
192209
*/
193-
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
210+
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContextImpl) extends Promise[T] {
194211
val value = Some(resolve(suppliedValue))
195-
212+
196213
def tryComplete(value: Either[Throwable, T]): Boolean = false
197-
def onComplete(func: Either[Throwable, T] => Unit): this.type = {
214+
215+
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
198216
val completedAs = value.get
199-
Future dispatchTask (() => func(completedAs))
217+
executor dispatchFuture {
218+
() => func(completedAs)
219+
}
200220
this
201221
}
202-
203-
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
204-
def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
222+
223+
private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
224+
225+
def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
205226
case Left(e) => throw e
206227
case Right(r) => r
207228
}
208229
}
209-
*/
230+
210231
}
211232

212233

0 commit comments

Comments
 (0)