1616
1717package rx .observables ;
1818
19- import static org .junit .Assert .assertEquals ;
20- import static org .junit .Assert .assertFalse ;
21- import static org .junit .Assert .assertNull ;
22- import static org .junit .Assert .assertTrue ;
23- import static org .mockito .Matchers .any ;
24- import static org .mockito .Matchers .isA ;
25- import static org .mockito .Mockito .inOrder ;
26- import static org .mockito .Mockito .mock ;
27- import static org .mockito .Mockito .never ;
28- import static org .mockito .Mockito .times ;
29- import static org .mockito .Mockito .verify ;
30-
31- import java .util .ArrayList ;
32- import java .util .Arrays ;
33- import java .util .Iterator ;
34- import java .util .List ;
35- import java .util .Map ;
36- import java .util .concurrent .BrokenBarrierException ;
37- import java .util .concurrent .Callable ;
38- import java .util .concurrent .ConcurrentHashMap ;
39- import java .util .concurrent .CountDownLatch ;
40- import java .util .concurrent .CyclicBarrier ;
41- import java .util .concurrent .ExecutionException ;
42- import java .util .concurrent .ExecutorService ;
43- import java .util .concurrent .Executors ;
44- import java .util .concurrent .Future ;
45- import java .util .concurrent .TimeUnit ;
46- import java .util .concurrent .atomic .AtomicBoolean ;
47- import java .util .concurrent .atomic .AtomicInteger ;
48- import java .util .concurrent .atomic .AtomicReference ;
19+ import static org .junit .Assert .*;
20+ import static org .mockito .Matchers .*;
21+ import static org .mockito .Mockito .*;
22+
23+ import java .util .*;
24+ import java .util .concurrent .*;
25+ import java .util .concurrent .atomic .*;
4926
5027import org .junit .Test ;
51- import org .mockito .InOrder ;
52- import org .mockito .Matchers ;
53- import org .mockito .Mockito ;
28+ import org .mockito .*;
5429
30+ import rx .*;
5531import rx .Observable ;
56- import rx .Observable .OnSubscribe ;
57- import rx .Observable .Operator ;
32+ import rx .Observable .*;
5833import rx .Observer ;
59- import rx .Producer ;
60- import rx .Subscriber ;
6134import rx .exceptions .TestException ;
62- import rx .functions .Action0 ;
63- import rx .functions .Action1 ;
64- import rx .functions .Action2 ;
65- import rx .functions .Func0 ;
66- import rx .functions .Func2 ;
35+ import rx .functions .*;
6736import rx .observers .TestSubscriber ;
68- import rx .schedulers .Schedulers ;
69- import rx .schedulers .TestScheduler ;
37+ import rx .schedulers .*;
7038
7139/**
7240 * Test if SyncOnSubscribe adheres to the usual unsubscription and backpressure contracts.
@@ -489,6 +457,16 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
489457 verify (onUnSubscribe , times (1 )).call (any (Integer .class ));
490458 }
491459
460+ @ Test
461+ public void testConcurrentRequestsLoop () throws InterruptedException {
462+ for (int i = 0 ; i < 100 ; i ++) {
463+ if (i % 10 == 0 ) {
464+ System .out .println ("testConcurrentRequestsLoop >> " + i );
465+ }
466+ testConcurrentRequests ();
467+ }
468+ }
469+
492470 @ Test
493471 public void testConcurrentRequests () throws InterruptedException {
494472 final int count1 = 1000 ;
@@ -514,12 +492,20 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
514492 l2 .countDown ();
515493 // wait until the 2nd request returns then proceed
516494 try {
517- if (!l1 .await (1 , TimeUnit .SECONDS ))
518- throw new IllegalStateException ();
519- } catch (InterruptedException e ) {}
495+ if (!l1 .await (2 , TimeUnit .SECONDS )) {
496+ observer .onError (new TimeoutException ());
497+ return state + 1 ;
498+ }
499+ } catch (InterruptedException e ) {
500+ observer .onError (e );
501+ return state + 1 ;
502+ }
520503 observer .onNext (state );
521- if (state == finalCount )
504+
505+ if (state == finalCount ) {
522506 observer .onCompleted ();
507+ }
508+
523509 return state + 1 ;
524510 }},
525511 onUnSubscribe );
@@ -532,10 +518,9 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
532518 Observable .create (os ).subscribeOn (Schedulers .newThread ()).subscribe (ts );
533519
534520 // wait until the first request has started processing
535- try {
536- if (!l2 .await (1 , TimeUnit .SECONDS ))
537- throw new IllegalStateException ();
538- } catch (InterruptedException e ) {}
521+ if (!l2 .await (2 , TimeUnit .SECONDS )) {
522+ fail ("SyncOnSubscribe failed to countDown in time" );
523+ }
539524 // make a concurrent request, this should return
540525 ts .requestMore (count2 );
541526 // unblock the 1st thread to proceed fulfilling requests
0 commit comments