1616package rx .observers ;
1717
1818import java .util .Arrays ;
19- import java .util .concurrent .atomic .AtomicBoolean ;
19+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2020
2121import rx .Subscriber ;
2222import rx .exceptions .CompositeException ;
6060public class SafeSubscriber <T > extends Subscriber <T > {
6161
6262 private final Subscriber <? super T > actual ;
63- private final AtomicBoolean isFinished = new AtomicBoolean (false );
63+ /** Terminal state indication if not zero. */
64+ volatile int done ;
65+ @ SuppressWarnings ("rawtypes" )
66+ static final AtomicIntegerFieldUpdater <SafeSubscriber > DONE_UPDATER
67+ = AtomicIntegerFieldUpdater .newUpdater (SafeSubscriber .class , "done" );
6468
6569 public SafeSubscriber (Subscriber <? super T > actual ) {
6670 super (actual );
@@ -69,7 +73,7 @@ public SafeSubscriber(Subscriber<? super T> actual) {
6973
7074 @ Override
7175 public void onCompleted () {
72- if (isFinished . compareAndSet ( false , true ) ) {
76+ if (DONE_UPDATER . getAndSet ( this , 1 ) == 0 ) {
7377 try {
7478 actual .onCompleted ();
7579 } catch (Throwable e ) {
@@ -90,15 +94,15 @@ public void onError(Throwable e) {
9094 // we handle here instead of another method so we don't add stacks to the frame
9195 // which can prevent it from being able to handle StackOverflow
9296 Exceptions .throwIfFatal (e );
93- if (isFinished . compareAndSet ( false , true ) ) {
97+ if (DONE_UPDATER . getAndSet ( this , 1 ) == 0 ) {
9498 _onError (e );
9599 }
96100 }
97101
98102 @ Override
99103 public void onNext (T args ) {
100104 try {
101- if (! isFinished . get () ) {
105+ if (done == 0 ) {
102106 actual .onNext (args );
103107 }
104108 } catch (Throwable e ) {
0 commit comments