File tree Expand file tree Collapse file tree 2 files changed +23
-2
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +23
-2
lines changed Original file line number Diff line number Diff line change @@ -68,8 +68,8 @@ public void onError(Throwable e) {
6868
6969 @ Override
7070 public void onNext (T i ) {
71- if (!isUnsubscribed ()) {
72- boolean stop = ++ count > = limit ;
71+ if (!isUnsubscribed () && count ++ < limit ) {
72+ boolean stop = count = = limit ;
7373 child .onNext (i );
7474 if (stop && !completed ) {
7575 completed = true ;
Original file line number Diff line number Diff line change 3232import rx .functions .*;
3333import rx .observers .*;
3434import rx .schedulers .Schedulers ;
35+ import rx .subjects .PublishSubject ;
3536
3637public class OperatorTakeTest {
3738
@@ -417,4 +418,24 @@ public void onNext(Integer t) {
417418 ts .assertError (TestException .class );
418419 ts .assertNotCompleted ();
419420 }
421+
422+ @ Test
423+ public void testReentrantTake () {
424+ final PublishSubject <Integer > source = PublishSubject .create ();
425+
426+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
427+
428+ source .take (1 ).doOnNext (new Action1 <Integer >() {
429+ @ Override
430+ public void call (Integer v ) {
431+ source .onNext (2 );
432+ }
433+ }).subscribe (ts );
434+
435+ source .onNext (1 );
436+
437+ ts .assertValue (1 );
438+ ts .assertNoErrors ();
439+ ts .assertCompleted ();
440+ }
420441}
You can’t perform that action at this time.
0 commit comments