1818import java .util .ArrayList ;
1919import java .util .List ;
2020import java .util .concurrent .atomic .AtomicReference ;
21+ import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
2122
2223import rx .Subscription ;
2324import rx .exceptions .CompositeException ;
2930 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
3031 */
3132public final class CompositeSubscription implements Subscription {
32-
33- private final AtomicReference <State > state = new AtomicReference <State >();
33+ /** The atomic state updater. */
34+ static final AtomicReferenceFieldUpdater <CompositeSubscription , State > STATE_UPDATER
35+ = AtomicReferenceFieldUpdater .newUpdater (CompositeSubscription .class , State .class , "state" );
36+ /** The subscription state. */
37+ volatile State state ;
3438
3539 /** Empty initial state. */
3640 private static final State CLEAR_STATE ;
@@ -97,43 +101,45 @@ State clear() {
97101 }
98102
99103 public CompositeSubscription () {
100- state .set (CLEAR_STATE );
104+ // this creates only a store-store barrier which is generally faster when
105+ // CompositeSubscriptions are created in a tight loop.
106+ STATE_UPDATER .lazySet (this , CLEAR_STATE );
101107 }
102108
103109 public CompositeSubscription (final Subscription ... subscriptions ) {
104- state . set ( new State (false , subscriptions ));
110+ STATE_UPDATER . lazySet ( this , new State (false , subscriptions ));
105111 }
106112
107113 @ Override
108114 public boolean isUnsubscribed () {
109- return state .get (). isUnsubscribed ;
115+ return state .isUnsubscribed ;
110116 }
111117
112118 public void add (final Subscription s ) {
113119 State oldState ;
114120 State newState ;
115121 do {
116- oldState = state . get () ;
122+ oldState = state ;
117123 if (oldState .isUnsubscribed ) {
118124 s .unsubscribe ();
119125 return ;
120126 } else {
121127 newState = oldState .add (s );
122128 }
123- } while (!state .compareAndSet (oldState , newState ));
129+ } while (!STATE_UPDATER .compareAndSet (this , oldState , newState ));
124130 }
125131
126132 public void remove (final Subscription s ) {
127133 State oldState ;
128134 State newState ;
129135 do {
130- oldState = state . get () ;
136+ oldState = state ;
131137 if (oldState .isUnsubscribed ) {
132138 return ;
133139 } else {
134140 newState = oldState .remove (s );
135141 }
136- } while (!state .compareAndSet (oldState , newState ));
142+ } while (!STATE_UPDATER .compareAndSet (this , oldState , newState ));
137143 // if we removed successfully we then need to call unsubscribe on it
138144 s .unsubscribe ();
139145 }
@@ -142,29 +148,25 @@ public void clear() {
142148 State oldState ;
143149 State newState ;
144150 do {
145- oldState = state . get () ;
151+ oldState = state ;
146152 if (oldState .isUnsubscribed ) {
147153 return ;
148154 } else {
149155 newState = oldState .clear ();
150156 }
151- } while (!state .compareAndSet (oldState , newState ));
157+ } while (!STATE_UPDATER .compareAndSet (this , oldState , newState ));
152158 // if we cleared successfully we then need to call unsubscribe on all previous
153159 unsubscribeFromAll (oldState .subscriptions );
154160 }
155161
156162 @ Override
157163 public void unsubscribe () {
158- State oldState ;
159- State newState ;
160- do {
161- oldState = state .get ();
162- if (oldState .isUnsubscribed ) {
163- return ;
164- } else {
165- newState = oldState .unsubscribe ();
166- }
167- } while (!state .compareAndSet (oldState , newState ));
164+ State oldState = state ;
165+ if (oldState .isUnsubscribed ) {
166+ return ;
167+ }
168+ // intrinsics may make this a single instruction and may prevent concurrent add/remove faster
169+ oldState = STATE_UPDATER .getAndSet (this , oldState .unsubscribe ());
168170 unsubscribeFromAll (oldState .subscriptions );
169171 }
170172
0 commit comments