Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.core.processors.external.AsyncConnector;
import com.gotocompany.dagger.core.processors.external.ExternalMetricConfig;
import io.netty.util.internal.StringUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

Expand All @@ -20,6 +22,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.config;

Expand All @@ -31,6 +40,7 @@ public class HttpAsyncConnector extends AsyncConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpAsyncConnector.class.getName());
private AsyncHttpClient httpClient;
private HttpSourceConfig httpSourceConfig;
private Set<Integer> failOnErrorsExclusionSet;

/**
* Instantiates a new Http async connector with specified http client.
Expand Down Expand Up @@ -80,6 +90,12 @@ protected void createClient() {
}
}

@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
setFailOnErrorsExclusionSet(httpSourceConfig.getExcludeFailOnErrorsCodeRange());
}

@Override
public void close() throws Exception {
httpClient.close();
Expand All @@ -102,9 +118,8 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
return;
}

BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues, endpointVariablesValues);
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getFailOnErrorsExclusionSet(), getMeterStatsManager(),
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
httpResponseHandler.startTimer();
request.execute(httpResponseHandler);
Expand All @@ -114,4 +129,19 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
}

}

protected Set<Integer> getFailOnErrorsExclusionSet() {
return failOnErrorsExclusionSet;
}

private void setFailOnErrorsExclusionSet(String excludeFailOnErrorsCodeRange) {
failOnErrorsExclusionSet = new HashSet<Integer>();
if (!StringUtil.isNullOrEmpty(excludeFailOnErrorsCodeRange)) {
String[] ranges = excludeFailOnErrorsCodeRange.split(",");
Arrays.stream(ranges).forEach(range -> {
List<Integer> rangeList = Arrays.stream(range.split("-")).map(Integer::parseInt).collect(Collectors.toList());
IntStream.rangeClosed(rangeList.get(0), rangeList.get(rangeList.size() - 1)).forEach(statusCode -> failOnErrorsExclusionSet.add(statusCode));
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package com.gotocompany.dagger.core.processors.external.http;

import com.google.protobuf.Descriptors;
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandler;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandlerFactory;
import com.gotocompany.dagger.core.exception.HttpFailureException;
import com.gotocompany.dagger.core.metrics.aspects.ExternalSourceAspects;
import com.gotocompany.dagger.core.metrics.reporters.ErrorReporter;
import com.gotocompany.dagger.core.processors.ColumnNameManager;
import com.gotocompany.dagger.core.processors.common.OutputMapping;
import com.gotocompany.dagger.core.processors.common.PostResponseTelemetry;
import com.gotocompany.dagger.core.processors.common.RowManager;
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandler;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandlerFactory;
import com.google.protobuf.Descriptors;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Collections;
import java.util.Set;
import java.util.regex.Pattern;

/**
Expand All @@ -39,6 +39,7 @@ public class HttpResponseHandler extends AsyncCompletionHandler<Object> {
private Descriptors.Descriptor descriptor;
private ResultFuture<Row> resultFuture;
private HttpSourceConfig httpSourceConfig;
private Set<Integer> failOnErrorsExclusionSet;
private MeterStatsManager meterStatsManager;
private Instant startTime;
private ErrorReporter errorReporter;
Expand All @@ -48,20 +49,22 @@ public class HttpResponseHandler extends AsyncCompletionHandler<Object> {
/**
* Instantiates a new Http response handler.
*
* @param httpSourceConfig the http source config
* @param meterStatsManager the meter stats manager
* @param rowManager the row manager
* @param columnNameManager the column name manager
* @param descriptor the descriptor
* @param resultFuture the result future
* @param errorReporter the error reporter
* @param postResponseTelemetry the post response telemetry
* @param httpSourceConfig the http source config
* @param failOnErrorsExclusionSet the fail on error exclusion set
* @param meterStatsManager the meter stats manager
* @param rowManager the row manager
* @param columnNameManager the column name manager
* @param descriptor the descriptor
* @param resultFuture the result future
* @param errorReporter the error reporter
* @param postResponseTelemetry the post response telemetry
*/
public HttpResponseHandler(HttpSourceConfig httpSourceConfig, MeterStatsManager meterStatsManager, RowManager rowManager,
public HttpResponseHandler(HttpSourceConfig httpSourceConfig, Set<Integer> failOnErrorsExclusionSet, MeterStatsManager meterStatsManager, RowManager rowManager,
ColumnNameManager columnNameManager, Descriptors.Descriptor descriptor, ResultFuture<Row> resultFuture,
ErrorReporter errorReporter, PostResponseTelemetry postResponseTelemetry) {

this.httpSourceConfig = httpSourceConfig;
this.failOnErrorsExclusionSet = failOnErrorsExclusionSet;
this.meterStatsManager = meterStatsManager;
this.rowManager = rowManager;
this.columnNameManager = columnNameManager;
Expand All @@ -86,15 +89,16 @@ public Object onCompleted(Response response) {
successHandler(response);
} else {
postResponseTelemetry.validateResponseCode(meterStatsManager, statusCode);
failureHandler("Received status code : " + statusCode);
failureHandler("Received status code : " + statusCode, statusCode);
}
return response;
}

@Override
public void onThrowable(Throwable t) {
t.printStackTrace();
meterStatsManager.markEvent(ExternalSourceAspects.OTHER_ERRORS);
failureHandler(t.getMessage());
failureHandler(t.getMessage(), 0);
}

private void successHandler(Response response) {
Expand Down Expand Up @@ -123,19 +127,27 @@ private void successHandler(Response response) {
* Failure handler.
*
* @param logMessage the log message
* @param statusCode the status code
*/
public void failureHandler(String logMessage) {
public void failureHandler(String logMessage, Integer statusCode) {
postResponseTelemetry.sendFailureTelemetry(meterStatsManager, startTime);
LOGGER.error(logMessage);
Exception httpFailureException = new HttpFailureException(logMessage);
if (httpSourceConfig.isFailOnErrors()) {
if (shouldFailOnError(statusCode)) {
reportAndThrowError(httpFailureException);
} else {
errorReporter.reportNonFatalException(httpFailureException);
}
resultFuture.complete(Collections.singleton(rowManager.getAll()));
}

private boolean shouldFailOnError(Integer statusCode) {
if (httpSourceConfig.isFailOnErrors() && (statusCode == 0 || !failOnErrorsExclusionSet.contains(statusCode))) {
return true;
}
return false;
}

private void setField(String key, Object value, int fieldIndex) {
if (!httpSourceConfig.isRetainResponseType() || httpSourceConfig.hasType()) {
setFieldUsingType(key, value, fieldIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
private String excludeFailOnErrorsCodeRange;
@SerializedName(value = "type", alternate = {"Type", "TYPE"})
private String type;
private String capacity;
Expand All @@ -49,14 +50,15 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param excludeFailOnErrorsCodeRange the exclude fail on errors code range
* @param type the type
* @param capacity the capacity
* @param headers the static headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param retainResponseType the retain response type
*/
public HttpSourceConfig(String endpoint, String endpointVariables, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
public HttpSourceConfig(String endpoint, String endpointVariables, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String excludeFailOnErrorsCodeRange, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
this.endpoint = endpoint;
this.endpointVariables = endpointVariables;
this.verb = verb;
Expand All @@ -67,6 +69,7 @@ public HttpSourceConfig(String endpoint, String endpointVariables, String verb,
this.streamTimeout = streamTimeout;
this.connectTimeout = connectTimeout;
this.failOnErrors = failOnErrors;
this.excludeFailOnErrorsCodeRange = excludeFailOnErrorsCodeRange;
this.type = type;
this.capacity = capacity;
this.headers = headers;
Expand Down Expand Up @@ -162,6 +165,16 @@ public boolean isFailOnErrors() {
return failOnErrors;
}

/**
* Gets failOnErrorsCodeRange Variable.
*
* @return the failOnErrorsCodeRange Variable
*/
public String getExcludeFailOnErrorsCodeRange() {
return excludeFailOnErrorsCodeRange;
}


@Override
public String getMetricId() {
return metricId;
Expand Down Expand Up @@ -245,11 +258,11 @@ public boolean equals(Object o) {
return false;
}
HttpSourceConfig that = (HttpSourceConfig) o;
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
return failOnErrors == that.failOnErrors && excludeFailOnErrorsCodeRange == that.excludeFailOnErrorsCodeRange && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
}

@Override
public int hashCode() {
return Objects.hash(endpoint, endpointVariables, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
return Objects.hash(endpoint, endpointVariables, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, excludeFailOnErrorsCodeRange, type, capacity, headers, outputMapping, metricId, retainResponseType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void shouldReturnHttpExternalSourceConfig() {
outputMapping = new OutputMapping("$.data.tensor.values[0]");
outputMappings.put("surge_factor", outputMapping);

HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "", "post", null, null, null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "", "post", null, null, null, null, "5000", "5000", true, null, null, null, headerMap, outputMappings, null, false);

assertEquals(httpSourceConfig, defaultPostProcessorConfig.getExternalSource().getHttpConfig().get(0));
}
Expand Down Expand Up @@ -120,7 +120,7 @@ public void shouldBeEmptyWhenNoneOfTheConfigsExist() {
@Test
public void shouldNotBeEmptyWhenExternalSourceHasHttpConfigExist() {
ArrayList<HttpSourceConfig> http = new ArrayList<>();
http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, null, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
ArrayList<EsSourceConfig> es = new ArrayList<>();
ArrayList<PgSourceConfig> pg = new ArrayList<>();
ExternalSourceConfig externalSourceConfig = new ExternalSourceConfig(http, es, pg, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setup() {
HashMap<String, OutputMapping> httpColumnNames = new HashMap<>();
httpColumnNames.put("http_field_1", new OutputMapping(""));
httpColumnNames.put("http_field_2", new OutputMapping(""));
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false);
HashMap<String, OutputMapping> esOutputMapping = new HashMap<>();
esOutputMapping.put("es_field_1", new OutputMapping(""));
EsSourceConfig esSourceConfig = new EsSourceConfig("host", "port", "", "", "endpointPattern",
Expand Down Expand Up @@ -132,7 +132,7 @@ public void shouldProcessWithRightConfiguration() {
outputMapping.put("order_id", new OutputMapping("path"));

List<HttpSourceConfig> httpSourceConfigs = new ArrayList<>();
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false);
httpSourceConfigs.add(httpSourceConfig);

List<EsSourceConfig> esSourceConfigs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void setUp() {
HashMap<String, OutputMapping> httpOutputMapping = new HashMap<>();
httpOutputMapping.put("http_field_1", new OutputMapping(""));
httpOutputMapping.put("http_field_2", new OutputMapping(""));
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false);
http = new ArrayList<>();
http.add(httpSourceConfig);
es = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ public void hasTypeShouldBeTrueWhenTypeIsPresent() {

@Test
public void hasTypeShouldBeFalseWhenTypeIsNull() {
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, "", null, "", new HashMap<>(), new HashMap<>(), metricId, false);
assertFalse(httpSourceConfig.hasType());
}

@Test
public void hasTypeShouldBeFalseWhenTypeIsEmpty() {
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", "", new HashMap<>(), new HashMap<>(), metricId, false);
assertFalse(httpSourceConfig.hasType());
}

Expand Down
Loading