1616
1717package rx .observables ;
1818
19- import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2019import java .util .concurrent .atomic .AtomicLong ;
2120
2221import rx .Observable .OnSubscribe ;
@@ -321,14 +320,9 @@ private static class SubscriptionProducer<S, T>
321320 private final SyncOnSubscribe <S , T > parent ;
322321 private boolean onNextCalled ;
323322 private boolean hasTerminated ;
324-
323+
325324 private S state ;
326325
327- volatile int isUnsubscribed ;
328- @ SuppressWarnings ("rawtypes" )
329- static final AtomicIntegerFieldUpdater <SubscriptionProducer > IS_UNSUBSCRIBED =
330- AtomicIntegerFieldUpdater .newUpdater (SubscriptionProducer .class , "isUnsubscribed" );
331-
332326 private SubscriptionProducer (final Subscriber <? super T > subscriber , SyncOnSubscribe <S , T > parent , S state ) {
333327 this .actualSubscriber = subscriber ;
334328 this .parent = parent ;
@@ -337,14 +331,39 @@ private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubsc
337331
338332 @ Override
339333 public boolean isUnsubscribed () {
340- return isUnsubscribed != 0 ;
334+ return get () < 0L ;
341335 }
342336
343337 @ Override
344338 public void unsubscribe () {
345- IS_UNSUBSCRIBED .compareAndSet (this , 0 , 1 );
346- if (get () == 0L )
347- parent .onUnsubscribe (state );
339+ while (true ) {
340+ long requestCount = get ();
341+ if (compareAndSet (0L , -1L )) {
342+ doUnsubscribe ();
343+ return ;
344+ }
345+ else if (compareAndSet (requestCount , -2L ))
346+ // the loop is iterating concurrently
347+ // need to check if requestCount == -1
348+ // and unsub if so after loop iteration
349+ return ;
350+ }
351+ }
352+
353+ private boolean tryUnsubscribe () {
354+ // only one thread at a time can iterate over request count
355+ // therefore the requestCount atomic cannot be decrement concurrently here
356+ // safe to set to -1 atomically (since this check can only be done by 1 thread)
357+ if (hasTerminated || get () < -1 ) {
358+ set (-1 );
359+ doUnsubscribe ();
360+ return true ;
361+ }
362+ return false ;
363+ }
364+
365+ private void doUnsubscribe () {
366+ parent .onUnsubscribe (state );
348367 }
349368
350369 @ Override
@@ -358,71 +377,60 @@ public void request(long n) {
358377 }
359378 }
360379
361- void fastpath () {
380+ private void fastpath () {
362381 final SyncOnSubscribe <S , T > p = parent ;
363382 Subscriber <? super T > a = actualSubscriber ;
364383
365- if (isUnsubscribed ()) {
366- p .onUnsubscribe (state );
367- return ;
368- }
369-
370384 for (;;) {
371385 try {
372386 onNextCalled = false ;
373387 nextIteration (p );
374388 } catch (Throwable ex ) {
375- handleThrownError (p , a , state , ex );
389+ handleThrownError (a , ex );
376390 return ;
377391 }
378- if (hasTerminated || isUnsubscribed ()) {
379- p .onUnsubscribe (state );
392+ if (tryUnsubscribe ()) {
380393 return ;
381394 }
382395 }
383396 }
384397
385- private void handleThrownError (final SyncOnSubscribe < S , T > p , Subscriber <? super T > a , S st , Throwable ex ) {
398+ private void handleThrownError (Subscriber <? super T > a , Throwable ex ) {
386399 if (hasTerminated ) {
387400 RxJavaPlugins .getInstance ().getErrorHandler ().handleError (ex );
388401 } else {
389402 hasTerminated = true ;
390403 a .onError (ex );
391- p . onUnsubscribe ( st );
404+ unsubscribe ( );
392405 }
393406 }
394407
395- void slowPath (long n ) {
408+ private void slowPath (long n ) {
396409 final SyncOnSubscribe <S , T > p = parent ;
397410 Subscriber <? super T > a = actualSubscriber ;
398411 long numRequested = n ;
399412 for (;;) {
400- if (isUnsubscribed ()) {
401- p .onUnsubscribe (state );
402- return ;
403- }
404413 long numRemaining = numRequested ;
405414 do {
406415 try {
407416 onNextCalled = false ;
408417 nextIteration (p );
409418 } catch (Throwable ex ) {
410- handleThrownError (p , a , state , ex );
419+ handleThrownError (a , ex );
411420 return ;
412421 }
413- if (hasTerminated || isUnsubscribed ()) {
414- p .onUnsubscribe (state );
422+ if (tryUnsubscribe ()) {
415423 return ;
416424 }
417425 if (onNextCalled )
418426 numRemaining --;
419427 } while (numRemaining != 0L );
420-
421428 numRequested = addAndGet (-numRequested );
422- if (numRequested == 0L ) {
429+ if (numRequested <= 0L )
423430 break ;
424- }
425431 }
432+ // catches cases where unsubscribe is called before decrementing atomic request count
433+ tryUnsubscribe ();
426434 }
427435
428436 private void nextIteration (final SyncOnSubscribe <S , T > parent ) {
0 commit comments