Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2981,8 +2981,6 @@ protected Integer run() throws Exception {

@Test
public void testEarlyUnsubscribeDuringExecutionViaToObservable() {
final AtomicBoolean hystrixThreadStartedExecuting = new AtomicBoolean(false);

class AsyncCommand extends HystrixCommand<Boolean> {

public AsyncCommand() {
Expand All @@ -2991,9 +2989,8 @@ public AsyncCommand() {

@Override
protected Boolean run() {
hystrixThreadStartedExecuting.set(true);
try {
Thread.sleep(100);
Thread.sleep(500);
return true;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
Expand Down Expand Up @@ -3035,18 +3032,17 @@ public void onNext(Boolean b) {
Thread.sleep(10);
s.unsubscribe();
assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed());
assertFalse(cmd.isExecutionComplete());
assertEquals(hystrixThreadStartedExecuting.get(), cmd.isExecutedInThread());
assertEquals(null, cmd.getFailedExecutionException());
assertNull(cmd.getExecutionException());
System.out.println("Execution time : " + cmd.getExecutionTimeInMilliseconds());
assertTrue(cmd.getExecutionTimeInMilliseconds() > -1);
assertFalse(cmd.isSuccessfulExecution());
assertCommandExecutionEvents(cmd, HystrixEventType.CANCELLED);
assertEquals(0, cmd.metrics.getCurrentConcurrentExecutionCount());
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertSaneHystrixRequestLog(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
Expand All @@ -3055,8 +3051,6 @@ public void onNext(Boolean b) {

@Test
public void testEarlyUnsubscribeDuringExecutionViaObserve() {
final AtomicBoolean hystrixThreadStartedExecuting = new AtomicBoolean(false);

class AsyncCommand extends HystrixCommand<Boolean> {

public AsyncCommand() {
Expand All @@ -3066,8 +3060,7 @@ public AsyncCommand() {
@Override
protected Boolean run() {
try {
hystrixThreadStartedExecuting.set(true);
Thread.sleep(100);
Thread.sleep(500);
return true;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
Expand Down Expand Up @@ -3109,17 +3102,16 @@ public void onNext(Boolean b) {
Thread.sleep(10);
s.unsubscribe();
assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed());
assertFalse(cmd.isExecutionComplete());
assertEquals(hystrixThreadStartedExecuting.get(), cmd.isExecutedInThread());
assertEquals(null, cmd.getFailedExecutionException());
assertNull(cmd.getExecutionException());
assertTrue(cmd.getExecutionTimeInMilliseconds() > -1);
assertFalse(cmd.isSuccessfulExecution());
assertCommandExecutionEvents(cmd, HystrixEventType.CANCELLED);
assertEquals(0, cmd.metrics.getCurrentConcurrentExecutionCount());
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertSaneHystrixRequestLog(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
Expand All @@ -3128,8 +3120,6 @@ public void onNext(Boolean b) {

@Test
public void testEarlyUnsubscribeDuringFallback() {
final AtomicBoolean hystrixThreadStartedExecuting = new AtomicBoolean(false);

class AsyncCommand extends HystrixCommand<Boolean> {

public AsyncCommand() {
Expand All @@ -3144,8 +3134,7 @@ protected Boolean run() {
@Override
protected Boolean getFallback() {
try {
hystrixThreadStartedExecuting.set(true);
Thread.sleep(100);
Thread.sleep(500);
return false;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
Expand Down Expand Up @@ -3192,7 +3181,6 @@ public void onNext(Boolean b) {
assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed());
assertEquals(0, cmd.metrics.getCurrentConcurrentExecutionCount());
assertFalse(cmd.isExecutionComplete());
assertEquals(hystrixThreadStartedExecuting.get(), cmd.isExecutedInThread());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
Expand Down Expand Up @@ -3266,8 +3254,8 @@ public void onNext(Boolean b) {

try {
fromCacheSubscription.unsubscribe();
assertTrue(fromCacheLatch.await(200, TimeUnit.MILLISECONDS));
assertTrue(originalLatch.await(200, TimeUnit.MILLISECONDS));
assertTrue(fromCacheLatch.await(600, TimeUnit.MILLISECONDS));
assertTrue(originalLatch.await(600, TimeUnit.MILLISECONDS));
assertEquals("Number of execution semaphores in use (original)", 0, original.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use (original)", 0, original.getFallbackSemaphore().getNumberOfPermitsUsed());
assertTrue(original.isExecutionComplete());
Expand Down Expand Up @@ -3369,8 +3357,8 @@ public void onNext(Boolean b) {
try {
Thread.sleep(10);
originalSubscription.unsubscribe();
assertTrue(originalLatch.await(200, TimeUnit.MILLISECONDS));
assertTrue(fromCacheLatch.await(200, TimeUnit.MILLISECONDS));
assertTrue(originalLatch.await(600, TimeUnit.MILLISECONDS));
assertTrue(fromCacheLatch.await(600, TimeUnit.MILLISECONDS));
assertEquals("Number of execution semaphores in use (original)", 0, original.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use (original)", 0, original.getFallbackSemaphore().getNumberOfPermitsUsed());
assertFalse(original.isExecutionComplete());
Expand Down Expand Up @@ -3503,9 +3491,11 @@ public void onNext(Boolean b) {
originalSubscription.unsubscribe();
//fromCache1Subscription.unsubscribe();
fromCache2Subscription.unsubscribe();
assertTrue(originalLatch.await(200, TimeUnit.MILLISECONDS));
assertTrue(fromCache1Latch.await(200, TimeUnit.MILLISECONDS));
assertTrue(fromCache2Latch.await(200, TimeUnit.MILLISECONDS));
assertTrue(originalLatch.await(600, TimeUnit.MILLISECONDS));
assertTrue(fromCache1Latch.await(600, TimeUnit.MILLISECONDS));
assertTrue(fromCache2Latch.await(600, TimeUnit.MILLISECONDS));
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());

assertEquals("Number of execution semaphores in use (original)", 0, original.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use (original)", 0, original.getFallbackSemaphore().getNumberOfPermitsUsed());
assertFalse(original.isExecutionComplete());
Expand Down Expand Up @@ -3545,7 +3535,6 @@ public void onNext(Boolean b) {
assertNull(fromCache2Value.get());
assertEquals(0, fromCache2.metrics.getCurrentConcurrentExecutionCount());

System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertSaneHystrixRequestLog(3);
} catch (InterruptedException ex) {
ex.printStackTrace();
Expand Down Expand Up @@ -3649,6 +3638,8 @@ public void onNext(Boolean b) {
assertTrue(originalLatch.await(200, TimeUnit.MILLISECONDS));
assertTrue(fromCache1Latch.await(200, TimeUnit.MILLISECONDS));
assertTrue(fromCache2Latch.await(200, TimeUnit.MILLISECONDS));
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());

assertEquals("Number of execution semaphores in use (original)", 0, original.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use (original)", 0, original.getFallbackSemaphore().getNumberOfPermitsUsed());
assertFalse(original.isExecutionComplete());
Expand Down Expand Up @@ -3689,7 +3680,6 @@ public void onNext(Boolean b) {
assertFalse(fromCache2.isSuccessfulExecution());
assertEquals(0, fromCache2.metrics.getCurrentConcurrentExecutionCount());

System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertSaneHystrixRequestLog(3);
} catch (InterruptedException ex) {
ex.printStackTrace();
Expand Down Expand Up @@ -5189,7 +5179,7 @@ public AsyncCacheableCommand(String arg) {
@Override
protected Boolean run() {
try {
Thread.sleep(100);
Thread.sleep(500);
return true;
} catch (InterruptedException ex) {
cancelled.set(true);
Expand Down