Skip to content

Commit 90e0964

Browse files
committed
Merge pull request scala#2508 from viktorklang/wip-removing-synchronization-from-DefaultPromise-2.10-√
Reimplementing blocking awaiting for Futures so only the blockers have t...
2 parents f81a4f9 + 73d494d commit 90e0964

File tree

2 files changed

+46
-43
lines changed

2 files changed

+46
-43
lines changed

bincompat-forward.whitelist.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,10 @@ filter {
467467
{
468468
matchName="scala.concurrent.forkjoin.ForkJoinPool.helpJoinOnce"
469469
problemName=IncompatibleResultTypeProblem
470+
},
471+
{
472+
matchName="scala.concurrent.impl.Promise$CompletionLatch"
473+
problemName=MissingClassProblem
470474
}
471475
]
472476
}

src/library/scala/concurrent/impl/Promise.scala

Lines changed: 42 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88

99
package scala.concurrent.impl
1010

11-
import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
11+
import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException, blocking }
12+
import scala.concurrent.Future.InternalCallbackExecutor
1213
import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration, NANOSECONDS }
1314
import scala.annotation.tailrec
1415
import scala.util.control.NonFatal
1516
import scala.util.{ Try, Success, Failure }
17+
import java.io.ObjectInputStream
18+
import java.util.concurrent.locks.AbstractQueuedSynchronizer
1619

1720
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
1821
def future: this.type = this
@@ -52,70 +55,69 @@ private[concurrent] object Promise {
5255
case e: Error => Failure(new ExecutionException("Boxed Error", e))
5356
case t => Failure(t)
5457
}
58+
59+
/*
60+
* Inspired by: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
61+
* Written by Doug Lea with assistance from members of JCP JSR-166
62+
* Expert Group and released to the public domain, as explained at
63+
* http://creativecommons.org/publicdomain/zero/1.0/
64+
*/
65+
private final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) {
66+
override protected def tryAcquireShared(ignored: Int): Int = if (getState != 0) 1 else -1
67+
override protected def tryReleaseShared(ignore: Int): Boolean = {
68+
setState(1)
69+
true
70+
}
71+
override def apply(ignored: Try[T]): Unit = releaseShared(1)
72+
}
73+
5574

5675
/** Default promise implementation.
5776
*/
5877
class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
5978
updateState(null, Nil) // Start at "No callbacks"
6079

61-
protected final def tryAwait(atMost: Duration): Boolean = {
62-
@tailrec
63-
def awaitUnsafe(deadline: Deadline, nextWait: FiniteDuration): Boolean = {
64-
if (!isCompleted && nextWait > Duration.Zero) {
65-
val ms = nextWait.toMillis
66-
val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec
67-
68-
synchronized { if (!isCompleted) wait(ms, ns) }
69-
70-
awaitUnsafe(deadline, deadline.timeLeft)
71-
} else
72-
isCompleted
73-
}
74-
@tailrec
75-
def awaitUnbounded(): Boolean = {
76-
if (isCompleted) true
77-
else {
78-
synchronized { if (!isCompleted) wait() }
79-
awaitUnbounded()
80-
}
81-
}
82-
80+
protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) {
8381
import Duration.Undefined
82+
import scala.concurrent.Future.InternalCallbackExecutor
8483
atMost match {
85-
case u if u eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
86-
case Duration.Inf => awaitUnbounded
87-
case Duration.MinusInf => isCompleted
88-
case f: FiniteDuration => if (f > Duration.Zero) awaitUnsafe(f.fromNow, f) else isCompleted
84+
case e if e eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
85+
case Duration.Inf =>
86+
val l = new CompletionLatch[T]()
87+
onComplete(l)(InternalCallbackExecutor)
88+
l.acquireSharedInterruptibly(1)
89+
case Duration.MinusInf => // Drop out
90+
case f: FiniteDuration =>
91+
if (f > Duration.Zero) {
92+
val l = new CompletionLatch[T]()
93+
onComplete(l)(InternalCallbackExecutor)
94+
l.tryAcquireSharedNanos(1, f.toNanos)
95+
}
8996
}
90-
}
97+
98+
isCompleted
99+
} else true // Already completed
91100

92101
@throws(classOf[TimeoutException])
93102
@throws(classOf[InterruptedException])
94103
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
95-
if (isCompleted || tryAwait(atMost)) this
104+
if (tryAwait(atMost)) this
96105
else throw new TimeoutException("Futures timed out after [" + atMost + "]")
97106

98107
@throws(classOf[Exception])
99108
def result(atMost: Duration)(implicit permit: CanAwait): T =
100-
ready(atMost).value.get match {
101-
case Failure(e) => throw e
102-
case Success(r) => r
103-
}
109+
ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here
104110

105111
def value: Option[Try[T]] = getState match {
106112
case c: Try[_] => Some(c.asInstanceOf[Try[T]])
107113
case _ => None
108114
}
109115

110-
override def isCompleted: Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
111-
case _: Try[_] => true
112-
case _ => false
113-
}
116+
override def isCompleted: Boolean = getState.isInstanceOf[Try[_]]
114117

115118
def tryComplete(value: Try[T]): Boolean = {
116119
val resolved = resolveTry(value)
117-
(try {
118-
@tailrec
120+
@tailrec
119121
def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
120122
getState match {
121123
case raw: List[_] =>
@@ -124,10 +126,7 @@ private[concurrent] object Promise {
124126
case _ => null
125127
}
126128
}
127-
tryComplete(resolved)
128-
} finally {
129-
synchronized { notifyAll() } //Notify any evil blockers
130-
}) match {
129+
tryComplete(resolved) match {
131130
case null => false
132131
case rs if rs.isEmpty => true
133132
case rs => rs.foreach(r => r.executeWithValue(resolved)); true

0 commit comments

Comments
 (0)