diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java index a47be40cf..236a67b1f 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java @@ -15,24 +15,13 @@ */ package com.netflix.hystrix.contrib.sample.stream; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; -import com.netflix.hystrix.HystrixCollapserKey; -import com.netflix.hystrix.HystrixCommandKey; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.config.HystrixCollapserConfiguration; -import com.netflix.hystrix.config.HystrixCommandConfiguration; import com.netflix.hystrix.config.HystrixConfiguration; -import com.netflix.hystrix.config.HystrixConfigurationStream; -import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; import rx.Observable; import rx.functions.Func1; import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** @@ -62,23 +51,18 @@ public class HystrixConfigSseServlet extends HystrixSampleSseServlet>() { - @Override - public Observable call(Integer delay) { - return new HystrixConfigurationStream(delay).observe(); - } - }); + this.jsonStream = new HystrixConfigurationJsonStream(); } /* package-private */ HystrixConfigSseServlet(Func1> createStream) { - super(createStream); + this.jsonStream = new HystrixConfigurationJsonStream(createStream); } @Override @@ -106,106 +90,14 @@ protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - private void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { - json.writeObjectFieldStart(key.name()); - json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); - json.writeStringField("groupKey", commandConfig.getGroupKey().name()); - json.writeObjectFieldStart("execution"); - HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); - json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); - json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); - json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); - json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); - json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); - json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); - json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); - json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); - json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); - json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); - json.writeEndObject(); - json.writeObjectFieldStart("metrics"); - HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); - json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); - json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); - json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); - json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - json.writeObjectFieldStart("circuitBreaker"); - HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); - json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); - json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); - json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); - json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); - json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); - json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); - json.writeEndObject(); - json.writeEndObject(); - } - - private void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { - json.writeObjectFieldStart(threadPoolKey.name()); - json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); - json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); - json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); - json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); - json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - } - - private void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { - json.writeObjectFieldStart(collapserKey.name()); - json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); - json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); - json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); - json.writeObjectFieldStart("metrics"); - HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); - json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); - json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); - json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - json.writeEndObject(); + @Override + protected Observable getStream(int delay) { + return jsonStream.observe(delay); } @Override protected String convertToString(HystrixConfiguration config) throws IOException { - StringWriter jsonString = new StringWriter(); - JsonGenerator json = jsonFactory.createGenerator(jsonString); - - json.writeStartObject(); - json.writeStringField("type", "HystrixConfig"); - json.writeObjectFieldStart("commands"); - for (Map.Entry entry: config.getCommandConfig().entrySet()) { - final HystrixCommandKey key = entry.getKey(); - final HystrixCommandConfiguration commandConfig = entry.getValue(); - writeCommandConfigJson(json, key, commandConfig); - - } - json.writeEndObject(); - - json.writeObjectFieldStart("threadpools"); - for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { - final HystrixThreadPoolKey threadPoolKey = entry.getKey(); - final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); - writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); - } - json.writeEndObject(); - - json.writeObjectFieldStart("collapsers"); - for (Map.Entry entry: config.getCollapserConfig().entrySet()) { - final HystrixCollapserKey collapserKey = entry.getKey(); - final HystrixCollapserConfiguration collapserConfig = entry.getValue(); - writeCollapserConfigJson(json, collapserKey, collapserConfig); - } - json.writeEndObject(); - json.writeEndObject(); - json.close(); - - return jsonString.getBuffer().toString(); + return HystrixConfigurationJsonStream.convertToString(config); } } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java new file mode 100644 index 000000000..1a2aab252 --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java @@ -0,0 +1,181 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.sample.stream; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixCollapserConfiguration; +import com.netflix.hystrix.config.HystrixCommandConfiguration; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import rx.Observable; +import rx.functions.Func1; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; + +/** + * Links HystrixConfigurationStream and JSON encoding. This may be consumed in a variety of ways: + * -such as- + *

    + *
  • {@link HystrixConfigSseServlet} for mapping a specific URL to this data as an SSE stream + *
  • Consumer of your choice that wants control over where to embed this stream + *
+ * + */ +public class HystrixConfigurationJsonStream { + + private static final JsonFactory jsonFactory = new JsonFactory(); + private final Func1> streamGenerator; + + public HystrixConfigurationJsonStream() { + this.streamGenerator = new Func1>() { + @Override + public Observable call(Integer delay) { + return new HystrixConfigurationStream(delay).observe(); + } + }; + } + + public HystrixConfigurationJsonStream(Func1> streamGenerator) { + this.streamGenerator = streamGenerator; + } + + private static final Func1 convertToJson = new Func1() { + @Override + public String call(HystrixConfiguration hystrixConfiguration) { + try { + return convertToString(hystrixConfiguration); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + }; + + private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); + json.writeStringField("groupKey", commandConfig.getGroupKey().name()); + json.writeObjectFieldStart("execution"); + HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); + json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); + json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); + json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); + json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); + json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); + json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); + json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); + json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); + json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); + json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); + json.writeEndObject(); + json.writeObjectFieldStart("metrics"); + HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); + json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeObjectFieldStart("circuitBreaker"); + HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); + json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); + json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); + json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); + json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); + json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); + json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); + json.writeEndObject(); + json.writeEndObject(); + } + + private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); + json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); + json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); + json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + } + + private static void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { + json.writeObjectFieldStart(collapserKey.name()); + json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); + json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); + json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); + json.writeObjectFieldStart("metrics"); + HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeEndObject(); + } + + public static String convertToString(HystrixConfiguration config) throws IOException { + StringWriter jsonString = new StringWriter(); + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + json.writeStartObject(); + json.writeStringField("type", "HystrixConfig"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: config.getCommandConfig().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandConfiguration commandConfig = entry.getValue(); + writeCommandConfigJson(json, key, commandConfig); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); + writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); + } + json.writeEndObject(); + + json.writeObjectFieldStart("collapsers"); + for (Map.Entry entry: config.getCollapserConfig().entrySet()) { + final HystrixCollapserKey collapserKey = entry.getKey(); + final HystrixCollapserConfiguration collapserConfig = entry.getValue(); + writeCollapserConfigJson(json, collapserKey, collapserConfig); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + return jsonString.getBuffer().toString(); + } + + public Observable observe(int delay) { + return streamGenerator.call(delay); + } + + public Observable observeJson(int delay) { + return streamGenerator.call(delay).map(convertToJson); + } +} diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java index 1d39bea80..7b3fe2257 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java @@ -20,7 +20,6 @@ import rx.Observable; import rx.Subscriber; import rx.Subscription; -import rx.functions.Func1; import rx.schedulers.Schedulers; import javax.servlet.ServletException; @@ -42,12 +41,6 @@ public abstract class HystrixSampleSseServlet extends HttpServlet { private static final String DELAY_REQ_PARAM_NAME = "delay"; - private final Func1> createStream; - - protected HystrixSampleSseServlet(Func1> createStream) { - this.createStream = createStream; - } - abstract int getDefaultDelayInMilliseconds(); abstract int getMaxNumberConcurrentConnectionsAllowed(); @@ -58,6 +51,8 @@ protected HystrixSampleSseServlet(Func1> createS protected abstract void decrementCurrentConcurrentConnections(); + protected abstract Observable getStream(int delay); + protected abstract String convertToString(SampleData sampleData) throws IOException; /** @@ -139,7 +134,7 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse final PrintWriter writer = response.getWriter(); - Observable sampledStream = createStream.call(delay); + Observable sampledStream = getStream(delay); //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java new file mode 100644 index 000000000..dd17f41d2 --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java @@ -0,0 +1,121 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.sample.stream; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; +import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; +import rx.Observable; +import rx.functions.Func1; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; + +/** + * Links HystrixUtilizationStream and JSON encoding. This may be consumed in a variety of ways: + * -such as- + *

    + *
  • {@link HystrixUtilizationSseServlet} for mapping a specific URL to this data as an SSE stream + *
  • Consumer of your choice that wants control over where to embed this stream + *
+ * + */ +public class HystrixUtilizationJsonStream { + private final Func1> streamGenerator; + + private static final JsonFactory jsonFactory = new JsonFactory(); + + private static final Func1 convertToJsonFunc = new Func1() { + @Override + public String call(HystrixUtilization utilization) { + try { + return convertToJson(utilization); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + }; + + public HystrixUtilizationJsonStream() { + this.streamGenerator = new Func1>() { + @Override + public Observable call(Integer delay) { + return new HystrixUtilizationStream(delay).observe(); + } + }; + } + + public HystrixUtilizationJsonStream(Func1> streamGenerator) { + this.streamGenerator = streamGenerator; + } + + private static void writeCommandUtilizationJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandUtilization utilization) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeNumberField("activeCount", utilization.getConcurrentCommandCount()); + json.writeEndObject(); + } + + private static void writeThreadPoolUtilizationJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolUtilization utilization) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("activeCount", utilization.getCurrentActiveCount()); + json.writeNumberField("queueSize", utilization.getCurrentQueueSize()); + json.writeNumberField("corePoolSize", utilization.getCurrentCorePoolSize()); + json.writeNumberField("poolSize", utilization.getCurrentPoolSize()); + json.writeEndObject(); + } + + protected static String convertToJson(HystrixUtilization utilization) throws IOException { + StringWriter jsonString = new StringWriter(); + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + json.writeStartObject(); + json.writeStringField("type", "HystrixUtilization"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: utilization.getCommandUtilizationMap().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandUtilization commandUtilization = entry.getValue(); + writeCommandUtilizationJson(json, key, commandUtilization); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: utilization.getThreadPoolUtilizationMap().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolUtilization threadPoolUtilization = entry.getValue(); + writeThreadPoolUtilizationJson(json, threadPoolKey, threadPoolUtilization); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + return jsonString.getBuffer().toString(); + } + + public Observable observe(int delay) { + return streamGenerator.call(delay); + } + + public Observable observeJson(int delay) { + return streamGenerator.call(delay).map(convertToJsonFunc); + } +} diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java index 84e8624da..22dee84d8 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java @@ -15,22 +15,13 @@ */ package com.netflix.hystrix.contrib.sample.stream; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; -import com.netflix.hystrix.HystrixCommandKey; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; -import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; import com.netflix.hystrix.metric.sample.HystrixUtilization; -import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; import rx.Observable; import rx.functions.Func1; import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** @@ -60,7 +51,7 @@ public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet>() { - @Override - public Observable call(Integer delay) { - return new HystrixUtilizationStream(delay).observe(); - } - }); + this.jsonStream = new HystrixUtilizationJsonStream(); + } /* package-private */ HystrixUtilizationSseServlet(Func1> createStream) { - super(createStream); + this.jsonStream = new HystrixUtilizationJsonStream(createStream); } @Override @@ -105,48 +92,14 @@ protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - private void writeCommandUtilizationJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandUtilization utilization) throws IOException { - json.writeObjectFieldStart(key.name()); - json.writeNumberField("activeCount", utilization.getConcurrentCommandCount()); - json.writeEndObject(); - } - - private void writeThreadPoolUtilizationJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolUtilization utilization) throws IOException { - json.writeObjectFieldStart(threadPoolKey.name()); - json.writeNumberField("activeCount", utilization.getCurrentActiveCount()); - json.writeNumberField("queueSize", utilization.getCurrentQueueSize()); - json.writeNumberField("corePoolSize", utilization.getCurrentCorePoolSize()); - json.writeNumberField("poolSize", utilization.getCurrentPoolSize()); - json.writeEndObject(); + @Override + protected Observable getStream(int delay) { + return jsonStream.observe(delay); } @Override protected String convertToString(HystrixUtilization utilization) throws IOException { - StringWriter jsonString = new StringWriter(); - JsonGenerator json = jsonFactory.createGenerator(jsonString); - - json.writeStartObject(); - json.writeStringField("type", "HystrixUtilization"); - json.writeObjectFieldStart("commands"); - for (Map.Entry entry: utilization.getCommandUtilizationMap().entrySet()) { - final HystrixCommandKey key = entry.getKey(); - final HystrixCommandUtilization commandUtilization = entry.getValue(); - writeCommandUtilizationJson(json, key, commandUtilization); - - } - json.writeEndObject(); - - json.writeObjectFieldStart("threadpools"); - for (Map.Entry entry: utilization.getThreadPoolUtilizationMap().entrySet()) { - final HystrixThreadPoolKey threadPoolKey = entry.getKey(); - final HystrixThreadPoolUtilization threadPoolUtilization = entry.getValue(); - writeThreadPoolUtilizationJson(json, threadPoolKey, threadPoolUtilization); - } - json.writeEndObject(); - json.writeEndObject(); - json.close(); - - return jsonString.getBuffer().toString(); + return HystrixUtilizationJsonStream.convertToJson(utilization); } }