From c9241ac1a432a3c4eaef3d83a76bf2ccf73c9342 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Sun, 7 May 2017 09:55:29 -0700 Subject: [PATCH] Made circuit-opening happen in background, as health counts stream produces new values * This allows for methods which imply reading the circuit-breaker status (isOpen/allowRequest) to be idempotent * Added new method (attemptExecution), which command internally uses to actually manipulate status --- .../com/netflix/hystrix/AbstractCommand.java | 3 +- .../hystrix/HystrixCircuitBreaker.java | 212 ++++++++++++------ .../hystrix/HystrixCommandMetrics.java | 4 + .../hystrix/HystrixCircuitBreakerTest.java | 31 ++- 4 files changed, 172 insertions(+), 78 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index c9ca2cbac..a93273d90 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -520,7 +520,7 @@ private Observable applyHystrixSemantics(final AbstractCommand _cmd) { executionHook.onStart(_cmd); /* determine if we're allowed to execute */ - if (circuitBreaker.allowRequest()) { + if (circuitBreaker.attemptExecution()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @@ -601,6 +601,7 @@ public void call() { final Func1> handleFallback = new Func1>() { @Override public Observable call(Throwable t) { + circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java index f8047af1e..29e8f7899 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java @@ -16,44 +16,58 @@ package com.netflix.hystrix; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; +import rx.Subscriber; +import rx.Subscription; /** * Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold. *

- * It will then allow single retries after a defined sleepWindow until the execution succeeds at which point it will again close the circuit and allow executions again. + * The default (and only) implementation will then allow a single retry after a defined sleepWindow until the execution + * succeeds at which point it will again close the circuit and allow executions again. */ public interface HystrixCircuitBreaker { /** - * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. - *

- * This takes into account the half-open logic which allows some requests through when determining if it should be closed again. + * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does + * not modify any internal state, and takes into account the half-open logic which allows some requests through + * after the circuit has been opened * * @return boolean whether a request should be permitted */ - public boolean allowRequest(); + boolean allowRequest(); /** * Whether the circuit is currently open (tripped). * * @return boolean state of circuit breaker */ - public boolean isOpen(); + boolean isOpen(); /** * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. */ - /* package */void markSuccess(); + void markSuccess(); + + /** + * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. + */ + void markNonSuccess(); + + /** + * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal + * state. + */ + boolean attemptExecution(); /** * @ExcludeFromJavadoc * @ThreadSafe */ - public static class Factory { + class Factory { // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly) private static ConcurrentHashMap circuitBreakersByCommand = new ConcurrentHashMap(); @@ -114,102 +128,161 @@ public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) { } } + /** * The default production implementation of {@link HystrixCircuitBreaker}. * * @ExcludeFromJavadoc * @ThreadSafe */ - /* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { + /* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; - /* track whether this circuit is open/closed at any given point in time (default to false==closed) */ - private AtomicBoolean circuitOpen = new AtomicBoolean(false); + enum Status { + CLOSED, OPEN, HALF_OPEN; + } - /* when the circuit was marked open or was last allowed to try a 'singleTest' */ - private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); + private final AtomicReference status = new AtomicReference(Status.CLOSED); + private final AtomicLong circuitOpened = new AtomicLong(-1); + private final AtomicReference activeSubscription = new AtomicReference(null); - protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { + protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; + + //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur + Subscription s = subscribeToStream(); + activeSubscription.set(s); + } + + private Subscription subscribeToStream() { + /* + * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream + */ + return metrics.getHealthCountsStream() + .observe() + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(HealthCounts hc) { + // check if we are past the statisticalWindowVolumeThreshold + if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { + // we are not past the minimum volume threshold for the stat window, + // so no change to circuit status. + // if it was CLOSED, it stays CLOSED + // if it was half-open, we need to wait for a successful command execution + // if it was open, we need to wait for sleep window to elapse + } else { + if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { + //we are not past the minimum error threshold for the stat window, + // so no change to circuit status. + // if it was CLOSED, it stays CLOSED + // if it was half-open, we need to wait for a successful command execution + // if it was open, we need to wait for sleep window to elapse + } else { + // our failure rate is too high, we need to set the state to OPEN + if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { + circuitOpened.set(System.currentTimeMillis()); + } + } + } + } + }); } + @Override public void markSuccess() { - if (circuitOpen.get()) { - if (circuitOpen.compareAndSet(true, false)) { - //win the thread race to reset metrics - //Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view, - //and all other metric consumers are unaffected by the reset - metrics.resetStream(); + if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { + //This thread wins the race to close the circuit - it resets the stream to start it over from 0 + metrics.resetStream(); + Subscription previousSubscription = activeSubscription.get(); + if (previousSubscription != null) { + previousSubscription.unsubscribe(); } + Subscription newSubscription = subscribeToStream(); + activeSubscription.set(newSubscription); + circuitOpened.set(-1L); + } + } + + @Override + public void markNonSuccess() { + if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { + //This thread wins the race to re-open the circuit - it resets the start time for the sleep window + circuitOpened.set(System.currentTimeMillis()); + } + } + + @Override + public boolean isOpen() { + if (properties.circuitBreakerForceOpen().get()) { + return true; + } + if (properties.circuitBreakerForceClosed().get()) { + return false; } + return circuitOpened.get() >= 0; } @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { - // properties have asked us to force the circuit open so we will allow NO requests return false; } if (properties.circuitBreakerForceClosed().get()) { - // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior - isOpen(); - // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through return true; } - return !isOpen() || allowSingleTest(); - } - - public boolean allowSingleTest() { - long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); - // 1) if the circuit is open - // 2) and it's been longer than 'sleepWindow' since we opened the circuit - if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { - // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try. - // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'. - if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { - // if this returns true that means we set the time so we'll return true to allow the singleTest - // if it returned false it means another thread raced us and allowed the singleTest before we did - return true; + if (circuitOpened.get() == -1) { + return true; + } else { + if (status.get().equals(Status.HALF_OPEN)) { + return false; + } else { + return isAfterSleepWindow(); } } - return false; } - @Override - public boolean isOpen() { - if (circuitOpen.get()) { - // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close - return true; - } - - // we're closed, so let's see if errors have made us so we should trip the circuit open - HealthCounts health = metrics.getHealthCounts(); + private boolean isAfterSleepWindow() { + final long circuitOpenTime = circuitOpened.get(); + final long currentTime = System.currentTimeMillis(); + final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); + return currentTime > circuitOpenTime + sleepWindowTime; + } - // check if we are past the statisticalWindowVolumeThreshold - if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { - // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything + @Override + public boolean attemptExecution() { + if (properties.circuitBreakerForceOpen().get()) { return false; } - - if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { - return false; + if (properties.circuitBreakerForceClosed().get()) { + return true; + } + if (circuitOpened.get() == -1) { + return true; } else { - // our failure rate is too high, trip the circuit - if (circuitOpen.compareAndSet(false, true)) { - // if the previousValue was false then we want to set the currentTime - circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); - return true; + if (isAfterSleepWindow()) { + if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { + //only the first request after sleep window should execute + return true; + } else { + return false; + } } else { - // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have - // caused another thread to set it to true already even though we were in the process of doing the same - // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open - return true; + return false; } } } - } /** @@ -234,6 +307,15 @@ public void markSuccess() { } + @Override + public void markNonSuccess() { + + } + + @Override + public boolean attemptExecution() { + return true; + } } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommandMetrics.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommandMetrics.java index 797c5e48f..433f0cb1b 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommandMetrics.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommandMetrics.java @@ -343,6 +343,10 @@ public int getCurrentConcurrentExecutionCount() { } } + /* package-private */ HealthCountsStream getHealthCountsStream() { + return healthCountsStream; + } + /** * Retrieve a snapshot of total requests, error count and error percentage. * diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java index 067993dd9..b0d0b13f2 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java @@ -93,6 +93,16 @@ public void markSuccess() { // we don't need to do anything since we're going to permanently trip the circuit } + @Override + public void markNonSuccess() { + + } + + @Override + public boolean attemptExecution() { + return !isOpen(); + } + @Override public boolean allowRequest() { return !isOpen(); @@ -123,7 +133,7 @@ public void testTripCircuit() { // this should still allow requests as everything has been successful Thread.sleep(100); - assertTrue(cb.allowRequest()); + //assertTrue(cb.allowRequest()); assertFalse(cb.isOpen()); // fail @@ -298,7 +308,6 @@ public void testTripCircuitOnTimeoutsAboveThreshold() { // this should trip the circuit as the error percentage is above the threshold Thread.sleep(100); - assertFalse(cb.allowRequest()); assertTrue(cb.isOpen()); } catch (Exception e) { e.printStackTrace(); @@ -338,11 +347,11 @@ public void testSingleTestOnOpenCircuitAfterTimeWindow() { Thread.sleep(sleepWindow + 50); // we should now allow 1 request - assertTrue(cb.allowRequest()); + assertTrue(cb.attemptExecution()); // but the circuit should still be open assertTrue(cb.isOpen()); // and further requests are still blocked - assertFalse(cb.allowRequest()); + assertFalse(cb.attemptExecution()); } catch (Exception e) { e.printStackTrace(); @@ -378,7 +387,6 @@ public void testCircuitClosedAfterSuccess() { Thread.sleep(100); System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); System.out.println("CircuitBreaker state 1 : " + cmd1.getMetrics().getHealthCounts()); - assertFalse(cb.allowRequest()); assertTrue(cb.isOpen()); // wait for sleepWindow to pass @@ -438,16 +446,15 @@ public void testMultipleTimeWindowRetriesBeforeClosingCircuit() { cmd4.execute(); // everything has failed in the test window so we should return false now - System.out.println("!!!! 1 4 failures, circuit will open on recalc"); + System.out.println("!!!! 1: 4 failures, circuit will open on recalc"); Thread.sleep(100); - assertFalse(cb.allowRequest()); assertTrue(cb.isOpen()); // wait for sleepWindow to pass - System.out.println("!!!! 2 Sleep window starting where all commands fail-fast"); + System.out.println("!!!! 2: Sleep window starting where all commands fail-fast"); Thread.sleep(sleepWindow + 50); - System.out.println("!!!! 3 Sleep window over, should allow singleTest()"); + System.out.println("!!!! 3: Sleep window over, should allow singleTest()"); // but the circuit should still be open assertTrue(cb.isOpen()); @@ -455,14 +462,14 @@ public void testMultipleTimeWindowRetriesBeforeClosingCircuit() { // we should now allow 1 request, and upon failure, should not affect the circuit breaker, which should remain open HystrixCommand cmd5 = new FailureCommand(key, 60); Observable asyncResult5 = cmd5.observe(); - System.out.println("!!!! Kicked off the single-test"); + System.out.println("!!!! 4: Kicked off the single-test"); // and further requests are still blocked while the singleTest command is in flight assertFalse(cb.allowRequest()); - System.out.println("!!!! Confirmed that no other requests go out during single-test"); + System.out.println("!!!! 5: Confirmed that no other requests go out during single-test"); asyncResult5.toBlocking().single(); - System.out.println("!!!! SingleTest just completed"); + System.out.println("!!!! 6: SingleTest just completed"); // all requests should still be blocked, because the singleTest failed assertFalse(cb.allowRequest());