From 5e5f13adf52fa607a1f039e404df4f8e0f31c432 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Thu, 13 Jul 2023 21:08:13 +0530 Subject: [PATCH 1/4] feat: adding configuration to provide exclusion for failOnError with http post processor --- .../external/http/HttpResponseHandler.java | 44 +++++-- .../external/http/HttpSourceConfig.java | 19 ++- .../processors/PostProcessorConfigTest.java | 4 +- .../external/ExternalPostProcessorTest.java | 4 +- .../external/ExternalSourceConfigTest.java | 2 +- .../external/grpc/GrpcSourceConfigTest.java | 4 +- .../external/http/HttpAsyncConnectorTest.java | 52 +++++--- .../http/HttpResponseHandlerTest.java | 116 ++++++++++++++++-- .../external/http/HttpSourceConfigTest.java | 18 +-- .../request/HttpGetRequestHandlerTest.java | 16 +-- .../request/HttpPostRequestHandlerTest.java | 16 +-- .../http/request/HttpRequestFactoryTest.java | 14 +-- docs/docs/advance/post_processor.md | 7 ++ 13 files changed, 236 insertions(+), 80 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java index bbd53f50e..f3b850003 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java @@ -1,5 +1,9 @@ package com.gotocompany.dagger.core.processors.external.http; +import com.google.protobuf.Descriptors; +import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; +import com.gotocompany.dagger.common.serde.typehandler.TypeHandler; +import com.gotocompany.dagger.common.serde.typehandler.TypeHandlerFactory; import com.gotocompany.dagger.core.exception.HttpFailureException; import com.gotocompany.dagger.core.metrics.aspects.ExternalSourceAspects; import com.gotocompany.dagger.core.metrics.reporters.ErrorReporter; @@ -7,15 +11,11 @@ import com.gotocompany.dagger.core.processors.common.OutputMapping; import com.gotocompany.dagger.core.processors.common.PostResponseTelemetry; import com.gotocompany.dagger.core.processors.common.RowManager; -import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; -import com.gotocompany.dagger.common.serde.typehandler.TypeHandler; -import com.gotocompany.dagger.common.serde.typehandler.TypeHandlerFactory; -import com.google.protobuf.Descriptors; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; +import io.netty.util.internal.StringUtil; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.types.Row; - import org.asynchttpclient.AsyncCompletionHandler; import org.asynchttpclient.Response; import org.slf4j.Logger; @@ -23,9 +23,14 @@ import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.Map; +import java.util.Collections; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * The Http response handler. @@ -86,7 +91,8 @@ public Object onCompleted(Response response) { successHandler(response); } else { postResponseTelemetry.validateResponseCode(meterStatsManager, statusCode); - failureHandler("Received status code : " + statusCode); + boolean shouldFailOnError = (httpSourceConfig.isFailOnErrors() ? shouldFailOnError(statusCode) : false); + failureHandler("Received status code : " + statusCode, shouldFailOnError); } return response; } @@ -94,7 +100,7 @@ public Object onCompleted(Response response) { @Override public void onThrowable(Throwable t) { meterStatsManager.markEvent(ExternalSourceAspects.OTHER_ERRORS); - failureHandler(t.getMessage()); + failureHandler(t.getMessage(), httpSourceConfig.isFailOnErrors()); } private void successHandler(Response response) { @@ -123,12 +129,13 @@ private void successHandler(Response response) { * Failure handler. * * @param logMessage the log message + * @param shouldFailOnErrors should fail on error */ - public void failureHandler(String logMessage) { + public void failureHandler(String logMessage, boolean shouldFailOnErrors) { postResponseTelemetry.sendFailureTelemetry(meterStatsManager, startTime); LOGGER.error(logMessage); Exception httpFailureException = new HttpFailureException(logMessage); - if (httpSourceConfig.isFailOnErrors()) { + if (shouldFailOnErrors) { reportAndThrowError(httpFailureException); } else { errorReporter.reportNonFatalException(httpFailureException); @@ -136,6 +143,23 @@ public void failureHandler(String logMessage) { resultFuture.complete(Collections.singleton(rowManager.getAll())); } + private boolean shouldFailOnError(Integer statusCode) { + if (statusCode == 0 || StringUtil.isNullOrEmpty(httpSourceConfig.getExcludeFailOnErrorsCodeRange())) { + return true; + } + return !getFailOnErrorCodeRanges(httpSourceConfig.getExcludeFailOnErrorsCodeRange()).contains(statusCode); + } + + private HashSet getFailOnErrorCodeRanges(String input) { + String[] ranges = input.split(","); + HashSet statusSet = new HashSet(); + Arrays.stream(ranges).forEach(range -> { + List rangeList = Arrays.stream(range.split("-")).map(Integer::parseInt).collect(Collectors.toList()); + IntStream.rangeClosed(rangeList.get(0), rangeList.get(rangeList.size() - 1)).forEach(statusCode -> statusSet.add(statusCode)); + }); + return statusSet; + } + private void setField(String key, Object value, int fieldIndex) { if (!httpSourceConfig.isRetainResponseType() || httpSourceConfig.hasType()) { setFieldUsingType(key, value, fieldIndex); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfig.java index 127996399..a775452fe 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfig.java @@ -26,6 +26,7 @@ public class HttpSourceConfig implements Serializable, SourceConfig { private String streamTimeout; private String connectTimeout; private boolean failOnErrors; + private String excludeFailOnErrorsCodeRange; @SerializedName(value = "type", alternate = {"Type", "TYPE"}) private String type; private String capacity; @@ -49,6 +50,7 @@ public class HttpSourceConfig implements Serializable, SourceConfig { * @param streamTimeout the stream timeout * @param connectTimeout the connect timeout * @param failOnErrors the fail on errors + * @param excludeFailOnErrorsCodeRange the exclude fail on errors code range * @param type the type * @param capacity the capacity * @param headers the static headers @@ -56,7 +58,7 @@ public class HttpSourceConfig implements Serializable, SourceConfig { * @param metricId the metric id * @param retainResponseType the retain response type */ - public HttpSourceConfig(String endpoint, String endpointVariables, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map headers, Map outputMapping, String metricId, boolean retainResponseType) { + public HttpSourceConfig(String endpoint, String endpointVariables, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String excludeFailOnErrorsCodeRange, String type, String capacity, Map headers, Map outputMapping, String metricId, boolean retainResponseType) { this.endpoint = endpoint; this.endpointVariables = endpointVariables; this.verb = verb; @@ -67,6 +69,7 @@ public HttpSourceConfig(String endpoint, String endpointVariables, String verb, this.streamTimeout = streamTimeout; this.connectTimeout = connectTimeout; this.failOnErrors = failOnErrors; + this.excludeFailOnErrorsCodeRange = excludeFailOnErrorsCodeRange; this.type = type; this.capacity = capacity; this.headers = headers; @@ -162,6 +165,16 @@ public boolean isFailOnErrors() { return failOnErrors; } + /** + * Gets failOnErrorsCodeRange Variable. + * + * @return the failOnErrorsCodeRange Variable + */ + public String getExcludeFailOnErrorsCodeRange() { + return excludeFailOnErrorsCodeRange; + } + + @Override public String getMetricId() { return metricId; @@ -245,11 +258,11 @@ public boolean equals(Object o) { return false; } HttpSourceConfig that = (HttpSourceConfig) o; - return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId); + return failOnErrors == that.failOnErrors && excludeFailOnErrorsCodeRange == that.excludeFailOnErrorsCodeRange && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId); } @Override public int hashCode() { - return Objects.hash(endpoint, endpointVariables, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType); + return Objects.hash(endpoint, endpointVariables, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, excludeFailOnErrorsCodeRange, type, capacity, headers, outputMapping, metricId, retainResponseType); } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/PostProcessorConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/PostProcessorConfigTest.java index 18c15b94a..d00366485 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/PostProcessorConfigTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/PostProcessorConfigTest.java @@ -64,7 +64,7 @@ public void shouldReturnHttpExternalSourceConfig() { outputMapping = new OutputMapping("$.data.tensor.values[0]"); outputMappings.put("surge_factor", outputMapping); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "", "post", null, null, null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "", "post", null, null, null, null, "5000", "5000", true, null, null, null, headerMap, outputMappings, null, false); assertEquals(httpSourceConfig, defaultPostProcessorConfig.getExternalSource().getHttpConfig().get(0)); } @@ -120,7 +120,7 @@ public void shouldBeEmptyWhenNoneOfTheConfigsExist() { @Test public void shouldNotBeEmptyWhenExternalSourceHasHttpConfigExist() { ArrayList http = new ArrayList<>(); - http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false)); + http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, null, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false)); ArrayList es = new ArrayList<>(); ArrayList pg = new ArrayList<>(); ExternalSourceConfig externalSourceConfig = new ExternalSourceConfig(http, es, pg, new ArrayList<>()); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalPostProcessorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalPostProcessorTest.java index d213408de..779ea14e3 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalPostProcessorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalPostProcessorTest.java @@ -69,7 +69,7 @@ public void setup() { HashMap httpColumnNames = new HashMap<>(); httpColumnNames.put("http_field_1", new OutputMapping("")); httpColumnNames.put("http_field_2", new OutputMapping("")); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false); HashMap esOutputMapping = new HashMap<>(); esOutputMapping.put("es_field_1", new OutputMapping("")); EsSourceConfig esSourceConfig = new EsSourceConfig("host", "port", "", "", "endpointPattern", @@ -132,7 +132,7 @@ public void shouldProcessWithRightConfiguration() { outputMapping.put("order_id", new OutputMapping("path")); List httpSourceConfigs = new ArrayList<>(); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false); httpSourceConfigs.add(httpSourceConfig); List esSourceConfigs = new ArrayList<>(); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalSourceConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalSourceConfigTest.java index b8adff9f2..0ab669d30 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalSourceConfigTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/ExternalSourceConfigTest.java @@ -29,7 +29,7 @@ public void setUp() { HashMap httpOutputMapping = new HashMap<>(); httpOutputMapping.put("http_field_1", new OutputMapping("")); httpOutputMapping.put("http_field_2", new OutputMapping("")); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false); http = new ArrayList<>(); http.add(httpSourceConfig); es = new ArrayList<>(); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java index 4b60c3f55..b54c48c15 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java @@ -119,13 +119,13 @@ public void hasTypeShouldBeTrueWhenTypeIsPresent() { @Test public void hasTypeShouldBeFalseWhenTypeIsNull() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, "", null, "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } @Test public void hasTypeShouldBeFalseWhenTypeIsEmpty() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java index e58768318..74dfa14e7 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java @@ -96,7 +96,7 @@ public void setUp() { externalMetricConfig = new ExternalMetricConfig("metricId-http-01", shutDownPeriod, telemetryEnabled); defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", - "customer_id", "", "", "123", "234", false, httpConfigType, "345", + "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); } @@ -186,7 +186,7 @@ public void shouldCompleteExceptionallyIfOutputDescriptorNotFound() throws Excep public void shouldCompleteExceptionallyWhenRequestVariableIsInvalid() throws Exception { when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); String invalidRequestVariable = "invalid_variable"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", invalidRequestVariable, "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", invalidRequestVariable, "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); @@ -209,7 +209,7 @@ public void shouldCompleteExceptionallyWhenRequestVariableIsInvalid() throws Exc @Test public void shouldCompleteExceptionallyWhenEndpointVariableIsEmptyAndRequiredInPattern() throws Exception { String emptyRequestVariable = ""; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", emptyRequestVariable, "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", emptyRequestVariable, "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); @@ -226,7 +226,7 @@ public void shouldCompleteExceptionallyWhenEndpointVariableIsEmptyAndRequiredInP @Test public void shouldEnrichWhenEndpointVariableIsEmptyAndNotRequiredInPattern() throws Exception { String emptyRequestVariable = ""; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"static\"}", emptyRequestVariable, "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"static\"}", emptyRequestVariable, "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); @@ -246,7 +246,7 @@ public void shouldEnrichWhenEndpointVariableIsEmptyAndNotRequiredInPattern() thr @Test public void shouldCompleteExceptionallyWhenEndpointPatternIsInvalid() throws Exception { String invalidRequestPattern = "{\"key\": \"%\"}"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", invalidRequestPattern, "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", invalidRequestPattern, "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); @@ -261,7 +261,7 @@ public void shouldCompleteExceptionallyWhenEndpointPatternIsInvalid() throws Exc @Test public void shouldGetDescriptorFromOutputProtoIfTypeNotGiven() throws Exception { - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, null, "345", headers, outputMapping, "metricId_02", false); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); @@ -276,7 +276,7 @@ public void shouldGetDescriptorFromOutputProtoIfTypeNotGiven() throws Exception @Test public void shouldGetDescriptorFromTypeIfGiven() throws Exception { - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "TestBookingLogMessage", "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, "TestBookingLogMessage", "345", headers, outputMapping, "metricId_02", false); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); @@ -292,7 +292,7 @@ public void shouldGetDescriptorFromTypeIfGiven() throws Exception { @Test public void shouldCompleteExceptionallyWhenEndpointPatternIsIncompatible() throws Exception { String invalidRequestPattern = "{\"key\": \"%d\"}"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", invalidRequestPattern, "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", invalidRequestPattern, "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); @@ -343,7 +343,7 @@ public void shouldAddDynamicHeaders() throws Exception { when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); HttpSourceConfig dynamicHeaderHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", - "customer_id", "{\"X_KEY\": \"%s\"}", "customer_id", "123", "234", false, httpConfigType, "345", + "customer_id", "{\"X_KEY\": \"%s\"}", "customer_id", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(dynamicHeaderHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); @@ -361,7 +361,7 @@ public void shouldNotAddDynamicHeaders() throws Exception { when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); HttpSourceConfig dynamicHeaderHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", - "customer_id", "{\"X_KEY\": \"%s\"}", "customer_ids", "123", "234", false, httpConfigType, "345", + "customer_id", "{\"X_KEY\": \"%s\"}", "customer_ids", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(dynamicHeaderHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.open(flinkConfiguration); @@ -376,7 +376,7 @@ public void shouldNotAddDynamicHeaders() throws Exception { public void shouldCompleteExceptionallyWhenHeaderVariableIsInvalid() throws Exception { when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); String invalidHeaderVariable = "invalid_variable"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "{\"key\": \"%s\"}", invalidHeaderVariable, "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "{\"key\": \"%s\"}", invalidHeaderVariable, "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); @@ -399,7 +399,7 @@ public void shouldCompleteExceptionallyWhenHeaderVariableIsInvalid() throws Exce @Test public void shouldCompleteExceptionallyWhenHeaderPatternIsIncompatible() throws Exception { String invalidHeaderPattern = "{\"key\": \"%d\"}"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", invalidHeaderPattern, "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", invalidHeaderPattern, "customer_id", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); @@ -414,7 +414,17 @@ public void shouldCompleteExceptionallyWhenHeaderPatternIsIncompatible() throws @Test public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrue() throws Exception { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + + httpAsyncConnector.timeout(streamData, resultFuture); + + verify(resultFuture, times(1)).completeExceptionally(any(TimeoutException.class)); + } + + @Test + public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrueWithFailOnErrorCodeRange() throws Exception { + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "400-600", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.timeout(streamData, resultFuture); @@ -424,7 +434,17 @@ public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrue() throws Exception @Test public void shouldReportFatalInTimeoutIfFailOnErrorIsTrue() throws Exception { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + + httpAsyncConnector.timeout(streamData, resultFuture); + + verify(errorReporter, times(1)).reportFatalException(any(TimeoutException.class)); + } + + @Test + public void shouldReportFatalInTimeoutIfFailOnErrorIsTrueWithFailOnErrorCodeRange() throws Exception { + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "401-600", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.timeout(streamData, resultFuture); @@ -434,7 +454,7 @@ public void shouldReportFatalInTimeoutIfFailOnErrorIsTrue() throws Exception { @Test public void shouldReportNonFatalInTimeoutIfFailOnErrorIsFalse() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.timeout(streamData, resultFuture); @@ -454,7 +474,7 @@ public void shouldPassTheInputWithRowSizeCorrespondingToColumnNamesInTimeoutIfFa public void shouldThrowExceptionIfUnsupportedHttpVerbProvided() throws Exception { when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "PATCH", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "PATCH", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.open(flinkConfiguration); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java index f0c3b9c7d..e21344946 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java @@ -74,7 +74,7 @@ public void setup() { streamData.setField(1, new Row(2)); rowManager = new RowManager(streamData); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); } @Test @@ -148,7 +148,7 @@ public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIsOtherThan5XXAnd4XX() @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs404() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(404); @@ -167,7 +167,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs404() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs4XXOtherThan404() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(400); @@ -185,7 +185,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs4XXOtherThan404() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs5XX() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(502); @@ -203,7 +203,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs5XX() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIsOtherThan5XXAnd4XX() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(302); @@ -237,7 +237,7 @@ public void shouldPassInputIfFailOnErrorFalseAndOnThrowable() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowable() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Throwable throwable = new Throwable("throwable message"); @@ -253,12 +253,102 @@ public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowable() { verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); } + @Test + public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowableWithExcludeFailOnErrorsCodeRange() { + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "404-499", httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + Throwable throwable = new Throwable("throwable message"); + + httpResponseHandler.startTimer(); + httpResponseHandler.onThrowable(throwable); + + verify(resultFuture).completeExceptionally(any(RuntimeException.class)); + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass(HttpFailureException.class); + verify(errorReporter, times(1)).reportFatalException(failureCaptor.capture()); + assertEquals("throwable message", failureCaptor.getValue().getMessage()); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.OTHER_ERRORS); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.TOTAL_FAILED_REQUESTS); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + } + + @Test + public void shouldFailForAnyNone2xxIfFailOnErrorsTrueWithNullExcludeFailOnErrorsCodeRange() { + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + when(response.getStatusCode()).thenReturn(400); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass(HttpFailureException.class); + verify(resultFuture, times(1)).completeExceptionally(failureCaptor.capture()); + assertEquals("Received status code : 400", failureCaptor.getValue().getMessage()); + verify(errorReporter, times(1)).reportFatalException(any(HttpFailureException.class)); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.FAILURE_CODE_4XX); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.TOTAL_FAILED_REQUESTS); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + } + + @Test + public void shouldFailForAnyNone2xxIfFailOnErrorsTrueWithEmptyExcludeFailOnErrorsCodeRange() { + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + when(response.getStatusCode()).thenReturn(400); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass(HttpFailureException.class); + verify(resultFuture, times(1)).completeExceptionally(failureCaptor.capture()); + assertEquals("Received status code : 400", failureCaptor.getValue().getMessage()); + verify(errorReporter, times(1)).reportFatalException(any(HttpFailureException.class)); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.FAILURE_CODE_4XX); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.TOTAL_FAILED_REQUESTS); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + } + + @Test + public void shouldPassForAnyNone2xxInsideOfFailOnErrorsCodeRangeIfFailOnErrorsTrue() { + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "400,404-499", httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + when(response.getStatusCode()).thenReturn(400); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + verify(resultFuture, times(1)).complete(Collections.singleton(streamData)); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.FAILURE_CODE_4XX); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.TOTAL_FAILED_REQUESTS); + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass(HttpFailureException.class); + verify(errorReporter, times(1)).reportNonFatalException(failureCaptor.capture()); + assertEquals("Received status code : 400", failureCaptor.getValue().getMessage()); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + } + + @Test + public void shouldFailForAnyNone2xxOutsideOfFailOnErrorsCodeRangeIfFailOnErrorsTrue() { + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "400,404-499", httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + when(response.getStatusCode()).thenReturn(502); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass(HttpFailureException.class); + verify(resultFuture, times(1)).completeExceptionally(failureCaptor.capture()); + assertEquals("Received status code : 502", failureCaptor.getValue().getMessage()); + verify(errorReporter, times(1)).reportFatalException(any(HttpFailureException.class)); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.FAILURE_CODE_5XX); + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.TOTAL_FAILED_REQUESTS); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + } + @Test public void shouldPopulateSingleResultFromHttpCallInInputRow() { outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -284,7 +374,7 @@ public void shouldPopulateMultipleResultsFromHttpCallInInputRow() { outputMapping.put("s2_id_level", new OutputMapping("$.prediction")); outputColumnNames = Arrays.asList("surge_factor", "s2_id_level"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -313,7 +403,7 @@ public void shouldThrowExceptionIfFieldNotFoundInFieldDescriptorWhenTypeIsPassed outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(response.getStatusCode()).thenReturn(200); when(response.getResponseBody()).thenReturn("{\n" + " \"surge\": 0.732\n" @@ -331,7 +421,7 @@ public void shouldThrowExceptionIfPathIsWrongIfFailOnErrorsTrue() { outputMapping.put("surge_factor", new OutputMapping("invalidPath")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -356,7 +446,7 @@ public void shouldPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTypeIs outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", true); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", null, "345", headers, outputMapping, "metricId_02", true); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -381,7 +471,7 @@ public void shouldNotPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTyp outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, null, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -406,7 +496,7 @@ public void shouldHandleAnySuccessResponseCodeOtherThan200() { outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfigTest.java index 03ca7611b..520a31962 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfigTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpSourceConfigTest.java @@ -27,6 +27,7 @@ public class HttpSourceConfigTest { private String headerVariables; private String connectTimeout; private boolean failOnErrors; + private String failOnErrorsCodeRange; private String type; private String capacity; private String metricId; @@ -49,11 +50,12 @@ public void setup() { headerVariables = "customer_id"; connectTimeout = "234"; failOnErrors = false; + failOnErrorsCodeRange = ""; type = "InputProtoMessage"; capacity = "345"; metricId = "metricId-http-01"; retainResponseType = false; - defaultHttpSourceConfig = new HttpSourceConfig(endpoint, endpointVariable, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headerMap, outputMappings, metricId, retainResponseType); + defaultHttpSourceConfig = new HttpSourceConfig(endpoint, endpointVariable, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, failOnErrorsCodeRange, type, capacity, headerMap, outputMappings, metricId, retainResponseType); } @Test @@ -113,13 +115,13 @@ public void hasTypeShouldBeTrueWhenTypeIsPresent() { @Test public void hasTypeShouldBeFalseWhenTypeIsNull() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, null, null, "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } @Test public void hasTypeShouldBeFalseWhenTypeIsEmpty() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, null, "", "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } @@ -153,7 +155,7 @@ public void shouldValidate() { @Test public void shouldThrowExceptionIfAllFieldsMissing() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig(null, null, null, null, requestVariables, null, null, null, null, false, null, capacity, null, null, metricId, retainResponseType); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig(null, null, null, null, requestVariables, null, null, null, null, false, null, null, capacity, null, null, metricId, retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> httpSourceConfig.validateFields()); assertEquals("Missing required fields: [endpoint, streamTimeout, requestPattern, verb, connectTimeout, outputMapping]", exception.getMessage()); @@ -162,7 +164,7 @@ public void shouldThrowExceptionIfAllFieldsMissing() { @Test public void shouldThrowExceptionIfSomeFieldsMissing() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", "", "post", "body", requestVariables, null, null, null, null, false, null, capacity, null, null, "metricId_01", retainResponseType); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", "", "post", "body", requestVariables, null, null, null, null, false, null, null, capacity, null, null, "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> httpSourceConfig.validateFields()); assertEquals("Missing required fields: [streamTimeout, connectTimeout, outputMapping]", exception.getMessage()); @@ -176,7 +178,7 @@ public void shouldThrowExceptionIfFieldsOfNestedObjectsAreMissing() { outputMappings.put("field", outputMappingWithNullField); defaultHttpSourceConfig = new HttpSourceConfig("http://localhost", "", - "post", "request_body", requestVariables, "", "", "4000", "1000", false, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); + "post", "request_body", requestVariables, "", "", "4000", "1000", false, null, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> defaultHttpSourceConfig.validateFields()); assertEquals("Missing required fields: [path]", exception.getMessage()); @@ -191,7 +193,7 @@ public void shouldThrowExceptionIfRequestPatternIsEmpty() { outputMappings.put("field", outputMappingWithNullField); defaultHttpSourceConfig = new HttpSourceConfig("http://localhost", "", - "post", "", requestVariables, "", "", "4000", "1000", false, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); + "post", "", requestVariables, "", "", "4000", "1000", false, null, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> defaultHttpSourceConfig.validateFields()); assertEquals("Missing required fields: [requestPattern]", exception.getMessage()); @@ -226,7 +228,7 @@ public void shouldReturnMandatoryFields() { @Test public void shouldValidateWhenOutputMappingIsEmpty() { - defaultHttpSourceConfig = new HttpSourceConfig(endpoint, endpointVariable, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headerMap, new HashMap<>(), "metricId_01", retainResponseType); + defaultHttpSourceConfig = new HttpSourceConfig(endpoint, endpointVariable, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, failOnErrorsCodeRange, type, capacity, headerMap, new HashMap<>(), "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> defaultHttpSourceConfig.validateFields()); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java index 53ccb99b3..d4db23a22 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java @@ -40,14 +40,14 @@ public void setup() { @Test public void shouldReturnTrueForGetVerbOnCanCreate() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "", "type", "345", new HashMap<>(), null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); assertTrue(httpGetRequestBuilder.canCreate()); } @Test public void shouldReturnFalseForVerbOtherThanGetOnCanBuild() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "", "type", "345", new HashMap<>(), null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); assertFalse(httpGetRequestBuilder.canCreate()); } @@ -55,7 +55,7 @@ public void shouldReturnFalseForVerbOtherThanGetOnCanBuild() { @Test public void shouldBuildGetRequest() { when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "", "", "123", "234", false, "", "type", "345", new HashMap<>(), null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); assertEquals(request, httpGetRequestBuilder.create()); } @@ -64,7 +64,7 @@ public void shouldBuildGetRequest() { public void shouldBuildGetRequestWithOnlyDynamicHeader() { when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); when(request.addHeader("header_key", "1")).thenReturn(request); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "", "type", "345", new HashMap<>(), null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); httpGetRequestBuilder.create(); verify(request, times(1)).addHeader(anyString(), anyString()); @@ -77,7 +77,7 @@ public void shouldBuildGetRequestWithDynamicAndStaticHeader() { when(request.addHeader("header_key", "1")).thenReturn(request); HashMap staticHeader = new HashMap(); staticHeader.put("static", "2"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "", "type", "345", staticHeader, null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); httpGetRequestBuilder.create(); verify(request, times(2)).addHeader(anyString(), anyString()); @@ -90,7 +90,7 @@ public void shouldBuildGetRequestWithMultipleDynamicAndStaticHeaders() { when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); HashMap staticHeader = new HashMap(); staticHeader.put("static", "3"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%s\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%s\"}", "1,2", "123", "234", false, "", "type", "345", staticHeader, null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); httpGetRequestBuilder.create(); verify(request, times(3)).addHeader(anyString(), anyString()); @@ -107,7 +107,7 @@ public void shouldThrowErrorIfHeaderVariablesAreIncompatible() { ArrayList incompatibleHeaderVariablesValues = new ArrayList<>(); incompatibleHeaderVariablesValues.add("test1"); incompatibleHeaderVariablesValues.add("test12"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%d\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%d\"}", "1,2", "123", "234", false, "", "type", "345", staticHeader, null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), incompatibleHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); InvalidConfigurationException exception = assertThrows(InvalidConfigurationException.class, () -> httpGetRequestBuilder.create()); assertEquals("pattern config '{\"header_key_1\": \"%s\",\"header_key_2\": \"%d\"}' is incompatible with the variable config '1,2'", exception.getMessage()); @@ -118,7 +118,7 @@ public void shouldThrowErrorIfHeaderHeaderPatternIsInvalid() { when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); HashMap staticHeader = new HashMap(); staticHeader.put("static", "3"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%p\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "/key/%s", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%p\"}", "1,2", "123", "234", false, "", "type", "345", staticHeader, null, "metricId_01", false); HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); InvalidConfigurationException exception = assertThrows(InvalidConfigurationException.class, () -> httpGetRequestBuilder.create()); assertEquals("pattern config '{\"header_key_1\": \"%s\",\"header_key_2\": \"%p\"}' is invalid", exception.getMessage()); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java index c6dc3ee88..9302a2c83 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java @@ -41,14 +41,14 @@ public void setup() { @Test public void shouldReturnTrueForPostVerbOnCanCreate() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", null, "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", null, "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); assertTrue(httpPostRequestBuilder.canCreate()); } @Test public void shouldReturnFalseForVerbOtherThanPostOnCanBuild() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "GET", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); assertFalse(httpPostRequestBuilder.canCreate()); } @@ -57,7 +57,7 @@ public void shouldReturnFalseForVerbOtherThanPostOnCanBuild() { public void shouldBuildPostRequestWithoutHeader() { when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); assertEquals(request, httpPostRequestBuilder.create()); } @@ -66,7 +66,7 @@ public void shouldBuildPostRequestWithoutHeader() { public void shouldBuildPostRequestWithOnlyDynamicHeader() { when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); httpPostRequestBuilder.create(); verify(request, times(1)).addHeader(anyString(), anyString()); @@ -79,7 +79,7 @@ public void shouldBuildPostRequestWithDynamicAndStaticHeader() { when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); HashMap staticHeader = new HashMap(); staticHeader.put("static", "2"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, null, "type", "345", staticHeader, null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); httpPostRequestBuilder.create(); verify(request, times(2)).addHeader(anyString(), anyString()); @@ -93,7 +93,7 @@ public void shouldBuildPostRequestWithMultipleDynamicAndStaticHeaders() { when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); HashMap staticHeader = new HashMap(); staticHeader.put("static", "3"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%s\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%s\"}", "1,2", "123", "234", false, null, "type", "345", staticHeader, null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); httpPostRequestBuilder.create(); verify(request, times(3)).addHeader(anyString(), anyString()); @@ -111,7 +111,7 @@ public void shouldThrowErrorIfHeaderVariablesAreIncompatible() { ArrayList incompatibleHeaderVariablesValues = new ArrayList<>(); incompatibleHeaderVariablesValues.add("test1"); incompatibleHeaderVariablesValues.add("test12"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%d\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%d\"}", "1,2", "123", "234", false, null, "type", "345", staticHeader, null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), incompatibleHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); InvalidConfigurationException exception = assertThrows(InvalidConfigurationException.class, () -> httpPostRequestBuilder.create()); assertEquals("pattern config '{\"header_key_1\": \"%s\",\"header_key_2\": \"%d\"}' is incompatible with the variable config '1,2'", exception.getMessage()); @@ -123,7 +123,7 @@ public void shouldThrowErrorIfHeaderHeaderPatternIsInvalid() { when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); HashMap staticHeader = new HashMap(); staticHeader.put("static", "3"); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%p\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%p\"}", "1,2", "123", "234", false, null, "type", "345", staticHeader, null, "metricId_01", false); HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray(), endpointVariablesValues.toArray()); InvalidConfigurationException exception = assertThrows(InvalidConfigurationException.class, () -> httpPostRequestBuilder.create()); assertEquals("pattern config '{\"header_key_1\": \"%s\",\"header_key_2\": \"%p\"}' is invalid", exception.getMessage()); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java index 07b8ca7b6..f055f0cc2 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java @@ -41,7 +41,7 @@ public void setup() { @Test public void shouldReturnPostRequestOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", null, "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", null, "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); when(request.setBody("{\"key\": \"123456\"}")).thenReturn(request); HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); @@ -53,7 +53,7 @@ public void shouldReturnPostRequestOnTheBasisOfConfiguration() { @Test public void shouldReturnPostRequestWithMultiEndpointVariablesOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s/%s", "exp, 222", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s/%s", "exp, 222", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.preparePost("http://localhost:8080/test/exp/222")).thenReturn(request); when(request.setBody("{\"key\": \"123456\"}")).thenReturn(request); endpointVariablesValues.add("exp"); @@ -67,7 +67,7 @@ public void shouldReturnPostRequestWithMultiEndpointVariablesOnTheBasisOfConfigu @Test public void shouldReturnGetRequestOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", null, "GET", "/key/%s", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", null, "GET", "/key/%s", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray()); @@ -78,7 +78,7 @@ public void shouldReturnGetRequestOnTheBasisOfConfiguration() { @Test public void shouldReturnGetRequestWithMultiEndpointVariablesOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s/%s", "123, 332", "GET", "/key/%s", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s/%s", "123, 332", "GET", "/key/%s", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); endpointVariablesValues.add("123"); endpointVariablesValues.add("332"); @@ -90,7 +90,7 @@ public void shouldReturnGetRequestWithMultiEndpointVariablesOnTheBasisOfConfigur @Test public void shouldReturnPutRequestOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s", "123", "PUT", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s", "123", "PUT", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.preparePut("http://localhost:8080/test/123")).thenReturn(request); when(request.setBody("{\"key\": \"123456\"}")).thenReturn(request); endpointVariablesValues.add("123"); @@ -103,7 +103,7 @@ public void shouldReturnPutRequestOnTheBasisOfConfiguration() { @Test public void shouldReturnPutRequestWithMultiEndpointVariablesOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s/abc/%s", "123, 321, asd", "PUT", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test/%s/abc/%s", "123, 321, asd", "PUT", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.preparePut("http://localhost:8080/test/123/abc/asd")).thenReturn(request); when(request.setBody("{\"key\": \"123456\"}")).thenReturn(request); endpointVariablesValues.add("123"); @@ -118,7 +118,7 @@ public void shouldReturnPutRequestWithMultiEndpointVariablesOnTheBasisOfConfigur @Test public void shouldThrowExceptionForUnsupportedHttpVerb() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "PATCH", "/key/%s", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "PATCH", "/key/%s", "1", "", "", "123", "234", false, null, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); assertThrows(InvalidHttpVerbException.class, () -> HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray(), endpointVariablesValues.toArray())); } diff --git a/docs/docs/advance/post_processor.md b/docs/docs/advance/post_processor.md index 5c11f05a7..3c1f8aa5c 100644 --- a/docs/docs/advance/post_processor.md +++ b/docs/docs/advance/post_processor.md @@ -291,6 +291,13 @@ A flag for deciding whether the job should fail on encountering errors(timeout a - Type: `optional` - Default value: `false` +##### `exclude_fail_on_errors_code_range` + +Defines the exclusion range of HTTP status codes for which job should not fail if `fail_on_errors` is true. + +- Example value: `400,404-499` +- Type: `optional` + ##### `capacity` This parameter(Async I/O capacity) defines how many max asynchronous requests may be in progress at the same time. From e0f61fbf4c30f8a88105c213f19cb183d473b225 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Sun, 30 Jul 2023 23:32:47 +0530 Subject: [PATCH 2/4] fix(dagger-core): renamed exclusion function and added printStackTrace --- .../core/processors/external/http/HttpResponseHandler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java index f3b850003..f308f7f0b 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java @@ -99,6 +99,7 @@ public Object onCompleted(Response response) { @Override public void onThrowable(Throwable t) { + t.printStackTrace(); meterStatsManager.markEvent(ExternalSourceAspects.OTHER_ERRORS); failureHandler(t.getMessage(), httpSourceConfig.isFailOnErrors()); } @@ -147,10 +148,10 @@ private boolean shouldFailOnError(Integer statusCode) { if (statusCode == 0 || StringUtil.isNullOrEmpty(httpSourceConfig.getExcludeFailOnErrorsCodeRange())) { return true; } - return !getFailOnErrorCodeRanges(httpSourceConfig.getExcludeFailOnErrorsCodeRange()).contains(statusCode); + return !getExcludeFailOnErrorCodes(httpSourceConfig.getExcludeFailOnErrorsCodeRange()).contains(statusCode); } - private HashSet getFailOnErrorCodeRanges(String input) { + private HashSet getExcludeFailOnErrorCodes(String input) { String[] ranges = input.split(","); HashSet statusSet = new HashSet(); Arrays.stream(ranges).forEach(range -> { From d77573251bae44f7936babb249eebc8289dc4102 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Fri, 4 Aug 2023 10:54:40 +0530 Subject: [PATCH 3/4] fix: move failOnErrorsExclusionSet out-side of response handler --- .../external/http/HttpAsyncConnector.java | 26 ++++++++- .../external/http/HttpResponseHandler.java | 53 +++++++---------- .../external/http/HttpAsyncConnectorTest.java | 36 ++++++++++-- .../http/HttpResponseHandlerTest.java | 58 ++++++++++--------- 4 files changed, 107 insertions(+), 66 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java index fc14d0999..aba0daae9 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java @@ -12,6 +12,7 @@ import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.core.processors.external.AsyncConnector; import com.gotocompany.dagger.core.processors.external.ExternalMetricConfig; +import io.netty.util.internal.StringUtil; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.types.Row; @@ -20,6 +21,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import static org.asynchttpclient.Dsl.asyncHttpClient; import static org.asynchttpclient.Dsl.config; @@ -31,6 +39,7 @@ public class HttpAsyncConnector extends AsyncConnector { private static final Logger LOGGER = LoggerFactory.getLogger(HttpAsyncConnector.class.getName()); private AsyncHttpClient httpClient; private HttpSourceConfig httpSourceConfig; + private Set failOnErrorsExclusionSet; /** * Instantiates a new Http async connector with specified http client. @@ -104,7 +113,7 @@ protected void process(Row input, ResultFuture resultFuture) { } BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues, endpointVariablesValues); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(), + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getFailOnErrorsExclusionSet(httpSourceConfig.getExcludeFailOnErrorsCodeRange()), getMeterStatsManager(), rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry()); httpResponseHandler.startTimer(); request.execute(httpResponseHandler); @@ -114,4 +123,19 @@ protected void process(Row input, ResultFuture resultFuture) { } } + + protected Set getFailOnErrorsExclusionSet(String excludeFailOnErrorsCodeRange) { + if (failOnErrorsExclusionSet == null) { + failOnErrorsExclusionSet = new HashSet(); + if (StringUtil.isNullOrEmpty(excludeFailOnErrorsCodeRange)) { + return failOnErrorsExclusionSet; + } + String[] ranges = excludeFailOnErrorsCodeRange.split(","); + Arrays.stream(ranges).forEach(range -> { + List rangeList = Arrays.stream(range.split("-")).map(Integer::parseInt).collect(Collectors.toList()); + IntStream.rangeClosed(rangeList.get(0), rangeList.get(rangeList.size() - 1)).forEach(statusCode -> failOnErrorsExclusionSet.add(statusCode)); + }); + } + return failOnErrorsExclusionSet; + } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java index f308f7f0b..512518f14 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java @@ -13,7 +13,6 @@ import com.gotocompany.dagger.core.processors.common.RowManager; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; -import io.netty.util.internal.StringUtil; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.types.Row; import org.asynchttpclient.AsyncCompletionHandler; @@ -25,12 +24,8 @@ import java.util.ArrayList; import java.util.Map; import java.util.Collections; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; +import java.util.Set; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * The Http response handler. @@ -44,6 +39,7 @@ public class HttpResponseHandler extends AsyncCompletionHandler { private Descriptors.Descriptor descriptor; private ResultFuture resultFuture; private HttpSourceConfig httpSourceConfig; + private Set failOnErrorsExclusionSet; private MeterStatsManager meterStatsManager; private Instant startTime; private ErrorReporter errorReporter; @@ -53,20 +49,22 @@ public class HttpResponseHandler extends AsyncCompletionHandler { /** * Instantiates a new Http response handler. * - * @param httpSourceConfig the http source config - * @param meterStatsManager the meter stats manager - * @param rowManager the row manager - * @param columnNameManager the column name manager - * @param descriptor the descriptor - * @param resultFuture the result future - * @param errorReporter the error reporter - * @param postResponseTelemetry the post response telemetry + * @param httpSourceConfig the http source config + * @param failOnErrorsExclusionSet the fail on error exclusion set + * @param meterStatsManager the meter stats manager + * @param rowManager the row manager + * @param columnNameManager the column name manager + * @param descriptor the descriptor + * @param resultFuture the result future + * @param errorReporter the error reporter + * @param postResponseTelemetry the post response telemetry */ - public HttpResponseHandler(HttpSourceConfig httpSourceConfig, MeterStatsManager meterStatsManager, RowManager rowManager, + public HttpResponseHandler(HttpSourceConfig httpSourceConfig, Set failOnErrorsExclusionSet, MeterStatsManager meterStatsManager, RowManager rowManager, ColumnNameManager columnNameManager, Descriptors.Descriptor descriptor, ResultFuture resultFuture, ErrorReporter errorReporter, PostResponseTelemetry postResponseTelemetry) { this.httpSourceConfig = httpSourceConfig; + this.failOnErrorsExclusionSet = failOnErrorsExclusionSet; this.meterStatsManager = meterStatsManager; this.rowManager = rowManager; this.columnNameManager = columnNameManager; @@ -91,8 +89,7 @@ public Object onCompleted(Response response) { successHandler(response); } else { postResponseTelemetry.validateResponseCode(meterStatsManager, statusCode); - boolean shouldFailOnError = (httpSourceConfig.isFailOnErrors() ? shouldFailOnError(statusCode) : false); - failureHandler("Received status code : " + statusCode, shouldFailOnError); + failureHandler("Received status code : " + statusCode, statusCode); } return response; } @@ -101,7 +98,7 @@ public Object onCompleted(Response response) { public void onThrowable(Throwable t) { t.printStackTrace(); meterStatsManager.markEvent(ExternalSourceAspects.OTHER_ERRORS); - failureHandler(t.getMessage(), httpSourceConfig.isFailOnErrors()); + failureHandler(t.getMessage(), 0); } private void successHandler(Response response) { @@ -130,13 +127,13 @@ private void successHandler(Response response) { * Failure handler. * * @param logMessage the log message - * @param shouldFailOnErrors should fail on error + * @param statusCode the status code */ - public void failureHandler(String logMessage, boolean shouldFailOnErrors) { + public void failureHandler(String logMessage, Integer statusCode) { postResponseTelemetry.sendFailureTelemetry(meterStatsManager, startTime); LOGGER.error(logMessage); Exception httpFailureException = new HttpFailureException(logMessage); - if (shouldFailOnErrors) { + if (shouldFailOnError(statusCode)) { reportAndThrowError(httpFailureException); } else { errorReporter.reportNonFatalException(httpFailureException); @@ -145,20 +142,10 @@ public void failureHandler(String logMessage, boolean shouldFailOnErrors) { } private boolean shouldFailOnError(Integer statusCode) { - if (statusCode == 0 || StringUtil.isNullOrEmpty(httpSourceConfig.getExcludeFailOnErrorsCodeRange())) { + if (httpSourceConfig.isFailOnErrors() && (statusCode == 0 || !failOnErrorsExclusionSet.contains(statusCode))) { return true; } - return !getExcludeFailOnErrorCodes(httpSourceConfig.getExcludeFailOnErrorsCodeRange()).contains(statusCode); - } - - private HashSet getExcludeFailOnErrorCodes(String input) { - String[] ranges = input.split(","); - HashSet statusSet = new HashSet(); - Arrays.stream(ranges).forEach(range -> { - List rangeList = Arrays.stream(range.split("-")).map(Integer::parseInt).collect(Collectors.toList()); - IntStream.rangeClosed(rangeList.get(0), rangeList.get(rangeList.size() - 1)).forEach(statusCode -> statusSet.add(statusCode)); - }); - return statusSet; + return false; } private void setField(String key, Object value, int fieldIndex) { diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java index 74dfa14e7..db162ef31 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java @@ -26,10 +26,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeoutException; import static org.junit.Assert.*; @@ -139,6 +136,33 @@ public void shouldRegisterStatsManagerInOpen() throws Exception { verify(meterStatsManager, times(1)).register("source_metricId", "HTTP.metricId-http-01", ExternalSourceAspects.values()); } + @Test + public void shouldReturnEmptySetIfFailOnErrorsExclusionCodeRangeNULL() throws Exception { + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet(null); + assertTrue(failOnErrorsExclusionSet.isEmpty()); + } + + @Test + public void shouldReturnEmptySetIfFailOnErrorsExclusionCodeRangeEmpty() throws Exception { + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet(""); + assertTrue(failOnErrorsExclusionSet.isEmpty()); + } + + @Test + public void shouldReturnSetIfFailOnErrorsExclusionCodeRangeProvided() throws Exception { + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet("400,410-499"); + assertTrue(failOnErrorsExclusionSet.contains(400)); + assertFalse(failOnErrorsExclusionSet.contains(401)); + assertFalse(failOnErrorsExclusionSet.contains(409)); + assertTrue(failOnErrorsExclusionSet.contains(410)); + assertTrue(failOnErrorsExclusionSet.contains(429)); + assertTrue(failOnErrorsExclusionSet.contains(499)); + assertTrue(failOnErrorsExclusionSet.size() == 91); + } + @Test public void shouldInitializeDescriptorManagerInOpen() throws Exception { when(schemaConfig.getStencilClientOrchestrator()).thenReturn(stencilClientOrchestrator); @@ -423,7 +447,7 @@ public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrue() throws Exception } @Test - public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrueWithFailOnErrorCodeRange() throws Exception { + public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrueWithExcludeFailOnErrorCodeRange() throws Exception { HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "400-600", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); @@ -443,7 +467,7 @@ public void shouldReportFatalInTimeoutIfFailOnErrorIsTrue() throws Exception { } @Test - public void shouldReportFatalInTimeoutIfFailOnErrorIsTrueWithFailOnErrorCodeRange() throws Exception { + public void shouldReportFatalInTimeoutIfFailOnErrorIsTrueWithExcludeFailOnErrorCodeRange() throws Exception { HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "401-600", httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java index e21344946..e98f94b1e 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java @@ -20,10 +20,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; +import java.util.*; +import java.util.stream.IntStream; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -79,7 +77,7 @@ public void setup() { @Test public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIs404() { - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(404); httpResponseHandler.startTimer(); @@ -96,7 +94,7 @@ public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIs404() { @Test public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIs4XXOtherThan404() { - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(402); httpResponseHandler.startTimer(); @@ -113,7 +111,7 @@ public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIs4XXOtherThan404() { @Test public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIs5XX() { - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(502); httpResponseHandler.startTimer(); @@ -131,7 +129,7 @@ public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIs5XX() { @Test public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIsOtherThan5XXAnd4XX() { - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(302); httpResponseHandler.startTimer(); @@ -149,7 +147,7 @@ public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIsOtherThan5XXAnd4XX() @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs404() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(404); httpResponseHandler.startTimer(); @@ -168,7 +166,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs404() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs4XXOtherThan404() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(400); httpResponseHandler.startTimer(); @@ -186,7 +184,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs4XXOtherThan404() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs5XX() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(502); httpResponseHandler.startTimer(); @@ -204,7 +202,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs5XX() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIsOtherThan5XXAnd4XX() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(302); httpResponseHandler.startTimer(); @@ -221,7 +219,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIsOtherThan5XXAnd4XX() @Test public void shouldPassInputIfFailOnErrorFalseAndOnThrowable() { - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Throwable throwable = new Throwable("throwable message"); httpResponseHandler.startTimer(); @@ -238,7 +236,7 @@ public void shouldPassInputIfFailOnErrorFalseAndOnThrowable() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowable() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Throwable throwable = new Throwable("throwable message"); httpResponseHandler.startTimer(); @@ -256,7 +254,9 @@ public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowable() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowableWithExcludeFailOnErrorsCodeRange() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "404-499", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HashSet failOnErrorsExclusionSet = new HashSet(); + IntStream.rangeClosed(404, 499).forEach(statusCode -> failOnErrorsExclusionSet.add(statusCode)); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, failOnErrorsExclusionSet, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Throwable throwable = new Throwable("throwable message"); httpResponseHandler.startTimer(); @@ -274,7 +274,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowableWithExcludeFailOnErro @Test public void shouldFailForAnyNone2xxIfFailOnErrorsTrueWithNullExcludeFailOnErrorsCodeRange() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(400); httpResponseHandler.startTimer(); @@ -292,7 +292,7 @@ public void shouldFailForAnyNone2xxIfFailOnErrorsTrueWithNullExcludeFailOnErrors @Test public void shouldFailForAnyNone2xxIfFailOnErrorsTrueWithEmptyExcludeFailOnErrorsCodeRange() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(400); httpResponseHandler.startTimer(); @@ -310,7 +310,10 @@ public void shouldFailForAnyNone2xxIfFailOnErrorsTrueWithEmptyExcludeFailOnError @Test public void shouldPassForAnyNone2xxInsideOfFailOnErrorsCodeRangeIfFailOnErrorsTrue() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "400,404-499", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HashSet failOnErrorsExclusionSet = new HashSet(); + IntStream.rangeClosed(404, 499).forEach(statusCode -> failOnErrorsExclusionSet.add(statusCode)); + failOnErrorsExclusionSet.add(400); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, failOnErrorsExclusionSet, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(400); httpResponseHandler.startTimer(); @@ -328,7 +331,10 @@ public void shouldPassForAnyNone2xxInsideOfFailOnErrorsCodeRangeIfFailOnErrorsTr @Test public void shouldFailForAnyNone2xxOutsideOfFailOnErrorsCodeRangeIfFailOnErrorsTrue() { httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "400,404-499", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HashSet failOnErrorsExclusionSet = new HashSet(); + IntStream.rangeClosed(404, 499).forEach(statusCode -> failOnErrorsExclusionSet.add(statusCode)); + failOnErrorsExclusionSet.add(400); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, failOnErrorsExclusionSet, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(502); httpResponseHandler.startTimer(); @@ -349,7 +355,7 @@ public void shouldPopulateSingleResultFromHttpCallInInputRow() { outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); outputData.setField(0, 0.732f); @@ -375,7 +381,7 @@ public void shouldPopulateMultipleResultsFromHttpCallInInputRow() { outputColumnNames = Arrays.asList("surge_factor", "s2_id_level"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); outputData.setField(0, 0.732f); @@ -408,7 +414,7 @@ public void shouldThrowExceptionIfFieldNotFoundInFieldDescriptorWhenTypeIsPassed when(response.getResponseBody()).thenReturn("{\n" + " \"surge\": 0.732\n" + "}"); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); httpResponseHandler.startTimer(); assertThrows(NullPointerException.class, @@ -422,7 +428,7 @@ public void shouldThrowExceptionIfPathIsWrongIfFailOnErrorsTrue() { outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, "", httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); outputData.setField(0, 0.732f); @@ -447,7 +453,7 @@ public void shouldPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTypeIs outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "", null, "345", headers, outputMapping, "metricId_02", true); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); outputData.setField(0, 0.732); @@ -472,7 +478,7 @@ public void shouldNotPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTyp outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, null, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); outputData.setField(0, 0.732f); @@ -497,7 +503,7 @@ public void shouldHandleAnySuccessResponseCodeOtherThan200() { outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); outputData.setField(0, 0.732f); From c489b75c434cf26596e0018dd2b8316649b754db Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Fri, 4 Aug 2023 16:30:30 +0530 Subject: [PATCH 4/4] fix: set failOnErrorsExclusionSet from open --- .../external/http/HttpAsyncConnector.java | 24 ++++++++++++------- .../external/http/HttpAsyncConnectorTest.java | 18 +++++++++++--- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java index aba0daae9..f7b1592cd 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnector.java @@ -13,6 +13,7 @@ import com.gotocompany.dagger.core.processors.external.AsyncConnector; import com.gotocompany.dagger.core.processors.external.ExternalMetricConfig; import io.netty.util.internal.StringUtil; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.types.Row; @@ -89,6 +90,12 @@ protected void createClient() { } } + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + setFailOnErrorsExclusionSet(httpSourceConfig.getExcludeFailOnErrorsCodeRange()); + } + @Override public void close() throws Exception { httpClient.close(); @@ -111,9 +118,8 @@ protected void process(Row input, ResultFuture resultFuture) { if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) { return; } - BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues, endpointVariablesValues); - HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getFailOnErrorsExclusionSet(httpSourceConfig.getExcludeFailOnErrorsCodeRange()), getMeterStatsManager(), + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getFailOnErrorsExclusionSet(), getMeterStatsManager(), rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry()); httpResponseHandler.startTimer(); request.execute(httpResponseHandler); @@ -124,18 +130,18 @@ protected void process(Row input, ResultFuture resultFuture) { } - protected Set getFailOnErrorsExclusionSet(String excludeFailOnErrorsCodeRange) { - if (failOnErrorsExclusionSet == null) { - failOnErrorsExclusionSet = new HashSet(); - if (StringUtil.isNullOrEmpty(excludeFailOnErrorsCodeRange)) { - return failOnErrorsExclusionSet; - } + protected Set getFailOnErrorsExclusionSet() { + return failOnErrorsExclusionSet; + } + + private void setFailOnErrorsExclusionSet(String excludeFailOnErrorsCodeRange) { + failOnErrorsExclusionSet = new HashSet(); + if (!StringUtil.isNullOrEmpty(excludeFailOnErrorsCodeRange)) { String[] ranges = excludeFailOnErrorsCodeRange.split(","); Arrays.stream(ranges).forEach(range -> { List rangeList = Arrays.stream(range.split("-")).map(Integer::parseInt).collect(Collectors.toList()); IntStream.rangeClosed(rangeList.get(0), rangeList.get(rangeList.size() - 1)).forEach(statusCode -> failOnErrorsExclusionSet.add(statusCode)); }); } - return failOnErrorsExclusionSet; } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java index db162ef31..88e3cc139 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpAsyncConnectorTest.java @@ -139,21 +139,33 @@ public void shouldRegisterStatsManagerInOpen() throws Exception { @Test public void shouldReturnEmptySetIfFailOnErrorsExclusionCodeRangeNULL() throws Exception { HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); - Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet(null); + + httpAsyncConnector.open(flinkConfiguration); + + Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet(); assertTrue(failOnErrorsExclusionSet.isEmpty()); } @Test public void shouldReturnEmptySetIfFailOnErrorsExclusionCodeRangeEmpty() throws Exception { HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); - Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet(""); + + httpAsyncConnector.open(flinkConfiguration); + + Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet(); assertTrue(failOnErrorsExclusionSet.isEmpty()); } @Test public void shouldReturnSetIfFailOnErrorsExclusionCodeRangeProvided() throws Exception { + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", + "customer_id", "", "", "123", "234", true, "400,410-499", httpConfigType, "345", + headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); - Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet("400,410-499"); + + httpAsyncConnector.open(flinkConfiguration); + + Set failOnErrorsExclusionSet = httpAsyncConnector.getFailOnErrorsExclusionSet(); assertTrue(failOnErrorsExclusionSet.contains(400)); assertFalse(failOnErrorsExclusionSet.contains(401)); assertFalse(failOnErrorsExclusionSet.contains(409));