From c214766bae6c6d90881fb7d77a8e8ad9a64243fe Mon Sep 17 00:00:00 2001 From: Shreyansh Date: Tue, 29 Aug 2023 13:34:31 +0530 Subject: [PATCH 1/2] chore: Map external_source response to the nested output proto field In order to appropriately organize the proto, ensuring the mapping of external API responses (whether gRPC or HTTP) to the nested fields in the output proto is efficient. Additionally, this approach, while maintaining compatibility with the prior method of associating external responses to predefined fields, avoids the need to introduce supplementary fields in the output proto. --- .../external/grpc/GrpcResponseHandler.java | 13 +- .../external/http/HttpResponseHandler.java | 13 +- .../dagger/core/utils/DescriptorsUtil.java | 47 ++++++ .../grpc/GrpcResponseHandlerTest.java | 153 ++++++++++++++++++ .../http/HttpResponseHandlerTest.java | 114 ++++++++++++- .../core/utils/DescriptorsUtilTest.java | 84 ++++++++++ 6 files changed, 415 insertions(+), 9 deletions(-) create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java index 50ae49b6b..d88b39ca1 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java @@ -14,6 +14,7 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; +import com.gotocompany.dagger.core.utils.DescriptorsUtil; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import io.grpc.stub.StreamObserver; @@ -111,10 +112,14 @@ private void setField(String key, Object value, int fieldIndex) { } private void setFieldUsingType(String key, Object value, int fieldIndex) { - Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key); - if (fieldDescriptor == null) { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key); - reportAndThrowError(illegalArgumentException); + Descriptors.FieldDescriptor fieldDescriptor = null; + try { + fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key); + if (fieldDescriptor == null) { + throw new IllegalArgumentException("Field Descriptor not found for field: " + key); + } + } catch (RuntimeException exception) { + reportAndThrowError(exception); } TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor); rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value)); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java index 512518f14..64a3abd91 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java @@ -11,6 +11,7 @@ import com.gotocompany.dagger.core.processors.common.OutputMapping; import com.gotocompany.dagger.core.processors.common.PostResponseTelemetry; import com.gotocompany.dagger.core.processors.common.RowManager; +import com.gotocompany.dagger.core.utils.DescriptorsUtil; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import org.apache.flink.streaming.api.functions.async.ResultFuture; @@ -157,10 +158,14 @@ private void setField(String key, Object value, int fieldIndex) { } private void setFieldUsingType(String key, Object value, Integer fieldIndex) { - Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key); - if (fieldDescriptor == null) { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key); - reportAndThrowError(illegalArgumentException); + Descriptors.FieldDescriptor fieldDescriptor = null; + try { + fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key); + if (fieldDescriptor == null) { + throw new IllegalArgumentException("Field Descriptor not found for field: " + key); + } + } catch (RuntimeException exception) { + reportAndThrowError(exception); } TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor); rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value)); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java new file mode 100644 index 000000000..a87a4578e --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java @@ -0,0 +1,47 @@ +package com.gotocompany.dagger.core.utils; + +import com.google.protobuf.Descriptors; + +import java.util.Arrays; + +/** + * Utility class that contains helper methods to get {@link Descriptors} {@link Descriptors.FieldDescriptor}. + */ +public class DescriptorsUtil { + + /** + * Gets FieldDescriptor . + * + * @param descriptor the descriptor + * @param columnName the columnName + * @return the fieldDescriptor + */ + public static Descriptors.FieldDescriptor getFieldDescriptor(Descriptors.Descriptor descriptor, String columnName) { + String[] nestedFields = columnName.split("\\."); + if (nestedFields.length == 1) { + return descriptor.findFieldByName(columnName); + } else { + return getNestedFieldDescriptor(descriptor, nestedFields); + } + } + + /** + * Gets FieldDescriptor . + * + * @param parentDescriptor the descriptor + * @param nestedColumnNames the array of columnNames + * @return the fieldDescriptor + */ + public static Descriptors.FieldDescriptor getNestedFieldDescriptor(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames) { + String childColumnName = nestedColumnNames[0]; + if (nestedColumnNames.length == 1) { + return parentDescriptor.findFieldByName(childColumnName); + } + Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName); + if (childFieldDescriptor == null) { + throw new IllegalArgumentException(String.format("Field Descriptor not found for field: %s in the proto of %s", childColumnName, parentDescriptor.getFullName())); + } + Descriptors.Descriptor childDescriptor = childFieldDescriptor.getMessageType(); + return getNestedFieldDescriptor(childDescriptor, Arrays.copyOfRange(nestedColumnNames, 1, nestedColumnNames.length)); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java index f7adda604..f25d3166f 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java @@ -287,4 +287,157 @@ public void shouldDetectProperComplexBodyAndHandleResponseIfRetainResponseTypeIs verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); } + @Test + public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsFalse() throws InvalidProtocolBufferException { + descriptor = TestBookingLogMessage.getDescriptor(); + outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address")); + outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name")); + + TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); + TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); + + outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + + grpcSourceConfig = new GrpcSourceConfigBuilder() + .setEndpoint("localhost") + .setServicePort(8000) + .setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest") + .setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse") + .setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod") + .setRequestPattern("{\"key\": \"%s\"}") + .setRequestVariables("customer_id") + .setOutputMapping(outputMapping) + .createGrpcSourceConfig(); + grpcSourceConfig.setRetainResponseType(false); + + DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); + GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "Indonesia"); + outputData.setField(1, "GojekTech"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + + grpcResponseHandler.startTimer(); + grpcResponseHandler.onNext(message); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsTrue() throws InvalidProtocolBufferException { + descriptor = TestBookingLogMessage.getDescriptor(); + outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address")); + outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name")); + + TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); + TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); + + outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + + grpcSourceConfig = new GrpcSourceConfigBuilder() + .setEndpoint("localhost") + .setServicePort(8000) + .setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest") + .setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse") + .setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod") + .setRequestPattern("{\"key\": \"%s\"}") + .setRequestVariables("customer_id") + .setOutputMapping(outputMapping) + .createGrpcSourceConfig(); + grpcSourceConfig.setRetainResponseType(true); + + DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); + GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "Indonesia"); + outputData.setField(1, "GojekTech"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + + grpcResponseHandler.startTimer(); + grpcResponseHandler.onNext(message); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldThrowErrorWhenNestedFieldIsNotPresentInOutputDescriptor() throws InvalidProtocolBufferException { + descriptor = TestBookingLogMessage.getDescriptor(); + outputMapping.put("driver_pickup_location.invalid_address", new OutputMapping("$.driver_pickup_location.address")); + TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); + TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); + + grpcSourceConfig = new GrpcSourceConfigBuilder().setEndpoint("localhost").setServicePort(8000).setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest").setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse").setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod").setRequestPattern("{\"key\": \"%s\"}").setRequestVariables("customer_id").setOutputMapping(outputMapping).createGrpcSourceConfig(); + grpcSourceConfig.setRetainResponseType(false); + + outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + + DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); + GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "Indonesia"); + outputData.setField(1, "GojekTech"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + + grpcResponseHandler.startTimer(); + assertThrows(Exception.class, () -> grpcResponseHandler.onNext(message)); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class); + verify(errorReporter, times(1)).reportFatalException(exceptionCaptor.capture()); + assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor.getValue().getMessage()); + + ArgumentCaptor exceptionCaptor2 = ArgumentCaptor.forClass(IllegalArgumentException.class); + verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture()); + assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor2.getValue().getMessage()); + } + @Test + public void shouldThrowErrorWhenNestedParentFieldIsNotPresentInOutputDescriptor() throws InvalidProtocolBufferException { + descriptor = TestBookingLogMessage.getDescriptor(); + outputMapping.put("driver-pickup-location.address", new OutputMapping("$.driver_pickup_location.address")); + TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); + TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); + + grpcSourceConfig = new GrpcSourceConfigBuilder().setEndpoint("localhost").setServicePort(8000).setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest").setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse").setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod").setRequestPattern("{\"key\": \"%s\"}").setRequestVariables("customer_id").setOutputMapping(outputMapping).createGrpcSourceConfig(); + grpcSourceConfig.setRetainResponseType(false); + + outputColumnNames = Arrays.asList("driver-pickup-location.address"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + + DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); + GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "Indonesia"); + outputData.setField(1, "GojekTech"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + + grpcResponseHandler.startTimer(); + assertThrows(Exception.class, () -> grpcResponseHandler.onNext(message)); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class); + verify(errorReporter, times(1)).reportFatalException(exceptionCaptor.capture()); + assertEquals("Field Descriptor not found for field: driver-pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", exceptionCaptor.getValue().getMessage()); + + ArgumentCaptor exceptionCaptor2 = ArgumentCaptor.forClass(IllegalArgumentException.class); + verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture()); + assertEquals("Field Descriptor not found for field: driver-pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", exceptionCaptor2.getValue().getMessage()); + } + } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java index e98f94b1e..b1731ec74 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java @@ -60,7 +60,7 @@ public void setup() { initMocks(this); descriptor = TestSurgeFactorLogMessage.getDescriptor(); outputColumnNames = Collections.singletonList("value"); - inputColumnNames = new String[] {"order_id", "customer_id", "driver_id"}; + inputColumnNames = new String[]{"order_id", "customer_id", "driver_id"}; outputMapping = new HashMap<>(); headers = new HashMap<>(); headers.put("content-type", "application/json"); @@ -521,4 +521,116 @@ public void shouldHandleAnySuccessResponseCodeOtherThan200() { verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); } + + @Test + public void shouldPopulateSingleResultFromHttpCallInInputRowForNestedColumnSetRetainTypeFalse() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer_profile.email", new OutputMapping("$.email")); + outputMapping.put("customer_profile.name", new OutputMapping("$.name")); + outputColumnNames = Arrays.asList("customer_profile.email", "customer_profile.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldPopulateSingleResultFromHttpCallInInputRowForNestedColumnSetRetainTypeTrue() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer_profile.email", new OutputMapping("$.email")); + outputMapping.put("customer_profile.name", new OutputMapping("$.name")); + outputColumnNames = Arrays.asList("customer_profile.email", "customer_profile.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", true); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldThrowNullPointerExceptionWhenNestedFieldIsNotPresentInOutputDescriptor() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer_profile.invalid_email", new OutputMapping("$.email")); + outputColumnNames = Arrays.asList("customer_profile.invalid_email"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + httpResponseHandler.startTimer(); + assertThrows(NullPointerException.class, + () -> httpResponseHandler.onCompleted(response)); + verify(resultFuture, times(1)).completeExceptionally(any(RuntimeException.class)); + } + + @Test + public void shouldThrowExceptionWhenNestedParentFieldIsNotPresentInOutputDescriptor() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer-profile.email", new OutputMapping("$.email")); + outputColumnNames = Arrays.asList("customer-profile.email"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + httpResponseHandler.startTimer(); + + assertThrows(NullPointerException.class, + () -> httpResponseHandler.onCompleted(response)); + verify(resultFuture, times(1)).completeExceptionally(any(RuntimeException.class)); + } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java new file mode 100644 index 000000000..1fe3bd5af --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java @@ -0,0 +1,84 @@ +package com.gotocompany.dagger.core.utils; + +import com.google.protobuf.Descriptors; +import com.gotocompany.dagger.consumer.TestCustomerLogMessage; +import com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class DescriptorsUtilTest { + + @Test + public void shouldGetFieldDescriptor() { + String fieldName = "customer_id"; + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, fieldName); + assertNotNull(fieldDescriptor); + assertEquals("customer_id", fieldDescriptor.getName()); + } + + @Test + public void shouldRunGetNestedFieldDescriptor() { + String fieldName = "customer_profile.customer_id"; + Descriptors.Descriptor descriptor = TestEnrichedBookingLogMessage.getDescriptor(); + Descriptors.FieldDescriptor fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, fieldName); + assertNotNull(fieldDescriptor); + assertEquals("customer_id", fieldDescriptor.getName()); + } + + @Test + public void shouldRunGetNestedFieldColumnsDescriptor() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String[] nestedFieldNames = {"customer_profile", "customer_id"}; + Descriptors.FieldDescriptor fieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(parentDescriptor, nestedFieldNames); + assertNotNull(fieldDescriptor); + assertEquals("customer_id", fieldDescriptor.getName()); + } + + @Test + public void shouldGiveNullForInvalidFieldFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + String nonExistentField = "customer-id"; + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, nonExistentField); + assertNull(nonExistentFieldDescriptor); + } + + @Test + public void shouldGiveNullForInvalidNestedFieldDescriptor() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String fieldName = "customer_profile.customer-id"; + Descriptors.FieldDescriptor invalidFieldDescriptor = DescriptorsUtil.getFieldDescriptor(parentDescriptor, fieldName); + assertNull(invalidFieldDescriptor); + } + + @Test + public void shouldGiveNullForInvlidNestedFieldColumnsDescriptor() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String[] invalidNestedFieldNames = {"customer_profile", "customer-id"}; + Descriptors.FieldDescriptor invalidFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(parentDescriptor, invalidNestedFieldNames); + assertNull(invalidFieldDescriptor); + } + + @Test + public void shouldThrowExceptionWhenNestedColumnDoesNotExists() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String invalidNestedFieldNames = "booking_log.driver_pickup-location.name"; + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> DescriptorsUtil.getFieldDescriptor(parentDescriptor, invalidNestedFieldNames)); + assertEquals("Field Descriptor not found for field: driver_pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", + exception.getMessage()); + } + @Test + public void shouldThrowExceptionWhenNestedColumnGetFieldDescriptorDoesNotExists() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String[] invalidNestedFieldNames = {"booking_log", "driver_pickup-location", "name"}; + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> DescriptorsUtil.getNestedFieldDescriptor(parentDescriptor, invalidNestedFieldNames)); + assertEquals("Field Descriptor not found for field: driver_pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", + exception.getMessage()); + } +} From 317f91da397f8d350836538bee35341f350be1cc Mon Sep 17 00:00:00 2001 From: Shreyansh Date: Thu, 7 Sep 2023 13:15:36 +0530 Subject: [PATCH 2/2] fix: adding null check for the DescriptorsUtil function --- .../dagger/core/utils/DescriptorsUtil.java | 21 ++++++-- .../grpc/GrpcResponseHandlerTest.java | 35 ------------- .../core/utils/DescriptorsUtilTest.java | 51 +++++++++++++------ 3 files changed, 52 insertions(+), 55 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java index a87a4578e..8b3caa589 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java @@ -1,22 +1,31 @@ package com.gotocompany.dagger.core.utils; import com.google.protobuf.Descriptors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; +import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.MESSAGE; + /** * Utility class that contains helper methods to get {@link Descriptors} {@link Descriptors.FieldDescriptor}. */ public class DescriptorsUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(DescriptorsUtil.class.getName()); + /** * Gets FieldDescriptor . * * @param descriptor the descriptor - * @param columnName the columnName + * @param columnName the columnName * @return the fieldDescriptor */ public static Descriptors.FieldDescriptor getFieldDescriptor(Descriptors.Descriptor descriptor, String columnName) { + if (descriptor == null || columnName == null) { + return null; + } String[] nestedFields = columnName.split("\\."); if (nestedFields.length == 1) { return descriptor.findFieldByName(columnName); @@ -28,18 +37,22 @@ public static Descriptors.FieldDescriptor getFieldDescriptor(Descriptors.Descrip /** * Gets FieldDescriptor . * - * @param parentDescriptor the descriptor + * @param parentDescriptor the descriptor * @param nestedColumnNames the array of columnNames * @return the fieldDescriptor */ public static Descriptors.FieldDescriptor getNestedFieldDescriptor(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames) { + if (parentDescriptor == null || nestedColumnNames == null || nestedColumnNames.length == 0) { + return null; + } String childColumnName = nestedColumnNames[0]; if (nestedColumnNames.length == 1) { return parentDescriptor.findFieldByName(childColumnName); } Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName); - if (childFieldDescriptor == null) { - throw new IllegalArgumentException(String.format("Field Descriptor not found for field: %s in the proto of %s", childColumnName, parentDescriptor.getFullName())); + if (childFieldDescriptor == null || childFieldDescriptor.getJavaType() != MESSAGE) { + LOGGER.info(String.format("Either the Field Descriptor for the field '%s' is missing in the proto '%s', or the Field Descriptor is not of the MESSAGE type.", childColumnName, parentDescriptor.getFullName())); + return null; } Descriptors.Descriptor childDescriptor = childFieldDescriptor.getMessageType(); return getNestedFieldDescriptor(childDescriptor, Arrays.copyOfRange(nestedColumnNames, 1, nestedColumnNames.length)); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java index f25d3166f..60c883f04 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java @@ -405,39 +405,4 @@ public void shouldThrowErrorWhenNestedFieldIsNotPresentInOutputDescriptor() thro verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture()); assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor2.getValue().getMessage()); } - @Test - public void shouldThrowErrorWhenNestedParentFieldIsNotPresentInOutputDescriptor() throws InvalidProtocolBufferException { - descriptor = TestBookingLogMessage.getDescriptor(); - outputMapping.put("driver-pickup-location.address", new OutputMapping("$.driver_pickup_location.address")); - TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); - TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); - - grpcSourceConfig = new GrpcSourceConfigBuilder().setEndpoint("localhost").setServicePort(8000).setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest").setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse").setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod").setRequestPattern("{\"key\": \"%s\"}").setRequestVariables("customer_id").setOutputMapping(outputMapping).createGrpcSourceConfig(); - grpcSourceConfig.setRetainResponseType(false); - - outputColumnNames = Arrays.asList("driver-pickup-location.address"); - columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - - DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); - GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); - - Row resultStreamData = new Row(2); - Row outputData = new Row(2); - outputData.setField(0, "Indonesia"); - outputData.setField(1, "GojekTech"); - resultStreamData.setField(0, inputData); - resultStreamData.setField(1, outputData); - - grpcResponseHandler.startTimer(); - assertThrows(Exception.class, () -> grpcResponseHandler.onNext(message)); - - ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class); - verify(errorReporter, times(1)).reportFatalException(exceptionCaptor.capture()); - assertEquals("Field Descriptor not found for field: driver-pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", exceptionCaptor.getValue().getMessage()); - - ArgumentCaptor exceptionCaptor2 = ArgumentCaptor.forClass(IllegalArgumentException.class); - verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture()); - assertEquals("Field Descriptor not found for field: driver-pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", exceptionCaptor2.getValue().getMessage()); - } - } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java index 1fe3bd5af..03844f07f 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java @@ -6,7 +6,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -39,6 +38,24 @@ public void shouldRunGetNestedFieldColumnsDescriptor() { assertEquals("customer_id", fieldDescriptor.getName()); } + @Test + public void shouldGiveNullForEmptyFieldFieldDescriptor() { + String nonExistentField = "customer-id"; + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(null, nonExistentField); + assertNull(nonExistentFieldDescriptor); + } + @Test + public void shouldGiveNullForNullColumnFieldFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, null); + assertNull(nonExistentFieldDescriptor); + } + @Test + public void shouldGiveNullForEmptyColumnFieldFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, ""); + assertNull(nonExistentFieldDescriptor); + } @Test public void shouldGiveNullForInvalidFieldFieldDescriptor() { Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); @@ -55,8 +72,9 @@ public void shouldGiveNullForInvalidNestedFieldDescriptor() { assertNull(invalidFieldDescriptor); } + @Test - public void shouldGiveNullForInvlidNestedFieldColumnsDescriptor() { + public void shouldGiveNullForInvalidNestedFieldColumnsDescriptor() { Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); String[] invalidNestedFieldNames = {"customer_profile", "customer-id"}; Descriptors.FieldDescriptor invalidFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(parentDescriptor, invalidNestedFieldNames); @@ -64,21 +82,22 @@ public void shouldGiveNullForInvlidNestedFieldColumnsDescriptor() { } @Test - public void shouldThrowExceptionWhenNestedColumnDoesNotExists() { - Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); - String invalidNestedFieldNames = "booking_log.driver_pickup-location.name"; - IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, - () -> DescriptorsUtil.getFieldDescriptor(parentDescriptor, invalidNestedFieldNames)); - assertEquals("Field Descriptor not found for field: driver_pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", - exception.getMessage()); + public void shouldGiveNullForNullNestedFieldDescriptor() { + String[] nonExistentField = new String[]{"customer-id"}; + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(null, nonExistentField); + assertNull(nonExistentFieldDescriptor); } + @Test - public void shouldThrowExceptionWhenNestedColumnGetFieldDescriptorDoesNotExists() { - Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); - String[] invalidNestedFieldNames = {"booking_log", "driver_pickup-location", "name"}; - IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, - () -> DescriptorsUtil.getNestedFieldDescriptor(parentDescriptor, invalidNestedFieldNames)); - assertEquals("Field Descriptor not found for field: driver_pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", - exception.getMessage()); + public void shouldGiveNullForNullColumnNestedFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(descriptor, null); + assertNull(nonExistentFieldDescriptor); + } + @Test + public void shouldGiveNullForEmptyColumnNestedFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(descriptor, new String[]{}); + assertNull(nonExistentFieldDescriptor); } }