Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.ExecutionResult;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.metric.HystrixRequestEvents;
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import rx.Observable;

import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -66,69 +61,19 @@ public static String convertRequestToJson(HystrixRequestEvents request) throws I

private static void writeRequestAsJson(JsonGenerator json, HystrixRequestEvents request) throws IOException {
json.writeStartArray();
Map<CommandAndCacheKey, Integer> cachingDetector = new HashMap<CommandAndCacheKey, Integer>();
List<HystrixInvokableInfo<?>> nonCachedExecutions = new ArrayList<HystrixInvokableInfo<?>>(request.getExecutions().size());
for (HystrixInvokableInfo<?> execution: request.getExecutions()) {
if (execution.getPublicCacheKey() != null) {
//eligible for caching - might be the initial, or might be from cache
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), execution.getPublicCacheKey());
Integer count = cachingDetector.get(key);
if (count != null) {
//key already seen
cachingDetector.put(key, count + 1);
} else {
//key not seen yet
cachingDetector.put(key, 0);
}
}
if (!execution.isResponseFromCache()) {
nonCachedExecutions.add(execution);
}
}

Map<ExecutionSignature, List<Integer>> commandDeduper = new HashMap<ExecutionSignature, List<Integer>>();
for (HystrixInvokableInfo<?> execution: nonCachedExecutions) {
int cachedCount = 0;
String cacheKey = null;
if (execution.getPublicCacheKey() != null) {
cacheKey = execution.getPublicCacheKey();
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), cacheKey);
cachedCount = cachingDetector.get(key);
}
ExecutionSignature signature;
HystrixCollapserKey collapserKey = execution.getOriginatingCollapserKey();
int collapserBatchCount = execution.getNumberCollapsed();
if (cachedCount > 0) {
//this has a RESPONSE_FROM_CACHE and needs to get split off
signature = ExecutionSignature.from(execution, cacheKey, cachedCount);
} else {
//nothing cached from this, can collapse further
signature = ExecutionSignature.from(execution);
}
List<Integer> currentLatencyList = commandDeduper.get(signature);
if (currentLatencyList != null) {
currentLatencyList.add(execution.getExecutionTimeInMilliseconds());
} else {
List<Integer> newLatencyList = new ArrayList<Integer>();
newLatencyList.add(execution.getExecutionTimeInMilliseconds());
commandDeduper.put(signature, newLatencyList);
}
}

for (Map.Entry<ExecutionSignature, List<Integer>> entry: commandDeduper.entrySet()) {
ExecutionSignature executionSignature = entry.getKey();
List<Integer> latencies = entry.getValue();
convertExecutionToJson(json, executionSignature, latencies);
for (Map.Entry<HystrixRequestEvents.ExecutionSignature, List<Integer>> entry: request.getExecutionsMappedToLatencies().entrySet()) {
convertExecutionToJson(json, entry.getKey(), entry.getValue());
}

json.writeEndArray();
}

