Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public void call() {
}
}));
try {
//don't block or propagate CancellationException if already unsubscribed
if (subscriber.isUnsubscribed()) {
return;
}
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
subscriber.onNext(value);
subscriber.onCompleted();
Expand All @@ -71,6 +75,10 @@ public void call() {
// since it's already subscribed.
// If the Future is canceled in other place, CancellationException will be still
// passed to the final Subscriber.
if (subscriber.isUnsubscribed()) {
//refuse to emit onError if already unsubscribed
return;
}
subscriber.onError(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@
*/
package rx.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OnSubscribeToObservableFutureTest {

Expand Down Expand Up @@ -68,4 +77,64 @@ public void testFailure() throws Exception {
verify(o, times(1)).onError(e);
verify(future, times(1)).cancel(true);
}

@Test
public void testCancelledBeforeSubscribe() throws Exception {
Future<Object> future = mock(Future.class);
CancellationException e = new CancellationException("unit test synthetic cancellation");
when(future.get()).thenThrow(e);
Observer<Object> o = mock(Observer.class);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
testSubscriber.unsubscribe();
Subscription sub = Observable.from(future).subscribe(testSubscriber);
assertEquals(0, testSubscriber.getOnErrorEvents().size());
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
}

@Test
public void testCancellationDuringFutureGet() throws Exception {
Future<Object> future = new Future<Object>() {
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private AtomicBoolean isDone = new AtomicBoolean(false);

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
isCancelled.compareAndSet(false, true);
return true;
}

@Override
public boolean isCancelled() {
return isCancelled.get();
}

@Override
public boolean isDone() {
return isCancelled() || isDone.get();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
Thread.sleep(500);
isDone.compareAndSet(false, true);
return "foo";
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return get();
}
};

Observer<Object> o = mock(Observer.class);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
Observable<Object> futureObservable = Observable.from(future);
Subscription sub = futureObservable.subscribeOn(Schedulers.computation()).subscribe(testSubscriber);
sub.unsubscribe();
assertEquals(0, testSubscriber.getOnErrorEvents().size());
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
assertEquals(0, testSubscriber.getOnNextEvents().size());
}
}