Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);

/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
Expand Down Expand Up @@ -601,6 +601,7 @@ public void call() {
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,58 @@
package com.netflix.hystrix;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts;
import rx.Subscriber;
import rx.Subscription;

/**
* Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold.
* <p>
* It will then allow single retries after a defined sleepWindow until the execution succeeds at which point it will again close the circuit and allow executions again.
* The default (and only) implementation will then allow a single retry after a defined sleepWindow until the execution
* succeeds at which point it will again close the circuit and allow executions again.
*/
public interface HystrixCircuitBreaker {

/**
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.
* <p>
* This takes into account the half-open logic which allows some requests through when determining if it should be closed again.
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does
* not modify any internal state, and takes into account the half-open logic which allows some requests through
* after the circuit has been opened
*
* @return boolean whether a request should be permitted
*/
public boolean allowRequest();
boolean allowRequest();

/**
* Whether the circuit is currently open (tripped).
*
* @return boolean state of circuit breaker
*/
public boolean isOpen();
boolean isOpen();

/**
* Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
/* package */void markSuccess();
void markSuccess();

/**
* Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markNonSuccess();

/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal
* state.
*/
boolean attemptExecution();

/**
* @ExcludeFromJavadoc
* @ThreadSafe
*/
public static class Factory {
class Factory {
// String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

Expand Down Expand Up @@ -114,102 +128,161 @@ public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
}
}


/**
* The default production implementation of {@link HystrixCircuitBreaker}.
*
* @ExcludeFromJavadoc
* @ThreadSafe
*/
/* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;

/* track whether this circuit is open/closed at any given point in time (default to false==closed) */
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
enum Status {
CLOSED, OPEN, HALF_OPEN;
}

/* when the circuit was marked open or was last allowed to try a 'singleTest' */
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;

//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}

private Subscription subscribeToStream() {
/*
* This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
*/
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}

@Override
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
//win the thread race to reset metrics
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
//and all other metric consumers are unaffected by the reset
metrics.resetStream();
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}

@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}

@Override
public boolean isOpen() {
if (properties.circuitBreakerForceOpen().get()) {
return true;
}
if (properties.circuitBreakerForceClosed().get()) {
return false;
}
return circuitOpened.get() >= 0;
}

@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}

public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
if (circuitOpened.get() == -1) {
return true;
} else {
if (status.get().equals(Status.HALF_OPEN)) {
return false;
} else {
return isAfterSleepWindow();
}
}
return false;
}

@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}

// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}

// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}

if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
}
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
return false;
}
}
}

}

/**
Expand All @@ -234,6 +307,15 @@ public void markSuccess() {

}

@Override
public void markNonSuccess() {

}

@Override
public boolean attemptExecution() {
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ public int getCurrentConcurrentExecutionCount() {
}
}

/* package-private */ HealthCountsStream getHealthCountsStream() {
return healthCountsStream;
}

/**
* Retrieve a snapshot of total requests, error count and error percentage.
*
Expand Down
Loading