Skip to content

Commit 4e08f1a

Browse files
authored
Merge pull request Netflix#1396 from mattrjacobs/unsubscribe-before-subscribe
If command is unsubscribed before any work starts, just return Observable.never()
2 parents 487ee3a + 5b1e4d6 commit 4e08f1a

File tree

3 files changed

+79
-0
lines changed

3 files changed

+79
-0
lines changed

hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,9 @@ public void call() {
402402
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
403403
@Override
404404
public Observable<R> call() {
405+
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
406+
return Observable.never();
407+
}
405408
return applyHystrixSemantics(_cmd);
406409
}
407410
};

hystrix-core/src/test/java/com/netflix/hystrix/CommonHystrixCommandTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,10 @@ C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult execu
727727
return getCommand(isolationStrategy, executionResult, FallbackResult.UNIMPLEMENTED);
728728
}
729729

730+
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency) {
731+
return getCommand(isolationStrategy, executionResult, executionLatency, FallbackResult.UNIMPLEMENTED);
732+
}
733+
730734
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, FallbackResult fallbackResult) {
731735
return getCommand(isolationStrategy, executionResult, 0, fallbackResult);
732736
}

hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import rx.functions.Action1;
4242
import rx.functions.Func0;
4343
import rx.functions.Func1;
44+
import rx.functions.Func2;
4445
import rx.observers.TestSubscriber;
4546
import rx.schedulers.Schedulers;
4647

@@ -3868,6 +3869,77 @@ public void onNext(Integer i) {
38683869
assertCommandExecutionEvents(cmd, HystrixEventType.SUCCESS);
38693870
}
38703871

3872+
@Test
3873+
public void testUnsubscribeBeforeSubscribe() throws Exception {
3874+
//this may happen in Observable chain, so Hystrix should make sure that command never executes/allocates in this situation
3875+
Observable<String> error = Observable.error(new RuntimeException("foo"));
3876+
HystrixCommand<Integer> cmd = getCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.ExecutionResult.SUCCESS, 100);
3877+
Observable<Integer> cmdResult = cmd.toObservable()
3878+
.doOnNext(new Action1<Integer>() {
3879+
@Override
3880+
public void call(Integer integer) {
3881+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + integer);
3882+
}
3883+
})
3884+
.doOnError(new Action1<Throwable>() {
3885+
@Override
3886+
public void call(Throwable ex) {
3887+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + ex);
3888+
}
3889+
})
3890+
.doOnCompleted(new Action0() {
3891+
@Override
3892+
public void call() {
3893+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted");
3894+
}
3895+
})
3896+
.doOnSubscribe(new Action0() {
3897+
@Override
3898+
public void call() {
3899+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnSubscribe");
3900+
}
3901+
})
3902+
.doOnUnsubscribe(new Action0() {
3903+
@Override
3904+
public void call() {
3905+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnUnsubscribe");
3906+
}
3907+
});
3908+
3909+
//the zip operator will subscribe to each observable. there is a race between the error of the first
3910+
//zipped observable terminating the zip and the subscription to the command's observable
3911+
Observable<String> zipped = Observable.zip(error, cmdResult, new Func2<String, Integer, String>() {
3912+
@Override
3913+
public String call(String s, Integer integer) {
3914+
return s + integer;
3915+
}
3916+
});
3917+
3918+
final CountDownLatch latch = new CountDownLatch(1);
3919+
3920+
zipped.subscribe(new Subscriber<String>() {
3921+
@Override
3922+
public void onCompleted() {
3923+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
3924+
latch.countDown();
3925+
}
3926+
3927+
@Override
3928+
public void onError(Throwable e) {
3929+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e);
3930+
latch.countDown();
3931+
}
3932+
3933+
@Override
3934+
public void onNext(String s) {
3935+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnNext : " + s);
3936+
}
3937+
});
3938+
3939+
latch.await(1000, TimeUnit.MILLISECONDS);
3940+
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
3941+
}
3942+
38713943
/**
38723944
*********************** THREAD-ISOLATED Execution Hook Tests **************************************
38733945
*/

0 commit comments

Comments
 (0)