Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Made HystrixConfigurationStream support sharing
  • Loading branch information
Matt Jacobs committed Jun 21, 2016
commit 8f304c8935d77421d24d0daa2e11c05be6fdfd29
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import com.netflix.hystrix.config.HystrixThreadPoolConfiguration;
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
import rx.Observable;
import rx.functions.Func1;

Expand Down Expand Up @@ -171,11 +172,27 @@ public static String convertToString(HystrixConfiguration config) throws IOExcep
return jsonString.getBuffer().toString();
}

/**
* @deprecated Not for public use. This prevents stream-sharing. Please use {@link HystrixConfigurationStream#observe()}
* @param delay interval between data emissions
* @return sampled utilization as Java object, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<HystrixConfiguration> observe(int delay) {
return streamGenerator.call(delay);
}

/**
* @deprecated Not for public use. This prevents stream-sharing. Please use {@link #observeJson()}
* @param delay interval between data emissions
* @return sampled utilization as JSON string, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<String> observeJson(int delay) {
return streamGenerator.call(delay).map(convertToJson);
}

public Observable<String> observeJson() {
return HystrixConfigurationStream.getInstance().observe().map(convertToJson);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import java.util.function.Supplier;

class EventStream implements Supplier<Observable<Payload>> {

private final static int CONFIGURATION_DATA_INTERVAL_IN_MS = 500;

private final Observable<Payload> source;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

Expand All @@ -56,7 +53,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) {

switch (eventStreamEnum) {
case CONFIG_STREAM:
source = new HystrixConfigurationStream(CONFIGURATION_DATA_INTERVAL_IN_MS)
source = HystrixConfigurationStream.getInstance()
.observe()
.map(SerialHystrixConfiguration::toBytes)
.map(SerialHystrixMetric::toPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,85 @@
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Action0;
import rx.functions.Func1;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class samples current Hystrix configuration and exposes that as a stream
*/
public class HystrixConfigurationStream {

private final int intervalInMilliseconds;
private final Observable<Long> timer;

private final Observable<HystrixConfiguration> allConfigurationStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.timer = Observable.defer(new Func0<Observable<Long>>() {
@Override
public Observable<Long> call() {
return Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS);
}
});
this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllConfig)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}

private static final HystrixConfigurationStream INSTANCE = new HystrixConfigurationStream(500);

public static HystrixConfigurationStream getInstance() {
return INSTANCE;
}

static HystrixConfigurationStream getNonSingletonInstanceOnlyUsedInUnitTests(int delayInMs) {
return new HystrixConfigurationStream(delayInMs);
}

/**
* Return a ref-counted stream that will only do work when at least one subscriber is present
*/
public Observable<HystrixConfiguration> observe() {
return timer.map(getAllConfig);
return allConfigurationStream;
}

public Observable<Map<HystrixCommandKey, HystrixCommandConfiguration>> observeCommandConfiguration() {
return timer.map(getAllCommandConfig);
return allConfigurationStream.map(getOnlyCommandConfig);
}

public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> observeThreadPoolConfiguration() {
return timer.map(getAllThreadPoolConfig);
return allConfigurationStream.map(getOnlyThreadPoolConfig);
}

public Observable<Map<HystrixCollapserKey, HystrixCollapserConfiguration>> observeCollapserConfiguration() {
return timer.map(getAllCollapserConfig);
return allConfigurationStream.map(getOnlyCollapserConfig);
}

public int getIntervalInMilliseconds() {
return this.intervalInMilliseconds;
}

public boolean isSourceCurrentlySubscribed() {
return isSourceCurrentlySubscribed.get();
}

private static HystrixCommandConfiguration sampleCommandConfiguration(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,
HystrixCommandGroupKey groupKey, HystrixCommandProperties commandProperties) {
return HystrixCommandConfiguration.sample(commandKey, threadPoolKey, groupKey, commandProperties);
Expand Down Expand Up @@ -136,4 +170,28 @@ public HystrixConfiguration call(Long timestamp) {
);
}
};

private static final Func1<HystrixConfiguration, Map<HystrixCommandKey, HystrixCommandConfiguration>> getOnlyCommandConfig =
new Func1<HystrixConfiguration, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
@Override
public Map<HystrixCommandKey, HystrixCommandConfiguration> call(HystrixConfiguration hystrixConfiguration) {
return hystrixConfiguration.getCommandConfig();
}
};

private static final Func1<HystrixConfiguration, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getOnlyThreadPoolConfig =
new Func1<HystrixConfiguration, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() {
@Override
public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(HystrixConfiguration hystrixConfiguration) {
return hystrixConfiguration.getThreadPoolConfig();
}
};

private static final Func1<HystrixConfiguration, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getOnlyCollapserConfig =
new Func1<HystrixConfiguration, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() {
@Override
public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(HystrixConfiguration hystrixConfiguration) {
return hystrixConfiguration.getCollapserConfig();
}
};
}
Loading