diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java index 50ae49b6b..d88b39ca1 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandler.java @@ -14,6 +14,7 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; +import com.gotocompany.dagger.core.utils.DescriptorsUtil; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import io.grpc.stub.StreamObserver; @@ -111,10 +112,14 @@ private void setField(String key, Object value, int fieldIndex) { } private void setFieldUsingType(String key, Object value, int fieldIndex) { - Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key); - if (fieldDescriptor == null) { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key); - reportAndThrowError(illegalArgumentException); + Descriptors.FieldDescriptor fieldDescriptor = null; + try { + fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key); + if (fieldDescriptor == null) { + throw new IllegalArgumentException("Field Descriptor not found for field: " + key); + } + } catch (RuntimeException exception) { + reportAndThrowError(exception); } TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor); rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value)); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java index 512518f14..64a3abd91 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandler.java @@ -11,6 +11,7 @@ import com.gotocompany.dagger.core.processors.common.OutputMapping; import com.gotocompany.dagger.core.processors.common.PostResponseTelemetry; import com.gotocompany.dagger.core.processors.common.RowManager; +import com.gotocompany.dagger.core.utils.DescriptorsUtil; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import org.apache.flink.streaming.api.functions.async.ResultFuture; @@ -157,10 +158,14 @@ private void setField(String key, Object value, int fieldIndex) { } private void setFieldUsingType(String key, Object value, Integer fieldIndex) { - Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key); - if (fieldDescriptor == null) { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key); - reportAndThrowError(illegalArgumentException); + Descriptors.FieldDescriptor fieldDescriptor = null; + try { + fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key); + if (fieldDescriptor == null) { + throw new IllegalArgumentException("Field Descriptor not found for field: " + key); + } + } catch (RuntimeException exception) { + reportAndThrowError(exception); } TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor); rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value)); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java new file mode 100644 index 000000000..8b3caa589 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/DescriptorsUtil.java @@ -0,0 +1,60 @@ +package com.gotocompany.dagger.core.utils; + +import com.google.protobuf.Descriptors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.MESSAGE; + +/** + * Utility class that contains helper methods to get {@link Descriptors} {@link Descriptors.FieldDescriptor}. + */ +public class DescriptorsUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(DescriptorsUtil.class.getName()); + + /** + * Gets FieldDescriptor . + * + * @param descriptor the descriptor + * @param columnName the columnName + * @return the fieldDescriptor + */ + public static Descriptors.FieldDescriptor getFieldDescriptor(Descriptors.Descriptor descriptor, String columnName) { + if (descriptor == null || columnName == null) { + return null; + } + String[] nestedFields = columnName.split("\\."); + if (nestedFields.length == 1) { + return descriptor.findFieldByName(columnName); + } else { + return getNestedFieldDescriptor(descriptor, nestedFields); + } + } + + /** + * Gets FieldDescriptor . + * + * @param parentDescriptor the descriptor + * @param nestedColumnNames the array of columnNames + * @return the fieldDescriptor + */ + public static Descriptors.FieldDescriptor getNestedFieldDescriptor(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames) { + if (parentDescriptor == null || nestedColumnNames == null || nestedColumnNames.length == 0) { + return null; + } + String childColumnName = nestedColumnNames[0]; + if (nestedColumnNames.length == 1) { + return parentDescriptor.findFieldByName(childColumnName); + } + Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName); + if (childFieldDescriptor == null || childFieldDescriptor.getJavaType() != MESSAGE) { + LOGGER.info(String.format("Either the Field Descriptor for the field '%s' is missing in the proto '%s', or the Field Descriptor is not of the MESSAGE type.", childColumnName, parentDescriptor.getFullName())); + return null; + } + Descriptors.Descriptor childDescriptor = childFieldDescriptor.getMessageType(); + return getNestedFieldDescriptor(childDescriptor, Arrays.copyOfRange(nestedColumnNames, 1, nestedColumnNames.length)); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java index f7adda604..60c883f04 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcResponseHandlerTest.java @@ -287,4 +287,122 @@ public void shouldDetectProperComplexBodyAndHandleResponseIfRetainResponseTypeIs verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); } + @Test + public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsFalse() throws InvalidProtocolBufferException { + descriptor = TestBookingLogMessage.getDescriptor(); + outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address")); + outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name")); + + TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); + TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); + + outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + + grpcSourceConfig = new GrpcSourceConfigBuilder() + .setEndpoint("localhost") + .setServicePort(8000) + .setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest") + .setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse") + .setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod") + .setRequestPattern("{\"key\": \"%s\"}") + .setRequestVariables("customer_id") + .setOutputMapping(outputMapping) + .createGrpcSourceConfig(); + grpcSourceConfig.setRetainResponseType(false); + + DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); + GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "Indonesia"); + outputData.setField(1, "GojekTech"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + + grpcResponseHandler.startTimer(); + grpcResponseHandler.onNext(message); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsTrue() throws InvalidProtocolBufferException { + descriptor = TestBookingLogMessage.getDescriptor(); + outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address")); + outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name")); + + TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); + TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); + + outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + + grpcSourceConfig = new GrpcSourceConfigBuilder() + .setEndpoint("localhost") + .setServicePort(8000) + .setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest") + .setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse") + .setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod") + .setRequestPattern("{\"key\": \"%s\"}") + .setRequestVariables("customer_id") + .setOutputMapping(outputMapping) + .createGrpcSourceConfig(); + grpcSourceConfig.setRetainResponseType(true); + + DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); + GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "Indonesia"); + outputData.setField(1, "GojekTech"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + + grpcResponseHandler.startTimer(); + grpcResponseHandler.onNext(message); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldThrowErrorWhenNestedFieldIsNotPresentInOutputDescriptor() throws InvalidProtocolBufferException { + descriptor = TestBookingLogMessage.getDescriptor(); + outputMapping.put("driver_pickup_location.invalid_address", new OutputMapping("$.driver_pickup_location.address")); + TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build(); + TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build(); + + grpcSourceConfig = new GrpcSourceConfigBuilder().setEndpoint("localhost").setServicePort(8000).setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest").setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse").setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod").setRequestPattern("{\"key\": \"%s\"}").setRequestVariables("customer_id").setOutputMapping(outputMapping).createGrpcSourceConfig(); + grpcSourceConfig.setRetainResponseType(false); + + outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + + DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray()); + GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "Indonesia"); + outputData.setField(1, "GojekTech"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + + grpcResponseHandler.startTimer(); + assertThrows(Exception.class, () -> grpcResponseHandler.onNext(message)); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class); + verify(errorReporter, times(1)).reportFatalException(exceptionCaptor.capture()); + assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor.getValue().getMessage()); + + ArgumentCaptor exceptionCaptor2 = ArgumentCaptor.forClass(IllegalArgumentException.class); + verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture()); + assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor2.getValue().getMessage()); + } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java index e98f94b1e..b1731ec74 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/http/HttpResponseHandlerTest.java @@ -60,7 +60,7 @@ public void setup() { initMocks(this); descriptor = TestSurgeFactorLogMessage.getDescriptor(); outputColumnNames = Collections.singletonList("value"); - inputColumnNames = new String[] {"order_id", "customer_id", "driver_id"}; + inputColumnNames = new String[]{"order_id", "customer_id", "driver_id"}; outputMapping = new HashMap<>(); headers = new HashMap<>(); headers.put("content-type", "application/json"); @@ -521,4 +521,116 @@ public void shouldHandleAnySuccessResponseCodeOtherThan200() { verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); } + + @Test + public void shouldPopulateSingleResultFromHttpCallInInputRowForNestedColumnSetRetainTypeFalse() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer_profile.email", new OutputMapping("$.email")); + outputMapping.put("customer_profile.name", new OutputMapping("$.name")); + outputColumnNames = Arrays.asList("customer_profile.email", "customer_profile.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldPopulateSingleResultFromHttpCallInInputRowForNestedColumnSetRetainTypeTrue() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer_profile.email", new OutputMapping("$.email")); + outputMapping.put("customer_profile.name", new OutputMapping("$.name")); + outputColumnNames = Arrays.asList("customer_profile.email", "customer_profile.name"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", true); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } + + @Test + public void shouldThrowNullPointerExceptionWhenNestedFieldIsNotPresentInOutputDescriptor() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer_profile.invalid_email", new OutputMapping("$.email")); + outputColumnNames = Arrays.asList("customer_profile.invalid_email"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + httpResponseHandler.startTimer(); + assertThrows(NullPointerException.class, + () -> httpResponseHandler.onCompleted(response)); + verify(resultFuture, times(1)).completeExceptionally(any(RuntimeException.class)); + } + + @Test + public void shouldThrowExceptionWhenNestedParentFieldIsNotPresentInOutputDescriptor() { + descriptor = com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage.getDescriptor(); + outputMapping.put("customer-profile.email", new OutputMapping("$.email")); + outputColumnNames = Arrays.asList("customer-profile.email"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, "test_email@go-jek.com"); + outputData.setField(1, "test_name"); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponseBody()).thenReturn("{\n" + + " \"email\": \"test_email@go-jek.com\",\n" + + " \"name\": \"test_name\"\n" + + "}"); + + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, new HashSet(), meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + + httpResponseHandler.startTimer(); + + assertThrows(NullPointerException.class, + () -> httpResponseHandler.onCompleted(response)); + verify(resultFuture, times(1)).completeExceptionally(any(RuntimeException.class)); + } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java new file mode 100644 index 000000000..03844f07f --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/DescriptorsUtilTest.java @@ -0,0 +1,103 @@ +package com.gotocompany.dagger.core.utils; + +import com.google.protobuf.Descriptors; +import com.gotocompany.dagger.consumer.TestCustomerLogMessage; +import com.gotocompany.dagger.consumer.TestEnrichedBookingLogMessage; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class DescriptorsUtilTest { + + @Test + public void shouldGetFieldDescriptor() { + String fieldName = "customer_id"; + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, fieldName); + assertNotNull(fieldDescriptor); + assertEquals("customer_id", fieldDescriptor.getName()); + } + + @Test + public void shouldRunGetNestedFieldDescriptor() { + String fieldName = "customer_profile.customer_id"; + Descriptors.Descriptor descriptor = TestEnrichedBookingLogMessage.getDescriptor(); + Descriptors.FieldDescriptor fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, fieldName); + assertNotNull(fieldDescriptor); + assertEquals("customer_id", fieldDescriptor.getName()); + } + + @Test + public void shouldRunGetNestedFieldColumnsDescriptor() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String[] nestedFieldNames = {"customer_profile", "customer_id"}; + Descriptors.FieldDescriptor fieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(parentDescriptor, nestedFieldNames); + assertNotNull(fieldDescriptor); + assertEquals("customer_id", fieldDescriptor.getName()); + } + + @Test + public void shouldGiveNullForEmptyFieldFieldDescriptor() { + String nonExistentField = "customer-id"; + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(null, nonExistentField); + assertNull(nonExistentFieldDescriptor); + } + @Test + public void shouldGiveNullForNullColumnFieldFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, null); + assertNull(nonExistentFieldDescriptor); + } + @Test + public void shouldGiveNullForEmptyColumnFieldFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, ""); + assertNull(nonExistentFieldDescriptor); + } + @Test + public void shouldGiveNullForInvalidFieldFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + String nonExistentField = "customer-id"; + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, nonExistentField); + assertNull(nonExistentFieldDescriptor); + } + + @Test + public void shouldGiveNullForInvalidNestedFieldDescriptor() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String fieldName = "customer_profile.customer-id"; + Descriptors.FieldDescriptor invalidFieldDescriptor = DescriptorsUtil.getFieldDescriptor(parentDescriptor, fieldName); + assertNull(invalidFieldDescriptor); + } + + + @Test + public void shouldGiveNullForInvalidNestedFieldColumnsDescriptor() { + Descriptors.Descriptor parentDescriptor = TestEnrichedBookingLogMessage.getDescriptor(); + String[] invalidNestedFieldNames = {"customer_profile", "customer-id"}; + Descriptors.FieldDescriptor invalidFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(parentDescriptor, invalidNestedFieldNames); + assertNull(invalidFieldDescriptor); + } + + @Test + public void shouldGiveNullForNullNestedFieldDescriptor() { + String[] nonExistentField = new String[]{"customer-id"}; + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(null, nonExistentField); + assertNull(nonExistentFieldDescriptor); + } + + @Test + public void shouldGiveNullForNullColumnNestedFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(descriptor, null); + assertNull(nonExistentFieldDescriptor); + } + @Test + public void shouldGiveNullForEmptyColumnNestedFieldDescriptor() { + Descriptors.Descriptor descriptor = TestCustomerLogMessage.getDescriptor(); + Descriptors.FieldDescriptor nonExistentFieldDescriptor = DescriptorsUtil.getNestedFieldDescriptor(descriptor, new String[]{}); + assertNull(nonExistentFieldDescriptor); + } +}