Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix memory leak where TimerReference was not getting cleared on lost-…
…race request-cache insert
  • Loading branch information
Matt Jacobs committed May 13, 2016
commit 77e9860c51498cc8e7d44fb69d18ae9de8e366d2
46 changes: 25 additions & 21 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
Expand Down Expand Up @@ -95,7 +94,6 @@ protected static enum TimedOutStatus {
protected final AtomicReference<Reference<TimerListener>> timeoutTimer = new AtomicReference<Reference<TimerListener>>();

protected final AtomicBoolean commandStarted = new AtomicBoolean();
protected volatile boolean executionStarted = false;
protected volatile boolean isExecutionComplete = false;

/*
Expand Down Expand Up @@ -361,8 +359,7 @@ public Observable<R> toObservable() {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
}

final long startTimestamp = System.currentTimeMillis();
commandStartTimestamp = startTimestamp;
commandStartTimestamp = System.currentTimeMillis();

if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
Expand Down Expand Up @@ -449,21 +446,7 @@ public Observable<R> call() {
.doOnUnsubscribe(unsubscribeCommandCleanup); // perform cleanup once
}

private void handleCommandEnd() {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}

long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
ExecutionResult cancelled = executionResultAtTimeOfCancellation;
if (cancelled == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, executionStarted);
} else {
metrics.markCommandDone(cancelled, commandKey, threadPoolKey, executionStarted);
}
}

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
Expand Down Expand Up @@ -601,7 +584,7 @@ private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionStarted = true;
executionResult = executionResult.setExecutionOccurred();
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
Expand Down Expand Up @@ -639,7 +622,7 @@ public Boolean call() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionStarted = true;
executionResult = executionResult.setExecutionOccurred();
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
Expand Down Expand Up @@ -849,17 +832,38 @@ public void call() {
}

private void cleanUpAfterResponseFromCache() {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}

final long latency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult
.addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE)
.markUserThreadCompletion(latency)
.setNotExecutedInThread();
ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE)
.markUserThreadCompletion(latency);
metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, executionStarted);
metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey);
eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
}

private void handleCommandEnd() {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}

long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
ExecutionResult cancelled = executionResultAtTimeOfCancellation;
if (cancelled == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
} else {
metrics.markCommandDone(cancelled, commandKey, threadPoolKey);
}
}

private Observable<R> handleSemaphoreRejectionViaFallback() {
Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
executionResult = executionResult.setExecutionException(semaphoreRejectionException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ private static boolean didExecutionOccur(HystrixEventType eventType) {
}
}

public ExecutionResult setExecutionOccurred() {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, true, isExecutedInThread, collapserKey);
}

public ExecutionResult setExecutionLatency(int executionLatency) {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, executionOccurred, isExecutedInThread, collapserKey);
Expand Down Expand Up @@ -273,14 +278,14 @@ public ExecutionResult markUserThreadCompletion(long userThreadLatency) {
public ExecutionResult addEvent(HystrixEventType eventType) {
return new ExecutionResult(eventCounts.plus(eventType), startTimestamp, executionLatency,
userThreadLatency, failedExecutionException, executionException,
executionOccurred ? executionOccurred : didExecutionOccur(eventType), isExecutedInThread, collapserKey);
executionOccurred, isExecutedInThread, collapserKey);
}

public ExecutionResult addEvent(int executionLatency, HystrixEventType eventType) {
if (startTimestamp >= 0 && !isResponseRejected()) {
return new ExecutionResult(eventCounts.plus(eventType), startTimestamp, executionLatency,
userThreadLatency, failedExecutionException, executionException,
executionOccurred ? executionOccurred : didExecutionOccur(eventType), isExecutedInThread, collapserKey);
executionOccurred, isExecutedInThread, collapserKey);
} else {
return addEvent(eventType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ public void call() {
completionLogicRun.set(true);
}
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ public int getCurrentConcurrentExecutionCount() {
HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
}

/* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
/* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
if (executionStarted) {
if (executionResult.executionOccurred()) {
concurrentExecutionCount.decrementAndGet();
}
}
Expand Down