@@ -79,6 +79,9 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
7979 final Func1 <? super T , ? extends R > elementSelector ;
8080 final Subscriber <? super GroupedObservable <K , R >> child ;
8181
82+ // We should not call `unsubscribe()` until `groups.isEmpty() && child.isUnsubscribed()` is true.
83+ // Use `WIP_FOR_UNSUBSCRIBE_UPDATER` to monitor these statuses and call `unsubscribe()` properly.
84+ // Should check both when `child.unsubscribe` is called and any group is removed.
8285 @ SuppressWarnings ("rawtypes" )
8386 static final AtomicIntegerFieldUpdater <GroupBySubscriber > WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "wipForUnsubscribe" );
8487 volatile int wipForUnsubscribe = 1 ;
@@ -124,7 +127,13 @@ public Observer<T> getObserver() {
124127 private static final NotificationLite <Object > nl = NotificationLite .instance ();
125128
126129 volatile int completionEmitted ;
127- volatile int terminated ;
130+
131+ private static final int UNTERMINATED = 0 ;
132+ private static final int TERMINATED_WITH_COMPLETED = 1 ;
133+ private static final int TERMINATED_WITH_ERROR = 2 ;
134+
135+ // Must be one of `UNTERMINATED`, `TERMINATED_WITH_COMPLETED`, `TERMINATED_WITH_ERROR`
136+ volatile int terminated = UNTERMINATED ;
128137
129138 @ SuppressWarnings ("rawtypes" )
130139 static final AtomicIntegerFieldUpdater <GroupBySubscriber > COMPLETION_EMITTED_UPDATER = AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "completionEmitted" );
@@ -139,8 +148,6 @@ public Observer<T> getObserver() {
139148 @ SuppressWarnings ("rawtypes" )
140149 static final AtomicLongFieldUpdater <GroupBySubscriber > BUFFERED_COUNT = AtomicLongFieldUpdater .newUpdater (GroupBySubscriber .class , "bufferedCount" );
141150
142- volatile boolean errorEmitted = false ;
143-
144151 @ Override
145152 public void onStart () {
146153 REQUESTED .set (this , MAX_QUEUE_SIZE );
@@ -149,7 +156,7 @@ public void onStart() {
149156
150157 @ Override
151158 public void onCompleted () {
152- if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
159+ if (TERMINATED_UPDATER .compareAndSet (this , UNTERMINATED , TERMINATED_WITH_COMPLETED )) {
153160 // if we receive onCompleted from our parent we onComplete children
154161 // for each group check if it is ready to accept more events if so pass the oncomplete through else buffer it.
155162 for (GroupState <K , T > group : groups .values ()) {
@@ -168,9 +175,7 @@ public void onCompleted() {
168175
169176 @ Override
170177 public void onError (Throwable e ) {
171- if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
172- errorEmitted = true ;
173-
178+ if (TERMINATED_UPDATER .compareAndSet (this , UNTERMINATED , TERMINATED_WITH_ERROR )) {
174179 // It's safe to access all groups and emit the error.
175180 // onNext and onError are in sequence so no group will be created in the loop.
176181 for (GroupState <K , T > group : groups .values ()) {
@@ -390,18 +395,16 @@ private void drainIfPossible(GroupState<K, T> groupState) {
390395 }
391396
392397 private void completeInner () {
398+ // A group is removed, so check if we need to call `unsubscribe`
393399 if (WIP_FOR_UNSUBSCRIBE_UPDATER .decrementAndGet (this ) == 0 ) {
400+ // It means `groups.isEmpty() && child.isUnsubscribed()` is true
394401 unsubscribe ();
395402 }
396- // if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
397- if (groups .isEmpty () && ( terminated == 1 || child . isUnsubscribed ()) ) {
403+ // if we have no outstanding groups (all completed or unsubscribe) and terminated on outer
404+ if (groups .isEmpty () && terminated == TERMINATED_WITH_COMPLETED ) {
398405 // completionEmitted ensures we only emit onCompleted once
399406 if (COMPLETION_EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
400-
401- if (child .isUnsubscribed ()) {
402- // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
403- unsubscribe ();
404- } else if (!errorEmitted ) {
407+ if (!child .isUnsubscribed ()) {
405408 child .onCompleted ();
406409 }
407410 }
0 commit comments