diff --git a/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 803d999fe8..950498e880 100644 --- a/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -54,9 +54,11 @@ public abstract class AbstractSchedulerConcurrencyTests extends AbstractSchedule public final void testUnSubscribeForScheduler() throws InterruptedException { final AtomicInteger countReceived = new AtomicInteger(); final AtomicInteger countGenerated = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch subscribed = new CountDownLatch(1); + final CountDownLatch unsubscribed = new CountDownLatch(1); - Observable.interval(50, TimeUnit.MILLISECONDS) + TestScheduler testScheduler = Schedulers.test(); + Observable.interval(50, TimeUnit.MILLISECONDS, testScheduler) .map(new Func1() { @Override public Long call(Long aLong) { @@ -64,6 +66,12 @@ public Long call(Long aLong) { return aLong; } }) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.countDown(); + } + }) .subscribeOn(getScheduler()) .observeOn(getScheduler()) .subscribe(new Subscriber() { @@ -79,19 +87,18 @@ public void onError(Throwable e) { @Override public void onNext(Long args) { + System.out.println("==> Received " + args); if (countReceived.incrementAndGet() == 2) { unsubscribe(); - latch.countDown(); + unsubscribed.countDown(); } - System.out.println("==> Received " + args); } }); - latch.await(1000, TimeUnit.MILLISECONDS); - - System.out.println("----------- it thinks it is finished ------------------ "); - Thread.sleep(100); - + subscribed.await(1000, TimeUnit.MILLISECONDS); + testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + unsubscribed.await(1000, TimeUnit.MILLISECONDS); + testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); assertEquals(2, countGenerated.get()); }