Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Moved non-core data streams into hystrix-data-streams
  • Loading branch information
Matt Jacobs committed Jun 22, 2016
commit 151a4a08310a3e1dcc29e5d00f53d8933f1ec4ee
2 changes: 1 addition & 1 deletion hystrix-contrib/hystrix-metrics-event-stream/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies {
compile project(':hystrix-core')
compile 'com.fasterxml.jackson.core:jackson-core:2.5.2'
compile project(':hystrix-data-stream')
provided 'javax.servlet:servlet-api:2.5'
testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-all:1.9.5'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
* the code below may reference a HystrixEventType that does not exist in hystrix-core. If this happens,
* a j.l.NoSuchFieldError occurs. Since this data is not being generated by hystrix-core, it's safe to count it as 0
* and we should log an error to get users to update their dependency set.
*
* @deprecated Prefer {@link com.netflix.hystrix.metric.consumer.HystrixDashboardStream}
*/
@Deprecated //since 1.5.4
public class HystrixMetricsPoller {

static final Logger logger = LoggerFactory.getLogger(HystrixMetricsPoller.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.contrib.sample.stream.HystrixSampleSseServlet;
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import com.netflix.hystrix.metric.serial.SerialHystrixDashboardData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -51,186 +48,47 @@
* </servlet-mapping>
* } </pre>
*/
public class HystrixMetricsStreamServlet extends HttpServlet {
public class HystrixMetricsStreamServlet extends HystrixSampleSseServlet<HystrixDashboardStream.DashboardData> {

private static final long serialVersionUID = -7548505095303313237L;

private static final Logger logger = LoggerFactory.getLogger(HystrixMetricsStreamServlet.class);

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.maxConcurrentConnections", 5);
private static DynamicIntProperty defaultMetricListenerQueueSize = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.defaultMetricListenerQueueSize", 1000);
private static DynamicIntProperty maxConcurrentConnections =
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

private static volatile boolean isDestroyed = false;

/**
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
* shutdown, perhaps from some other serverlet's destroy() method.
*/
public static void shutdown() {
isDestroyed = true;
public HystrixMetricsStreamServlet() {
super(HystrixDashboardStream.getInstance().observe());
}

@Override
public void init() throws ServletException {
isDestroyed = false;

/* package-private */ HystrixMetricsStreamServlet(Observable<HystrixDashboardStream.DashboardData> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
}

/**
* Handle incoming GETs
*/

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (isDestroyed) {
response.sendError(503, "Service has been shut down.");
} else {
handleRequest(request, response);
}
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

/**
* Handle servlet being undeployed by gracefully releasing connections so poller threads stop.
*/

@Override
public void destroy() {
/* set marker so the loops can break out */
isDestroyed = true;
super.destroy();
protected int getNumberCurrentConnections() {
return concurrentConnections.get();
}

/**
* - maintain an open connection with the client
* - on initial connection send latest data of each requested event type
* - subsequently send all changes for each requested event type
*
* @param request
* @param response
* @throws javax.servlet.ServletException
* @throws java.io.IOException
*/
private void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
/* ensure we aren't allowing more connections than we want */
int numberConnections = concurrentConnections.incrementAndGet();
HystrixMetricsPoller poller = null;
try {
if (numberConnections > maxConcurrentConnections.get()) {
response.sendError(503, "MaxConcurrentConnections reached: " + maxConcurrentConnections.get());
} else {

int delay = 500;
try {
String d = request.getParameter("delay");
if (d != null) {
delay = Math.max(Integer.parseInt(d), 1);
}
} catch (Exception e) {
// ignore if it's not a number
}

/* initialize response */
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

int queueSize = defaultMetricListenerQueueSize.get();

MetricJsonListener jsonListener = new MetricJsonListener(queueSize);
poller = new HystrixMetricsPoller(jsonListener, delay);
// start polling and it will write directly to the output stream
poller.start();
logger.debug("Starting poller");

// we will use a "single-writer" approach where the Servlet thread does all the writing
// by fetching JSON messages from the MetricJsonListener to write them to the output
try {
while (poller.isRunning() && !isDestroyed) {
List<String> jsonMessages = jsonListener.getJsonMetrics();
if (jsonMessages.isEmpty()) {
// https://github.com/Netflix/Hystrix/issues/85 hystrix.stream holds connection open if no metrics
// we send a ping to test the connection so that we'll get an IOException if the client has disconnected
response.getWriter().println("ping: \n");
} else {
for (String json : jsonMessages) {
response.getWriter().println("data: " + json + "\n");
}
}

/* shortcut breaking out of loop if we have been destroyed */
if(isDestroyed) {
break;
}

// after outputting all the messages we will flush the stream
response.flushBuffer();

// explicitly check for client disconnect - PrintWriter does not throw exceptions
if (response.getWriter().checkError()) {
throw new IOException("io error");
}

// now wait the 'delay' time
Thread.sleep(delay);
}
} catch (InterruptedException e) {
poller.shutdown();
logger.debug("InterruptedException. Will stop polling.");
Thread.currentThread().interrupt();
} catch (IOException e) {
poller.shutdown();
// debug instead of error as we expect to get these whenever a client disconnects or network issue occurs
logger.debug("IOException while trying to write (generally caused by client disconnecting). Will stop polling.", e);
} catch (Exception e) {
poller.shutdown();
logger.error("Failed to write Hystrix metrics. Will stop polling.", e);
}
logger.debug("Stopping Turbine stream to connection");
}
} catch (Exception e) {
logger.error("Error initializing servlet for metrics event stream.", e);
} finally {
concurrentConnections.decrementAndGet();
if (poller != null) {
poller.shutdown();
}
}
@Override
protected int incrementAndGetCurrentConcurrentConnections() {
return concurrentConnections.incrementAndGet();
}

/**
* This will be called from another thread so needs to be thread-safe.
* @ThreadSafe
*/
private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {

/**
* Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
* <p>
* This is a safety check against a runaway poller causing memory leaks.
*/
private LinkedBlockingQueue<String> jsonMetrics;

public MetricJsonListener(int queueSize) {
jsonMetrics = new LinkedBlockingQueue<String>(queueSize);
}

/**
* Store JSON messages in a queue.
*/
@Override
public void handleJsonMetric(String json) {
jsonMetrics.add(json);
}
@Override
protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

/**
* Get all JSON messages in the queue.
*
* @return
*/
public List<String> getJsonMetrics() {
ArrayList<String> metrics = new ArrayList<String>();
jsonMetrics.drainTo(metrics);
return metrics;
}
@Override
protected String convertToString(HystrixDashboardStream.DashboardData dashboardData) throws IOException {
return SerialHystrixDashboardData.toJsonString(dashboardData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ public HystrixConfigSseServlet() {
}

@Override
int getMaxNumberConcurrentConnectionsAllowed() {
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

@Override
int getNumberCurrentConnections() {
protected int getNumberCurrentConnections() {
return concurrentConnections.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ protected HystrixSampleSseServlet(Observable<SampleData> sampleStream, int pause
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
}

abstract int getMaxNumberConcurrentConnectionsAllowed();
protected abstract int getMaxNumberConcurrentConnectionsAllowed();

abstract int getNumberCurrentConnections();
protected abstract int getNumberCurrentConnections();

protected abstract int incrementAndGetCurrentConcurrentConnections();

Expand Down Expand Up @@ -131,8 +131,6 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse

final PrintWriter writer = response.getWriter();

//Observable<SampleData> sampledStream = getStream();

//since the sample stream is based on Observable.interval, events will get published on an RxComputation thread
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
sampleSubscription = sampleStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public HystrixUtilizationSseServlet() {
}

@Override
int getMaxNumberConcurrentConnectionsAllowed() {
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

@Override
int getNumberCurrentConnections() {
protected int getNumberCurrentConnections() {
return concurrentConnections.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,70 @@
*/
package com.netflix.hystrix.contrib.metrics.eventstream;

import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import rx.functions.Func1;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class HystrixMetricsStreamServletUnitTest {

@Mock HttpServletRequest mockReq;
@Mock HttpServletResponse mockResp;
@Mock HystrixDashboardStream.DashboardData mockDashboard;
@Mock PrintWriter mockPrintWriter;

HystrixMetricsStreamServlet servlet;

private final Observable<HystrixDashboardStream.DashboardData> streamOfOnNexts =
Observable.interval(100, TimeUnit.MILLISECONDS).map(new Func1<Long, HystrixDashboardStream.DashboardData>() {
@Override
public HystrixDashboardStream.DashboardData call(Long timestamp) {
return mockDashboard;
}
});


@Before
public void init() {
MockitoAnnotations.initMocks(this);
when(mockReq.getMethod()).thenReturn("GET");
}

@After
public void tearDown() {
servlet.destroy();
servlet.shutdown();
}

@Test
public void shutdownServletShouldRejectRequests() throws ServletException, IOException {
servlet = new HystrixMetricsStreamServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {

final HystrixMetricsStreamServlet servlet = new HystrixMetricsStreamServlet();
servlet.shutdown();
}

final HttpServletResponse response = mock(HttpServletResponse.class);
servlet.doGet(mock(HttpServletRequest.class), response);
servlet.shutdown();

verify(response).sendError(503, "Service has been shut down.");
servlet.service(mockReq, mockResp);

verify(mockResp).sendError(503, "Service has been shut down.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void call(Subscriber<? super HystrixConfiguration> subscriber) {
@Before
public void init() {
MockitoAnnotations.initMocks(this);

}

@After
Expand All @@ -111,7 +110,6 @@ public void shutdownServletShouldRejectRequests() throws ServletException, IOExc
servlet.doGet(mockReq, mockResp);

verify(mockResp).sendError(503, "Service has been shut down.");

}

@Test
Expand Down
Loading