Skip to content

Commit 5639d82

Browse files
author
Adriaan Moors
committed
Merge pull request scala#569 from phaller/wip-sip14-fixes-new
SIP-14: clean ups and fixes by @viktorklang
2 parents a69bdda + 2a36246 commit 5639d82

File tree

10 files changed

+201
-83
lines changed

10 files changed

+201
-83
lines changed

src/library/scala/collection/parallel/Tasks.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ trait ExecutionContextTasks extends Tasks {
541541

542542
// this part is a hack which allows switching
543543
val driver: Tasks = executionContext match {
544-
case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executorService match {
544+
case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match {
545545
case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp)
546546
case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe)
547547
case _ => ???

src/library/scala/concurrent/ConcurrentPackageObject.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package scala.concurrent
1010

11-
import java.util.concurrent.{ Executors, ExecutorService, ThreadFactory }
11+
import java.util.concurrent.{ Executors, Executor, ThreadFactory }
1212
import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinWorkerThread }
1313
import scala.concurrent.util.Duration
1414
import language.implicitConversions
@@ -19,7 +19,7 @@ import language.implicitConversions
1919
abstract class ConcurrentPackageObject {
2020
/** A global execution environment for executing lightweight tasks.
2121
*/
22-
lazy val defaultExecutionContext = new impl.ExecutionContextImpl(null)
22+
lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
2323

2424
val currentExecutionContext = new ThreadLocal[ExecutionContext]
2525

src/library/scala/concurrent/ExecutionContext.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,13 @@ object ExecutionContext {
4747

4848
/** Creates an `ExecutionContext` from the given `ExecutorService`.
4949
*/
50-
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = new impl.ExecutionContextImpl(e, reporter)
50+
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
51+
impl.ExecutionContextImpl.fromExecutorService(e, reporter)
5152

5253
/** Creates an `ExecutionContext` from the given `Executor`.
5354
*/
54-
def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = new impl.ExecutionContextImpl(e, reporter)
55+
def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
56+
impl.ExecutionContextImpl.fromExecutor(e, reporter)
5557

5658
def defaultReporter: Throwable => Unit = {
5759
// re-throwing `Error`s here causes an exception handling test to fail.

src/library/scala/concurrent/Future.scala

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ trait Future[+T] extends Awaitable[T] {
9696
*
9797
* $multipleCallbacks
9898
*/
99-
def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
100-
case Left(t) => // do nothing
101-
case Right(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ }
99+
def onSuccess[U](pf: PartialFunction[T, U]): Unit = onComplete {
100+
case Right(v) if pf isDefinedAt v => pf(v)
101+
case _ =>
102102
}
103103

104104
/** When this future is completed with a failure (i.e. with a throwable),
@@ -113,9 +113,9 @@ trait Future[+T] extends Awaitable[T] {
113113
*
114114
* $multipleCallbacks
115115
*/
116-
def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
117-
case Left(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ }
118-
case Right(v) => // do nothing
116+
def onFailure[U](callback: PartialFunction[Throwable, U]): Unit = onComplete {
117+
case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
118+
case _ =>
119119
}
120120

121121
/** When this future is completed, either through an exception, or a value,
@@ -126,7 +126,7 @@ trait Future[+T] extends Awaitable[T] {
126126
*
127127
* $multipleCallbacks
128128
*/
129-
def onComplete[U](func: Either[Throwable, T] => U): this.type
129+
def onComplete[U](func: Either[Throwable, T] => U): Unit
130130

131131

132132
/* Miscellaneous */
@@ -169,7 +169,7 @@ trait Future[+T] extends Awaitable[T] {
169169

170170
onComplete {
171171
case Left(t) => p success t
172-
case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v))
172+
case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
173173
}
174174

175175
p.future
@@ -184,7 +184,36 @@ trait Future[+T] extends Awaitable[T] {
184184
*/
185185
def foreach[U](f: T => U): Unit = onComplete {
186186
case Right(r) => f(r)
187-
case Left(_) => // do nothing
187+
case _ => // do nothing
188+
}
189+
190+
/** Creates a new future by applying the 's' function to the successful result of
191+
* this future, or the 'f' function to the failed result. If there is any non-fatal
192+
* exception thrown when 's' or 'f' is applied, that exception will be propagated
193+
* to the resulting future.
194+
*
195+
* @param s function that transforms a successful result of the receiver into a
196+
* successful result of the returned future
197+
* @param f function that transforms a failure of the receiver into a failure of
198+
* the returned future
199+
* @return a future that will be completed with the transformed value
200+
*/
201+
def transform[S](s: T => S, f: Throwable => Throwable): Future[S] = {
202+
val p = Promise[S]()
203+
204+
onComplete {
205+
case result =>
206+
try {
207+
result match {
208+
case Left(t) => p failure f(t)
209+
case Right(r) => p success s(r)
210+
}
211+
} catch {
212+
case NonFatal(t) => p failure t
213+
}
214+
}
215+
216+
p.future
188217
}
189218

190219
/** Creates a new future by applying a function to the successful result of
@@ -193,14 +222,17 @@ trait Future[+T] extends Awaitable[T] {
193222
*
194223
* $forComprehensionExamples
195224
*/
196-
def map[S](f: T => S): Future[S] = {
225+
def map[S](f: T => S): Future[S] = { // transform(f, identity)
197226
val p = Promise[S]()
198227

199228
onComplete {
200-
case Left(t) => p failure t
201-
case Right(v) =>
202-
try p success f(v)
203-
catch {
229+
case result =>
230+
try {
231+
result match {
232+
case Right(r) => p success f(r)
233+
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
234+
}
235+
} catch {
204236
case NonFatal(t) => p failure t
205237
}
206238
}
@@ -219,11 +251,11 @@ trait Future[+T] extends Awaitable[T] {
219251
val p = Promise[S]()
220252

221253
onComplete {
222-
case Left(t) => p failure t
254+
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
223255
case Right(v) =>
224256
try {
225257
f(v) onComplete {
226-
case Left(t) => p failure t
258+
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
227259
case Right(v) => p success v
228260
}
229261
} catch {
@@ -254,7 +286,7 @@ trait Future[+T] extends Awaitable[T] {
254286
val p = Promise[T]()
255287

256288
onComplete {
257-
case Left(t) => p failure t
289+
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, T]]
258290
case Right(v) =>
259291
try {
260292
if (pred(v)) p success v
@@ -303,7 +335,7 @@ trait Future[+T] extends Awaitable[T] {
303335
val p = Promise[S]()
304336

305337
onComplete {
306-
case Left(t) => p failure t
338+
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
307339
case Right(v) =>
308340
try {
309341
if (pf.isDefinedAt(v)) p success pf(v)
@@ -384,7 +416,7 @@ trait Future[+T] extends Awaitable[T] {
384416
val p = Promise[(T, U)]()
385417

386418
this onComplete {
387-
case Left(t) => p failure t
419+
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, (T, U)]]
388420
case Right(r) =>
389421
that onSuccess {
390422
case r2 => p success ((r, r2))
@@ -431,7 +463,7 @@ trait Future[+T] extends Awaitable[T] {
431463
val p = Promise[S]()
432464

433465
onComplete {
434-
case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]]
466+
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
435467
case Right(t) =>
436468
p complete (try {
437469
Right(boxedType(tag.erasure).cast(t).asInstanceOf[S])
@@ -470,9 +502,7 @@ trait Future[+T] extends Awaitable[T] {
470502
val p = Promise[T]()
471503

472504
onComplete {
473-
case r =>
474-
try if (pf isDefinedAt r) pf(r)
475-
finally p complete r
505+
case r => try if (pf isDefinedAt r) pf(r) finally p complete r
476506
}
477507

478508
p.future
@@ -493,11 +523,7 @@ trait Future[+T] extends Awaitable[T] {
493523
*/
494524
def either[U >: T](that: Future[U]): Future[U] = {
495525
val p = Promise[U]()
496-
497-
val completePromise: PartialFunction[Either[Throwable, U], _] = {
498-
case Left(t) => p tryFailure t
499-
case Right(v) => p trySuccess v
500-
}
526+
val completePromise: PartialFunction[Either[Throwable, U], _] = { case result => p tryComplete result }
501527

502528
this onComplete completePromise
503529
that onComplete completePromise

src/library/scala/concurrent/impl/ExecutionContextImpl.scala

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,20 @@ package scala.concurrent.impl
1010

1111

1212

13-
import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory }
13+
import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
14+
import java.util.Collection
1415
import scala.concurrent.forkjoin._
1516
import scala.concurrent.{ ExecutionContext, Awaitable }
1617
import scala.concurrent.util.Duration
1718

1819

1920

20-
private[scala] class ExecutionContextImpl(es: AnyRef, reporter: Throwable => Unit = ExecutionContext.defaultReporter)
21-
extends ExecutionContext with Executor {
22-
import ExecutionContextImpl._
23-
24-
val executorService: AnyRef = if (es eq null) getExecutorService else es
21+
private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor {
22+
23+
val executor: Executor = es match {
24+
case null => createExecutorService
25+
case some => some
26+
}
2527

2628
// to ensure that the current execution context thread local is properly set
2729
def executorsThreadFactory = new ThreadFactory {
@@ -42,64 +44,76 @@ extends ExecutionContext with Executor {
4244
}
4345
}
4446

45-
def getExecutorService: AnyRef =
46-
if (scala.util.Properties.isJavaAtLeast("1.6")) {
47-
val vendor = scala.util.Properties.javaVmVendor
48-
if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple"))
49-
new ForkJoinPool(
50-
Runtime.getRuntime.availableProcessors(),
47+
def createExecutorService: ExecutorService = try { new ForkJoinPool(
48+
Runtime.getRuntime.availableProcessors(), //FIXME from config
5149
forkJoinPoolThreadFactory,
52-
null,
53-
false)
54-
else
55-
Executors.newCachedThreadPool(executorsThreadFactory)
56-
} else Executors.newCachedThreadPool(executorsThreadFactory)
50+
null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does
51+
true) //FIXME I really think this should be async...
52+
} catch {
53+
case NonFatal(t) =>
54+
System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
55+
t.printStackTrace(System.err)
56+
Executors.newCachedThreadPool(executorsThreadFactory)
57+
}
5758

58-
def execute(runnable: Runnable): Unit = executorService match {
59+
def execute(runnable: Runnable): Unit = executor match {
5960
case fj: ForkJoinPool =>
6061
Thread.currentThread match {
6162
case fjw: ForkJoinWorkerThread if fjw.getPool eq fj =>
62-
val fjtask = runnable match {
63+
(runnable match {
6364
case fjt: ForkJoinTask[_] => fjt
6465
case _ => ForkJoinTask.adapt(runnable)
65-
}
66-
fjtask.fork
67-
case _ =>
68-
fj.execute(runnable)
66+
}).fork
67+
case _ => fj.execute(runnable)
6968
}
70-
case executor: Executor =>
71-
executor execute runnable
69+
case generic => generic execute runnable
7270
}
7371

7472
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
7573
Future.releaseStack(this)
7674

77-
executorService match {
75+
executor match {
7876
case fj: ForkJoinPool =>
7977
var result: T = null.asInstanceOf[T]
80-
val managedBlocker = new ForkJoinPool.ManagedBlocker {
78+
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
8179
@volatile var isdone = false
82-
def block() = {
83-
result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
80+
def block(): Boolean = {
81+
result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
8482
isdone = true
8583
true
8684
}
8785
def isReleasable = isdone
88-
}
89-
ForkJoinPool.managedBlock(managedBlocker)
86+
})
9087
result
9188
case _ =>
9289
awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
9390
}
9491
}
9592

9693
def reportFailure(t: Throwable) = reporter(t)
97-
9894
}
9995

10096

10197
private[concurrent] object ExecutionContextImpl {
102-
98+
99+
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
100+
def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService =
101+
new ExecutionContextImpl(es, reporter) with ExecutorService {
102+
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
103+
override def execute(command: Runnable) = executor.execute(command)
104+
override def shutdown() { asExecutorService.shutdown() }
105+
override def shutdownNow() = asExecutorService.shutdownNow()
106+
override def isShutdown = asExecutorService.isShutdown
107+
override def isTerminated = asExecutorService.isTerminated
108+
override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
109+
override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
110+
override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
111+
override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
112+
override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
113+
override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
114+
override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
115+
override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
116+
}
103117
}
104118

105119

src/library/scala/concurrent/impl/Future.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ private[concurrent] object Future {
6565
promise.future
6666
}
6767

68+
private[impl] val throwableId: Throwable => Throwable = identity _
69+
6870
// an optimization for batching futures
6971
// TODO we should replace this with a public queue,
7072
// so that it can be stolen from

src/library/scala/concurrent/impl/Promise.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,14 @@ object Promise {
112112
}
113113
}
114114

115-
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
115+
def onComplete[U](func: Either[Throwable, T] => U): Unit = {
116116
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
117117
def dispatchOrAddCallback(): Unit =
118118
getState match {
119119
case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]]))
120120
case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback()
121121
}
122122
dispatchOrAddCallback()
123-
this
124123
}
125124

126125
private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
@@ -144,10 +143,9 @@ object Promise {
144143

145144
def tryComplete(value: Either[Throwable, T]): Boolean = false
146145

147-
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
148-
val completedAs = value.get
146+
def onComplete[U](func: Either[Throwable, T] => U): Unit = {
147+
val completedAs = value.get // Avoid closing over "this"
149148
Future.dispatchFuture(executor, () => func(completedAs))
150-
this
151149
}
152150

153151
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this

0 commit comments

Comments
 (0)