private static void convertExecutionToJson(JsonGenerator json, ExecutionSignature executionSignature, List<Integer> latencies) throws IOException {
private static void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.ExecutionSignature executionSignature, List<Integer> latencies) throws IOException {
json.writeStartObject();
json.writeStringField("name", executionSignature.commandName);
json.writeStringField("name", executionSignature.getCommandName());
json.writeArrayFieldStart("events");
ExecutionResult.EventCounts eventCounts = executionSignature.eventCounts;
ExecutionResult.EventCounts eventCounts = executionSignature.getEventCounts();
for (HystrixEventType eventType: HystrixEventType.values()) {
if (!eventType.equals(HystrixEventType.COLLAPSED)) {
if (eventCounts.contains(eventType)) {
Expand All @@ -150,99 +95,15 @@ private static void convertExecutionToJson(JsonGenerator json, ExecutionSignatur
json.writeNumber(latency);
}
json.writeEndArray();
if (executionSignature.cachedCount > 0) {
json.writeNumberField("cached", executionSignature.cachedCount);
if (executionSignature.getCachedCount() > 0) {
json.writeNumberField("cached", executionSignature.getCachedCount());
}
if (executionSignature.eventCounts.contains(HystrixEventType.COLLAPSED)) {
if (executionSignature.getEventCounts().contains(HystrixEventType.COLLAPSED)) {
json.writeObjectFieldStart("collapsed");
json.writeStringField("name", executionSignature.collapserKey.name());
json.writeNumberField("count", executionSignature.collapserBatchSize);
json.writeStringField("name", executionSignature.getCollapserKey().name());
json.writeNumberField("count", executionSignature.getCollapserBatchSize());
json.writeEndObject();
}
json.writeEndObject();
}

private static class CommandAndCacheKey {
private final String commandName;
private final String cacheKey;

public CommandAndCacheKey(String commandName, String cacheKey) {
this.commandName = commandName;
this.cacheKey = cacheKey;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CommandAndCacheKey that = (CommandAndCacheKey) o;

if (!commandName.equals(that.commandName)) return false;
return cacheKey.equals(that.cacheKey);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + cacheKey.hashCode();
return result;
}

@Override
public String toString() {
return "CommandAndCacheKey{" +
"commandName='" + commandName + '\'' +
", cacheKey='" + cacheKey + '\'' +
'}';
}
}

private static class ExecutionSignature {
private final String commandName;
private final ExecutionResult.EventCounts eventCounts;
private final String cacheKey;
private final int cachedCount;
private final HystrixCollapserKey collapserKey;
private final int collapserBatchSize;

private ExecutionSignature(HystrixCommandKey commandKey, ExecutionResult.EventCounts eventCounts, String cacheKey, int cachedCount, HystrixCollapserKey collapserKey, int collapserBatchSize) {
this.commandName = commandKey.name();
this.eventCounts = eventCounts;
this.cacheKey = cacheKey;
this.cachedCount = cachedCount;
this.collapserKey = collapserKey;
this.collapserBatchSize = collapserBatchSize;
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), null, 0, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution, String cacheKey, int cachedCount) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), cacheKey, cachedCount, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ExecutionSignature that = (ExecutionSignature) o;

if (!commandName.equals(that.commandName)) return false;
if (!eventCounts.equals(that.eventCounts)) return false;
return !(cacheKey != null ? !cacheKey.equals(that.cacheKey) : that.cacheKey != null);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + eventCounts.hashCode();
result = 31 * result + (cacheKey != null ? cacheKey.hashCode() : 0);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
*/
package com.netflix.hystrix.metric;

import com.netflix.hystrix.ExecutionResult;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixInvokableInfo;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class HystrixRequestEvents {

private final Collection<HystrixInvokableInfo<?>> executions;

public HystrixRequestEvents(Collection<HystrixInvokableInfo<?>> executions) {
Expand All @@ -30,4 +36,162 @@ public HystrixRequestEvents(Collection<HystrixInvokableInfo<?>> executions) {
public Collection<HystrixInvokableInfo<?>> getExecutions() {
return executions;
}

public Map<ExecutionSignature, List<Integer>> getExecutionsMappedToLatencies() {
Map<CommandAndCacheKey, Integer> cachingDetector = new HashMap<CommandAndCacheKey, Integer>();
List<HystrixInvokableInfo<?>> nonCachedExecutions = new ArrayList<HystrixInvokableInfo<?>>(executions.size());
for (HystrixInvokableInfo<?> execution: executions) {
if (execution.getPublicCacheKey() != null) {
//eligible for caching - might be the initial, or might be from cache
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), execution.getPublicCacheKey());
Integer count = cachingDetector.get(key);
if (count != null) {
//key already seen
cachingDetector.put(key, count + 1);
} else {
//key not seen yet
cachingDetector.put(key, 0);
}
}
if (!execution.isResponseFromCache()) {
nonCachedExecutions.add(execution);
}
}

Map<ExecutionSignature, List<Integer>> commandDeduper = new HashMap<ExecutionSignature, List<Integer>>();
for (HystrixInvokableInfo<?> execution: nonCachedExecutions) {
int cachedCount = 0;
String cacheKey = null;
if (execution.getPublicCacheKey() != null) {
cacheKey = execution.getPublicCacheKey();
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), cacheKey);
cachedCount = cachingDetector.get(key);
}
ExecutionSignature signature;
HystrixCollapserKey collapserKey = execution.getOriginatingCollapserKey();
int collapserBatchCount = execution.getNumberCollapsed();
if (cachedCount > 0) {
//this has a RESPONSE_FROM_CACHE and needs to get split off
signature = ExecutionSignature.from(execution, cacheKey, cachedCount);
} else {
//nothing cached from this, can collapse further
signature = ExecutionSignature.from(execution);
}
List<Integer> currentLatencyList = commandDeduper.get(signature);
if (currentLatencyList != null) {
currentLatencyList.add(execution.getExecutionTimeInMilliseconds());
} else {
List<Integer> newLatencyList = new ArrayList<Integer>();
newLatencyList.add(execution.getExecutionTimeInMilliseconds());
commandDeduper.put(signature, newLatencyList);
}
}

return commandDeduper;
}

private static class CommandAndCacheKey {
private final String commandName;
private final String cacheKey;

public CommandAndCacheKey(String commandName, String cacheKey) {
this.commandName = commandName;
this.cacheKey = cacheKey;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CommandAndCacheKey that = (CommandAndCacheKey) o;

if (!commandName.equals(that.commandName)) return false;
return cacheKey.equals(that.cacheKey);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + cacheKey.hashCode();
return result;
}

@Override
public String toString() {
return "CommandAndCacheKey{" +
"commandName='" + commandName + '\'' +
", cacheKey='" + cacheKey + '\'' +
'}';
}
}

public static class ExecutionSignature {
private final String commandName;
private final ExecutionResult.EventCounts eventCounts;
private final String cacheKey;
private final int cachedCount;
private final HystrixCollapserKey collapserKey;
private final int collapserBatchSize;

private ExecutionSignature(HystrixCommandKey commandKey, ExecutionResult.EventCounts eventCounts, String cacheKey, int cachedCount, HystrixCollapserKey collapserKey, int collapserBatchSize) {
this.commandName = commandKey.name();
this.eventCounts = eventCounts;
this.cacheKey = cacheKey;
this.cachedCount = cachedCount;
this.collapserKey = collapserKey;
this.collapserBatchSize = collapserBatchSize;
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), null, 0, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution, String cacheKey, int cachedCount) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), cacheKey, cachedCount, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ExecutionSignature that = (ExecutionSignature) o;

if (!commandName.equals(that.commandName)) return false;
if (!eventCounts.equals(that.eventCounts)) return false;
return !(cacheKey != null ? !cacheKey.equals(that.cacheKey) : that.cacheKey != null);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + eventCounts.hashCode();
result = 31 * result + (cacheKey != null ? cacheKey.hashCode() : 0);
return result;
}

public String getCommandName() {
return commandName;
}

public ExecutionResult.EventCounts getEventCounts() {
return eventCounts;
}

public int getCachedCount() {
return cachedCount;
}


public HystrixCollapserKey getCollapserKey() {
return collapserKey;
}

public int getCollapserBatchSize() {
return collapserBatchSize;
}
}
}