1515 */
1616package rx .internal .operators ;
1717
18+ import java .util .Map ;
1819import java .util .Queue ;
1920import java .util .concurrent .ConcurrentHashMap ;
2021import java .util .concurrent .ConcurrentLinkedQueue ;
2122import java .util .concurrent .atomic .AtomicBoolean ;
23+ import java .util .concurrent .atomic .AtomicInteger ;
2224import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2325import java .util .concurrent .atomic .AtomicLong ;
2426import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
3436import rx .functions .Func1 ;
3537import rx .observables .GroupedObservable ;
3638import rx .subjects .Subject ;
39+ import rx .subscriptions .Subscriptions ;
3740
3841/**
3942 * Groups the items emitted by an Observable according to a specified criterion, and emits these
@@ -76,6 +79,13 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
7679 final Func1 <? super T , ? extends R > elementSelector ;
7780 final Subscriber <? super GroupedObservable <K , R >> child ;
7881
82+ // We should not call `unsubscribe()` until `groups.isEmpty() && child.isUnsubscribed()` is true.
83+ // Use `WIP_FOR_UNSUBSCRIBE_UPDATER` to monitor these statuses and call `unsubscribe()` properly.
84+ // Should check both when `child.unsubscribe` is called and any group is removed.
85+ @ SuppressWarnings ("rawtypes" )
86+ static final AtomicIntegerFieldUpdater <GroupBySubscriber > WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "wipForUnsubscribe" );
87+ volatile int wipForUnsubscribe = 1 ;
88+
7989 public GroupBySubscriber (
8090 Func1 <? super T , ? extends K > keySelector ,
8191 Func1 <? super T , ? extends R > elementSelector ,
@@ -84,6 +94,16 @@ public GroupBySubscriber(
8494 this .keySelector = keySelector ;
8595 this .elementSelector = elementSelector ;
8696 this .child = child ;
97+ child .add (Subscriptions .create (new Action0 () {
98+
99+ @ Override
100+ public void call () {
101+ if (WIP_FOR_UNSUBSCRIBE_UPDATER .decrementAndGet (self ) == 0 ) {
102+ self .unsubscribe ();
103+ }
104+ }
105+
106+ }));
87107 }
88108
89109 private static class GroupState <K , T > {
@@ -107,7 +127,13 @@ public Observer<T> getObserver() {
107127 private static final NotificationLite <Object > nl = NotificationLite .instance ();
108128
109129 volatile int completionEmitted ;
110- volatile int terminated ;
130+
131+ private static final int UNTERMINATED = 0 ;
132+ private static final int TERMINATED_WITH_COMPLETED = 1 ;
133+ private static final int TERMINATED_WITH_ERROR = 2 ;
134+
135+ // Must be one of `UNTERMINATED`, `TERMINATED_WITH_COMPLETED`, `TERMINATED_WITH_ERROR`
136+ volatile int terminated = UNTERMINATED ;
111137
112138 @ SuppressWarnings ("rawtypes" )
113139 static final AtomicIntegerFieldUpdater <GroupBySubscriber > COMPLETION_EMITTED_UPDATER = AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "completionEmitted" );
@@ -130,15 +156,15 @@ public void onStart() {
130156
131157 @ Override
132158 public void onCompleted () {
133- if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
159+ if (TERMINATED_UPDATER .compareAndSet (this , UNTERMINATED , TERMINATED_WITH_COMPLETED )) {
134160 // if we receive onCompleted from our parent we onComplete children
135161 // for each group check if it is ready to accept more events if so pass the oncomplete through else buffer it.
136162 for (GroupState <K , T > group : groups .values ()) {
137163 emitItem (group , nl .completed ());
138164 }
139165
140166 // special case (no groups emitted ... or all unsubscribed)
141- if (groups .size () == 0 ) {
167+ if (groups .isEmpty () ) {
142168 // we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
143169 if (COMPLETION_EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
144170 child .onCompleted ();
@@ -149,9 +175,19 @@ public void onCompleted() {
149175
150176 @ Override
151177 public void onError (Throwable e ) {
152- if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
153- // we immediately tear everything down if we receive an error
154- child .onError (e );
178+ if (TERMINATED_UPDATER .compareAndSet (this , UNTERMINATED , TERMINATED_WITH_ERROR )) {
179+ // It's safe to access all groups and emit the error.
180+ // onNext and onError are in sequence so no group will be created in the loop.
181+ for (GroupState <K , T > group : groups .values ()) {
182+ emitItem (group , nl .error (e ));
183+ }
184+ try {
185+ // we immediately tear everything down if we receive an error
186+ child .onError (e );
187+ } finally {
188+ // We have not chained the subscribers, so need to call it explicitly.
189+ unsubscribe ();
190+ }
155191 }
156192 }
157193
@@ -187,7 +223,9 @@ public void onNext(T t) {
187223 }
188224 group = createNewGroup (key );
189225 }
190- emitItem (group , nl .next (t ));
226+ if (group != null ) {
227+ emitItem (group , nl .next (t ));
228+ }
191229 } catch (Throwable e ) {
192230 onError (OnErrorThrowable .addValueAsLastCause (e , t ));
193231 }
@@ -236,6 +274,11 @@ public void onCompleted() {
236274 @ Override
237275 public void onError (Throwable e ) {
238276 o .onError (e );
277+ // eagerly cleanup instead of waiting for unsubscribe
278+ if (once .compareAndSet (false , true )) {
279+ // done once per instance, either onComplete or onUnSubscribe
280+ cleanupGroup (key );
281+ }
239282 }
240283
241284 @ Override
@@ -250,7 +293,17 @@ public void onNext(T t) {
250293 }
251294 });
252295
253- GroupState <K , T > putIfAbsent = groups .putIfAbsent (key , groupState );
296+ GroupState <K , T > putIfAbsent ;
297+ for (;;) {
298+ int wip = wipForUnsubscribe ;
299+ if (wip <= 0 ) {
300+ return null ;
301+ }
302+ if (WIP_FOR_UNSUBSCRIBE_UPDATER .compareAndSet (this , wip , wip + 1 )) {
303+ putIfAbsent = groups .putIfAbsent (key , groupState );
304+ break ;
305+ }
306+ }
254307 if (putIfAbsent != null ) {
255308 // this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug
256309 throw new IllegalStateException ("Group already existed while creating a new one" );
@@ -264,7 +317,7 @@ private void cleanupGroup(Object key) {
264317 GroupState <K , T > removed ;
265318 removed = groups .remove (key );
266319 if (removed != null ) {
267- if (removed .buffer .size () > 0 ) {
320+ if (! removed .buffer .isEmpty () ) {
268321 BUFFERED_COUNT .addAndGet (self , -removed .buffer .size ());
269322 }
270323 completeInner ();
@@ -342,15 +395,14 @@ private void drainIfPossible(GroupState<K, T> groupState) {
342395 }
343396
344397 private void completeInner () {
345- // if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
346- if (groups .size () == 0 && (terminated == 1 || child .isUnsubscribed ())) {
398+ // A group is removed, so check if we need to call `unsubscribe`
399+ if (WIP_FOR_UNSUBSCRIBE_UPDATER .decrementAndGet (this ) == 0 ) {
400+ // It means `groups.isEmpty() && child.isUnsubscribed()` is true
401+ unsubscribe ();
402+ } else if (groups .isEmpty () && terminated == TERMINATED_WITH_COMPLETED ) {
403+ // if we have no outstanding groups (all completed or unsubscribe) and terminated on outer
347404 // completionEmitted ensures we only emit onCompleted once
348405 if (COMPLETION_EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
349-
350- if (child .isUnsubscribed ()) {
351- // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
352- unsubscribe ();
353- }
354406 child .onCompleted ();
355407 }
356408 }
0 commit comments