1414package org .asynchttpclient .netty ;
1515
1616import static org .asynchttpclient .util .DateUtils .unpreciseMillisTime ;
17- import static org .asynchttpclient .util .MiscUtils .getCause ;
1817import static io .netty .util .internal .PlatformDependent .*;
1918import io .netty .channel .Channel ;
2019
2120import java .util .concurrent .CancellationException ;
2221import java .util .concurrent .CompletableFuture ;
23- import java .util .concurrent .CountDownLatch ;
2422import java .util .concurrent .ExecutionException ;
23+ import java .util .concurrent .Executor ;
2524import java .util .concurrent .Future ;
2625import java .util .concurrent .TimeUnit ;
2726import java .util .concurrent .TimeoutException ;
2827import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
29- import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
3028
3129import org .asynchttpclient .AsyncHandler ;
30+ import org .asynchttpclient .ListenableFuture ;
3231import org .asynchttpclient .Realm ;
3332import org .asynchttpclient .Request ;
3433import org .asynchttpclient .channel .ChannelPoolPartitioning ;
35- import org .asynchttpclient .future .AbstractListenableFuture ;
3634import org .asynchttpclient .netty .channel .ChannelState ;
3735import org .asynchttpclient .netty .channel .Channels ;
3836import org .asynchttpclient .netty .request .NettyRequest ;
4745 *
4846 * @param <V> the result type
4947 */
50- public final class NettyResponseFuture <V > extends AbstractListenableFuture <V > {
48+ public final class NettyResponseFuture <V > implements ListenableFuture <V > {
5149
5250 private static final Logger LOGGER = LoggerFactory .getLogger (NettyResponseFuture .class );
5351
5452 private static final AtomicIntegerFieldUpdater <NettyResponseFuture <?>> REDIRECT_COUNT_UPDATER = newAtomicIntegerFieldUpdater (NettyResponseFuture .class , "redirectCount" );
5553 private static final AtomicIntegerFieldUpdater <NettyResponseFuture <?>> CURRENT_RETRY_UPDATER = newAtomicIntegerFieldUpdater (NettyResponseFuture .class , "currentRetry" );
56- @ SuppressWarnings ("rawtypes" )
57- // FIXME see https://github.com/netty/netty/pull/4669
58- private static final AtomicReferenceFieldUpdater <NettyResponseFuture , Object > CONTENT_UPDATER = newAtomicReferenceFieldUpdater (NettyResponseFuture .class , "content" );
59- @ SuppressWarnings ("rawtypes" )
60- // FIXME see https://github.com/netty/netty/pull/4669
61- private static final AtomicReferenceFieldUpdater <NettyResponseFuture , ExecutionException > EX_EX_UPDATER = newAtomicReferenceFieldUpdater (NettyResponseFuture .class , "exEx" );
6254
6355 private final long start = unpreciseMillisTime ();
6456 private final ChannelPoolPartitioning connectionPoolPartitioning ;
6557 private final ProxyServer proxyServer ;
6658 private final int maxRetry ;
67- private final CountDownLatch latch = new CountDownLatch ( 1 );
59+ private final CompletableFuture < V > future = new CompletableFuture <>( );
6860
6961 // state mutated from outside the event loop
7062 // TODO check if they are indeed mutated outside the event loop
@@ -89,8 +81,6 @@ public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {
8981 // volatile where we need CAS ops
9082 private volatile int redirectCount = 0 ;
9183 private volatile int currentRetry = 0 ;
92- private volatile V content ;
93- private volatile ExecutionException exEx ;
9484
9585 // volatile where we don't need CAS ops
9686 private volatile long touch = unpreciseMillisTime ();
@@ -160,40 +150,35 @@ public boolean cancel(boolean force) {
160150 LOGGER .warn ("cancel" , t );
161151 }
162152 }
163- latch . countDown ();
164- runListeners ( );
153+
154+ future . cancel ( false );
165155 return true ;
166156 }
167157
168158 @ Override
169159 public V get () throws InterruptedException , ExecutionException {
170- latch .await ();
171- return getContent ();
160+ return future .get ();
172161 }
173162
174163 @ Override
175164 public V get (long l , TimeUnit tu ) throws InterruptedException , TimeoutException , ExecutionException {
176- if (!latch .await (l , tu ))
177- throw new TimeoutException ();
178- return getContent ();
165+ return future .get (l , tu );
179166 }
180167
181168 private V getContent () throws ExecutionException {
169+ if (future .isDone ()) {
170+ try {
171+ return future .get ();
172+ } catch (InterruptedException e ) {
173+ throw new RuntimeException ("unreachable" , e );
174+ }
175+ }
182176
183- if (isCancelled ())
184- throw new CancellationException ();
185-
186- ExecutionException e = EX_EX_UPDATER .get (this );
187- if (e != null )
188- throw e ;
189-
190- @ SuppressWarnings ("unchecked" )
191- V update = (V ) CONTENT_UPDATER .get (this );
192177 // No more retry
193178 CURRENT_RETRY_UPDATER .set (this , maxRetry );
194179 if (contentProcessedField .getAndSet (this , 1 ) == 0 ) {
195180 try {
196- update = asyncHandler .onCompleted ();
181+ future . complete ( asyncHandler .onCompleted () );
197182 } catch (Throwable ex ) {
198183 if (onThrowableCalledField .getAndSet (this , 1 ) == 0 ) {
199184 try {
@@ -202,15 +187,14 @@ private V getContent() throws ExecutionException {
202187 } catch (Throwable t ) {
203188 LOGGER .debug ("asyncHandler.onThrowable" , t );
204189 }
205- throw new RuntimeException (ex );
206190 } finally {
207191 cancelTimeouts ();
208192 }
209193 }
194+ future .completeExceptionally (ex );
210195 }
211- CONTENT_UPDATER .compareAndSet (this , null , update );
212196 }
213- return update ;
197+ return future . getNow ( null ) ;
214198 }
215199
216200 // org.asynchttpclient.ListenableFuture
@@ -229,22 +213,19 @@ public final void done() {
229213
230214 try {
231215 getContent ();
216+ } catch (ExecutionException ignored ) {
232217
233- } catch (ExecutionException t ) {
234- return ;
235218 } catch (RuntimeException t ) {
236- EX_EX_UPDATER . compareAndSet ( this , null , new ExecutionException ( getCause ( t )) );
237-
238- } finally {
239- latch . countDown () ;
219+ future . completeExceptionally ( t );
220+ } catch ( Throwable t ) {
221+ future . completeExceptionally ( t );
222+ throw t ;
240223 }
241-
242- runListeners ();
243224 }
244225
245226 public final void abort (final Throwable t ) {
246227
247- EX_EX_UPDATER . compareAndSet ( this , null , new ExecutionException ( t ) );
228+ future . completeExceptionally ( t );
248229
249230 if (terminateAndExit ())
250231 return ;
@@ -256,8 +237,6 @@ public final void abort(final Throwable t) {
256237 LOGGER .debug ("asyncHandler.onThrowable" , te );
257238 }
258239 }
259- latch .countDown ();
260- runListeners ();
261240 }
262241
263242 @ Override
@@ -266,22 +245,14 @@ public void touch() {
266245 }
267246
268247 @ Override
269- public CompletableFuture <V > toCompletableFuture () {
270- CompletableFuture <V > completable = new CompletableFuture <>();
271- addListener (new Runnable () {
272- @ Override
273- @ SuppressWarnings ("unchecked" )
274- public void run () {
275- ExecutionException e = EX_EX_UPDATER .get (NettyResponseFuture .this );
276- if (e != null )
277- completable .completeExceptionally (e .getCause ());
278- else
279- completable .complete ((V ) CONTENT_UPDATER .get (NettyResponseFuture .this ));
280- }
281-
282- }, null );
248+ public ListenableFuture <V > addListener (Runnable listener , Executor exec ) {
249+ future .whenCompleteAsync ((r , v ) -> listener .run (), exec );
250+ return this ;
251+ }
283252
284- return completable ;
253+ @ Override
254+ public CompletableFuture <V > toCompletableFuture () {
255+ return future ;
285256 }
286257
287258 // INTERNAL
@@ -498,10 +469,9 @@ public String toString() {
498469 ",\n \t isCancelled=" + isCancelled + //
499470 ",\n \t asyncHandler=" + asyncHandler + //
500471 ",\n \t nettyRequest=" + nettyRequest + //
501- ",\n \t content =" + content + //
472+ ",\n \t future =" + future + //
502473 ",\n \t uri=" + getUri () + //
503474 ",\n \t keepAlive=" + keepAlive + //
504- ",\n \t exEx=" + exEx + //
505475 ",\n \t redirectCount=" + redirectCount + //
506476 ",\n \t timeoutsHolder=" + timeoutsHolder + //
507477 ",\n \t inAuth=" + inAuth + //
0 commit comments