Skip to content

Commit 669ee21

Browse files
author
Matt Jacobs
committed
Move HystrixDashboardStream to hystrix-core
1 parent 31cf0fe commit 669ee21

File tree

4 files changed

+117
-48
lines changed

4 files changed

+117
-48
lines changed

hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixRequestEvents;
2323
import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization;
2424
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
25+
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
2526
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
2627
import io.reactivesocket.Payload;
2728
import rx.Observable;

hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import com.netflix.hystrix.HystrixEventType;
2626
import com.netflix.hystrix.HystrixThreadPoolKey;
2727
import com.netflix.hystrix.HystrixThreadPoolMetrics;
28-
import com.netflix.hystrix.contrib.reactivesocket.HystrixDashboardStream;
28+
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
2929
import org.agrona.LangUtil;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.netflix.hystrix.contrib.reactivesocket;
16+
package com.netflix.hystrix.metric.consumer;
1717

1818
import com.netflix.hystrix.HystrixCollapserMetrics;
1919
import com.netflix.hystrix.HystrixCommandMetrics;
2020
import com.netflix.hystrix.HystrixThreadPoolMetrics;
2121
import rx.Observable;
22+
import rx.functions.Action0;
23+
import rx.functions.Func1;
2224

2325
import java.util.Collection;
2426
import java.util.concurrent.TimeUnit;
@@ -32,13 +34,28 @@ public class HystrixDashboardStream {
3234
private HystrixDashboardStream(int delayInMs) {
3335
this.delayInMs = delayInMs;
3436
this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
35-
.map(timestamp -> new DashboardData(
36-
HystrixCommandMetrics.getInstances(),
37-
HystrixThreadPoolMetrics.getInstances(),
38-
HystrixCollapserMetrics.getInstances()
39-
))
40-
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
41-
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
37+
.map(new Func1<Long, DashboardData>() {
38+
@Override
39+
public DashboardData call(Long timestamp) {
40+
return new DashboardData(
41+
HystrixCommandMetrics.getInstances(),
42+
HystrixThreadPoolMetrics.getInstances(),
43+
HystrixCollapserMetrics.getInstances()
44+
);
45+
}
46+
})
47+
.doOnSubscribe(new Action0() {
48+
@Override
49+
public void call() {
50+
isSourceCurrentlySubscribed.set(true);
51+
}
52+
})
53+
.doOnUnsubscribe(new Action0() {
54+
@Override
55+
public void call() {
56+
isSourceCurrentlySubscribed.set(false);
57+
}
58+
})
4259
.share()
4360
.onBackpressureDrop();
4461
}
Lines changed: 90 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.netflix.hystrix.contrib.reactivesocket;
16+
package com.netflix.hystrix.metric.consumer;
1717

18+
import com.hystrix.junit.HystrixRequestContextRule;
1819
import com.netflix.hystrix.HystrixCommand;
20+
import com.netflix.hystrix.HystrixCommandGroupKey;
1921
import com.netflix.hystrix.HystrixCommandKey;
2022
import com.netflix.hystrix.HystrixCommandMetrics;
23+
import com.netflix.hystrix.HystrixEventType;
24+
import com.netflix.hystrix.metric.CommandStreamTest;
2125
import org.junit.Before;
26+
import org.junit.Rule;
2227
import org.junit.Test;
2328
import rx.Observable;
2429
import rx.Subscriber;
2530
import rx.Subscription;
26-
import rx.functions.Actions;
31+
import rx.functions.Action0;
32+
import rx.functions.Func1;
33+
import rx.functions.Func2;
2734
import rx.schedulers.Schedulers;
2835

2936
import java.util.concurrent.CountDownLatch;
@@ -34,9 +41,14 @@
3441
import static org.junit.Assert.assertFalse;
3542
import static org.junit.Assert.assertTrue;
3643

37-
public class HystrixDashboardStreamTest extends HystrixStreamTest {
44+
public class HystrixDashboardStreamTest extends CommandStreamTest {
45+
46+
@Rule
47+
public HystrixRequestContextRule ctx = new HystrixRequestContextRule();
3848

3949
HystrixDashboardStream stream;
50+
private final static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Dashboard");
51+
private final static HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("DashboardCommand");
4052

4153
@Before
4254
public void init() {
@@ -46,26 +58,37 @@ public void init() {
4658
@Test
4759
public void testStreamHasData() throws Exception {
4860
final AtomicBoolean commandShowsUp = new AtomicBoolean(false);
49-
CountDownLatch latch = new CountDownLatch(1);
61+
final CountDownLatch latch = new CountDownLatch(1);
5062
final int NUM = 10;
5163

5264
for (int i = 0; i < 2; i++) {
53-
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
65+
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
5466
cmd.observe();
5567
}
5668

57-
stream.observe().take(NUM).subscribe(dashboardData -> {
58-
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands");
59-
for (HystrixCommandMetrics metrics: dashboardData.commandMetrics) {
60-
if (metrics.getCommandKey().name().equals("SyntheticBlockingCommand")) {
61-
commandShowsUp.set(true);
69+
stream.observe().take(NUM).subscribe(
70+
new Subscriber<HystrixDashboardStream.DashboardData>() {
71+
@Override
72+
public void onCompleted() {
73+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
74+
latch.countDown();
75+
}
76+
77+
@Override
78+
public void onError(Throwable e) {
79+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e);
80+
latch.countDown();
81+
}
82+
83+
@Override
84+
public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
85+
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands");
86+
for (HystrixCommandMetrics metrics : dashboardData.commandMetrics) {
87+
if (metrics.getCommandKey().equals(commandKey)) {
88+
commandShowsUp.set(true);
89+
}
6290
}
6391
}
64-
},
65-
Actions.empty(),
66-
() -> {
67-
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
68-
latch.countDown();
6992
});
7093

7194
assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
@@ -74,15 +97,20 @@ public void testStreamHasData() throws Exception {
7497

7598
@Test
7699
public void testTwoSubscribersOneUnsubscribes() throws Exception {
77-
CountDownLatch latch1 = new CountDownLatch(1);
78-
CountDownLatch latch2 = new CountDownLatch(1);
79-
AtomicInteger payloads1 = new AtomicInteger(0);
80-
AtomicInteger payloads2 = new AtomicInteger(0);
100+
final CountDownLatch latch1 = new CountDownLatch(1);
101+
final CountDownLatch latch2 = new CountDownLatch(1);
102+
final AtomicInteger payloads1 = new AtomicInteger(0);
103+
final AtomicInteger payloads2 = new AtomicInteger(0);
81104

82105
Subscription s1 = stream
83106
.observe()
84107
.take(100)
85-
.doOnUnsubscribe(latch1::countDown)
108+
.doOnUnsubscribe(new Action0() {
109+
@Override
110+
public void call() {
111+
latch1.countDown();
112+
}
113+
})
86114
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
87115
@Override
88116
public void onCompleted() {
@@ -106,7 +134,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
106134
Subscription s2 = stream
107135
.observe()
108136
.take(100)
109-
.doOnUnsubscribe(latch2::countDown)
137+
.doOnUnsubscribe(new Action0() {
138+
@Override
139+
public void call() {
140+
latch2.countDown();
141+
}
142+
})
110143
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
111144
@Override
112145
public void onCompleted() {
@@ -128,7 +161,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
128161
});
129162
//execute 1 command, then unsubscribe from first stream. then execute the rest
130163
for (int i = 0; i < 50; i++) {
131-
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
164+
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
132165
cmd.execute();
133166
if (i == 1) {
134167
s1.unsubscribe();
@@ -145,15 +178,20 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
145178

146179
@Test
147180
public void testTwoSubscribersBothUnsubscribe() throws Exception {
148-
CountDownLatch latch1 = new CountDownLatch(1);
149-
CountDownLatch latch2 = new CountDownLatch(1);
150-
AtomicInteger payloads1 = new AtomicInteger(0);
151-
AtomicInteger payloads2 = new AtomicInteger(0);
181+
final CountDownLatch latch1 = new CountDownLatch(1);
182+
final CountDownLatch latch2 = new CountDownLatch(1);
183+
final AtomicInteger payloads1 = new AtomicInteger(0);
184+
final AtomicInteger payloads2 = new AtomicInteger(0);
152185

153186
Subscription s1 = stream
154187
.observe()
155188
.take(10)
156-
.doOnUnsubscribe(latch1::countDown)
189+
.doOnUnsubscribe(new Action0() {
190+
@Override
191+
public void call() {
192+
latch1.countDown();
193+
}
194+
})
157195
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
158196
@Override
159197
public void onCompleted() {
@@ -177,7 +215,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
177215
Subscription s2 = stream
178216
.observe()
179217
.take(10)
180-
.doOnUnsubscribe(latch2::countDown)
218+
.doOnUnsubscribe(new Action0() {
219+
@Override
220+
public void call() {
221+
latch2.countDown();
222+
}
223+
})
181224
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
182225
@Override
183226
public void onCompleted() {
@@ -199,7 +242,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
199242
});
200243
//execute half the commands, then unsubscribe from both streams, then execute the rest
201244
for (int i = 0; i < 50; i++) {
202-
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
245+
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
203246
cmd.execute();
204247
if (i == 25) {
205248
s1.unsubscribe();
@@ -217,25 +260,33 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
217260

218261
@Test
219262
public void testTwoSubscribersOneSlowOneFast() throws Exception {
220-
CountDownLatch latch = new CountDownLatch(1);
221-
AtomicBoolean foundError = new AtomicBoolean(false);
263+
final CountDownLatch latch = new CountDownLatch(1);
264+
final AtomicBoolean foundError = new AtomicBoolean(false);
222265

223266
Observable<HystrixDashboardStream.DashboardData> fast = stream
224267
.observe()
225268
.observeOn(Schedulers.newThread());
226269
Observable<HystrixDashboardStream.DashboardData> slow = stream
227270
.observe()
228271
.observeOn(Schedulers.newThread())
229-
.map(n -> {
230-
try {
231-
Thread.sleep(100);
232-
return n;
233-
} catch (InterruptedException ex) {
234-
return n;
272+
.map(new Func1<HystrixDashboardStream.DashboardData, HystrixDashboardStream.DashboardData>() {
273+
@Override
274+
public HystrixDashboardStream.DashboardData call(HystrixDashboardStream.DashboardData n) {
275+
try {
276+
Thread.sleep(100);
277+
return n;
278+
} catch (InterruptedException ex) {
279+
return n;
280+
}
235281
}
236282
});
237283

238-
Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, (payload, payload2) -> payload == payload2);
284+
Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, new Func2<HystrixDashboardStream.DashboardData, HystrixDashboardStream.DashboardData, Boolean>() {
285+
@Override
286+
public Boolean call(HystrixDashboardStream.DashboardData payload, HystrixDashboardStream.DashboardData payload2) {
287+
return payload == payload2;
288+
}
289+
});
239290

240291
Subscription s1 = checkZippedEqual
241292
.take(10000)
@@ -261,7 +312,7 @@ public void onNext(Boolean b) {
261312
});
262313

263314
for (int i = 0; i < 50; i++) {
264-
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
315+
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
265316
cmd.execute();
266317
}
267318

0 commit comments

Comments
 (0)