Skip to content

Commit 910ee48

Browse files
author
Matt Jacobs
committed
Made HystrixUtilizationStream support sharing
1 parent c86931b commit 910ee48

File tree

4 files changed

+401
-14
lines changed

4 files changed

+401
-14
lines changed

hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,26 @@ protected static String convertToJson(HystrixUtilization utilization) throws IOE
111111
return jsonString.getBuffer().toString();
112112
}
113113

114+
/**
115+
* @deprecated Not for public use. This prevents stream-sharing. Please use {@link HystrixUtilizationStream#observe()}
116+
* @param delay interval between data emissions
117+
* @return sampled utilization as Java object, taken on a timer
118+
*/
119+
@Deprecated //deprecated as of 1.5.4
114120
public Observable<HystrixUtilization> observe(int delay) {
115121
return streamGenerator.call(delay);
116122
}
117123

124+
/**
125+
* @deprecated Not for public use. This prevents stream-sharing. Please use {@link #observeJson()}
126+
* @param delay interval between data emissions
127+
* @return sampled utilization as JSON string, taken on a timer
128+
*/
118129
public Observable<String> observeJson(int delay) {
119130
return streamGenerator.call(delay).map(convertToJsonFunc);
120131
}
132+
133+
public Observable<String> observeJson() {
134+
return HystrixUtilizationStream.getInstance().observe().map(convertToJsonFunc);
135+
}
121136
}

hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
class EventStream implements Supplier<Observable<Payload>> {
3535

3636
private final static int CONFIGURATION_DATA_INTERVAL_IN_MS = 500;
37-
private final static int UTILIZATION_DATA_INTERVAL_IN_MS = 500;
3837

3938
private final Observable<Payload> source;
4039
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
@@ -69,7 +68,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) {
6968
.map(SerialHystrixMetric::toPayload);
7069
break;
7170
case UTILIZATION_STREAM:
72-
source = new HystrixUtilizationStream(UTILIZATION_DATA_INTERVAL_IN_MS)
71+
source = HystrixUtilizationStream.getInstance()
7372
.observe()
7473
.map(SerialHystrixUtilization::toBytes)
7574
.map(SerialHystrixMetric::toPayload);

hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,47 +20,80 @@
2020
import com.netflix.hystrix.HystrixThreadPoolKey;
2121
import com.netflix.hystrix.HystrixThreadPoolMetrics;
2222
import rx.Observable;
23-
import rx.functions.Func0;
23+
import rx.functions.Action0;
2424
import rx.functions.Func1;
2525

2626
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930

3031
/**
3132
* This class samples current Hystrix utilization of resources and exposes that as a stream
3233
*/
3334
public class HystrixUtilizationStream {
34-
3535
private final int intervalInMilliseconds;
36-
private final Observable<Long> timer;
36+
private final Observable<HystrixUtilization> allUtilizationStream;
37+
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
3738

39+
/**
40+
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
41+
* @param intervalInMilliseconds milliseconds between data emissions
42+
*/
43+
@Deprecated //deprecated in 1.5.4.
3844
public HystrixUtilizationStream(final int intervalInMilliseconds) {
3945
this.intervalInMilliseconds = intervalInMilliseconds;
40-
this.timer = Observable.defer(new Func0<Observable<Long>>() {
41-
@Override
42-
public Observable<Long> call() {
43-
return Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS);
44-
}
45-
});
46+
this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
47+
.map(getAllUtilization)
48+
.doOnSubscribe(new Action0() {
49+
@Override
50+
public void call() {
51+
isSourceCurrentlySubscribed.set(true);
52+
}
53+
})
54+
.doOnUnsubscribe(new Action0() {
55+
@Override
56+
public void call() {
57+
isSourceCurrentlySubscribed.set(false);
58+
}
59+
})
60+
.share()
61+
.onBackpressureDrop();
4662
}
4763

64+
private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(500);
65+
66+
public static HystrixUtilizationStream getInstance() {
67+
return INSTANCE;
68+
}
69+
70+
static HystrixUtilizationStream getNonSingletonInstanceOnlyUsedInUnitTests(int delayInMs) {
71+
return new HystrixUtilizationStream(delayInMs);
72+
}
73+
74+
/**
75+
* Return a ref-counted stream that will only do work when at least one subscriber is present
76+
*/
4877
public Observable<HystrixUtilization> observe() {
49-
return timer.map(getAllUtilization);
78+
return allUtilizationStream;
5079
}
5180

5281
public Observable<Map<HystrixCommandKey, HystrixCommandUtilization>> observeCommandUtilization() {
53-
return timer.map(getAllCommandUtilization);
82+
return allUtilizationStream.map(getOnlyCommandUtilization);
5483
}
5584

5685
public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization>> observeThreadPoolUtilization() {
57-
return timer.map(getAllThreadPoolUtilization);
86+
return allUtilizationStream.map(getOnlyThreadPoolUtilization);
5887
}
5988

6089
public int getIntervalInMilliseconds() {
6190
return this.intervalInMilliseconds;
6291
}
6392

93+
public boolean isSourceCurrentlySubscribed() {
94+
return isSourceCurrentlySubscribed.get();
95+
}
96+
6497
private static HystrixCommandUtilization sampleCommandUtilization(HystrixCommandMetrics commandMetrics) {
6598
return HystrixCommandUtilization.sample(commandMetrics);
6699
}
@@ -105,4 +138,20 @@ public HystrixUtilization call(Long timestamp) {
105138
);
106139
}
107140
};
141+
142+
private static final Func1<HystrixUtilization, Map<HystrixCommandKey, HystrixCommandUtilization>> getOnlyCommandUtilization =
143+
new Func1<HystrixUtilization, Map<HystrixCommandKey, HystrixCommandUtilization>>() {
144+
@Override
145+
public Map<HystrixCommandKey, HystrixCommandUtilization> call(HystrixUtilization hystrixUtilization) {
146+
return hystrixUtilization.getCommandUtilizationMap();
147+
}
148+
};
149+
150+
private static final Func1<HystrixUtilization, Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization>> getOnlyThreadPoolUtilization =
151+
new Func1<HystrixUtilization, Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization>>() {
152+
@Override
153+
public Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization> call(HystrixUtilization hystrixUtilization) {
154+
return hystrixUtilization.getThreadPoolUtilizationMap();
155+
}
156+
};
108157
}

0 commit comments

Comments
 (0)