1919import java .util .concurrent .ConcurrentHashMap ;
2020import java .util .concurrent .ConcurrentLinkedQueue ;
2121import java .util .concurrent .atomic .AtomicBoolean ;
22+ import java .util .concurrent .atomic .AtomicInteger ;
2223import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2324import java .util .concurrent .atomic .AtomicLong ;
2425import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
3435import rx .functions .Func1 ;
3536import rx .observables .GroupedObservable ;
3637import rx .subjects .Subject ;
38+ import rx .subscriptions .Subscriptions ;
3739
3840/**
3941 * Groups the items emitted by an Observable according to a specified criterion, and emits these
@@ -76,6 +78,10 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
7678 final Func1 <? super T , ? extends R > elementSelector ;
7779 final Subscriber <? super GroupedObservable <K , R >> child ;
7880
81+ @ SuppressWarnings ("rawtypes" )
82+ static final AtomicIntegerFieldUpdater <GroupBySubscriber > WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "wipForUnsubscribe" );
83+ volatile int wipForUnsubscribe = 1 ;
84+
7985 public GroupBySubscriber (
8086 Func1 <? super T , ? extends K > keySelector ,
8187 Func1 <? super T , ? extends R > elementSelector ,
@@ -84,6 +90,16 @@ public GroupBySubscriber(
8490 this .keySelector = keySelector ;
8591 this .elementSelector = elementSelector ;
8692 this .child = child ;
93+ child .add (Subscriptions .create (new Action0 () {
94+
95+ @ Override
96+ public void call () {
97+ if (WIP_FOR_UNSUBSCRIBE_UPDATER .decrementAndGet (self ) == 0 ) {
98+ self .unsubscribe ();
99+ }
100+ }
101+
102+ }));
87103 }
88104
89105 private static class GroupState <K , T > {
@@ -138,7 +154,7 @@ public void onCompleted() {
138154 }
139155
140156 // special case (no groups emitted ... or all unsubscribed)
141- if (groups .size () == 0 ) {
157+ if (groups .isEmpty () ) {
142158 // we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
143159 if (COMPLETION_EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
144160 child .onCompleted ();
@@ -150,8 +166,13 @@ public void onCompleted() {
150166 @ Override
151167 public void onError (Throwable e ) {
152168 if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
153- // we immediately tear everything down if we receive an error
154- child .onError (e );
169+ try {
170+ // we immediately tear everything down if we receive an error
171+ child .onError (e );
172+ } finally {
173+ // We have not chained the subscribers, so need to call it explicitly.
174+ unsubscribe ();
175+ }
155176 }
156177 }
157178
@@ -187,7 +208,9 @@ public void onNext(T t) {
187208 }
188209 group = createNewGroup (key );
189210 }
190- emitItem (group , nl .next (t ));
211+ if (group != null ) {
212+ emitItem (group , nl .next (t ));
213+ }
191214 } catch (Throwable e ) {
192215 onError (OnErrorThrowable .addValueAsLastCause (e , t ));
193216 }
@@ -250,7 +273,17 @@ public void onNext(T t) {
250273 }
251274 });
252275
253- GroupState <K , T > putIfAbsent = groups .putIfAbsent (key , groupState );
276+ GroupState <K , T > putIfAbsent ;
277+ for (;;) {
278+ int wip = wipForUnsubscribe ;
279+ if (wip <= 0 ) {
280+ return null ;
281+ }
282+ if (WIP_FOR_UNSUBSCRIBE_UPDATER .compareAndSet (this , wip , wip + 1 )) {
283+ putIfAbsent = groups .putIfAbsent (key , groupState );
284+ break ;
285+ }
286+ }
254287 if (putIfAbsent != null ) {
255288 // this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug
256289 throw new IllegalStateException ("Group already existed while creating a new one" );
@@ -264,7 +297,7 @@ private void cleanupGroup(Object key) {
264297 GroupState <K , T > removed ;
265298 removed = groups .remove (key );
266299 if (removed != null ) {
267- if (removed .buffer .size () > 0 ) {
300+ if (! removed .buffer .isEmpty () ) {
268301 BUFFERED_COUNT .addAndGet (self , -removed .buffer .size ());
269302 }
270303 completeInner ();
@@ -342,16 +375,20 @@ private void drainIfPossible(GroupState<K, T> groupState) {
342375 }
343376
344377 private void completeInner () {
378+ if (WIP_FOR_UNSUBSCRIBE_UPDATER .decrementAndGet (this ) == 0 ) {
379+ unsubscribe ();
380+ }
345381 // if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
346- if (groups .size () == 0 && (terminated == 1 || child .isUnsubscribed ())) {
382+ if (groups .isEmpty () && (terminated == 1 || child .isUnsubscribed ())) {
347383 // completionEmitted ensures we only emit onCompleted once
348384 if (COMPLETION_EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
349385
350386 if (child .isUnsubscribed ()) {
351387 // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
352388 unsubscribe ();
389+ } else {
390+ child .onCompleted ();
353391 }
354- child .onCompleted ();
355392 }
356393 }
357394 }
0 commit comments